aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-04-19 13:40:48 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-04-19 13:40:48 +0300
commite46919542a6fca7274b648f6ee88f3fe76e38499 (patch)
treec78f304bfb25f460af5df013264cbf682f24c95b
parent804c961885d2205be1ed59989984cd25d628d606 (diff)
downloadydb-e46919542a6fca7274b648f6ee88f3fe76e38499.tar.gz
YQ-1052 Rename common sinks & transform entities
Rename IDqSink Rename sink actor in CA Rename sink actor Rename IDqSinkActorFactory Rename IDqSinkActor ref:fd606208853da58481dc36b78e4fddf47a76a9d0
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.h4
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp8
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp8
-rw-r--r--ydb/core/yq/libs/init/init.cpp8
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp34
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp8
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h60
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_io_actors_factory.cpp8
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_io_actors_factory.h16
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_sinks.h32
-rw-r--r--ydb/library/yql/dq/actors/protos/dq_stats.proto4
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp6
-rw-r--r--ydb/library/yql/dq/runtime/dq_sink.cpp14
-rw-r--r--ydb/library/yql/dq/runtime/dq_sink.h12
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp6
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h8
-rw-r--r--ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp20
-rw-r--r--ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h8
-rw-r--r--ydb/library/yql/providers/dq/actors/compute_actor.cpp4
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp32
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.h2
-rw-r--r--ydb/library/yql/providers/dq/api/protos/task_command_executor.proto2
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp4
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp2
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp2
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp8
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h4
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp2
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h2
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp14
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h4
-rw-r--r--ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp14
-rw-r--r--ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h4
36 files changed, 185 insertions, 185 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
index 02b7b98029e..afb11d79b19 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
@@ -16,11 +16,11 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput
namespace NKqp {
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask&& task,
- NYql::NDq::IDqSourceActorFactory::TPtr sourceActorFactory, NYql::NDq::IDqSinkActorFactory::TPtr sinkActorFactory,
+ NYql::NDq::IDqSourceActorFactory::TPtr sourceActorFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits);
IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
- NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqSourceActorFactory::TPtr sourceActorFactory, NYql::NDq::IDqSinkActorFactory::TPtr sinkActorFactory,
+ NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqSourceActorFactory::TPtr sourceActorFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
TIntrusivePtr<TKqpCounters> counters);
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index 77ebf1dcd58..117ccb88072 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -33,9 +33,9 @@ public:
}
TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
- : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkActorFactory), settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true)
+ : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true)
, ComputeCtx(settings.StatsMode)
{
if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) {
@@ -317,11 +317,11 @@ private:
} // anonymous namespace
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
{
return new TKqpComputeActor(executerId, txId, std::move(task), std::move(sourceActorFactory),
- std::move(sinkActorFactory), settings, memoryLimits);
+ std::move(sinkFactory), settings, memoryLimits);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 0966e3405e0..7395b148c37 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -72,9 +72,9 @@ public:
}
TKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
- NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory,
+ NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters)
- : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkActorFactory), settings, memoryLimits)
+ : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), settings, memoryLimits)
, ComputeCtx(settings.StatsMode)
, Snapshot(snapshot)
, Counters(counters)
@@ -1130,10 +1130,10 @@ private:
} // anonymous namespace
IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
- NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory,
+ NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters)
{
- return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkActorFactory),
+ return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory),
settings, memoryLimits, counters);
}
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 8f2e4005130..4e65e14ecc0 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -119,7 +119,7 @@ void Init(
});
auto sourceActorFactory = MakeIntrusive<NYql::NDq::TDqSourceFactory>();
- auto sinkActorFactory = MakeIntrusive<NYql::NDq::TDqSinkFactory>();
+ auto sinkFactory = MakeIntrusive<NYql::NDq::TDqSinkFactory>();
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory;
const auto httpGateway = NYql::IHTTPGateway::Make(
@@ -141,8 +141,8 @@ void Init(
httpGateway, std::make_shared<NYql::NS3::TRetryConfig>(protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetRetryConfig()));
RegisterClickHouseReadActorFactory(*sourceActorFactory, credentialsFactory, httpGateway);
- RegisterDqPqWriteActorFactory(*sinkActorFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
- RegisterDQSolomonWriteActorFactory(*sinkActorFactory, credentialsFactory);
+ RegisterDqPqWriteActorFactory(*sinkFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
+ RegisterDQSolomonWriteActorFactory(*sinkFactory, credentialsFactory);
}
ui64 mkqlInitialMemoryLimit = 8_GB;
@@ -161,7 +161,7 @@ void Init(
lwmOptions.Counters = workerManagerCounters;
lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, false);
lwmOptions.SourceActorFactory = sourceActorFactory;
- lwmOptions.SinkActorFactory = sinkActorFactory;
+ lwmOptions.SinkFactory = sinkFactory;
lwmOptions.TaskRunnerInvokerFactory = new NYql::NDqs::TTaskRunnerInvokerFactory();
lwmOptions.MkqlInitialMemoryLimit = mkqlInitialMemoryLimit;
lwmOptions.MkqlTotalMemoryLimit = mkqlTotalMemoryLimit;
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 8e9a2cfa7f5..bd44982504d 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -29,10 +29,10 @@ public:
static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR";
TDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory)
- : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkActorFactory), settings, memoryLimits, /* ownMemoryQuota = */ false)
+ : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), settings, memoryLimits, /* ownMemoryQuota = */ false)
, TaskRunnerActorFactory(taskRunnerActorFactory)
, ReadyToCheckpointFlag(false)
, SentStatsRequest(false)
@@ -121,7 +121,7 @@ private:
WaitingForStateResponse.clear();
}
- const TDqSinkStats* GetSinkStats(ui64 outputIdx, const TSinkInfo& sinkInfo) const override {
+ const TDqAsyncOutputBufferStats* GetSinkStats(ui64 outputIdx, const TSinkInfo& sinkInfo) const override {
Y_UNUSED(sinkInfo);
return TaskRunnerStats.GetSinkStats(outputIdx);
}
@@ -176,31 +176,31 @@ private:
return;
}
- Y_VERIFY(sinkInfo.SinkActor);
+ Y_VERIFY(sinkInfo.Sink);
Y_VERIFY(sinkInfo.Actor);
const ui32 allowedOvercommit = AllowedChannelsOvercommit();
- const i64 sinkActorFreeSpaceBeforeSend = sinkInfo.SinkActor->GetFreeSpace();
+ const i64 sinkFreeSpaceBeforeSend = sinkInfo.Sink->GetFreeSpace();
- i64 toSend = sinkActorFreeSpaceBeforeSend + allowedOvercommit;
+ i64 toSend = sinkFreeSpaceBeforeSend + allowedOvercommit;
CA_LOG_D("About to drain sink " << outputIndex
- << ". FreeSpace: " << sinkActorFreeSpaceBeforeSend
+ << ". FreeSpace: " << sinkFreeSpaceBeforeSend
<< ", allowedOvercommit: " << allowedOvercommit
<< ", toSend: " << toSend
- //<< ", finished: " << sinkInfo.Sink->IsFinished());
+ //<< ", finished: " << sinkInfo.SinkBuffer->IsFinished());
);
sinkInfo.PopStarted = true;
ProcessOutputsState.Inflight ++;
- sinkInfo.SinkActorFreeSpaceBeforeSend = sinkActorFreeSpaceBeforeSend;
- this->Send(TaskRunnerActorId, new NTaskRunnerActor::TEvSinkPop(outputIndex, sinkActorFreeSpaceBeforeSend));
+ sinkInfo.SinkFreeSpaceBeforeSend = sinkFreeSpaceBeforeSend;
+ this->Send(TaskRunnerActorId, new NTaskRunnerActor::TEvSinkPop(outputIndex, sinkFreeSpaceBeforeSend));
}
bool DoHandleChannelsAfterFinishImpl() override {
Y_VERIFY(Checkpoints);
auto req = GetCheckpointRequest();
if (!req.Defined()) {
- return true; // handled channels syncronously
+ return true; // handled channels syncronously
}
CA_LOG_D("DoHandleChannelsAfterFinishImpl");
this->Send(TaskRunnerActorId, new NTaskRunnerActor::TEvContinueRun(std::move(req), /* checkpointOnly = */ true));
@@ -360,7 +360,7 @@ private:
}
if (ReadyToCheckpointFlag) {
Y_VERIFY(!ProgramState);
- ProgramState = std::move(ev->Get()->ProgramState);
+ ProgramState = std::move(ev->Get()->ProgramState);
Checkpoints->DoCheckpoint();
}
ProcessOutputsImpl(status);
@@ -493,11 +493,11 @@ private:
ProcessOutputsState.HasDataToSend |= !sinkInfo.Finished;
auto guard = BindAllocator();
- sinkInfo.SinkActor->SendData(std::move(batch), size, std::move(checkpoint), finished);
+ sinkInfo.Sink->SendData(std::move(batch), size, std::move(checkpoint), finished);
CA_LOG_D("sink " << outputIndex << ": sent " << dataSize << " bytes of data and " << checkpointSize << " bytes of checkpoint barrier");
CA_LOG_D("Drain sink " << outputIndex
- << ". Free space decreased: " << (sinkInfo.SinkActorFreeSpaceBeforeSend - sinkInfo.SinkActor->GetFreeSpace())
+ << ". Free space decreased: " << (sinkInfo.SinkFreeSpaceBeforeSend - sinkInfo.Sink->GetFreeSpace())
<< ", sent data from buffer: " << dataSize);
ProcessOutputsState.DataWasSent |= dataWasSent;
@@ -520,7 +520,7 @@ private:
return TaskRunnerStats.Get();
}
- template<typename TSecond>
+ template<typename TSecond>
TVector<ui32> GetIds(const THashMap<ui64, TSecond>& collection) {
TVector<ui32> ids;
std::transform(collection.begin(), collection.end(), std::back_inserter(ids), [](const auto& p) {
@@ -572,12 +572,12 @@ private:
IActor* CreateDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory)
{
return new TDqAsyncComputeActor(executerId, txId, std::move(task), std::move(sourceActorFactory),
- std::move(sinkActorFactory), settings, memoryLimits, taskRunnerActorFactory);
+ std::move(sinkFactory), settings, memoryLimits, taskRunnerActorFactory);
}
} // namespace NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
index 71c918f04ca..6812598834d 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
@@ -19,7 +19,7 @@ namespace NYql {
namespace NDq {
NActors::IActor* CreateDqAsyncComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory);
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
index c2f6c45e2b5..e8303b8a1bc 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
@@ -33,10 +33,10 @@ public:
static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR";
TDqComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const TTaskRunnerFactory& taskRunnerFactory)
- : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkActorFactory), settings, memoryLimits)
+ : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), settings, memoryLimits)
, TaskRunnerFactory(taskRunnerFactory)
{}
@@ -67,11 +67,11 @@ private:
IActor* CreateDqComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TTaskRunnerFactory& taskRunnerFactory)
{
return new TDqComputeActor(executerId, txId, std::move(task), std::move(sourceActorFactory),
- std::move(sinkActorFactory), settings, memoryLimits, taskRunnerFactory);
+ std::move(sinkFactory), settings, memoryLimits, taskRunnerFactory);
}
} // namespace NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
index d03d1e68f52..4dfc41f03a3 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
@@ -257,7 +257,7 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase&
NDqProto::TDqTaskStats* protoTask, bool withProfileStats);
NActors::IActor* CreateDqComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const TTaskRunnerFactory& taskRunnerFactory);
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
index e5c64eebc5e..ba7cd1dddc5 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
@@ -99,7 +99,7 @@ public:
NDqProto::TCheckpoint GetPendingCheckpoint() const;
void RegisterCheckpoint(const NDqProto::TCheckpoint& checkpoint, ui64 channelId);
- // Sink actor support.
+ // Sink support.
void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint);
void TryToSavePendingCheckpoint();
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 819f13b3cf8..51559b03dad 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -58,7 +58,7 @@ template<typename TDerived>
class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
, public TDqComputeActorChannels::ICallbacks
, public TDqComputeActorCheckpoints::ICallbacks
- , public IDqSinkActor::ICallbacks
+ , public IDqComputeActorAsyncOutput::ICallbacks
{
protected:
enum EEvWakeupTag : ui64 {
@@ -114,7 +114,7 @@ public:
protected:
TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, bool ownMemoryQuota = true, bool passExceptions = false)
: ExecuterId(executerId)
, TxId(txId)
@@ -123,7 +123,7 @@ protected:
, MemoryLimits(memoryLimits)
, CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn)
, SourceActorFactory(std::move(sourceActorFactory))
- , SinkActorFactory(std::move(sinkActorFactory))
+ , SinkFactory(std::move(sinkFactory))
, CheckpointingMode(GetTaskCheckpointingMode(Task))
, State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING)
, MemoryQuota(ownMemoryQuota ? InitMemoryQuota() : nullptr)
@@ -137,7 +137,7 @@ protected:
}
TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, const NDqProto::TDqTask& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
: ExecuterId(executerId)
, TxId(txId)
@@ -146,7 +146,7 @@ protected:
, MemoryLimits(memoryLimits)
, CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn)
, SourceActorFactory(std::move(sourceActorFactory))
- , SinkActorFactory(std::move(sinkActorFactory))
+ , SinkFactory(std::move(sinkFactory))
, State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING)
, MemoryQuota(InitMemoryQuota())
, Running(!Task.GetCreateSuspended())
@@ -281,7 +281,7 @@ protected:
// Send checkpoints to output channels.
ProcessOutputsImpl(ERunStatus::Finished);
- return true; // returns true, when channels were handled syncronously
+ return true; // returns true, when channels were handled syncronously
}
void ProcessOutputsImpl(ERunStatus status) {
@@ -412,7 +412,7 @@ protected:
for (auto& [_, sink] : SinksMap) {
if (sink.Actor) {
- sink.SinkActor->PassAway();
+ sink.Sink->PassAway();
}
}
@@ -626,7 +626,7 @@ protected:
channelInfo.Channel->Push(NDqProto::TCheckpoint(checkpoint));
}
for (const auto& [outputIndex, sink] : SinksMap) {
- sink.Sink->Push(NDqProto::TCheckpoint(checkpoint));
+ sink.SinkBuffer->Push(NDqProto::TCheckpoint(checkpoint));
}
}
@@ -666,8 +666,8 @@ protected:
for (const NDqProto::TSinkState& sinkState : state.GetSinks()) {
TSinkInfo* sink = SinksMap.FindPtr(sinkState.GetOutputIndex());
YQL_ENSURE(sink, "Failed to load state. Sink with input index " << sinkState.GetOutputIndex() << " was not found");
- YQL_ENSURE(sink->SinkActor, "Sink[" << sinkState.GetOutputIndex() << "] is not created");
- sink->SinkActor->LoadState(sinkState);
+ YQL_ENSURE(sink->Sink, "Sink[" << sinkState.GetOutputIndex() << "] is not created");
+ sink->Sink->LoadState(sinkState);
}
} catch (const std::exception& e) {
error = e.what();
@@ -760,13 +760,13 @@ protected:
};
struct TSinkInfo {
- IDqSink::TPtr Sink;
- IDqSinkActor* SinkActor = nullptr;
+ IDqAsyncOutputBuffer::TPtr SinkBuffer;
+ IDqComputeActorAsyncOutput* Sink = nullptr;
NActors::IActor* Actor = nullptr;
bool Finished = false; // If sink is in finished state, it receives only checkpoints.
TIssuesBuffer IssuesBuffer;
bool PopStarted = false;
- i64 SinkActorFreeSpaceBeforeSend = 0;
+ i64 SinkFreeSpaceBeforeSend = 0;
TSinkInfo() : IssuesBuffer(IssuesBufferSize) {}
};
@@ -1078,19 +1078,19 @@ private:
return;
}
+ Y_VERIFY(sinkInfo.SinkBuffer);
Y_VERIFY(sinkInfo.Sink);
- Y_VERIFY(sinkInfo.SinkActor);
Y_VERIFY(sinkInfo.Actor);
const ui32 allowedOvercommit = AllowedChannelsOvercommit();
- const i64 sinkActorFreeSpaceBeforeSend = sinkInfo.SinkActor->GetFreeSpace();
+ const i64 sinkFreeSpaceBeforeSend = sinkInfo.Sink->GetFreeSpace();
- i64 toSend = sinkActorFreeSpaceBeforeSend + allowedOvercommit;
+ i64 toSend = sinkFreeSpaceBeforeSend + allowedOvercommit;
CA_LOG_D("About to drain sink " << outputIndex
- << ". FreeSpace: " << sinkActorFreeSpaceBeforeSend
+ << ". FreeSpace: " << sinkFreeSpaceBeforeSend
<< ", allowedOvercommit: " << allowedOvercommit
<< ", toSend: " << toSend
- << ", finished: " << sinkInfo.Sink->IsFinished());
+ << ", finished: " << sinkInfo.SinkBuffer->IsFinished());
i64 sent = 0;
while (toSend > 0 && (!sinkInfo.Finished || Checkpoints)) {
@@ -1099,11 +1099,11 @@ private:
break;
}
sent += sentChunk;
- toSend = sinkInfo.SinkActor->GetFreeSpace() + allowedOvercommit;
+ toSend = sinkInfo.Sink->GetFreeSpace() + allowedOvercommit;
}
CA_LOG_D("Drain sink " << outputIndex
- << ". Free space decreased: " << (sinkActorFreeSpaceBeforeSend - sinkInfo.SinkActor->GetFreeSpace())
+ << ". Free space decreased: " << (sinkFreeSpaceBeforeSend - sinkInfo.Sink->GetFreeSpace())
<< ", sent data from buffer: " << sent);
ProcessOutputsState.HasDataToSend |= !sinkInfo.Finished;
@@ -1111,7 +1111,7 @@ private:
}
ui32 SendSinkDataChunk(ui64 outputIndex, TSinkInfo& sinkInfo, ui64 bytes) {
- auto sink = sinkInfo.Sink;
+ auto sink = sinkInfo.SinkBuffer;
NKikimr::NMiniKQL::TUnboxedValueVector dataBatch;
NDqProto::TCheckpoint checkpoint;
@@ -1137,7 +1137,7 @@ private:
ResumeInputs();
}
- sinkInfo.SinkActor->SendData(std::move(dataBatch), dataSize, maybeCheckpoint, sinkInfo.Finished);
+ sinkInfo.Sink->SendData(std::move(dataBatch), dataSize, maybeCheckpoint, sinkInfo.Finished);
CA_LOG_D("sink " << outputIndex << ": sent " << dataSize << " bytes of data and " << checkpointSize << " bytes of checkpoint barrier");
return dataSize + checkpointSize;
@@ -1230,13 +1230,13 @@ protected:
}
}
for (auto& [outputIndex, sink] : SinksMap) {
- if (TaskRunner) { sink.Sink = TaskRunner->GetSink(outputIndex); }
- Y_VERIFY(SinkActorFactory);
+ if (TaskRunner) { sink.SinkBuffer = TaskRunner->GetSink(outputIndex); }
+ Y_VERIFY(SinkFactory);
const auto& outputDesc = Task.GetOutputs(outputIndex);
const ui64 i = outputIndex; // Crutch for clang
- CA_LOG_D("Create sink actor for output " << i << " " << outputDesc);
- std::tie(sink.SinkActor, sink.Actor) = SinkActorFactory->CreateDqSinkActor(
- IDqSinkActorFactory::TArguments {
+ CA_LOG_D("Create sink for output " << i << " " << outputDesc);
+ std::tie(sink.Sink, sink.Actor) = SinkFactory->CreateDqSink(
+ IDqSinkFactory::TArguments {
.OutputDesc = outputDesc,
.OutputIndex = outputIndex,
.TxId = TxId,
@@ -1367,9 +1367,9 @@ private:
return TaskRunner->GetStats();
}
- virtual const TDqSinkStats* GetSinkStats(ui64 outputIdx, const TSinkInfo& sinkInfo) const {
+ virtual const TDqAsyncOutputBufferStats* GetSinkStats(ui64 outputIdx, const TSinkInfo& sinkInfo) const {
Y_UNUSED(outputIdx);
- return sinkInfo.Sink ? sinkInfo.Sink->GetStats() : nullptr;
+ return sinkInfo.SinkBuffer ? sinkInfo.SinkBuffer->GetStats() : nullptr;
}
public:
@@ -1496,7 +1496,7 @@ protected:
const TComputeMemoryLimits MemoryLimits;
const bool CanAllocateExtraMemory = false;
const IDqSourceActorFactory::TPtr SourceActorFactory;
- const IDqSinkActorFactory::TPtr SinkActorFactory;
+ const IDqSinkFactory::TPtr SinkFactory;
const NDqProto::ECheckpointingMode CheckpointingMode;
TIntrusivePtr<IDqTaskRunner> TaskRunner;
TDqComputeActorChannels* Channels = nullptr;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_io_actors_factory.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_io_actors_factory.cpp
index 5ae40ef1ab6..c8f51e185c6 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_io_actors_factory.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_io_actors_factory.cpp
@@ -24,13 +24,13 @@ void TDqSourceFactory::Register(const TString& type, TCreatorFunction creator)
Y_VERIFY(registered);
}
-std::pair<IDqSinkActor*, NActors::IActor*> TDqSinkFactory::CreateDqSinkActor(IDqSinkActorFactory::TArguments&& args) const
+std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> TDqSinkFactory::CreateDqSink(IDqSinkFactory::TArguments&& args) const
{
const TString& type = args.OutputDesc.GetSink().GetType();
- YQL_ENSURE(!type.empty(), "Attempt to create sink actor of empty type");
+ YQL_ENSURE(!type.empty(), "Attempt to create sink of empty type");
const TCreatorFunction* creatorFunc = CreatorsByType.FindPtr(type);
- YQL_ENSURE(creatorFunc, "Unknown type of sink actor: \"" << type << "\"");
- std::pair<IDqSinkActor*, NActors::IActor*> actor = (*creatorFunc)(std::move(args));
+ YQL_ENSURE(creatorFunc, "Unknown type of sink: \"" << type << "\"");
+ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> actor = (*creatorFunc)(std::move(args));
Y_VERIFY(actor.first);
Y_VERIFY(actor.second);
return actor;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_io_actors_factory.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_io_actors_factory.h
index db57796f5d4..45b4ac13cf6 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_io_actors_factory.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_io_actors_factory.h
@@ -49,23 +49,23 @@ private:
};
template <class T>
-concept TCastsToSinkActorPair =
- std::is_convertible_v<T, std::pair<IDqSinkActor*, NActors::IActor*>>;
+concept TCastsToSinkPair =
+ std::is_convertible_v<T, std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*>>;
template <class T, class TProto>
-concept TSinkActorCreatorFunc = requires(T f, TProto&& settings, IDqSinkActorFactory::TArguments&& args) {
- { f(std::move(settings), std::move(args)) } -> TCastsToSinkActorPair;
+concept TSinkCreatorFunc = requires(T f, TProto&& settings, IDqSinkFactory::TArguments&& args) {
+ { f(std::move(settings), std::move(args)) } -> TCastsToSinkPair;
};
-class TDqSinkFactory : public IDqSinkActorFactory {
+class TDqSinkFactory : public IDqSinkFactory {
public:
- using TCreatorFunction = std::function<std::pair<IDqSinkActor*, NActors::IActor*>(TArguments&& args)>;
+ using TCreatorFunction = std::function<std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*>(TArguments&& args)>;
- std::pair<IDqSinkActor*, NActors::IActor*> CreateDqSinkActor(TArguments&& args) const override;
+ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSink(TArguments&& args) const override;
void Register(const TString& type, TCreatorFunction creator);
- template <class TProtoMsg, TSinkActorCreatorFunc<TProtoMsg> TCreatorFunc>
+ template <class TProtoMsg, TSinkCreatorFunc<TProtoMsg> TCreatorFunc>
void Register(const TString& type, TCreatorFunc creator) {
Register(type,
[creator = std::move(creator), type](TArguments&& args)
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_sinks.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_sinks.h
index 5c2db2ac8d8..74b4793e5a0 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_sinks.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_sinks.h
@@ -20,26 +20,26 @@ class IActor;
namespace NYql::NDq {
-// Sink actor.
+// Sink.
// Must be IActor.
//
// Protocol:
-// 1. CA starts sink actor.
+// 1. CA starts sink.
// 2. CA runs program and gets results.
-// 3. CA calls IDqSinkActor::SendData().
+// 3. CA calls IDqComputeActorAsyncOutput::SendData().
// 4. If SendData() returns value less than 0, loop stops running until free space appears.
-// 5. When free space appears, sink actor calls ICallbacks::ResumeExecution() to start processing again.
+// 5. When free space appears, sink calls ICallbacks::ResumeExecution() to start processing again.
//
// Checkpointing:
// 1. InjectCheckpoint event arrives to CA.
// 2. CA saves its state and injects special checkpoint event to all outputs (TDqComputeActorCheckpoints::ICallbacks::InjectBarrierToOutputs()).
-// 3. Sink actor writes all data before checkpoint.
-// 4. Sink actor waits all external sink's acks for written data.
-// 5. Sink actor gathers its state and passes it into callback ICallbacks::OnSinkStateSaved(state, outputIndex).
+// 3. Sink writes all data before checkpoint.
+// 4. Sink waits all external sink's acks for written data.
+// 5. Sink gathers its state and passes it into callback ICallbacks::OnSinkStateSaved(state, outputIndex).
// 6. Checkpoints actor builds state for all task node as sum of the state of CA and all its sinks and saves it.
// 7. ...
-// 8. When checkpoint is written into database, checkpoints actor calls IDqSinkActor::CommitState() to apply all side effects.
-struct IDqSinkActor {
+// 8. When checkpoint is written into database, checkpoints actor calls IDqComputeActorAsyncOutput::CommitState() to apply all side effects.
+struct IDqComputeActorAsyncOutput {
struct ICallbacks { // Compute actor
virtual void ResumeExecution() = 0;
virtual void OnSinkError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0;
@@ -68,26 +68,26 @@ struct IDqSinkActor {
virtual void PassAway() = 0; // The same signature as IActor::PassAway()
- virtual ~IDqSinkActor() = default;
+ virtual ~IDqComputeActorAsyncOutput() = default;
};
-struct IDqSinkActorFactory : public TThrRefBase {
- using TPtr = TIntrusivePtr<IDqSinkActorFactory>;
+struct IDqSinkFactory : public TThrRefBase {
+ using TPtr = TIntrusivePtr<IDqSinkFactory>;
struct TArguments {
const NDqProto::TTaskOutput& OutputDesc;
ui64 OutputIndex;
TTxId TxId;
const THashMap<TString, TString>& SecureParams;
- IDqSinkActor::ICallbacks* Callback;
+ IDqComputeActorAsyncOutput::ICallbacks* Callback;
const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv;
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
};
- // Creates sink actor.
+ // Creates sink.
// Could throw YQL errors.
- // IActor* and IDqSinkActor* returned by method must point to the objects with consistent lifetime.
- virtual std::pair<IDqSinkActor*, NActors::IActor*> CreateDqSinkActor(TArguments&& args) const = 0;
+ // IActor* and IDqComputeActorAsyncOutput* returned by method must point to the objects with consistent lifetime.
+ virtual std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSink(TArguments&& args) const = 0;
};
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto
index 6b608790305..cf671332c27 100644
--- a/ydb/library/yql/dq/actors/protos/dq_stats.proto
+++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto
@@ -47,7 +47,7 @@ message TDqInputChannelStats {
google.protobuf.Any Extra = 100;
}
-message TDqSinkStats {
+message TDqAsyncOutputBufferStats {
// basic stats
uint64 OutputIndex = 1;
uint64 Chunks = 2;
@@ -138,7 +138,7 @@ message TDqTaskStats {
repeated TDqSourceStats Sources = 150;
repeated TDqInputChannelStats InputChannels = 151;
- repeated TDqSinkStats Sinks = 152;
+ repeated TDqAsyncOutputBufferStats Sinks = 152;
repeated TDqOutputChannelStats OutputChannels = 153;
google.protobuf.Any Extra = 200;
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index 6a3c60d5995..508c5cf049e 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -73,7 +73,7 @@ public:
private:
void OnStatisticsRequest(TEvStatistics::TPtr& ev, const TActorContext& ctx) {
TaskRunner->UpdateStats();
- THashMap<ui32, const TDqSinkStats*> sinkStats;
+ THashMap<ui32, const TDqAsyncOutputBufferStats*> sinkStats;
for (const auto sinkId : ev->Get()->SinkIds) {
sinkStats[sinkId] = TaskRunner->GetSink(sinkId)->GetStats();
}
@@ -410,13 +410,13 @@ private:
}
THolder<TEvDq::TEvAbortExecution> GetError(const NKikimr::TMemoryLimitExceededException&) {
- const auto err = TStringBuilder() << "Mkql memory limit exceeded"
+ const auto err = TStringBuilder() << "Mkql memory limit exceeded"
<< ", limit: " << (MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : -1)
<< ", canAllocateExtraMemory: " << (MemoryQuota ? MemoryQuota->GetCanAllocateExtraMemory() : 0);
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, Sprintf("TMemoryLimitExceededException: %s", err.c_str()));
TIssue issue(err);
SetIssueCode(TIssuesIds::KIKIMR_PRECONDITION_FAILED, issue);
- return MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TVector<TIssue>{issue});
+ return MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TVector<TIssue>{issue});
}
THolder<TEvDq::TEvAbortExecution> GetError(const TString& message) {
diff --git a/ydb/library/yql/dq/runtime/dq_sink.cpp b/ydb/library/yql/dq/runtime/dq_sink.cpp
index ff86523ef48..99e04ba3f08 100644
--- a/ydb/library/yql/dq/runtime/dq_sink.cpp
+++ b/ydb/library/yql/dq/runtime/dq_sink.cpp
@@ -9,7 +9,7 @@
namespace NYql::NDq {
namespace {
-class TDqSink : public IDqSink {
+class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
struct TValueDesc {
std::variant<NUdf::TUnboxedValue, NDqProto::TCheckpoint> Value;
ui64 EstimatedSize;
@@ -31,7 +31,7 @@ class TDqSink : public IDqSink {
};
public:
- TDqSink(ui64 outputIndex, NKikimr::NMiniKQL::TType* outputType, ui64 maxStoredBytes, bool collectProfileStats)
+ TDqAsyncOutputBuffer(ui64 outputIndex, NKikimr::NMiniKQL::TType* outputType, ui64 maxStoredBytes, bool collectProfileStats)
: OutputIndex(outputIndex)
, MaxStoredBytes(maxStoredBytes)
, OutputType(outputType)
@@ -141,7 +141,7 @@ public:
return OutputType;
}
- const TDqSinkStats* GetStats() const override {
+ const TDqAsyncOutputBufferStats* GetStats() const override {
return &BasicStats;
}
@@ -181,16 +181,16 @@ private:
bool Finished = false;
std::deque<TValueDesc> Values;
ui64 EstimatedRowBytes = 0;
- TDqSinkStats BasicStats;
- TDqSinkStats* ProfileStats = nullptr;
+ TDqAsyncOutputBufferStats BasicStats;
+ TDqAsyncOutputBufferStats* ProfileStats = nullptr;
};
} // namespace
-IDqSink::TPtr CreateDqSink(ui64 outputIndex, NKikimr::NMiniKQL::TType* outputType, ui64 maxStoredBytes,
+IDqAsyncOutputBuffer::TPtr CreateDqAsyncOutputBuffer(ui64 outputIndex, NKikimr::NMiniKQL::TType* outputType, ui64 maxStoredBytes,
bool collectProfileStats)
{
- return MakeIntrusive<TDqSink>(outputIndex, outputType, maxStoredBytes, collectProfileStats);
+ return MakeIntrusive<TDqAsyncOutputBuffer>(outputIndex, outputType, maxStoredBytes, collectProfileStats);
}
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/runtime/dq_sink.h b/ydb/library/yql/dq/runtime/dq_sink.h
index 447647ae3ac..931957a7208 100644
--- a/ydb/library/yql/dq/runtime/dq_sink.h
+++ b/ydb/library/yql/dq/runtime/dq_sink.h
@@ -7,10 +7,10 @@
namespace NYql::NDq {
-struct TDqSinkStats : TDqOutputStats {
+struct TDqAsyncOutputBufferStats : TDqOutputStats {
const ui64 OutputIndex;
- explicit TDqSinkStats(ui64 outputIndex)
+ explicit TDqAsyncOutputBufferStats(ui64 outputIndex)
: OutputIndex(outputIndex) {}
template<typename T>
@@ -24,9 +24,9 @@ struct TDqSinkStats : TDqOutputStats {
}
};
-class IDqSink : public IDqOutput {
+class IDqAsyncOutputBuffer : public IDqOutput {
public:
- using TPtr = TIntrusivePtr<IDqSink>;
+ using TPtr = TIntrusivePtr<IDqAsyncOutputBuffer>;
virtual ui64 GetOutputIndex() const = 0;
@@ -37,10 +37,10 @@ public:
[[nodiscard]]
virtual bool Pop(NDqProto::TCheckpoint& checkpoint) = 0;
- virtual const TDqSinkStats* GetStats() const = 0;
+ virtual const TDqAsyncOutputBufferStats* GetStats() const = 0;
};
-IDqSink::TPtr CreateDqSink(ui64 outputIndex, NKikimr::NMiniKQL::TType* outputType, ui64 maxStoredBytes,
+IDqAsyncOutputBuffer::TPtr CreateDqAsyncOutputBuffer(ui64 outputIndex, NKikimr::NMiniKQL::TType* outputType, ui64 maxStoredBytes,
bool collectProfileStats);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index c2ba27b5dc1..67005a599b1 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -443,7 +443,7 @@ public:
TVector<IDqOutput::TPtr> outputs{Reserve(std::max<ui64>(outputDesc.ChannelsSize(), 1))};
if (outputDesc.HasSink()) {
- auto sink = CreateDqSink(i, ProgramParsed.OutputItemTypes[i], memoryLimits.ChannelBufferSize,
+ auto sink = CreateDqAsyncOutputBuffer(i, ProgramParsed.OutputItemTypes[i], memoryLimits.ChannelBufferSize,
Settings.CollectProfileStats);
auto [_, inserted] = Sinks.emplace(i, sink);
Y_VERIFY(inserted);
@@ -578,7 +578,7 @@ public:
return *ptr;
}
- IDqSink::TPtr GetSink(ui64 outputIndex) override {
+ IDqAsyncOutputBuffer::TPtr GetSink(ui64 outputIndex) override {
auto ptr = Sinks.FindPtr(outputIndex);
YQL_ENSURE(ptr, "task: " << TaskId << " does not have output index: " << outputIndex);
return *ptr;
@@ -713,7 +713,7 @@ private:
THashMap<ui64, IDqInputChannel::TPtr> InputChannels; // Channel id -> Channel
THashMap<ui64, IDqSource::TPtr> Sources; // Input index -> Source
THashMap<ui64, IDqOutputChannel::TPtr> OutputChannels; // Channel id -> Channel
- THashMap<ui64, IDqSink::TPtr> Sinks; // Output index -> Sink
+ THashMap<ui64, IDqAsyncOutputBuffer::TPtr> Sinks; // Output index -> Sink
IDqOutputConsumer::TPtr Output;
NUdf::TUnboxedValue ResultStream;
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index bf96d30ac5a..7db08493785 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -195,7 +195,7 @@ public:
, IsDefined(true) {
}
- TDqTaskRunnerStatsView(const TDqTaskRunnerStats* stats, THashMap<ui32, const TDqSinkStats*>&& sinkStats)
+ TDqTaskRunnerStatsView(const TDqTaskRunnerStats* stats, THashMap<ui32, const TDqAsyncOutputBufferStats*>&& sinkStats)
: StatsInplace()
, StatsPtr(stats)
, IsInplace(false)
@@ -214,7 +214,7 @@ public:
return IsDefined;
}
- const TDqSinkStats* GetSinkStats(ui32 sinkId) const {
+ const TDqAsyncOutputBufferStats* GetSinkStats(ui32 sinkId) const {
return SinkStats.at(sinkId);
}
@@ -223,7 +223,7 @@ private:
const TDqTaskRunnerStats* StatsPtr;
bool IsInplace;
bool IsDefined;
- THashMap<ui32, const TDqSinkStats*> SinkStats;
+ THashMap<ui32, const TDqAsyncOutputBufferStats*> SinkStats;
};
struct TDqTaskRunnerContext {
@@ -300,7 +300,7 @@ public:
virtual IDqInputChannel::TPtr GetInputChannel(ui64 channelId) = 0;
virtual IDqSource::TPtr GetSource(ui64 inputIndex) = 0;
virtual IDqOutputChannel::TPtr GetOutputChannel(ui64 channelId) = 0;
- virtual IDqSink::TPtr GetSink(ui64 outputIndex) = 0;
+ virtual IDqAsyncOutputBuffer::TPtr GetSink(ui64 outputIndex) = 0;
// if memoryLimit = Nothing() then don't set memory limit, use existing one (if any)
// if memoryLimit = 0 then set unlimited
diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp
index 3a12b53fc48..5eb9ede8c77 100644
--- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp
+++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp
@@ -36,10 +36,10 @@ TFakeActor::~TFakeActor() {
Alloc.Acquire();
}
-void TFakeActor::InitSink(IDqSinkActor* dqSink, IActor* dqSinkAsActor) {
+void TFakeActor::InitSink(IDqComputeActorAsyncOutput* dqSink, IActor* dqSinkAsActor) {
DqSinkActorId = RegisterWithSameMailbox(dqSinkAsActor),
- DqSinkActor = dqSink;
- DqSinkActorAsActor = dqSinkAsActor;
+ DqSink = dqSink;
+ DqSinkAsActor = dqSinkAsActor;
}
void TFakeActor::InitSource(IDqSourceActor* dqSource, IActor* dqSourceAsActor) {
@@ -58,11 +58,11 @@ void TFakeActor::Terminate() {
}
if (DqSinkActorId) {
- DqSinkActor->PassAway();
+ DqSink->PassAway();
DqSinkActorId = std::nullopt;
- DqSinkActor = nullptr;
- DqSinkActorAsActor = nullptr;
+ DqSink = nullptr;
+ DqSinkAsActor = nullptr;
}
}
@@ -98,8 +98,8 @@ TFakeCASetup::~TFakeCASetup() {
void TFakeCASetup::SinkWrite(const TWriteValueProducer valueProducer, TMaybe<NDqProto::TCheckpoint> checkpoint) {
Execute([&valueProducer, checkpoint](TFakeActor& actor) {
auto batch = valueProducer(actor.GetHolderFactory());
- Y_ASSERT(actor.DqSinkActor);
- actor.DqSinkActor->SendData(std::move(batch), 0, checkpoint, false);
+ Y_ASSERT(actor.DqSink);
+ actor.DqSink->SendData(std::move(batch), 0, checkpoint, false);
});
}
@@ -119,8 +119,8 @@ void TFakeCASetup::LoadSource(const NDqProto::TSourceState& state) {
void TFakeCASetup::LoadSink(const NDqProto::TSinkState& state) {
Execute([&state](TFakeActor& actor) {
- Y_ASSERT(actor.DqSinkActor);
- actor.DqSinkActor->LoadState(state);
+ Y_ASSERT(actor.DqSink);
+ actor.DqSink->LoadState(state);
});
}
diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
index 15f82baffbb..7a3de5b51e5 100644
--- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
+++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
@@ -85,7 +85,7 @@ class TFakeActor : public NActors::TActor<TFakeActor> {
TFakeActor& Parent;
};
- struct TSinkCallbacks : public IDqSinkActor::ICallbacks {
+ struct TSinkCallbacks : public IDqComputeActorAsyncOutput::ICallbacks {
explicit TSinkCallbacks(TFakeActor& parent) : Parent(parent) {}
void ResumeExecution() override {
@@ -112,7 +112,7 @@ public:
TFakeActor(TSourcePromises& sourcePromises, TSinkPromises& sinkPromises);
~TFakeActor();
- void InitSink(IDqSinkActor* dqSink, IActor* dqSinkAsActor);
+ void InitSink(IDqComputeActorAsyncOutput* dqSink, IActor* dqSinkAsActor);
void InitSource(IDqSourceActor* dqSource, IActor* dqSourceAsActor);
void Terminate();
@@ -121,7 +121,7 @@ public:
public:
IDqSourceActor* DqSourceActor = nullptr;
- IDqSinkActor* DqSinkActor = nullptr;
+ IDqComputeActorAsyncOutput* DqSink = nullptr;
private:
STRICT_STFUNC(StateFunc,
@@ -156,7 +156,7 @@ private:
IActor* DqSourceActorAsActor = nullptr;
std::optional<NActors::TActorId> DqSinkActorId;
- IActor* DqSinkActorAsActor = nullptr;
+ IActor* DqSinkAsActor = nullptr;
TSourceEvents SourceEvents;
TSinkCallbacks SinkCallbacks;
diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
index a2ae923d16f..55faca97130 100644
--- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
@@ -66,7 +66,7 @@ IActor* CreateComputeActor(
operationId,
std::move(task),
std::move(options.SourceActorFactory),
- std::move(options.SinkActorFactory),
+ std::move(options.SinkFactory),
computeRuntimeSettings,
memoryLimits,
taskRunnerFactory);
@@ -76,7 +76,7 @@ IActor* CreateComputeActor(
operationId,
std::move(task),
std::move(options.SourceActorFactory),
- std::move(options.SinkActorFactory),
+ std::move(options.SinkFactory),
computeRuntimeSettings,
memoryLimits,
taskRunnerActorFactory);
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index 8cb708b2f3f..cac1420315c 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -59,14 +59,14 @@ struct TSourceInfo {
};
struct TSinkInfo {
- IDqSinkActor* SinkActor = nullptr;
+ IDqComputeActorAsyncOutput* Sink = nullptr;
NActors::IActor* Actor = nullptr;
bool Finished = false;
NKikimr::NMiniKQL::TTypeEnvironment* TypeEnv = nullptr;
};
class TDqWorker: public TRichActor<TDqWorker>
- , IDqSinkActor::ICallbacks
+ , IDqComputeActorAsyncOutput::ICallbacks
, ITaskRunnerActor::ICallbacks
{
static constexpr ui32 INPUT_SIZE = 100000;
@@ -77,12 +77,12 @@ public:
explicit TDqWorker(
const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
const IDqSourceActorFactory::TPtr& sourceActorFactory,
- const IDqSinkActorFactory::TPtr& sinkActorFactory,
+ const IDqSinkFactory::TPtr& sinkFactory,
TWorkerRuntimeData* runtimeData,
const TString& traceId)
: TRichActor<TDqWorker>(&TDqWorker::Handler)
, SourceActorFactory(sourceActorFactory)
- , SinkActorFactory(sinkActorFactory)
+ , SinkFactory(sinkFactory)
, TaskRunnerActorFactory(taskRunnerActorFactory)
, RuntimeData(runtimeData)
, TraceId(traceId)
@@ -119,7 +119,7 @@ public:
v.SourceActor->PassAway();
}
for (const auto& [_, v] : SinksMap) {
- v.SinkActor->PassAway();
+ v.Sink->PassAway();
}
Dump();
}
@@ -140,7 +140,7 @@ private:
HFunc(TEvSourcePushFinished, OnSourcePushFinished);
// weird to have two events for error handling, but we need to use TEvDqFailure
- // between worker_actor <-> executer_actor, cause it transmits statistics in 'Metric' field
+ // between worker_actor <-> executer_actor, cause it transmits statistics in 'Metric' field
HFunc(NDq::TEvDq::TEvAbortExecution, OnErrorFromPipe); // received from task_runner_actor
HFunc(TEvDqFailure, OnError); // received from this actor itself
HFunc(TEvContinueRun, OnContinueRun);
@@ -306,8 +306,8 @@ private:
if (output.HasSink()) {
auto& sink = SinksMap[outputId];
sink.TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv);
- std::tie(sink.SinkActor, sink.Actor) = SinkActorFactory->CreateDqSinkActor(
- IDqSinkActorFactory::TArguments {
+ std::tie(sink.Sink, sink.Actor) = SinkFactory->CreateDqSink(
+ IDqSinkFactory::TArguments {
.OutputDesc = output,
.OutputIndex = static_cast<ui64>(outputId),
.TxId = TraceId,
@@ -591,9 +591,9 @@ private:
}
case ERunStatus::PendingOutput: {
for (auto& [index, sink] : SinksMap) {
- const i64 sinkActorFreeSpaceBeforeSend = sink.SinkActor->GetFreeSpace();
- if (sinkActorFreeSpaceBeforeSend > 0 && !sink.Finished) {
- Send(TaskRunnerActor, new TEvSinkPop(index, sinkActorFreeSpaceBeforeSend));
+ const i64 sinkFreeSpaceBeforeSend = sink.Sink->GetFreeSpace();
+ if (sinkFreeSpaceBeforeSend > 0 && !sink.Finished) {
+ Send(TaskRunnerActor, new TEvSinkPop(index, sinkFreeSpaceBeforeSend));
}
}
break;
@@ -691,7 +691,7 @@ private:
Run(ctx);
}
/*_________________________________________________________*/
- /*______________________ SinkActorEvents __________________*/
+ /*______________________ Sink Events ----__________________*/
void ResumeExecution() override {
Send(SelfId(), new TEvContinueRun());
}
@@ -720,13 +720,13 @@ private:
Y_UNUSED(checkpointSize); Y_UNUSED(checkpoint); Y_UNUSED(changed);
auto& sink = SinksMap[index];
sink.Finished = finished;
- sink.SinkActor->SendData(std::move(batch), size, {}, finished);
+ sink.Sink->SendData(std::move(batch), size, {}, finished);
}
/*_________________________________________________________*/
IDqSourceActorFactory::TPtr SourceActorFactory;
- IDqSinkActorFactory::TPtr SinkActorFactory;
+ IDqSinkFactory::TPtr SinkFactory;
ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory;
NTaskRunnerActor::ITaskRunnerActor* Actor = nullptr;
TActorId TaskRunnerActor;
@@ -765,14 +765,14 @@ NActors::IActor* CreateWorkerActor(
const TString& traceId,
const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
const IDqSourceActorFactory::TPtr& sourceActorFactory,
- const IDqSinkActorFactory::TPtr& sinkActorFactory)
+ const IDqSinkFactory::TPtr& sinkFactory)
{
Y_VERIFY(taskRunnerActorFactory);
return new TLogWrapReceive(
new TDqWorker(
taskRunnerActorFactory,
sourceActorFactory,
- sinkActorFactory,
+ sinkFactory,
runtimeData,
traceId), traceId);
}
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.h b/ydb/library/yql/providers/dq/actors/worker_actor.h
index 4d39687f5b2..a9eac60c486 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.h
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.h
@@ -26,6 +26,6 @@ namespace NYql::NDqs {
const TString& traceId,
const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
const NDq::IDqSourceActorFactory::TPtr& sourceActorFactory,
- const NDq::IDqSinkActorFactory::TPtr& sinkActorFactory);
+ const NDq::IDqSinkFactory::TPtr& sinkFactory);
} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
index 3f570f1aa41..213b7fb38b9 100644
--- a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
+++ b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
@@ -126,7 +126,7 @@ message TGetStatsSourceResponse {
}
message TSinkStatsResponse {
- NDqProto.TDqSinkStats Stats = 1;
+ NDqProto.TDqAsyncOutputBufferStats Stats = 1;
}
message TPrepareRequest {
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
index 1a8d3172e41..93243a29525 100644
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
@@ -35,7 +35,7 @@ namespace {
return factory;
}
- NDq::IDqSinkActorFactory::TPtr CreateSinkActorFactory(const NYdb::TDriver& driver) {
+ NDq::IDqSinkFactory::TPtr CreateSinkFactory(const NYdb::TDriver& driver) {
auto factory = MakeIntrusive<NYql::NDq::TDqSinkFactory>();
RegisterDqPqWriteActorFactory(*factory, driver, nullptr);
return factory;
@@ -72,7 +72,7 @@ public:
NDqs::TLocalWorkerManagerOptions lwmOptions;
lwmOptions.Factory = NTaskRunnerProxy::CreateFactory(functionRegistry, compFactory, taskTransformFactory, true);
lwmOptions.SourceActorFactory = CreateSourceActorFactory(driver, std::move(httpGateway));
- lwmOptions.SinkActorFactory = CreateSinkActorFactory(driver);
+ lwmOptions.SinkFactory = CreateSinkFactory(driver);
lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory();
lwmOptions.TaskRunnerActorFactory = NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
[=](const NDqProto::TDqTask& task, const NDq::TLogFunc& )
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index a2e7eda819b..2695689b47c 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -77,7 +77,7 @@ void ToProto(T* s1, const NDq::TDqOutputChannelStats* ss)
}
template<typename T>
-void ToProto(T* s1, const NDq::TDqSinkStats* ss)
+void ToProto(T* s1, const NDq::TDqAsyncOutputBufferStats* ss)
{
s1->SetChunks(ss->Chunks);
s1->SetBytes(ss->Bytes);
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
index ab8769f44ef..338ed422851 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
@@ -159,7 +159,7 @@ public:
return Runner->GetSource(index);
}
- IDqSink::TPtr GetSink(ui64 index) override {
+ IDqAsyncOutputBuffer::TPtr GetSink(ui64 index) override {
return Runner->GetSink(index);
}
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index 7af7fccc5d2..951011ce454 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -1127,7 +1127,7 @@ public:
return OutputType;
}
- const TDqSinkStats* GetStats() const override {
+ const TDqAsyncOutputBufferStats* GetStats() const override {
try {
NDqProto::TCommandHeader header;
header.SetVersion(5);
@@ -1181,7 +1181,7 @@ private:
IInputStream& Input;
IOutputStream& Output;
- mutable TDqSinkStats Stats;
+ mutable TDqAsyncOutputBufferStats Stats;
mutable NKikimr::NMiniKQL::TType* OutputType = nullptr;
};
@@ -1489,7 +1489,7 @@ public:
return channel;
}
- IDqSink::TPtr GetSink(ui64 outputIndex) override {
+ IDqAsyncOutputBuffer::TPtr GetSink(ui64 outputIndex) override {
auto& sink = Sinks[outputIndex];
if (!sink) {
sink = new TDqSink(
@@ -1565,7 +1565,7 @@ private:
THashMap<ui64, IDqInputChannel::TPtr> InputChannels;
THashMap<ui64, IDqSource::TPtr> Sources;
THashMap<ui64, IDqOutputChannel::TPtr> OutputChannels;
- THashMap<ui64, IDqSink::TPtr> Sinks;
+ THashMap<ui64, IDqAsyncOutputBuffer::TPtr> Sinks;
};
/*______________________________________________________________________________________________*/
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
index 31a6e82784d..184a9a2d2d8 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
@@ -17,7 +17,7 @@ public:
virtual void PushString(TVector<TString>&& batch, i64 space) = 0;
};
-class IStringSink: public NDq::IDqSink {
+class IStringSink: public NDq::IDqAsyncOutputBuffer {
public:
virtual ~IStringSink() = default;
virtual ui64 PopString(TVector<TString>& batch, ui64 bytes) = 0;
@@ -62,7 +62,7 @@ public:
virtual IInputChannel::TPtr GetInputChannel(ui64 channelId) = 0;
virtual IOutputChannel::TPtr GetOutputChannel(ui64 channelId) = 0;
virtual NDq::IDqSource::TPtr GetSource(ui64 index) = 0;
- virtual NDq::IDqSink::TPtr GetSink(ui64 index) = 0;
+ virtual NDq::IDqAsyncOutputBuffer::TPtr GetSink(ui64 index) = 0;
virtual const THashMap<TString,TString>& GetTaskParams() const = 0;
virtual const THashMap<TString,TString>& GetSecureParams() const = 0;
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
index c0641dc72b7..ab0f32af1d7 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
@@ -262,7 +262,7 @@ private:
traceId,
Options.TaskRunnerActorFactory,
Options.SourceActorFactory,
- Options.SinkActorFactory));
+ Options.SinkFactory));
}
allocationInfo.WorkerActors.emplace_back(RegisterChild(
actor.Release(), createComputeActor ? NYql::NDq::TEvDq::TEvAbortExecution::Unavailable("Aborted by LWM").Release() : nullptr
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
index 144217305fb..26f8a6a43ff 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
@@ -23,7 +23,7 @@ namespace NYql::NDqs {
TWorkerManagerCounters Counters;
NTaskRunnerProxy::IProxyFactory::TPtr Factory;
NDq::IDqSourceActorFactory::TPtr SourceActorFactory;
- NDq::IDqSinkActorFactory::TPtr SinkActorFactory;
+ NDq::IDqSinkFactory::TPtr SinkFactory;
TWorkerRuntimeData* RuntimeData = nullptr;
TTaskRunnerInvokerFactory::TPtr TaskRunnerInvokerFactory;
NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory;
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
index 85fbc91c01d..bfd7ad6dea9 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
@@ -88,7 +88,7 @@ struct TEvPrivate {
} // namespace
-class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqSinkActor {
+class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqComputeActorAsyncOutput {
public:
TDqPqWriteActor(
ui64 outputIndex,
@@ -96,7 +96,7 @@ public:
NPq::NProto::TDqPqTopicSink&& sinkParams,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
- IDqSinkActor::ICallbacks* callbacks,
+ IDqComputeActorAsyncOutput::ICallbacks* callbacks,
i64 freeSpace)
: TActor<TDqPqWriteActor>(&TDqPqWriteActor::StateFunc)
, OutputIndex(outputIndex)
@@ -208,7 +208,7 @@ private:
SubscribeOnNextEvent();
}
- // IActor & IDqSinkActor
+ // IActor & IDqComputeActorAsyncOutput
void PassAway() override { // Is called from Compute Actor
if (WriteSession) {
WriteSession->Close(TDuration::Zero());
@@ -371,7 +371,7 @@ private:
const NPq::NProto::TDqPqTopicSink SinkParams;
NYdb::TDriver Driver;
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
- IDqSinkActor::ICallbacks* const Callbacks;
+ IDqComputeActorAsyncOutput::ICallbacks* const Callbacks;
i64 FreeSpace = 0;
NYdb::NPersQueue::TPersQueueClient PersQueueClient;
@@ -388,14 +388,14 @@ private:
std::queue<std::tuple<ui64, NDqProto::TCheckpoint>> DeferredCheckpoints;
};
-std::pair<IDqSinkActor*, NActors::IActor*> CreateDqPqWriteActor(
+std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor(
NPq::NProto::TDqPqTopicSink&& settings,
ui64 outputIndex,
TTxId txId,
const THashMap<TString, TString>& secureParams,
NYdb::TDriver driver,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- IDqSinkActor::ICallbacks* callbacks,
+ IDqComputeActorAsyncOutput::ICallbacks* callbacks,
i64 freeSpace)
{
const TString& tokenName = settings.GetToken().GetName();
@@ -417,7 +417,7 @@ void RegisterDqPqWriteActorFactory(TDqSinkFactory& factory, NYdb::TDriver driver
factory.Register<NPq::NProto::TDqPqTopicSink>("PqSink",
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory)](
NPq::NProto::TDqPqTopicSink&& settings,
- IDqSinkActorFactory::TArguments&& args)
+ IDqSinkFactory::TArguments&& args)
{
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(DQ_PQ_PROVIDER));
return CreateDqPqWriteActor(
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h
index 6736c8188b2..2cc0b0ee183 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h
@@ -20,14 +20,14 @@ namespace NYql::NDq {
constexpr i64 DqPqDefaultFreeSpace = 16_MB;
-std::pair<IDqSinkActor*, NActors::IActor*> CreateDqPqWriteActor(
+std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor(
NPq::NProto::TDqPqTopicSink&& settings,
ui64 outputIndex,
TTxId txId,
const THashMap<TString, TString>& secureParams,
NYdb::TDriver driver,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- IDqSinkActor::ICallbacks* callbacks,
+ IDqComputeActorAsyncOutput::ICallbacks* callbacks,
i64 freeSpace = DqPqDefaultFreeSpace);
void RegisterDqPqWriteActorFactory(TDqSinkFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
index 74a5176e1dc..d5bc0ab0ddc 100644
--- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
+++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
@@ -75,14 +75,14 @@ struct TMetricsInflight {
} // namespace
-class TDqSolomonWriteActor : public NActors::TActor<TDqSolomonWriteActor>, public IDqSinkActor {
+class TDqSolomonWriteActor : public NActors::TActor<TDqSolomonWriteActor>, public IDqComputeActorAsyncOutput {
public:
static constexpr char ActorName[] = "DQ_SOLOMON_WRITE_ACTOR";
TDqSolomonWriteActor(
ui64 outputIndex,
TDqSolomonWriteParams&& writeParams,
- NYql::NDq::IDqSinkActor::ICallbacks* callbacks,
+ NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks* callbacks,
const NMonitoring::TDynamicCounterPtr& counters,
std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider,
i64 freeSpace)
@@ -229,7 +229,7 @@ private:
while (TryToSendNextBatch()) {}
}
- // IActor & IDqSinkActor
+ // IActor & IDqComputeActorAsyncOutput
void PassAway() override { // Is called from Compute Actor
for (const auto& [_, metricsInflight] : InflightBuffer) {
Send(metricsInflight.HttpSenderId, new TEvents::TEvPoison());
@@ -424,7 +424,7 @@ private:
const ui64 OutputIndex;
const TDqSolomonWriteParams WriteParams;
const TString Url;
- NYql::NDq::IDqSinkActor::ICallbacks* const Callbacks;
+ NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks* const Callbacks;
TDqSolomonWriteActorMetrics Metrics;
i64 FreeSpace = 0;
TActorId HttpProxyId;
@@ -440,11 +440,11 @@ private:
ui64 Cookie = 0;
};
-std::pair<NYql::NDq::IDqSinkActor*, NActors::IActor*> CreateDqSolomonWriteActor(
+std::pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSolomonWriteActor(
NYql::NSo::NProto::TDqSolomonShard&& settings,
ui64 outputIndex,
const THashMap<TString, TString>& secureParams,
- NYql::NDq::IDqSinkActor::ICallbacks* callbacks,
+ NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks* callbacks,
const NMonitoring::TDynamicCounterPtr& counters,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
i64 freeSpace)
@@ -473,7 +473,7 @@ void RegisterDQSolomonWriteActorFactory(TDqSinkFactory& factory, ISecuredService
factory.Register<NSo::NProto::TDqSolomonShard>("SolomonSink",
[credentialsFactory](
NYql::NSo::NProto::TDqSolomonShard&& settings,
- IDqSinkActorFactory::TArguments&& args)
+ IDqSinkFactory::TArguments&& args)
{
auto counters = MakeIntrusive<NMonitoring::TDynamicCounters>();
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h
index 56c34cbb25f..12c7dfc2cd9 100644
--- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h
+++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h
@@ -18,11 +18,11 @@ namespace NYql::NDq {
constexpr i64 DqSolomonDefaultFreeSpace = 16_MB;
-std::pair<NYql::NDq::IDqSinkActor*, NActors::IActor*> CreateDqSolomonWriteActor(
+std::pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSolomonWriteActor(
NYql::NSo::NProto::TDqSolomonShard&& settings,
ui64 outputIndex,
const THashMap<TString, TString>& secureParams,
- NYql::NDq::IDqSinkActor::ICallbacks* callbacks,
+ NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks* callbacks,
const NMonitoring::TDynamicCounterPtr& counters,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
i64 freeSpace = DqSolomonDefaultFreeSpace);