diff options
author | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-04-19 13:40:48 +0300 |
---|---|---|
committer | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-04-19 13:40:48 +0300 |
commit | e46919542a6fca7274b648f6ee88f3fe76e38499 (patch) | |
tree | c78f304bfb25f460af5df013264cbf682f24c95b | |
parent | 804c961885d2205be1ed59989984cd25d628d606 (diff) | |
download | ydb-e46919542a6fca7274b648f6ee88f3fe76e38499.tar.gz |
YQ-1052 Rename common sinks & transform entities
Rename IDqSink
Rename sink actor in CA
Rename sink actor
Rename IDqSinkActorFactory
Rename IDqSinkActor
ref:fd606208853da58481dc36b78e4fddf47a76a9d0
36 files changed, 185 insertions, 185 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index 02b7b98029e..afb11d79b19 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -16,11 +16,11 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput namespace NKqp { IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask&& task, - NYql::NDq::IDqSourceActorFactory::TPtr sourceActorFactory, NYql::NDq::IDqSinkActorFactory::TPtr sinkActorFactory, + NYql::NDq::IDqSourceActorFactory::TPtr sourceActorFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory, const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits); IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId, - NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqSourceActorFactory::TPtr sourceActorFactory, NYql::NDq::IDqSinkActorFactory::TPtr sinkActorFactory, + NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqSourceActorFactory::TPtr sourceActorFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory, const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters); diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index 77ebf1dcd58..117ccb88072 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -33,9 +33,9 @@ public: } TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory, + IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits) - : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkActorFactory), settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true) + : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true) , ComputeCtx(settings.StatsMode) { if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) { @@ -317,11 +317,11 @@ private: } // anonymous namespace IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory, + IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits) { return new TKqpComputeActor(executerId, txId, std::move(task), std::move(sourceActorFactory), - std::move(sinkActorFactory), settings, memoryLimits); + std::move(sinkFactory), settings, memoryLimits); } } // namespace NKqp diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 0966e3405e0..7395b148c37 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -72,9 +72,9 @@ public: } TKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId, - NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory, + NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters) - : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkActorFactory), settings, memoryLimits) + : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), settings, memoryLimits) , ComputeCtx(settings.StatsMode) , Snapshot(snapshot) , Counters(counters) @@ -1130,10 +1130,10 @@ private: } // anonymous namespace IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId, - NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory, + NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters) { - return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkActorFactory), + return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), settings, memoryLimits, counters); } diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index 8f2e4005130..4e65e14ecc0 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -119,7 +119,7 @@ void Init( }); auto sourceActorFactory = MakeIntrusive<NYql::NDq::TDqSourceFactory>(); - auto sinkActorFactory = MakeIntrusive<NYql::NDq::TDqSinkFactory>(); + auto sinkFactory = MakeIntrusive<NYql::NDq::TDqSinkFactory>(); NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory; const auto httpGateway = NYql::IHTTPGateway::Make( @@ -141,8 +141,8 @@ void Init( httpGateway, std::make_shared<NYql::NS3::TRetryConfig>(protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetRetryConfig())); RegisterClickHouseReadActorFactory(*sourceActorFactory, credentialsFactory, httpGateway); - RegisterDqPqWriteActorFactory(*sinkActorFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); - RegisterDQSolomonWriteActorFactory(*sinkActorFactory, credentialsFactory); + RegisterDqPqWriteActorFactory(*sinkFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); + RegisterDQSolomonWriteActorFactory(*sinkFactory, credentialsFactory); } ui64 mkqlInitialMemoryLimit = 8_GB; @@ -161,7 +161,7 @@ void Init( lwmOptions.Counters = workerManagerCounters; lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, false); lwmOptions.SourceActorFactory = sourceActorFactory; - lwmOptions.SinkActorFactory = sinkActorFactory; + lwmOptions.SinkFactory = sinkFactory; lwmOptions.TaskRunnerInvokerFactory = new NYql::NDqs::TTaskRunnerInvokerFactory(); lwmOptions.MkqlInitialMemoryLimit = mkqlInitialMemoryLimit; lwmOptions.MkqlTotalMemoryLimit = mkqlTotalMemoryLimit; diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 8e9a2cfa7f5..bd44982504d 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -29,10 +29,10 @@ public: static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR"; TDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory, + IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory) - : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkActorFactory), settings, memoryLimits, /* ownMemoryQuota = */ false) + : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), settings, memoryLimits, /* ownMemoryQuota = */ false) , TaskRunnerActorFactory(taskRunnerActorFactory) , ReadyToCheckpointFlag(false) , SentStatsRequest(false) @@ -121,7 +121,7 @@ private: WaitingForStateResponse.clear(); } - const TDqSinkStats* GetSinkStats(ui64 outputIdx, const TSinkInfo& sinkInfo) const override { + const TDqAsyncOutputBufferStats* GetSinkStats(ui64 outputIdx, const TSinkInfo& sinkInfo) const override { Y_UNUSED(sinkInfo); return TaskRunnerStats.GetSinkStats(outputIdx); } @@ -176,31 +176,31 @@ private: return; } - Y_VERIFY(sinkInfo.SinkActor); + Y_VERIFY(sinkInfo.Sink); Y_VERIFY(sinkInfo.Actor); const ui32 allowedOvercommit = AllowedChannelsOvercommit(); - const i64 sinkActorFreeSpaceBeforeSend = sinkInfo.SinkActor->GetFreeSpace(); + const i64 sinkFreeSpaceBeforeSend = sinkInfo.Sink->GetFreeSpace(); - i64 toSend = sinkActorFreeSpaceBeforeSend + allowedOvercommit; + i64 toSend = sinkFreeSpaceBeforeSend + allowedOvercommit; CA_LOG_D("About to drain sink " << outputIndex - << ". FreeSpace: " << sinkActorFreeSpaceBeforeSend + << ". FreeSpace: " << sinkFreeSpaceBeforeSend << ", allowedOvercommit: " << allowedOvercommit << ", toSend: " << toSend - //<< ", finished: " << sinkInfo.Sink->IsFinished()); + //<< ", finished: " << sinkInfo.SinkBuffer->IsFinished()); ); sinkInfo.PopStarted = true; ProcessOutputsState.Inflight ++; - sinkInfo.SinkActorFreeSpaceBeforeSend = sinkActorFreeSpaceBeforeSend; - this->Send(TaskRunnerActorId, new NTaskRunnerActor::TEvSinkPop(outputIndex, sinkActorFreeSpaceBeforeSend)); + sinkInfo.SinkFreeSpaceBeforeSend = sinkFreeSpaceBeforeSend; + this->Send(TaskRunnerActorId, new NTaskRunnerActor::TEvSinkPop(outputIndex, sinkFreeSpaceBeforeSend)); } bool DoHandleChannelsAfterFinishImpl() override { Y_VERIFY(Checkpoints); auto req = GetCheckpointRequest(); if (!req.Defined()) { - return true; // handled channels syncronously + return true; // handled channels syncronously } CA_LOG_D("DoHandleChannelsAfterFinishImpl"); this->Send(TaskRunnerActorId, new NTaskRunnerActor::TEvContinueRun(std::move(req), /* checkpointOnly = */ true)); @@ -360,7 +360,7 @@ private: } if (ReadyToCheckpointFlag) { Y_VERIFY(!ProgramState); - ProgramState = std::move(ev->Get()->ProgramState); + ProgramState = std::move(ev->Get()->ProgramState); Checkpoints->DoCheckpoint(); } ProcessOutputsImpl(status); @@ -493,11 +493,11 @@ private: ProcessOutputsState.HasDataToSend |= !sinkInfo.Finished; auto guard = BindAllocator(); - sinkInfo.SinkActor->SendData(std::move(batch), size, std::move(checkpoint), finished); + sinkInfo.Sink->SendData(std::move(batch), size, std::move(checkpoint), finished); CA_LOG_D("sink " << outputIndex << ": sent " << dataSize << " bytes of data and " << checkpointSize << " bytes of checkpoint barrier"); CA_LOG_D("Drain sink " << outputIndex - << ". Free space decreased: " << (sinkInfo.SinkActorFreeSpaceBeforeSend - sinkInfo.SinkActor->GetFreeSpace()) + << ". Free space decreased: " << (sinkInfo.SinkFreeSpaceBeforeSend - sinkInfo.Sink->GetFreeSpace()) << ", sent data from buffer: " << dataSize); ProcessOutputsState.DataWasSent |= dataWasSent; @@ -520,7 +520,7 @@ private: return TaskRunnerStats.Get(); } - template<typename TSecond> + template<typename TSecond> TVector<ui32> GetIds(const THashMap<ui64, TSecond>& collection) { TVector<ui32> ids; std::transform(collection.begin(), collection.end(), std::back_inserter(ids), [](const auto& p) { @@ -572,12 +572,12 @@ private: IActor* CreateDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask&& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory, + IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory) { return new TDqAsyncComputeActor(executerId, txId, std::move(task), std::move(sourceActorFactory), - std::move(sinkActorFactory), settings, memoryLimits, taskRunnerActorFactory); + std::move(sinkFactory), settings, memoryLimits, taskRunnerActorFactory); } } // namespace NDq diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h index 71c918f04ca..6812598834d 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h @@ -19,7 +19,7 @@ namespace NYql { namespace NDq { NActors::IActor* CreateDqAsyncComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory, + IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp index c2f6c45e2b5..e8303b8a1bc 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp @@ -33,10 +33,10 @@ public: static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR"; TDqComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory, + IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TTaskRunnerFactory& taskRunnerFactory) - : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkActorFactory), settings, memoryLimits) + : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), settings, memoryLimits) , TaskRunnerFactory(taskRunnerFactory) {} @@ -67,11 +67,11 @@ private: IActor* CreateDqComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask&& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory, + IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TTaskRunnerFactory& taskRunnerFactory) { return new TDqComputeActor(executerId, txId, std::move(task), std::move(sourceActorFactory), - std::move(sinkActorFactory), settings, memoryLimits, taskRunnerFactory); + std::move(sinkFactory), settings, memoryLimits, taskRunnerFactory); } } // namespace NDq diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index d03d1e68f52..4dfc41f03a3 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -257,7 +257,7 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& NDqProto::TDqTaskStats* protoTask, bool withProfileStats); NActors::IActor* CreateDqComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory, + IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TTaskRunnerFactory& taskRunnerFactory); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h index e5c64eebc5e..ba7cd1dddc5 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h @@ -99,7 +99,7 @@ public: NDqProto::TCheckpoint GetPendingCheckpoint() const; void RegisterCheckpoint(const NDqProto::TCheckpoint& checkpoint, ui64 channelId); - // Sink actor support. + // Sink support. void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint); void TryToSavePendingCheckpoint(); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 819f13b3cf8..51559b03dad 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -58,7 +58,7 @@ template<typename TDerived> class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived> , public TDqComputeActorChannels::ICallbacks , public TDqComputeActorCheckpoints::ICallbacks - , public IDqSinkActor::ICallbacks + , public IDqComputeActorAsyncOutput::ICallbacks { protected: enum EEvWakeupTag : ui64 { @@ -114,7 +114,7 @@ public: protected: TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory, + IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, bool ownMemoryQuota = true, bool passExceptions = false) : ExecuterId(executerId) , TxId(txId) @@ -123,7 +123,7 @@ protected: , MemoryLimits(memoryLimits) , CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn) , SourceActorFactory(std::move(sourceActorFactory)) - , SinkActorFactory(std::move(sinkActorFactory)) + , SinkFactory(std::move(sinkFactory)) , CheckpointingMode(GetTaskCheckpointingMode(Task)) , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) , MemoryQuota(ownMemoryQuota ? InitMemoryQuota() : nullptr) @@ -137,7 +137,7 @@ protected: } TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, const NDqProto::TDqTask& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory, + IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits) : ExecuterId(executerId) , TxId(txId) @@ -146,7 +146,7 @@ protected: , MemoryLimits(memoryLimits) , CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn) , SourceActorFactory(std::move(sourceActorFactory)) - , SinkActorFactory(std::move(sinkActorFactory)) + , SinkFactory(std::move(sinkFactory)) , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) , MemoryQuota(InitMemoryQuota()) , Running(!Task.GetCreateSuspended()) @@ -281,7 +281,7 @@ protected: // Send checkpoints to output channels. ProcessOutputsImpl(ERunStatus::Finished); - return true; // returns true, when channels were handled syncronously + return true; // returns true, when channels were handled syncronously } void ProcessOutputsImpl(ERunStatus status) { @@ -412,7 +412,7 @@ protected: for (auto& [_, sink] : SinksMap) { if (sink.Actor) { - sink.SinkActor->PassAway(); + sink.Sink->PassAway(); } } @@ -626,7 +626,7 @@ protected: channelInfo.Channel->Push(NDqProto::TCheckpoint(checkpoint)); } for (const auto& [outputIndex, sink] : SinksMap) { - sink.Sink->Push(NDqProto::TCheckpoint(checkpoint)); + sink.SinkBuffer->Push(NDqProto::TCheckpoint(checkpoint)); } } @@ -666,8 +666,8 @@ protected: for (const NDqProto::TSinkState& sinkState : state.GetSinks()) { TSinkInfo* sink = SinksMap.FindPtr(sinkState.GetOutputIndex()); YQL_ENSURE(sink, "Failed to load state. Sink with input index " << sinkState.GetOutputIndex() << " was not found"); - YQL_ENSURE(sink->SinkActor, "Sink[" << sinkState.GetOutputIndex() << "] is not created"); - sink->SinkActor->LoadState(sinkState); + YQL_ENSURE(sink->Sink, "Sink[" << sinkState.GetOutputIndex() << "] is not created"); + sink->Sink->LoadState(sinkState); } } catch (const std::exception& e) { error = e.what(); @@ -760,13 +760,13 @@ protected: }; struct TSinkInfo { - IDqSink::TPtr Sink; - IDqSinkActor* SinkActor = nullptr; + IDqAsyncOutputBuffer::TPtr SinkBuffer; + IDqComputeActorAsyncOutput* Sink = nullptr; NActors::IActor* Actor = nullptr; bool Finished = false; // If sink is in finished state, it receives only checkpoints. TIssuesBuffer IssuesBuffer; bool PopStarted = false; - i64 SinkActorFreeSpaceBeforeSend = 0; + i64 SinkFreeSpaceBeforeSend = 0; TSinkInfo() : IssuesBuffer(IssuesBufferSize) {} }; @@ -1078,19 +1078,19 @@ private: return; } + Y_VERIFY(sinkInfo.SinkBuffer); Y_VERIFY(sinkInfo.Sink); - Y_VERIFY(sinkInfo.SinkActor); Y_VERIFY(sinkInfo.Actor); const ui32 allowedOvercommit = AllowedChannelsOvercommit(); - const i64 sinkActorFreeSpaceBeforeSend = sinkInfo.SinkActor->GetFreeSpace(); + const i64 sinkFreeSpaceBeforeSend = sinkInfo.Sink->GetFreeSpace(); - i64 toSend = sinkActorFreeSpaceBeforeSend + allowedOvercommit; + i64 toSend = sinkFreeSpaceBeforeSend + allowedOvercommit; CA_LOG_D("About to drain sink " << outputIndex - << ". FreeSpace: " << sinkActorFreeSpaceBeforeSend + << ". FreeSpace: " << sinkFreeSpaceBeforeSend << ", allowedOvercommit: " << allowedOvercommit << ", toSend: " << toSend - << ", finished: " << sinkInfo.Sink->IsFinished()); + << ", finished: " << sinkInfo.SinkBuffer->IsFinished()); i64 sent = 0; while (toSend > 0 && (!sinkInfo.Finished || Checkpoints)) { @@ -1099,11 +1099,11 @@ private: break; } sent += sentChunk; - toSend = sinkInfo.SinkActor->GetFreeSpace() + allowedOvercommit; + toSend = sinkInfo.Sink->GetFreeSpace() + allowedOvercommit; } CA_LOG_D("Drain sink " << outputIndex - << ". Free space decreased: " << (sinkActorFreeSpaceBeforeSend - sinkInfo.SinkActor->GetFreeSpace()) + << ". Free space decreased: " << (sinkFreeSpaceBeforeSend - sinkInfo.Sink->GetFreeSpace()) << ", sent data from buffer: " << sent); ProcessOutputsState.HasDataToSend |= !sinkInfo.Finished; @@ -1111,7 +1111,7 @@ private: } ui32 SendSinkDataChunk(ui64 outputIndex, TSinkInfo& sinkInfo, ui64 bytes) { - auto sink = sinkInfo.Sink; + auto sink = sinkInfo.SinkBuffer; NKikimr::NMiniKQL::TUnboxedValueVector dataBatch; NDqProto::TCheckpoint checkpoint; @@ -1137,7 +1137,7 @@ private: ResumeInputs(); } - sinkInfo.SinkActor->SendData(std::move(dataBatch), dataSize, maybeCheckpoint, sinkInfo.Finished); + sinkInfo.Sink->SendData(std::move(dataBatch), dataSize, maybeCheckpoint, sinkInfo.Finished); CA_LOG_D("sink " << outputIndex << ": sent " << dataSize << " bytes of data and " << checkpointSize << " bytes of checkpoint barrier"); return dataSize + checkpointSize; @@ -1230,13 +1230,13 @@ protected: } } for (auto& [outputIndex, sink] : SinksMap) { - if (TaskRunner) { sink.Sink = TaskRunner->GetSink(outputIndex); } - Y_VERIFY(SinkActorFactory); + if (TaskRunner) { sink.SinkBuffer = TaskRunner->GetSink(outputIndex); } + Y_VERIFY(SinkFactory); const auto& outputDesc = Task.GetOutputs(outputIndex); const ui64 i = outputIndex; // Crutch for clang - CA_LOG_D("Create sink actor for output " << i << " " << outputDesc); - std::tie(sink.SinkActor, sink.Actor) = SinkActorFactory->CreateDqSinkActor( - IDqSinkActorFactory::TArguments { + CA_LOG_D("Create sink for output " << i << " " << outputDesc); + std::tie(sink.Sink, sink.Actor) = SinkFactory->CreateDqSink( + IDqSinkFactory::TArguments { .OutputDesc = outputDesc, .OutputIndex = outputIndex, .TxId = TxId, @@ -1367,9 +1367,9 @@ private: return TaskRunner->GetStats(); } - virtual const TDqSinkStats* GetSinkStats(ui64 outputIdx, const TSinkInfo& sinkInfo) const { + virtual const TDqAsyncOutputBufferStats* GetSinkStats(ui64 outputIdx, const TSinkInfo& sinkInfo) const { Y_UNUSED(outputIdx); - return sinkInfo.Sink ? sinkInfo.Sink->GetStats() : nullptr; + return sinkInfo.SinkBuffer ? sinkInfo.SinkBuffer->GetStats() : nullptr; } public: @@ -1496,7 +1496,7 @@ protected: const TComputeMemoryLimits MemoryLimits; const bool CanAllocateExtraMemory = false; const IDqSourceActorFactory::TPtr SourceActorFactory; - const IDqSinkActorFactory::TPtr SinkActorFactory; + const IDqSinkFactory::TPtr SinkFactory; const NDqProto::ECheckpointingMode CheckpointingMode; TIntrusivePtr<IDqTaskRunner> TaskRunner; TDqComputeActorChannels* Channels = nullptr; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_io_actors_factory.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_io_actors_factory.cpp index 5ae40ef1ab6..c8f51e185c6 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_io_actors_factory.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_io_actors_factory.cpp @@ -24,13 +24,13 @@ void TDqSourceFactory::Register(const TString& type, TCreatorFunction creator) Y_VERIFY(registered); } -std::pair<IDqSinkActor*, NActors::IActor*> TDqSinkFactory::CreateDqSinkActor(IDqSinkActorFactory::TArguments&& args) const +std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> TDqSinkFactory::CreateDqSink(IDqSinkFactory::TArguments&& args) const { const TString& type = args.OutputDesc.GetSink().GetType(); - YQL_ENSURE(!type.empty(), "Attempt to create sink actor of empty type"); + YQL_ENSURE(!type.empty(), "Attempt to create sink of empty type"); const TCreatorFunction* creatorFunc = CreatorsByType.FindPtr(type); - YQL_ENSURE(creatorFunc, "Unknown type of sink actor: \"" << type << "\""); - std::pair<IDqSinkActor*, NActors::IActor*> actor = (*creatorFunc)(std::move(args)); + YQL_ENSURE(creatorFunc, "Unknown type of sink: \"" << type << "\""); + std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> actor = (*creatorFunc)(std::move(args)); Y_VERIFY(actor.first); Y_VERIFY(actor.second); return actor; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_io_actors_factory.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_io_actors_factory.h index db57796f5d4..45b4ac13cf6 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_io_actors_factory.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_io_actors_factory.h @@ -49,23 +49,23 @@ private: }; template <class T> -concept TCastsToSinkActorPair = - std::is_convertible_v<T, std::pair<IDqSinkActor*, NActors::IActor*>>; +concept TCastsToSinkPair = + std::is_convertible_v<T, std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*>>; template <class T, class TProto> -concept TSinkActorCreatorFunc = requires(T f, TProto&& settings, IDqSinkActorFactory::TArguments&& args) { - { f(std::move(settings), std::move(args)) } -> TCastsToSinkActorPair; +concept TSinkCreatorFunc = requires(T f, TProto&& settings, IDqSinkFactory::TArguments&& args) { + { f(std::move(settings), std::move(args)) } -> TCastsToSinkPair; }; -class TDqSinkFactory : public IDqSinkActorFactory { +class TDqSinkFactory : public IDqSinkFactory { public: - using TCreatorFunction = std::function<std::pair<IDqSinkActor*, NActors::IActor*>(TArguments&& args)>; + using TCreatorFunction = std::function<std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*>(TArguments&& args)>; - std::pair<IDqSinkActor*, NActors::IActor*> CreateDqSinkActor(TArguments&& args) const override; + std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSink(TArguments&& args) const override; void Register(const TString& type, TCreatorFunction creator); - template <class TProtoMsg, TSinkActorCreatorFunc<TProtoMsg> TCreatorFunc> + template <class TProtoMsg, TSinkCreatorFunc<TProtoMsg> TCreatorFunc> void Register(const TString& type, TCreatorFunc creator) { Register(type, [creator = std::move(creator), type](TArguments&& args) diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_sinks.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_sinks.h index 5c2db2ac8d8..74b4793e5a0 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_sinks.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_sinks.h @@ -20,26 +20,26 @@ class IActor; namespace NYql::NDq { -// Sink actor. +// Sink. // Must be IActor. // // Protocol: -// 1. CA starts sink actor. +// 1. CA starts sink. // 2. CA runs program and gets results. -// 3. CA calls IDqSinkActor::SendData(). +// 3. CA calls IDqComputeActorAsyncOutput::SendData(). // 4. If SendData() returns value less than 0, loop stops running until free space appears. -// 5. When free space appears, sink actor calls ICallbacks::ResumeExecution() to start processing again. +// 5. When free space appears, sink calls ICallbacks::ResumeExecution() to start processing again. // // Checkpointing: // 1. InjectCheckpoint event arrives to CA. // 2. CA saves its state and injects special checkpoint event to all outputs (TDqComputeActorCheckpoints::ICallbacks::InjectBarrierToOutputs()). -// 3. Sink actor writes all data before checkpoint. -// 4. Sink actor waits all external sink's acks for written data. -// 5. Sink actor gathers its state and passes it into callback ICallbacks::OnSinkStateSaved(state, outputIndex). +// 3. Sink writes all data before checkpoint. +// 4. Sink waits all external sink's acks for written data. +// 5. Sink gathers its state and passes it into callback ICallbacks::OnSinkStateSaved(state, outputIndex). // 6. Checkpoints actor builds state for all task node as sum of the state of CA and all its sinks and saves it. // 7. ... -// 8. When checkpoint is written into database, checkpoints actor calls IDqSinkActor::CommitState() to apply all side effects. -struct IDqSinkActor { +// 8. When checkpoint is written into database, checkpoints actor calls IDqComputeActorAsyncOutput::CommitState() to apply all side effects. +struct IDqComputeActorAsyncOutput { struct ICallbacks { // Compute actor virtual void ResumeExecution() = 0; virtual void OnSinkError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0; @@ -68,26 +68,26 @@ struct IDqSinkActor { virtual void PassAway() = 0; // The same signature as IActor::PassAway() - virtual ~IDqSinkActor() = default; + virtual ~IDqComputeActorAsyncOutput() = default; }; -struct IDqSinkActorFactory : public TThrRefBase { - using TPtr = TIntrusivePtr<IDqSinkActorFactory>; +struct IDqSinkFactory : public TThrRefBase { + using TPtr = TIntrusivePtr<IDqSinkFactory>; struct TArguments { const NDqProto::TTaskOutput& OutputDesc; ui64 OutputIndex; TTxId TxId; const THashMap<TString, TString>& SecureParams; - IDqSinkActor::ICallbacks* Callback; + IDqComputeActorAsyncOutput::ICallbacks* Callback; const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv; const NKikimr::NMiniKQL::THolderFactory& HolderFactory; }; - // Creates sink actor. + // Creates sink. // Could throw YQL errors. - // IActor* and IDqSinkActor* returned by method must point to the objects with consistent lifetime. - virtual std::pair<IDqSinkActor*, NActors::IActor*> CreateDqSinkActor(TArguments&& args) const = 0; + // IActor* and IDqComputeActorAsyncOutput* returned by method must point to the objects with consistent lifetime. + virtual std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSink(TArguments&& args) const = 0; }; } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto index 6b608790305..cf671332c27 100644 --- a/ydb/library/yql/dq/actors/protos/dq_stats.proto +++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto @@ -47,7 +47,7 @@ message TDqInputChannelStats { google.protobuf.Any Extra = 100; } -message TDqSinkStats { +message TDqAsyncOutputBufferStats { // basic stats uint64 OutputIndex = 1; uint64 Chunks = 2; @@ -138,7 +138,7 @@ message TDqTaskStats { repeated TDqSourceStats Sources = 150; repeated TDqInputChannelStats InputChannels = 151; - repeated TDqSinkStats Sinks = 152; + repeated TDqAsyncOutputBufferStats Sinks = 152; repeated TDqOutputChannelStats OutputChannels = 153; google.protobuf.Any Extra = 200; diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index 6a3c60d5995..508c5cf049e 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -73,7 +73,7 @@ public: private: void OnStatisticsRequest(TEvStatistics::TPtr& ev, const TActorContext& ctx) { TaskRunner->UpdateStats(); - THashMap<ui32, const TDqSinkStats*> sinkStats; + THashMap<ui32, const TDqAsyncOutputBufferStats*> sinkStats; for (const auto sinkId : ev->Get()->SinkIds) { sinkStats[sinkId] = TaskRunner->GetSink(sinkId)->GetStats(); } @@ -410,13 +410,13 @@ private: } THolder<TEvDq::TEvAbortExecution> GetError(const NKikimr::TMemoryLimitExceededException&) { - const auto err = TStringBuilder() << "Mkql memory limit exceeded" + const auto err = TStringBuilder() << "Mkql memory limit exceeded" << ", limit: " << (MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : -1) << ", canAllocateExtraMemory: " << (MemoryQuota ? MemoryQuota->GetCanAllocateExtraMemory() : 0); LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, Sprintf("TMemoryLimitExceededException: %s", err.c_str())); TIssue issue(err); SetIssueCode(TIssuesIds::KIKIMR_PRECONDITION_FAILED, issue); - return MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TVector<TIssue>{issue}); + return MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TVector<TIssue>{issue}); } THolder<TEvDq::TEvAbortExecution> GetError(const TString& message) { diff --git a/ydb/library/yql/dq/runtime/dq_sink.cpp b/ydb/library/yql/dq/runtime/dq_sink.cpp index ff86523ef48..99e04ba3f08 100644 --- a/ydb/library/yql/dq/runtime/dq_sink.cpp +++ b/ydb/library/yql/dq/runtime/dq_sink.cpp @@ -9,7 +9,7 @@ namespace NYql::NDq { namespace { -class TDqSink : public IDqSink { +class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer { struct TValueDesc { std::variant<NUdf::TUnboxedValue, NDqProto::TCheckpoint> Value; ui64 EstimatedSize; @@ -31,7 +31,7 @@ class TDqSink : public IDqSink { }; public: - TDqSink(ui64 outputIndex, NKikimr::NMiniKQL::TType* outputType, ui64 maxStoredBytes, bool collectProfileStats) + TDqAsyncOutputBuffer(ui64 outputIndex, NKikimr::NMiniKQL::TType* outputType, ui64 maxStoredBytes, bool collectProfileStats) : OutputIndex(outputIndex) , MaxStoredBytes(maxStoredBytes) , OutputType(outputType) @@ -141,7 +141,7 @@ public: return OutputType; } - const TDqSinkStats* GetStats() const override { + const TDqAsyncOutputBufferStats* GetStats() const override { return &BasicStats; } @@ -181,16 +181,16 @@ private: bool Finished = false; std::deque<TValueDesc> Values; ui64 EstimatedRowBytes = 0; - TDqSinkStats BasicStats; - TDqSinkStats* ProfileStats = nullptr; + TDqAsyncOutputBufferStats BasicStats; + TDqAsyncOutputBufferStats* ProfileStats = nullptr; }; } // namespace -IDqSink::TPtr CreateDqSink(ui64 outputIndex, NKikimr::NMiniKQL::TType* outputType, ui64 maxStoredBytes, +IDqAsyncOutputBuffer::TPtr CreateDqAsyncOutputBuffer(ui64 outputIndex, NKikimr::NMiniKQL::TType* outputType, ui64 maxStoredBytes, bool collectProfileStats) { - return MakeIntrusive<TDqSink>(outputIndex, outputType, maxStoredBytes, collectProfileStats); + return MakeIntrusive<TDqAsyncOutputBuffer>(outputIndex, outputType, maxStoredBytes, collectProfileStats); } } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/runtime/dq_sink.h b/ydb/library/yql/dq/runtime/dq_sink.h index 447647ae3ac..931957a7208 100644 --- a/ydb/library/yql/dq/runtime/dq_sink.h +++ b/ydb/library/yql/dq/runtime/dq_sink.h @@ -7,10 +7,10 @@ namespace NYql::NDq { -struct TDqSinkStats : TDqOutputStats { +struct TDqAsyncOutputBufferStats : TDqOutputStats { const ui64 OutputIndex; - explicit TDqSinkStats(ui64 outputIndex) + explicit TDqAsyncOutputBufferStats(ui64 outputIndex) : OutputIndex(outputIndex) {} template<typename T> @@ -24,9 +24,9 @@ struct TDqSinkStats : TDqOutputStats { } }; -class IDqSink : public IDqOutput { +class IDqAsyncOutputBuffer : public IDqOutput { public: - using TPtr = TIntrusivePtr<IDqSink>; + using TPtr = TIntrusivePtr<IDqAsyncOutputBuffer>; virtual ui64 GetOutputIndex() const = 0; @@ -37,10 +37,10 @@ public: [[nodiscard]] virtual bool Pop(NDqProto::TCheckpoint& checkpoint) = 0; - virtual const TDqSinkStats* GetStats() const = 0; + virtual const TDqAsyncOutputBufferStats* GetStats() const = 0; }; -IDqSink::TPtr CreateDqSink(ui64 outputIndex, NKikimr::NMiniKQL::TType* outputType, ui64 maxStoredBytes, +IDqAsyncOutputBuffer::TPtr CreateDqAsyncOutputBuffer(ui64 outputIndex, NKikimr::NMiniKQL::TType* outputType, ui64 maxStoredBytes, bool collectProfileStats); } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index c2ba27b5dc1..67005a599b1 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -443,7 +443,7 @@ public: TVector<IDqOutput::TPtr> outputs{Reserve(std::max<ui64>(outputDesc.ChannelsSize(), 1))}; if (outputDesc.HasSink()) { - auto sink = CreateDqSink(i, ProgramParsed.OutputItemTypes[i], memoryLimits.ChannelBufferSize, + auto sink = CreateDqAsyncOutputBuffer(i, ProgramParsed.OutputItemTypes[i], memoryLimits.ChannelBufferSize, Settings.CollectProfileStats); auto [_, inserted] = Sinks.emplace(i, sink); Y_VERIFY(inserted); @@ -578,7 +578,7 @@ public: return *ptr; } - IDqSink::TPtr GetSink(ui64 outputIndex) override { + IDqAsyncOutputBuffer::TPtr GetSink(ui64 outputIndex) override { auto ptr = Sinks.FindPtr(outputIndex); YQL_ENSURE(ptr, "task: " << TaskId << " does not have output index: " << outputIndex); return *ptr; @@ -713,7 +713,7 @@ private: THashMap<ui64, IDqInputChannel::TPtr> InputChannels; // Channel id -> Channel THashMap<ui64, IDqSource::TPtr> Sources; // Input index -> Source THashMap<ui64, IDqOutputChannel::TPtr> OutputChannels; // Channel id -> Channel - THashMap<ui64, IDqSink::TPtr> Sinks; // Output index -> Sink + THashMap<ui64, IDqAsyncOutputBuffer::TPtr> Sinks; // Output index -> Sink IDqOutputConsumer::TPtr Output; NUdf::TUnboxedValue ResultStream; diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index bf96d30ac5a..7db08493785 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -195,7 +195,7 @@ public: , IsDefined(true) { } - TDqTaskRunnerStatsView(const TDqTaskRunnerStats* stats, THashMap<ui32, const TDqSinkStats*>&& sinkStats) + TDqTaskRunnerStatsView(const TDqTaskRunnerStats* stats, THashMap<ui32, const TDqAsyncOutputBufferStats*>&& sinkStats) : StatsInplace() , StatsPtr(stats) , IsInplace(false) @@ -214,7 +214,7 @@ public: return IsDefined; } - const TDqSinkStats* GetSinkStats(ui32 sinkId) const { + const TDqAsyncOutputBufferStats* GetSinkStats(ui32 sinkId) const { return SinkStats.at(sinkId); } @@ -223,7 +223,7 @@ private: const TDqTaskRunnerStats* StatsPtr; bool IsInplace; bool IsDefined; - THashMap<ui32, const TDqSinkStats*> SinkStats; + THashMap<ui32, const TDqAsyncOutputBufferStats*> SinkStats; }; struct TDqTaskRunnerContext { @@ -300,7 +300,7 @@ public: virtual IDqInputChannel::TPtr GetInputChannel(ui64 channelId) = 0; virtual IDqSource::TPtr GetSource(ui64 inputIndex) = 0; virtual IDqOutputChannel::TPtr GetOutputChannel(ui64 channelId) = 0; - virtual IDqSink::TPtr GetSink(ui64 outputIndex) = 0; + virtual IDqAsyncOutputBuffer::TPtr GetSink(ui64 outputIndex) = 0; // if memoryLimit = Nothing() then don't set memory limit, use existing one (if any) // if memoryLimit = 0 then set unlimited diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp index 3a12b53fc48..5eb9ede8c77 100644 --- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp +++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp @@ -36,10 +36,10 @@ TFakeActor::~TFakeActor() { Alloc.Acquire(); } -void TFakeActor::InitSink(IDqSinkActor* dqSink, IActor* dqSinkAsActor) { +void TFakeActor::InitSink(IDqComputeActorAsyncOutput* dqSink, IActor* dqSinkAsActor) { DqSinkActorId = RegisterWithSameMailbox(dqSinkAsActor), - DqSinkActor = dqSink; - DqSinkActorAsActor = dqSinkAsActor; + DqSink = dqSink; + DqSinkAsActor = dqSinkAsActor; } void TFakeActor::InitSource(IDqSourceActor* dqSource, IActor* dqSourceAsActor) { @@ -58,11 +58,11 @@ void TFakeActor::Terminate() { } if (DqSinkActorId) { - DqSinkActor->PassAway(); + DqSink->PassAway(); DqSinkActorId = std::nullopt; - DqSinkActor = nullptr; - DqSinkActorAsActor = nullptr; + DqSink = nullptr; + DqSinkAsActor = nullptr; } } @@ -98,8 +98,8 @@ TFakeCASetup::~TFakeCASetup() { void TFakeCASetup::SinkWrite(const TWriteValueProducer valueProducer, TMaybe<NDqProto::TCheckpoint> checkpoint) { Execute([&valueProducer, checkpoint](TFakeActor& actor) { auto batch = valueProducer(actor.GetHolderFactory()); - Y_ASSERT(actor.DqSinkActor); - actor.DqSinkActor->SendData(std::move(batch), 0, checkpoint, false); + Y_ASSERT(actor.DqSink); + actor.DqSink->SendData(std::move(batch), 0, checkpoint, false); }); } @@ -119,8 +119,8 @@ void TFakeCASetup::LoadSource(const NDqProto::TSourceState& state) { void TFakeCASetup::LoadSink(const NDqProto::TSinkState& state) { Execute([&state](TFakeActor& actor) { - Y_ASSERT(actor.DqSinkActor); - actor.DqSinkActor->LoadState(state); + Y_ASSERT(actor.DqSink); + actor.DqSink->LoadState(state); }); } diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h index 15f82baffbb..7a3de5b51e5 100644 --- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h +++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h @@ -85,7 +85,7 @@ class TFakeActor : public NActors::TActor<TFakeActor> { TFakeActor& Parent; }; - struct TSinkCallbacks : public IDqSinkActor::ICallbacks { + struct TSinkCallbacks : public IDqComputeActorAsyncOutput::ICallbacks { explicit TSinkCallbacks(TFakeActor& parent) : Parent(parent) {} void ResumeExecution() override { @@ -112,7 +112,7 @@ public: TFakeActor(TSourcePromises& sourcePromises, TSinkPromises& sinkPromises); ~TFakeActor(); - void InitSink(IDqSinkActor* dqSink, IActor* dqSinkAsActor); + void InitSink(IDqComputeActorAsyncOutput* dqSink, IActor* dqSinkAsActor); void InitSource(IDqSourceActor* dqSource, IActor* dqSourceAsActor); void Terminate(); @@ -121,7 +121,7 @@ public: public: IDqSourceActor* DqSourceActor = nullptr; - IDqSinkActor* DqSinkActor = nullptr; + IDqComputeActorAsyncOutput* DqSink = nullptr; private: STRICT_STFUNC(StateFunc, @@ -156,7 +156,7 @@ private: IActor* DqSourceActorAsActor = nullptr; std::optional<NActors::TActorId> DqSinkActorId; - IActor* DqSinkActorAsActor = nullptr; + IActor* DqSinkAsActor = nullptr; TSourceEvents SourceEvents; TSinkCallbacks SinkCallbacks; diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp index a2ae923d16f..55faca97130 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp @@ -66,7 +66,7 @@ IActor* CreateComputeActor( operationId, std::move(task), std::move(options.SourceActorFactory), - std::move(options.SinkActorFactory), + std::move(options.SinkFactory), computeRuntimeSettings, memoryLimits, taskRunnerFactory); @@ -76,7 +76,7 @@ IActor* CreateComputeActor( operationId, std::move(task), std::move(options.SourceActorFactory), - std::move(options.SinkActorFactory), + std::move(options.SinkFactory), computeRuntimeSettings, memoryLimits, taskRunnerActorFactory); diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index 8cb708b2f3f..cac1420315c 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -59,14 +59,14 @@ struct TSourceInfo { }; struct TSinkInfo { - IDqSinkActor* SinkActor = nullptr; + IDqComputeActorAsyncOutput* Sink = nullptr; NActors::IActor* Actor = nullptr; bool Finished = false; NKikimr::NMiniKQL::TTypeEnvironment* TypeEnv = nullptr; }; class TDqWorker: public TRichActor<TDqWorker> - , IDqSinkActor::ICallbacks + , IDqComputeActorAsyncOutput::ICallbacks , ITaskRunnerActor::ICallbacks { static constexpr ui32 INPUT_SIZE = 100000; @@ -77,12 +77,12 @@ public: explicit TDqWorker( const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, const IDqSourceActorFactory::TPtr& sourceActorFactory, - const IDqSinkActorFactory::TPtr& sinkActorFactory, + const IDqSinkFactory::TPtr& sinkFactory, TWorkerRuntimeData* runtimeData, const TString& traceId) : TRichActor<TDqWorker>(&TDqWorker::Handler) , SourceActorFactory(sourceActorFactory) - , SinkActorFactory(sinkActorFactory) + , SinkFactory(sinkFactory) , TaskRunnerActorFactory(taskRunnerActorFactory) , RuntimeData(runtimeData) , TraceId(traceId) @@ -119,7 +119,7 @@ public: v.SourceActor->PassAway(); } for (const auto& [_, v] : SinksMap) { - v.SinkActor->PassAway(); + v.Sink->PassAway(); } Dump(); } @@ -140,7 +140,7 @@ private: HFunc(TEvSourcePushFinished, OnSourcePushFinished); // weird to have two events for error handling, but we need to use TEvDqFailure - // between worker_actor <-> executer_actor, cause it transmits statistics in 'Metric' field + // between worker_actor <-> executer_actor, cause it transmits statistics in 'Metric' field HFunc(NDq::TEvDq::TEvAbortExecution, OnErrorFromPipe); // received from task_runner_actor HFunc(TEvDqFailure, OnError); // received from this actor itself HFunc(TEvContinueRun, OnContinueRun); @@ -306,8 +306,8 @@ private: if (output.HasSink()) { auto& sink = SinksMap[outputId]; sink.TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv); - std::tie(sink.SinkActor, sink.Actor) = SinkActorFactory->CreateDqSinkActor( - IDqSinkActorFactory::TArguments { + std::tie(sink.Sink, sink.Actor) = SinkFactory->CreateDqSink( + IDqSinkFactory::TArguments { .OutputDesc = output, .OutputIndex = static_cast<ui64>(outputId), .TxId = TraceId, @@ -591,9 +591,9 @@ private: } case ERunStatus::PendingOutput: { for (auto& [index, sink] : SinksMap) { - const i64 sinkActorFreeSpaceBeforeSend = sink.SinkActor->GetFreeSpace(); - if (sinkActorFreeSpaceBeforeSend > 0 && !sink.Finished) { - Send(TaskRunnerActor, new TEvSinkPop(index, sinkActorFreeSpaceBeforeSend)); + const i64 sinkFreeSpaceBeforeSend = sink.Sink->GetFreeSpace(); + if (sinkFreeSpaceBeforeSend > 0 && !sink.Finished) { + Send(TaskRunnerActor, new TEvSinkPop(index, sinkFreeSpaceBeforeSend)); } } break; @@ -691,7 +691,7 @@ private: Run(ctx); } /*_________________________________________________________*/ - /*______________________ SinkActorEvents __________________*/ + /*______________________ Sink Events ----__________________*/ void ResumeExecution() override { Send(SelfId(), new TEvContinueRun()); } @@ -720,13 +720,13 @@ private: Y_UNUSED(checkpointSize); Y_UNUSED(checkpoint); Y_UNUSED(changed); auto& sink = SinksMap[index]; sink.Finished = finished; - sink.SinkActor->SendData(std::move(batch), size, {}, finished); + sink.Sink->SendData(std::move(batch), size, {}, finished); } /*_________________________________________________________*/ IDqSourceActorFactory::TPtr SourceActorFactory; - IDqSinkActorFactory::TPtr SinkActorFactory; + IDqSinkFactory::TPtr SinkFactory; ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory; NTaskRunnerActor::ITaskRunnerActor* Actor = nullptr; TActorId TaskRunnerActor; @@ -765,14 +765,14 @@ NActors::IActor* CreateWorkerActor( const TString& traceId, const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, const IDqSourceActorFactory::TPtr& sourceActorFactory, - const IDqSinkActorFactory::TPtr& sinkActorFactory) + const IDqSinkFactory::TPtr& sinkFactory) { Y_VERIFY(taskRunnerActorFactory); return new TLogWrapReceive( new TDqWorker( taskRunnerActorFactory, sourceActorFactory, - sinkActorFactory, + sinkFactory, runtimeData, traceId), traceId); } diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.h b/ydb/library/yql/providers/dq/actors/worker_actor.h index 4d39687f5b2..a9eac60c486 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.h +++ b/ydb/library/yql/providers/dq/actors/worker_actor.h @@ -26,6 +26,6 @@ namespace NYql::NDqs { const TString& traceId, const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, const NDq::IDqSourceActorFactory::TPtr& sourceActorFactory, - const NDq::IDqSinkActorFactory::TPtr& sinkActorFactory); + const NDq::IDqSinkFactory::TPtr& sinkFactory); } // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto index 3f570f1aa41..213b7fb38b9 100644 --- a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto +++ b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto @@ -126,7 +126,7 @@ message TGetStatsSourceResponse { } message TSinkStatsResponse { - NDqProto.TDqSinkStats Stats = 1; + NDqProto.TDqAsyncOutputBufferStats Stats = 1; } message TPrepareRequest { diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp index 1a8d3172e41..93243a29525 100644 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp @@ -35,7 +35,7 @@ namespace { return factory; } - NDq::IDqSinkActorFactory::TPtr CreateSinkActorFactory(const NYdb::TDriver& driver) { + NDq::IDqSinkFactory::TPtr CreateSinkFactory(const NYdb::TDriver& driver) { auto factory = MakeIntrusive<NYql::NDq::TDqSinkFactory>(); RegisterDqPqWriteActorFactory(*factory, driver, nullptr); return factory; @@ -72,7 +72,7 @@ public: NDqs::TLocalWorkerManagerOptions lwmOptions; lwmOptions.Factory = NTaskRunnerProxy::CreateFactory(functionRegistry, compFactory, taskTransformFactory, true); lwmOptions.SourceActorFactory = CreateSourceActorFactory(driver, std::move(httpGateway)); - lwmOptions.SinkActorFactory = CreateSinkActorFactory(driver); + lwmOptions.SinkFactory = CreateSinkFactory(driver); lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory(); lwmOptions.TaskRunnerActorFactory = NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory( [=](const NDqProto::TDqTask& task, const NDq::TLogFunc& ) diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index a2e7eda819b..2695689b47c 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -77,7 +77,7 @@ void ToProto(T* s1, const NDq::TDqOutputChannelStats* ss) } template<typename T> -void ToProto(T* s1, const NDq::TDqSinkStats* ss) +void ToProto(T* s1, const NDq::TDqAsyncOutputBufferStats* ss) { s1->SetChunks(ss->Chunks); s1->SetBytes(ss->Bytes); diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp index ab8769f44ef..338ed422851 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp @@ -159,7 +159,7 @@ public: return Runner->GetSource(index); } - IDqSink::TPtr GetSink(ui64 index) override { + IDqAsyncOutputBuffer::TPtr GetSink(ui64 index) override { return Runner->GetSink(index); } diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index 7af7fccc5d2..951011ce454 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -1127,7 +1127,7 @@ public: return OutputType; } - const TDqSinkStats* GetStats() const override { + const TDqAsyncOutputBufferStats* GetStats() const override { try { NDqProto::TCommandHeader header; header.SetVersion(5); @@ -1181,7 +1181,7 @@ private: IInputStream& Input; IOutputStream& Output; - mutable TDqSinkStats Stats; + mutable TDqAsyncOutputBufferStats Stats; mutable NKikimr::NMiniKQL::TType* OutputType = nullptr; }; @@ -1489,7 +1489,7 @@ public: return channel; } - IDqSink::TPtr GetSink(ui64 outputIndex) override { + IDqAsyncOutputBuffer::TPtr GetSink(ui64 outputIndex) override { auto& sink = Sinks[outputIndex]; if (!sink) { sink = new TDqSink( @@ -1565,7 +1565,7 @@ private: THashMap<ui64, IDqInputChannel::TPtr> InputChannels; THashMap<ui64, IDqSource::TPtr> Sources; THashMap<ui64, IDqOutputChannel::TPtr> OutputChannels; - THashMap<ui64, IDqSink::TPtr> Sinks; + THashMap<ui64, IDqAsyncOutputBuffer::TPtr> Sinks; }; /*______________________________________________________________________________________________*/ diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h index 31a6e82784d..184a9a2d2d8 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h @@ -17,7 +17,7 @@ public: virtual void PushString(TVector<TString>&& batch, i64 space) = 0; }; -class IStringSink: public NDq::IDqSink { +class IStringSink: public NDq::IDqAsyncOutputBuffer { public: virtual ~IStringSink() = default; virtual ui64 PopString(TVector<TString>& batch, ui64 bytes) = 0; @@ -62,7 +62,7 @@ public: virtual IInputChannel::TPtr GetInputChannel(ui64 channelId) = 0; virtual IOutputChannel::TPtr GetOutputChannel(ui64 channelId) = 0; virtual NDq::IDqSource::TPtr GetSource(ui64 index) = 0; - virtual NDq::IDqSink::TPtr GetSink(ui64 index) = 0; + virtual NDq::IDqAsyncOutputBuffer::TPtr GetSink(ui64 index) = 0; virtual const THashMap<TString,TString>& GetTaskParams() const = 0; virtual const THashMap<TString,TString>& GetSecureParams() const = 0; diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp index c0641dc72b7..ab0f32af1d7 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp @@ -262,7 +262,7 @@ private: traceId, Options.TaskRunnerActorFactory, Options.SourceActorFactory, - Options.SinkActorFactory)); + Options.SinkFactory)); } allocationInfo.WorkerActors.emplace_back(RegisterChild( actor.Release(), createComputeActor ? NYql::NDq::TEvDq::TEvAbortExecution::Unavailable("Aborted by LWM").Release() : nullptr diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h index 144217305fb..26f8a6a43ff 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h @@ -23,7 +23,7 @@ namespace NYql::NDqs { TWorkerManagerCounters Counters; NTaskRunnerProxy::IProxyFactory::TPtr Factory; NDq::IDqSourceActorFactory::TPtr SourceActorFactory; - NDq::IDqSinkActorFactory::TPtr SinkActorFactory; + NDq::IDqSinkFactory::TPtr SinkFactory; TWorkerRuntimeData* RuntimeData = nullptr; TTaskRunnerInvokerFactory::TPtr TaskRunnerInvokerFactory; NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory; diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp index 85fbc91c01d..bfd7ad6dea9 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp @@ -88,7 +88,7 @@ struct TEvPrivate { } // namespace -class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqSinkActor { +class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqComputeActorAsyncOutput { public: TDqPqWriteActor( ui64 outputIndex, @@ -96,7 +96,7 @@ public: NPq::NProto::TDqPqTopicSink&& sinkParams, NYdb::TDriver driver, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory, - IDqSinkActor::ICallbacks* callbacks, + IDqComputeActorAsyncOutput::ICallbacks* callbacks, i64 freeSpace) : TActor<TDqPqWriteActor>(&TDqPqWriteActor::StateFunc) , OutputIndex(outputIndex) @@ -208,7 +208,7 @@ private: SubscribeOnNextEvent(); } - // IActor & IDqSinkActor + // IActor & IDqComputeActorAsyncOutput void PassAway() override { // Is called from Compute Actor if (WriteSession) { WriteSession->Close(TDuration::Zero()); @@ -371,7 +371,7 @@ private: const NPq::NProto::TDqPqTopicSink SinkParams; NYdb::TDriver Driver; std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory; - IDqSinkActor::ICallbacks* const Callbacks; + IDqComputeActorAsyncOutput::ICallbacks* const Callbacks; i64 FreeSpace = 0; NYdb::NPersQueue::TPersQueueClient PersQueueClient; @@ -388,14 +388,14 @@ private: std::queue<std::tuple<ui64, NDqProto::TCheckpoint>> DeferredCheckpoints; }; -std::pair<IDqSinkActor*, NActors::IActor*> CreateDqPqWriteActor( +std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor( NPq::NProto::TDqPqTopicSink&& settings, ui64 outputIndex, TTxId txId, const THashMap<TString, TString>& secureParams, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - IDqSinkActor::ICallbacks* callbacks, + IDqComputeActorAsyncOutput::ICallbacks* callbacks, i64 freeSpace) { const TString& tokenName = settings.GetToken().GetName(); @@ -417,7 +417,7 @@ void RegisterDqPqWriteActorFactory(TDqSinkFactory& factory, NYdb::TDriver driver factory.Register<NPq::NProto::TDqPqTopicSink>("PqSink", [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory)]( NPq::NProto::TDqPqTopicSink&& settings, - IDqSinkActorFactory::TArguments&& args) + IDqSinkFactory::TArguments&& args) { NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(DQ_PQ_PROVIDER)); return CreateDqPqWriteActor( diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h index 6736c8188b2..2cc0b0ee183 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h @@ -20,14 +20,14 @@ namespace NYql::NDq { constexpr i64 DqPqDefaultFreeSpace = 16_MB; -std::pair<IDqSinkActor*, NActors::IActor*> CreateDqPqWriteActor( +std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor( NPq::NProto::TDqPqTopicSink&& settings, ui64 outputIndex, TTxId txId, const THashMap<TString, TString>& secureParams, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - IDqSinkActor::ICallbacks* callbacks, + IDqComputeActorAsyncOutput::ICallbacks* callbacks, i64 freeSpace = DqPqDefaultFreeSpace); void RegisterDqPqWriteActorFactory(TDqSinkFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp index 74a5176e1dc..d5bc0ab0ddc 100644 --- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp +++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp @@ -75,14 +75,14 @@ struct TMetricsInflight { } // namespace -class TDqSolomonWriteActor : public NActors::TActor<TDqSolomonWriteActor>, public IDqSinkActor { +class TDqSolomonWriteActor : public NActors::TActor<TDqSolomonWriteActor>, public IDqComputeActorAsyncOutput { public: static constexpr char ActorName[] = "DQ_SOLOMON_WRITE_ACTOR"; TDqSolomonWriteActor( ui64 outputIndex, TDqSolomonWriteParams&& writeParams, - NYql::NDq::IDqSinkActor::ICallbacks* callbacks, + NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks* callbacks, const NMonitoring::TDynamicCounterPtr& counters, std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider, i64 freeSpace) @@ -229,7 +229,7 @@ private: while (TryToSendNextBatch()) {} } - // IActor & IDqSinkActor + // IActor & IDqComputeActorAsyncOutput void PassAway() override { // Is called from Compute Actor for (const auto& [_, metricsInflight] : InflightBuffer) { Send(metricsInflight.HttpSenderId, new TEvents::TEvPoison()); @@ -424,7 +424,7 @@ private: const ui64 OutputIndex; const TDqSolomonWriteParams WriteParams; const TString Url; - NYql::NDq::IDqSinkActor::ICallbacks* const Callbacks; + NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks* const Callbacks; TDqSolomonWriteActorMetrics Metrics; i64 FreeSpace = 0; TActorId HttpProxyId; @@ -440,11 +440,11 @@ private: ui64 Cookie = 0; }; -std::pair<NYql::NDq::IDqSinkActor*, NActors::IActor*> CreateDqSolomonWriteActor( +std::pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSolomonWriteActor( NYql::NSo::NProto::TDqSolomonShard&& settings, ui64 outputIndex, const THashMap<TString, TString>& secureParams, - NYql::NDq::IDqSinkActor::ICallbacks* callbacks, + NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks* callbacks, const NMonitoring::TDynamicCounterPtr& counters, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, i64 freeSpace) @@ -473,7 +473,7 @@ void RegisterDQSolomonWriteActorFactory(TDqSinkFactory& factory, ISecuredService factory.Register<NSo::NProto::TDqSolomonShard>("SolomonSink", [credentialsFactory]( NYql::NSo::NProto::TDqSolomonShard&& settings, - IDqSinkActorFactory::TArguments&& args) + IDqSinkFactory::TArguments&& args) { auto counters = MakeIntrusive<NMonitoring::TDynamicCounters>(); diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h index 56c34cbb25f..12c7dfc2cd9 100644 --- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h +++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h @@ -18,11 +18,11 @@ namespace NYql::NDq { constexpr i64 DqSolomonDefaultFreeSpace = 16_MB; -std::pair<NYql::NDq::IDqSinkActor*, NActors::IActor*> CreateDqSolomonWriteActor( +std::pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSolomonWriteActor( NYql::NSo::NProto::TDqSolomonShard&& settings, ui64 outputIndex, const THashMap<TString, TString>& secureParams, - NYql::NDq::IDqSinkActor::ICallbacks* callbacks, + NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks* callbacks, const NMonitoring::TDynamicCounterPtr& counters, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, i64 freeSpace = DqSolomonDefaultFreeSpace); |