summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <[email protected]>2022-05-19 15:51:26 +0300
committerVasily Gerasimov <[email protected]>2022-05-19 15:51:26 +0300
commit7cf06e3c230798638c126cdcb5532b5a93c09ce6 (patch)
tree75c49079794cc10f306bf7f96ba4b28bd6512be4
parent5daef990bb328f039b3c44707b471a3633d520ae (diff)
YQ-1098 Support output transform runtime
Rewrite TestPrivateFunction Rewrite TestSeveralIterations Rewrite TestMultiplicationTransform Adapt TestEmptyChannel Fix derivation from TThrRefBase of actor Tune factory args Remove TransformInput from factory args Remove incorrect check in type ann Pass settings and invoke url to actor Remove transform events Move factory to source/sinks factories files Remap callbacks Comment Remove unused header dq_transform_actor.h Rename transform factory Support checkpoints in CA Fix transform actor Fix types IDqComputeActorAsyncOutput interface Pass transform factory in dqrun Pass transform factory through CAs TAsyncOutputInfoBase Fix build Changes in task runner ref:73ec0ca11f52fc494992ecceec9e6ea8e603caa8
-rw-r--r--CMakeLists.darwin.txt3
-rw-r--r--CMakeLists.linux.txt3
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.h6
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp10
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp12
-rw-r--r--ydb/core/kqp/executer/kqp_data_executer.cpp2
-rw-r--r--ydb/core/kqp/node/kqp_node.cpp4
-rw-r--r--ydb/core/kqp/runtime/kqp_effects.cpp4
-rw-r--r--ydb/core/kqp/runtime/kqp_output_stream.cpp4
-rw-r--r--ydb/core/yq/libs/init/init.cpp1
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp28
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h3
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp10
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.h3
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp16
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h36
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h41
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h7
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h200
-rw-r--r--ydb/library/yql/dq/actors/dq_events_ids.h2
-rw-r--r--ydb/library/yql/dq/actors/transform/CMakeLists.txt24
-rw-r--r--ydb/library/yql/dq/actors/transform/dq_transform_actor.h11
-rw-r--r--ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.cpp28
-rw-r--r--ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h41
-rw-r--r--ydb/library/yql/dq/actors/transform/dq_transform_events.cpp1
-rw-r--r--ydb/library/yql/dq/actors/transform/dq_transform_events.h23
-rw-r--r--ydb/library/yql/dq/runtime/CMakeLists.txt1
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_consumer.cpp23
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_consumer.h1
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp58
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h1
-rw-r--r--ydb/library/yql/dq/type_ann/dq_type_ann.cpp10
-rw-r--r--ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp13
-rw-r--r--ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp48
-rw-r--r--ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h53
-rw-r--r--ydb/library/yql/providers/dq/actors/compute_actor.cpp12
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp6
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp50
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h6
-rw-r--r--ydb/library/yql/providers/dq/planner/execution_planner.cpp2
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp4
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h4
-rw-r--r--ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp2
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp9
-rw-r--r--ydb/library/yql/providers/pq/async_io/ut/dq_pq_write_actor_ut.cpp48
-rw-r--r--ydb/library/yql/providers/pq/async_io/ut/ut_helpers.cpp12
-rw-r--r--ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h8
-rw-r--r--ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h2
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp16
-rw-r--r--ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp12
-rw-r--r--ydb/library/yql/providers/solomon/async_io/ut/dq_solomon_write_actor_ut.cpp34
-rw-r--r--ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.cpp8
-rw-r--r--ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.h2
53 files changed, 563 insertions, 405 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index 4ce54157e6b..a9e6ed94b15 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -543,6 +543,7 @@ add_subdirectory(ydb/library/yql/dq/common)
add_subdirectory(ydb/core/ydb_convert)
add_subdirectory(ydb/library/yql/dq/runtime)
add_subdirectory(ydb/library/yql/dq/type_ann)
+add_subdirectory(ydb/library/yql/providers/common/schema/mkql)
add_subdirectory(ydb/core/sys_view/common)
add_subdirectory(ydb/core/sys_view/nodes)
add_subdirectory(ydb/core/sys_view/partition_stats)
@@ -679,7 +680,6 @@ add_subdirectory(ydb/core/yq/libs/shared_resources/interface)
add_subdirectory(ydb/library/logger)
add_subdirectory(ydb/core/yq/libs/private_client)
add_subdirectory(ydb/core/yq/libs/result_formatter)
-add_subdirectory(ydb/library/yql/providers/common/schema/mkql)
add_subdirectory(ydb/core/yq/libs/signer)
add_subdirectory(ydb/core/yq/libs/hmac)
add_subdirectory(ydb/library/yql/providers/clickhouse/provider)
@@ -1203,7 +1203,6 @@ add_subdirectory(ydb/core/yq/libs/hmac/ut)
add_subdirectory(ydb/core/yq/libs/result_formatter/ut)
add_subdirectory(ydb/core/yq/libs/signer/ut)
add_subdirectory(ydb/core/yq/libs/test_connection/ut)
-add_subdirectory(ydb/library/yql/dq/actors/transform)
add_subdirectory(ydb/library/yql/dq/actors/compute/ut)
add_subdirectory(ydb/library/yql/dq/runtime/ut)
add_subdirectory(ydb/library/yql/dq/state/ut)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 0e8beb569c0..6893800ba86 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -623,6 +623,7 @@ add_subdirectory(ydb/library/yql/dq/common)
add_subdirectory(ydb/core/ydb_convert)
add_subdirectory(ydb/library/yql/dq/runtime)
add_subdirectory(ydb/library/yql/dq/type_ann)
+add_subdirectory(ydb/library/yql/providers/common/schema/mkql)
add_subdirectory(ydb/core/sys_view/common)
add_subdirectory(ydb/core/sys_view/nodes)
add_subdirectory(ydb/core/sys_view/partition_stats)
@@ -759,7 +760,6 @@ add_subdirectory(ydb/core/yq/libs/shared_resources/interface)
add_subdirectory(ydb/library/logger)
add_subdirectory(ydb/core/yq/libs/private_client)
add_subdirectory(ydb/core/yq/libs/result_formatter)
-add_subdirectory(ydb/library/yql/providers/common/schema/mkql)
add_subdirectory(ydb/core/yq/libs/signer)
add_subdirectory(ydb/core/yq/libs/hmac)
add_subdirectory(ydb/library/yql/providers/clickhouse/provider)
@@ -1298,7 +1298,6 @@ add_subdirectory(ydb/core/yq/libs/hmac/ut)
add_subdirectory(ydb/core/yq/libs/result_formatter/ut)
add_subdirectory(ydb/core/yq/libs/signer/ut)
add_subdirectory(ydb/core/yq/libs/test_connection/ut)
-add_subdirectory(ydb/library/yql/dq/actors/transform)
add_subdirectory(ydb/library/yql/dq/actors/compute/ut)
add_subdirectory(ydb/library/yql/dq/runtime/ut)
add_subdirectory(ydb/library/yql/dq/state/ut)
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
index afb11d79b19..d70dcc3275a 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
@@ -16,11 +16,13 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput
namespace NKqp {
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask&& task,
- NYql::NDq::IDqSourceActorFactory::TPtr sourceActorFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory,
+ NYql::NDq::IDqSourceActorFactory::TPtr sourceActorFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory, NYql::NDq::IDqOutputTransformFactory::TPtr transformFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits);
IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
- NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqSourceActorFactory::TPtr sourceActorFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory,
+ NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqSourceActorFactory::TPtr sourceActorFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory, NYql::NDq::IDqOutputTransformFactory::TPtr transformFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
TIntrusivePtr<TKqpCounters> counters);
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index 117ccb88072..683cfb600dd 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -33,9 +33,10 @@ public:
}
TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
- : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true)
+ : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true)
, ComputeCtx(settings.StatsMode)
{
if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) {
@@ -317,11 +318,12 @@ private:
} // anonymous namespace
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
{
return new TKqpComputeActor(executerId, txId, std::move(task), std::move(sourceActorFactory),
- std::move(sinkFactory), settings, memoryLimits);
+ std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 249a9e3d184..61233473529 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -72,9 +72,10 @@ public:
}
TKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
- NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
+ NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters)
- : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), settings, memoryLimits)
+ : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits)
, ComputeCtx(settings.StatsMode)
, Snapshot(snapshot)
, Counters(counters)
@@ -1126,11 +1127,12 @@ private:
} // anonymous namespace
IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
- NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
+ NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters)
{
- return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory),
- settings, memoryLimits, counters);
+ return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), std::move(transformFactory),
+ functionRegistry, settings, memoryLimits, counters);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp
index b0d4947f654..a405a06f3de 100644
--- a/ydb/core/kqp/executer/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_data_executer.cpp
@@ -1240,7 +1240,7 @@ private:
return false;
};
- auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), nullptr, nullptr, settings, limits);
+ auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), nullptr, nullptr, nullptr, nullptr, settings, limits);
auto computeActorId = Register(computeActor);
task.ComputeActorId = computeActorId;
diff --git a/ydb/core/kqp/node/kqp_node.cpp b/ydb/core/kqp/node/kqp_node.cpp
index bf7d7f69c2d..80728c7160f 100644
--- a/ydb/core/kqp/node/kqp_node.cpp
+++ b/ydb/core/kqp/node/kqp_node.cpp
@@ -311,11 +311,11 @@ private:
IActor* computeActor;
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask),
- nullptr, nullptr, runtimeSettings, memoryLimits, Counters);
+ nullptr, nullptr, nullptr, nullptr, runtimeSettings, memoryLimits, Counters);
taskCtx.ComputeActorId = Register(computeActor);
} else {
if (Y_LIKELY(!CaFactory)) {
- computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), nullptr, nullptr, runtimeSettings,
+ computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), nullptr, nullptr, nullptr, nullptr, runtimeSettings,
memoryLimits);
taskCtx.ComputeActorId = Register(computeActor);
} else {
diff --git a/ydb/core/kqp/runtime/kqp_effects.cpp b/ydb/core/kqp/runtime/kqp_effects.cpp
index b178192f273..35ca7b017e1 100644
--- a/ydb/core/kqp/runtime/kqp_effects.cpp
+++ b/ydb/core/kqp/runtime/kqp_effects.cpp
@@ -23,6 +23,10 @@ public:
value.Apply(*ApplyCtx);
}
+ void Consume(NDqProto::TCheckpoint&&) final {
+ Y_FAIL("Shouldn't be called");
+ }
+
void Finish() final {}
private:
diff --git a/ydb/core/kqp/runtime/kqp_output_stream.cpp b/ydb/core/kqp/runtime/kqp_output_stream.cpp
index 640706d9b45..b3aac31aaed 100644
--- a/ydb/core/kqp/runtime/kqp_output_stream.cpp
+++ b/ydb/core/kqp/runtime/kqp_output_stream.cpp
@@ -45,6 +45,10 @@ public:
Outputs[partitionIndex]->Push(std::move(value));
}
+ void Consume(NDqProto::TCheckpoint&&) final {
+ Y_FAIL("Shouldn't be called");
+ }
+
void Finish() final {
for (auto& output : Outputs) {
output->Finish();
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index f112537737c..7fc0e22acc1 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -161,6 +161,7 @@ void Init(
lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, false);
lwmOptions.SourceActorFactory = sourceActorFactory;
lwmOptions.SinkFactory = sinkFactory;
+ lwmOptions.FunctionRegistry = appData->FunctionRegistry;
lwmOptions.TaskRunnerInvokerFactory = new NYql::NDqs::TTaskRunnerInvokerFactory();
lwmOptions.MkqlInitialMemoryLimit = mkqlInitialMemoryLimit;
lwmOptions.MkqlTotalMemoryLimit = mkqlTotalMemoryLimit;
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 2d1d3e6d16f..fc468e163b6 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -29,10 +29,11 @@ public:
static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR";
TDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory)
- : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), settings, memoryLimits, /* ownMemoryQuota = */ false)
+ : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ false)
, TaskRunnerActorFactory(taskRunnerActorFactory)
, ReadyToCheckpointFlag(false)
, SentStatsRequest(false)
@@ -121,7 +122,7 @@ private:
WaitingForStateResponse.clear();
}
- const TDqAsyncOutputBufferStats* GetSinkStats(ui64 outputIdx, const TSinkInfo& sinkInfo) const override {
+ const TDqAsyncOutputBufferStats* GetSinkStats(ui64 outputIdx, const TAsyncOutputInfoBase& sinkInfo) const override {
Y_UNUSED(sinkInfo);
return TaskRunnerStats.GetSinkStats(outputIdx);
}
@@ -167,7 +168,7 @@ private:
this->Send(TaskRunnerActorId, new NTaskRunnerActor::TEvPop(channelId, wasFinished, toSend));
}
- void DrainSink(ui64 outputIndex, TSinkInfo& sinkInfo) override {
+ void DrainAsyncOutput(ui64 outputIndex, TAsyncOutputInfoBase& sinkInfo) override {
if (sinkInfo.Finished && !Checkpoints) {
return;
}
@@ -176,23 +177,23 @@ private:
return;
}
- Y_VERIFY(sinkInfo.Sink);
+ Y_VERIFY(sinkInfo.AsyncOutput);
Y_VERIFY(sinkInfo.Actor);
const ui32 allowedOvercommit = AllowedChannelsOvercommit();
- const i64 sinkFreeSpaceBeforeSend = sinkInfo.Sink->GetFreeSpace();
+ const i64 sinkFreeSpaceBeforeSend = sinkInfo.AsyncOutput->GetFreeSpace();
i64 toSend = sinkFreeSpaceBeforeSend + allowedOvercommit;
CA_LOG_D("About to drain sink " << outputIndex
<< ". FreeSpace: " << sinkFreeSpaceBeforeSend
<< ", allowedOvercommit: " << allowedOvercommit
<< ", toSend: " << toSend
- //<< ", finished: " << sinkInfo.SinkBuffer->IsFinished());
+ //<< ", finished: " << sinkInfo.Buffer->IsFinished());
);
sinkInfo.PopStarted = true;
ProcessOutputsState.Inflight ++;
- sinkInfo.SinkFreeSpaceBeforeSend = sinkFreeSpaceBeforeSend;
+ sinkInfo.FreeSpaceBeforeSend = sinkFreeSpaceBeforeSend;
this->Send(TaskRunnerActorId, new NTaskRunnerActor::TEvSinkPop(outputIndex, sinkFreeSpaceBeforeSend));
}
@@ -478,7 +479,7 @@ private:
auto it = SinksMap.find(outputIndex);
Y_VERIFY(it != SinksMap.end());
- TSinkInfo& sinkInfo = it->second;
+ TAsyncOutputInfoBase& sinkInfo = it->second;
sinkInfo.Finished = finished;
if (finished) {
FinishedSinks.insert(outputIndex);
@@ -493,11 +494,11 @@ private:
ProcessOutputsState.HasDataToSend |= !sinkInfo.Finished;
auto guard = BindAllocator();
- sinkInfo.Sink->SendData(std::move(batch), size, std::move(checkpoint), finished);
+ sinkInfo.AsyncOutput->SendData(std::move(batch), size, std::move(checkpoint), finished);
CA_LOG_D("sink " << outputIndex << ": sent " << dataSize << " bytes of data and " << checkpointSize << " bytes of checkpoint barrier");
CA_LOG_D("Drain sink " << outputIndex
- << ". Free space decreased: " << (sinkInfo.SinkFreeSpaceBeforeSend - sinkInfo.Sink->GetFreeSpace())
+ << ". Free space decreased: " << (sinkInfo.FreeSpaceBeforeSend - sinkInfo.AsyncOutput->GetFreeSpace())
<< ", sent data from buffer: " << dataSize);
ProcessOutputsState.DataWasSent |= dataWasSent;
@@ -572,12 +573,13 @@ private:
IActor* CreateDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory)
{
return new TDqAsyncComputeActor(executerId, txId, std::move(task), std::move(sourceActorFactory),
- std::move(sinkFactory), settings, memoryLimits, taskRunnerActorFactory);
+ std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, taskRunnerActorFactory);
}
} // namespace NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
index 209570081c7..dae744e3777 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
@@ -19,7 +19,8 @@ namespace NYql {
namespace NDq {
NActors::IActor* CreateDqAsyncComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory);
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
index 17bfa4491d1..7f8a2564b80 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
@@ -33,10 +33,11 @@ public:
static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR";
TDqComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const TTaskRunnerFactory& taskRunnerFactory)
- : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), settings, memoryLimits)
+ : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits)
, TaskRunnerFactory(taskRunnerFactory)
{}
@@ -67,11 +68,12 @@ private:
IActor* CreateDqComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TTaskRunnerFactory& taskRunnerFactory)
{
return new TDqComputeActor(executerId, txId, std::move(task), std::move(sourceActorFactory),
- std::move(sinkFactory), settings, memoryLimits, taskRunnerFactory);
+ std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, taskRunnerFactory);
}
} // namespace NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
index 90a06438417..49568b7687c 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
@@ -257,7 +257,8 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase&
NDqProto::TDqTaskStats* protoTask, bool withProfileStats);
NActors::IActor* CreateDqComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const TTaskRunnerFactory& taskRunnerFactory);
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp
index f73750ce676..fdbc0648380 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp
@@ -3,6 +3,7 @@
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_sources.h>
#include <ydb/library/yql/dq/common/dq_common.h>
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
namespace NYql::NDq {
@@ -42,4 +43,19 @@ void TDqSinkFactory::Register(const TString& type, TCreatorFunction creator)
Y_VERIFY(registered);
}
+std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> TDqOutputTransformFactory::CreateDqOutputTransform(TArguments&& args) {
+ auto creator = CreatorsByType.find(args.OutputDesc.GetTransform().GetType());
+ YQL_ENSURE(creator != CreatorsByType.end(), "Unregistered type of transform actor: \"" << args.OutputDesc.GetTransform().GetType() << "\"");
+
+ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> actor = (creator->second)(std::move(args));
+ Y_VERIFY(actor.first);
+ Y_VERIFY(actor.second);
+ return actor;
+}
+
+void TDqOutputTransformFactory::Register(const TString& type, TCreatorFunction creator) {
+ auto [_, registered] = CreatorsByType.emplace(type, std::move(creator));
+ Y_VERIFY(registered);
+}
+
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h
index 196e6952df8..d82f5cdd317 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h
@@ -49,12 +49,17 @@ private:
};
template <class T>
-concept TCastsToSinkPair =
+concept TCastsToAsyncOutputPair =
std::is_convertible_v<T, std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*>>;
template <class T, class TProto>
concept TSinkCreatorFunc = requires(T f, TProto&& settings, IDqSinkFactory::TArguments&& args) {
- { f(std::move(settings), std::move(args)) } -> TCastsToSinkPair;
+ { f(std::move(settings), std::move(args)) } -> TCastsToAsyncOutputPair;
+};
+
+template <class T, class TProto>
+concept TOutputTransformCreatorFunc = requires(T f, TProto&& settings, IDqOutputTransformFactory::TArguments&& args) {
+ { f(std::move(settings), std::move(args)) } -> TCastsToAsyncOutputPair;
};
class TDqSinkFactory : public IDqSinkFactory {
@@ -84,4 +89,31 @@ private:
THashMap<TString, TCreatorFunction> CreatorsByType;
};
+class TDqOutputTransformFactory : public IDqOutputTransformFactory {
+public:
+ using TCreatorFunction = std::function<std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*>(TArguments&& args)>;
+
+ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqOutputTransform(TArguments&& args) override;
+
+ void Register(const TString& type, TCreatorFunction creator);
+
+ template <class TProtoMsg, TOutputTransformCreatorFunc<TProtoMsg> TCreatorFunc>
+ void Register(const TString& type, TCreatorFunc creator) {
+ Register(type,
+ [creator = std::move(creator), type](TArguments&& args)
+ {
+ const google::protobuf::Any& settingsAny = args.OutputDesc.GetTransform().GetSettings();
+ YQL_ENSURE(settingsAny.Is<TProtoMsg>(),
+ "Output transform \"" << type << "\" settings are expected to have protobuf type " << TProtoMsg::descriptor()->full_name()
+ << ", but got " << settingsAny.type_url());
+ TProtoMsg settings;
+ YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings of type \"" << type << "\"");
+ return creator(std::move(settings), std::move(args));
+ });
+ }
+
+private:
+ std::unordered_map<TString, TCreatorFunction> CreatorsByType;
+};
+
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h
index 74b4793e5a0..7af1d9a4103 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h
@@ -1,5 +1,7 @@
#pragma once
#include <ydb/library/yql/dq/common/dq_common.h>
+#include <ydb/library/yql/dq/runtime/dq_output_consumer.h>
+#include <ydb/library/yql/dq/runtime/dq_sink.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
@@ -18,13 +20,17 @@ namespace NActors {
class IActor;
} // namespace NActors
+namespace NKikimr::NMiniKQL {
+class TProgramBuilder;
+} // namespace NKikimr::NMiniKQL
+
namespace NYql::NDq {
-// Sink.
+// Sink/transform.
// Must be IActor.
//
// Protocol:
-// 1. CA starts sink.
+// 1. CA starts sink/transform.
// 2. CA runs program and gets results.
// 3. CA calls IDqComputeActorAsyncOutput::SendData().
// 4. If SendData() returns value less than 0, loop stops running until free space appears.
@@ -33,19 +39,19 @@ namespace NYql::NDq {
// Checkpointing:
// 1. InjectCheckpoint event arrives to CA.
// 2. CA saves its state and injects special checkpoint event to all outputs (TDqComputeActorCheckpoints::ICallbacks::InjectBarrierToOutputs()).
-// 3. Sink writes all data before checkpoint.
-// 4. Sink waits all external sink's acks for written data.
-// 5. Sink gathers its state and passes it into callback ICallbacks::OnSinkStateSaved(state, outputIndex).
+// 3. Sink/transform writes all data before checkpoint.
+// 4. Sink/transform waits all external sink's acks for written data.
+// 5. Sink/transform gathers its state and passes it into callback ICallbacks::OnAsyncOutputStateSaved(state, outputIndex).
// 6. Checkpoints actor builds state for all task node as sum of the state of CA and all its sinks and saves it.
// 7. ...
// 8. When checkpoint is written into database, checkpoints actor calls IDqComputeActorAsyncOutput::CommitState() to apply all side effects.
struct IDqComputeActorAsyncOutput {
struct ICallbacks { // Compute actor
virtual void ResumeExecution() = 0;
- virtual void OnSinkError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0;
+ virtual void OnAsyncOutputError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0;
// Checkpointing
- virtual void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) = 0;
+ virtual void OnAsyncOutputStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) = 0;
virtual ~ICallbacks() = default;
};
@@ -78,8 +84,8 @@ struct IDqSinkFactory : public TThrRefBase {
const NDqProto::TTaskOutput& OutputDesc;
ui64 OutputIndex;
TTxId TxId;
- const THashMap<TString, TString>& SecureParams;
IDqComputeActorAsyncOutput::ICallbacks* Callback;
+ const THashMap<TString, TString>& SecureParams;
const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv;
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
};
@@ -90,4 +96,23 @@ struct IDqSinkFactory : public TThrRefBase {
virtual std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSink(TArguments&& args) const = 0;
};
+struct IDqOutputTransformFactory : public TThrRefBase {
+public:
+ using TPtr = TIntrusivePtr<IDqOutputTransformFactory>;
+
+ struct TArguments {
+ const NDqProto::TTaskOutput& OutputDesc;
+ const ui64 OutputIndex;
+ TTxId TxId;
+ const IDqOutputConsumer::TPtr TransformOutput;
+ IDqComputeActorAsyncOutput::ICallbacks* Callback;
+ const THashMap<TString, TString>& SecureParams;
+ const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv;
+ const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
+ NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder;
+ };
+
+ virtual std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqOutputTransform(TArguments&& args) = 0;
+};
+
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
index f7e5ef62266..1c9f2334473 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
@@ -102,6 +102,13 @@ public:
// Sink support.
void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint);
+ void OnTransformStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) {
+ Y_UNUSED(state);
+ Y_UNUSED(outputIndex); // Note that we can have both sink and transform on one output index
+ Y_UNUSED(checkpoint);
+ Y_FAIL("Transform states are unimplemented");
+ }
+
void TryToSavePendingCheckpoint();
void AfterStateLoading(const TMaybe<TString>& error);
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 983922b3aa7..a2417952ca7 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -16,6 +16,7 @@
#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
#include <ydb/library/yql/core/issue/yql_issue.h>
#include <ydb/library/yql/minikql/comp_nodes/mkql_saveload.h>
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/library/yql/dq/actors/dq.h>
@@ -54,11 +55,38 @@ namespace NDq {
constexpr ui32 IssuesBufferSize = 16;
+struct TSinkCallbacks : public IDqComputeActorAsyncOutput::ICallbacks {
+ void OnAsyncOutputError(ui64 outputIndex, const TIssues& issues, bool isFatal) override final {
+ OnSinkError(outputIndex, issues, isFatal);
+ }
+
+ void OnAsyncOutputStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override final {
+ OnSinkStateSaved(std::move(state), outputIndex, checkpoint);
+ }
+
+ virtual void OnSinkError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0;
+ virtual void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) = 0;
+};
+
+struct TOutputTransformCallbacks : public IDqComputeActorAsyncOutput::ICallbacks {
+ void OnAsyncOutputError(ui64 outputIndex, const TIssues& issues, bool isFatal) override final {
+ OnTransformError(outputIndex, issues, isFatal);
+ }
+
+ void OnAsyncOutputStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override final {
+ OnTransformStateSaved(std::move(state), outputIndex, checkpoint);
+ }
+
+ virtual void OnTransformError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0;
+ virtual void OnTransformStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) = 0;
+};
+
template<typename TDerived>
class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
, public TDqComputeActorChannels::ICallbacks
, public TDqComputeActorCheckpoints::ICallbacks
- , public IDqComputeActorAsyncOutput::ICallbacks
+ , public TSinkCallbacks
+ , public TOutputTransformCallbacks
{
protected:
enum EEvWakeupTag : ui64 {
@@ -71,6 +99,7 @@ protected:
public:
void Bootstrap() {
try {
+ Cerr << "Start CA: " << Task.Utf8DebugString() << Endl;
CA_LOG_D("Start compute actor " << this->SelfId() << ", task: " << Task.GetId());
Channels = new TDqComputeActorChannels(this->SelfId(), TxId, Task, !RuntimeSettings.FailOnUndelivery,
@@ -114,7 +143,8 @@ public:
protected:
TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, bool ownMemoryQuota = true, bool passExceptions = false)
: ExecuterId(executerId)
, TxId(txId)
@@ -124,6 +154,8 @@ protected:
, CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn)
, SourceActorFactory(std::move(sourceActorFactory))
, SinkFactory(std::move(sinkFactory))
+ , OutputTransformFactory(std::move(transformFactory))
+ , FunctionRegistry(functionRegistry)
, CheckpointingMode(GetTaskCheckpointingMode(Task))
, State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING)
, MemoryQuota(ownMemoryQuota ? InitMemoryQuota() : nullptr)
@@ -137,7 +169,8 @@ protected:
}
TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, const NDqProto::TDqTask& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
: ExecuterId(executerId)
, TxId(txId)
@@ -147,6 +180,8 @@ protected:
, CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn)
, SourceActorFactory(std::move(sourceActorFactory))
, SinkFactory(std::move(sinkFactory))
+ , OutputTransformFactory(std::move(transformFactory))
+ , FunctionRegistry(functionRegistry)
, State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING)
, MemoryQuota(InitMemoryQuota())
, Running(!Task.GetCreateSuspended())
@@ -321,10 +356,12 @@ protected:
}
}
- for (auto& entry : SinksMap) {
- const ui64 outputIndex = entry.first;
- TSinkInfo& sinkInfo = entry.second;
- DrainSink(outputIndex, sinkInfo);
+ for (auto& [outputIndex, info] : OutputTransformsMap) {
+ DrainAsyncOutput(outputIndex, info);
+ }
+
+ for (auto& [outputIndex, info] : SinksMap) {
+ DrainAsyncOutput(outputIndex, info);
}
CheckRunStatus();
@@ -413,7 +450,13 @@ protected:
for (auto& [_, sink] : SinksMap) {
if (sink.Actor) {
- sink.Sink->PassAway();
+ sink.AsyncOutput->PassAway();
+ }
+ }
+
+ for (auto& [_, transform] : OutputTransformsMap) {
+ if (transform.Actor) {
+ transform.AsyncOutput->PassAway();
}
}
@@ -563,6 +606,11 @@ public:
Checkpoints->OnSinkStateSaved(std::move(state), outputIndex, checkpoint);
}
+ void OnTransformStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override {
+ Y_VERIFY(Checkpoints); // If we are checkpointing, we must have already constructed "checkpoints" object.
+ Checkpoints->OnTransformStateSaved(std::move(state), outputIndex, checkpoint);
+ }
+
protected:
bool ReadyToCheckpoint() const override {
for (auto& [id, channelInfo] : InputChannelsMap) {
@@ -607,10 +655,15 @@ protected:
void InjectBarrierToOutputs(const NDqProto::TCheckpoint& checkpoint) override {
Y_VERIFY(CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED);
for (const auto& [id, channelInfo] : OutputChannelsMap) {
- channelInfo.Channel->Push(NDqProto::TCheckpoint(checkpoint));
+ if (!channelInfo.IsTransformOutput) {
+ channelInfo.Channel->Push(NDqProto::TCheckpoint(checkpoint));
+ }
}
for (const auto& [outputIndex, sink] : SinksMap) {
- sink.SinkBuffer->Push(NDqProto::TCheckpoint(checkpoint));
+ sink.Buffer->Push(NDqProto::TCheckpoint(checkpoint));
+ }
+ for (const auto& [outputIndex, transform] : OutputTransformsMap) {
+ transform.Buffer->Push(NDqProto::TCheckpoint(checkpoint));
}
}
@@ -648,10 +701,10 @@ protected:
source->SourceActor->LoadState(sourceState);
}
for (const NDqProto::TSinkState& sinkState : state.GetSinks()) {
- TSinkInfo* sink = SinksMap.FindPtr(sinkState.GetOutputIndex());
- YQL_ENSURE(sink, "Failed to load state. Sink with input index " << sinkState.GetOutputIndex() << " was not found");
- YQL_ENSURE(sink->Sink, "Sink[" << sinkState.GetOutputIndex() << "] is not created");
- sink->Sink->LoadState(sinkState);
+ TAsyncOutputInfoBase* sink = SinksMap.FindPtr(sinkState.GetOutputIndex());
+ YQL_ENSURE(sink, "Failed to load state. Sink with output index " << sinkState.GetOutputIndex() << " was not found");
+ YQL_ENSURE(sink->AsyncOutput, "Sink[" << sinkState.GetOutputIndex() << "] is not created");
+ sink->AsyncOutput->LoadState(sinkState);
}
} catch (const std::exception& e) {
error = e.what();
@@ -731,6 +784,7 @@ protected:
bool HasPeer = false;
bool Finished = false; // != Channel->IsFinished() // If channel is in finished state, it sends only checkpoints.
bool PopStarted = false;
+ bool IsTransformOutput = false; // Is this channel output of a transform.
explicit TOutputChannelInfo(ui64 channelId)
: ChannelId(channelId)
@@ -743,16 +797,21 @@ protected:
THolder<TStats> Stats;
};
- struct TSinkInfo {
- IDqAsyncOutputBuffer::TPtr SinkBuffer;
- IDqComputeActorAsyncOutput* Sink = nullptr;
+ struct TAsyncOutputInfoBase {
+ IDqAsyncOutputBuffer::TPtr Buffer;
+ IDqComputeActorAsyncOutput* AsyncOutput = nullptr;
NActors::IActor* Actor = nullptr;
- bool Finished = false; // If sink is in finished state, it receives only checkpoints.
+ bool Finished = false; // If sink/transform is in finished state, it receives only checkpoints.
TIssuesBuffer IssuesBuffer;
bool PopStarted = false;
- i64 SinkFreeSpaceBeforeSend = 0;
+ i64 FreeSpaceBeforeSend = 0;
- TSinkInfo() : IssuesBuffer(IssuesBufferSize) {}
+ TAsyncOutputInfoBase() : IssuesBuffer(IssuesBufferSize) {}
+ };
+
+ struct TAsyncOutputTransformInfo : public TAsyncOutputInfoBase {
+ IDqOutputConsumer::TPtr OutputBuffer;
+ TMaybe<NKikimr::NMiniKQL::TProgramBuilder> ProgramBuilder;
};
protected:
@@ -1056,51 +1115,51 @@ private:
return 0;
}
- virtual void DrainSink(ui64 outputIndex, TSinkInfo& sinkInfo) {
- ProcessOutputsState.AllOutputsFinished &= sinkInfo.Finished;
- if (sinkInfo.Finished && !Checkpoints) {
+ virtual void DrainAsyncOutput(ui64 outputIndex, TAsyncOutputInfoBase& outputInfo) {
+ ProcessOutputsState.AllOutputsFinished &= outputInfo.Finished;
+ if (outputInfo.Finished && !Checkpoints) {
return;
}
- Y_VERIFY(sinkInfo.SinkBuffer);
- Y_VERIFY(sinkInfo.Sink);
- Y_VERIFY(sinkInfo.Actor);
+ Y_VERIFY(outputInfo.Buffer);
+ Y_VERIFY(outputInfo.AsyncOutput);
+ Y_VERIFY(outputInfo.Actor);
const ui32 allowedOvercommit = AllowedChannelsOvercommit();
- const i64 sinkFreeSpaceBeforeSend = sinkInfo.Sink->GetFreeSpace();
+ const i64 sinkFreeSpaceBeforeSend = outputInfo.AsyncOutput->GetFreeSpace();
i64 toSend = sinkFreeSpaceBeforeSend + allowedOvercommit;
- CA_LOG_D("About to drain sink " << outputIndex
+ CA_LOG_D("About to drain async output " << outputIndex
<< ". FreeSpace: " << sinkFreeSpaceBeforeSend
<< ", allowedOvercommit: " << allowedOvercommit
<< ", toSend: " << toSend
- << ", finished: " << sinkInfo.SinkBuffer->IsFinished());
+ << ", finished: " << outputInfo.Buffer->IsFinished());
i64 sent = 0;
- while (toSend > 0 && (!sinkInfo.Finished || Checkpoints)) {
- const ui32 sentChunk = SendSinkDataChunk(outputIndex, sinkInfo, toSend);
+ while (toSend > 0 && (!outputInfo.Finished || Checkpoints)) {
+ const ui32 sentChunk = SendDataChunkToAsyncOutput(outputIndex, outputInfo, toSend);
if (sentChunk == 0) {
break;
}
sent += sentChunk;
- toSend = sinkInfo.Sink->GetFreeSpace() + allowedOvercommit;
+ toSend = outputInfo.AsyncOutput->GetFreeSpace() + allowedOvercommit;
}
- CA_LOG_D("Drain sink " << outputIndex
- << ". Free space decreased: " << (sinkFreeSpaceBeforeSend - sinkInfo.Sink->GetFreeSpace())
+ CA_LOG_D("Drain async output " << outputIndex
+ << ". Free space decreased: " << (sinkFreeSpaceBeforeSend - outputInfo.AsyncOutput->GetFreeSpace())
<< ", sent data from buffer: " << sent);
- ProcessOutputsState.HasDataToSend |= !sinkInfo.Finished;
- ProcessOutputsState.DataWasSent |= sinkInfo.Finished || sent;
+ ProcessOutputsState.HasDataToSend |= !outputInfo.Finished;
+ ProcessOutputsState.DataWasSent |= outputInfo.Finished || sent;
}
- ui32 SendSinkDataChunk(ui64 outputIndex, TSinkInfo& sinkInfo, ui64 bytes) {
- auto sink = sinkInfo.SinkBuffer;
+ ui32 SendDataChunkToAsyncOutput(ui64 outputIndex, TAsyncOutputInfoBase& outputInfo, ui64 bytes) {
+ auto sink = outputInfo.Buffer;
NKikimr::NMiniKQL::TUnboxedValueVector dataBatch;
NDqProto::TCheckpoint checkpoint;
- const ui64 dataSize = !sinkInfo.Finished ? sink->Pop(dataBatch, bytes) : 0;
+ const ui64 dataSize = !outputInfo.Finished ? sink->Pop(dataBatch, bytes) : 0;
const bool hasCheckpoint = sink->Pop(checkpoint);
if (!dataSize && !hasCheckpoint) {
if (!sink->IsFinished()) {
@@ -1108,7 +1167,7 @@ private:
return 0; // sink is empty and not finished yet
}
}
- sinkInfo.Finished = sink->IsFinished();
+ outputInfo.Finished = sink->IsFinished();
YQL_ENSURE(!dataSize || !dataBatch.empty()); // dataSize != 0 => !dataBatch.empty() // even if we're about to send empty rows.
@@ -1121,7 +1180,7 @@ private:
ResumeInputs();
}
- sinkInfo.Sink->SendData(std::move(dataBatch), dataSize, maybeCheckpoint, sinkInfo.Finished);
+ outputInfo.AsyncOutput->SendData(std::move(dataBatch), dataSize, maybeCheckpoint, outputInfo.Finished);
CA_LOG_D("sink " << outputIndex << ": sent " << dataSize << " bytes of data and " << checkpointSize << " bytes of checkpoint barrier");
return dataSize + checkpointSize;
@@ -1213,19 +1272,43 @@ protected:
channel.Channel = TaskRunner->GetOutputChannel(channelId);
}
}
+ for (auto& [outputIndex, transform] : OutputTransformsMap) {
+ if (TaskRunner) {
+ transform.ProgramBuilder.ConstructInPlace(TaskRunner->GetTypeEnv(), *FunctionRegistry);
+ std::tie(transform.Buffer, transform.OutputBuffer) = TaskRunner->GetOutputTransform(outputIndex);
+ Y_VERIFY(OutputTransformFactory);
+ const auto& outputDesc = Task.GetOutputs(outputIndex);
+ const ui64 i = outputIndex; // Crutch for clang
+ CA_LOG_D("Create transform for output " << i << " " << outputDesc.ShortDebugString());
+ std::tie(transform.AsyncOutput, transform.Actor) = OutputTransformFactory->CreateDqOutputTransform(
+ IDqOutputTransformFactory::TArguments {
+ .OutputDesc = outputDesc,
+ .OutputIndex = outputIndex,
+ .TxId = TxId,
+ .TransformOutput = transform.OutputBuffer,
+ .Callback = static_cast<TOutputTransformCallbacks*>(this),
+ .SecureParams = secureParams,
+ .TypeEnv = typeEnv,
+ .HolderFactory = holderFactory,
+ .ProgramBuilder = *transform.ProgramBuilder
+
+ });
+ this->RegisterWithSameMailbox(transform.Actor);
+ }
+ }
for (auto& [outputIndex, sink] : SinksMap) {
- if (TaskRunner) { sink.SinkBuffer = TaskRunner->GetSink(outputIndex); }
+ if (TaskRunner) { sink.Buffer = TaskRunner->GetSink(outputIndex); }
Y_VERIFY(SinkFactory);
const auto& outputDesc = Task.GetOutputs(outputIndex);
const ui64 i = outputIndex; // Crutch for clang
CA_LOG_D("Create sink for output " << i << " " << outputDesc.ShortDebugString());
- std::tie(sink.Sink, sink.Actor) = SinkFactory->CreateDqSink(
+ std::tie(sink.AsyncOutput, sink.Actor) = SinkFactory->CreateDqSink(
IDqSinkFactory::TArguments {
.OutputDesc = outputDesc,
.OutputIndex = outputIndex,
.TxId = TxId,
+ .Callback = static_cast<TSinkCallbacks*>(this),
.SecureParams = secureParams,
- .Callback = this,
.TypeEnv = typeEnv,
.HolderFactory = holderFactory
});
@@ -1291,7 +1374,17 @@ protected:
return;
}
- CA_LOG_E("Sink[" << outputIndex << "] fatal error: " << issues.ToString());
+ CA_LOG_E("Sink[" << outputIndex << "] fatal error: " << issues.ToOneLineString());
+ InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, issues);
+ }
+
+ void OnTransformError(ui64 outputIndex, const TIssues& issues, bool isFatal) override {
+ if (!isFatal) {
+ OutputTransformsMap.at(outputIndex).IssuesBuffer.Push(issues);
+ return;
+ }
+
+ CA_LOG_E("OutputTransform[" << outputIndex << "] fatal error: " << issues.ToOneLineString());
InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, issues);
}
@@ -1323,13 +1416,19 @@ private:
Y_VERIFY(!outputDesc.HasSink() || outputDesc.ChannelsSize() == 0); // HasSink => no channels
Y_VERIFY(outputDesc.HasSink() || outputDesc.ChannelsSize() > 0);
+ if (outputDesc.HasTransform()) {
+ auto result = OutputTransformsMap.emplace(std::piecewise_construct, std::make_tuple(i), std::make_tuple());
+ YQL_ENSURE(result.second);
+ }
+
if (outputDesc.HasSink()) {
- auto result = SinksMap.emplace(i, TSinkInfo());
+ auto result = SinksMap.emplace(i, TAsyncOutputInfoBase());
YQL_ENSURE(result.second);
} else {
for (auto& channel : outputDesc.GetChannels()) {
TOutputChannelInfo outputChannel(channel.GetId());
outputChannel.HasPeer = channel.GetDstEndpoint().HasActorId();
+ outputChannel.IsTransformOutput = outputDesc.HasTransform();
if (Y_UNLIKELY(RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_PROFILE)) {
outputChannel.Stats = MakeHolder<typename TOutputChannelInfo::TStats>();
@@ -1350,9 +1449,9 @@ private:
return TaskRunner->GetStats();
}
- virtual const TDqAsyncOutputBufferStats* GetSinkStats(ui64 outputIdx, const TSinkInfo& sinkInfo) const {
+ virtual const TDqAsyncOutputBufferStats* GetSinkStats(ui64 outputIdx, const TAsyncOutputInfoBase& sinkInfo) const {
Y_UNUSED(outputIdx);
- return sinkInfo.SinkBuffer ? sinkInfo.SinkBuffer->GetStats() : nullptr;
+ return sinkInfo.Buffer ? sinkInfo.Buffer->GetStats() : nullptr;
}
public:
@@ -1480,6 +1579,8 @@ protected:
const bool CanAllocateExtraMemory = false;
const IDqSourceActorFactory::TPtr SourceActorFactory;
const IDqSinkFactory::TPtr SinkFactory;
+ const IDqOutputTransformFactory::TPtr OutputTransformFactory;
+ const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
const NDqProto::ECheckpointingMode CheckpointingMode;
TIntrusivePtr<IDqTaskRunner> TaskRunner;
TDqComputeActorChannels* Channels = nullptr;
@@ -1487,7 +1588,8 @@ protected:
THashMap<ui64, TInputChannelInfo> InputChannelsMap; // Channel id -> Channel info
THashMap<ui64, TSourceInfo> SourcesMap; // Input index -> Source info
THashMap<ui64, TOutputChannelInfo> OutputChannelsMap; // Channel id -> Channel info
- THashMap<ui64, TSinkInfo> SinksMap; // Output index -> Sink info
+ THashMap<ui64, TAsyncOutputInfoBase> SinksMap; // Output index -> Sink info
+ THashMap<ui64, TAsyncOutputTransformInfo> OutputTransformsMap; // Output index -> Transforms info
bool ResumeEventScheduled = false;
NDqProto::EComputeState State;
diff --git a/ydb/library/yql/dq/actors/dq_events_ids.h b/ydb/library/yql/dq/actors/dq_events_ids.h
index 1022c932019..2049edee75a 100644
--- a/ydb/library/yql/dq/actors/dq_events_ids.h
+++ b/ydb/library/yql/dq/actors/dq_events_ids.h
@@ -50,8 +50,6 @@ struct TDqComputeEvents {
EvNewCheckpointCoordinatorAck,
EvNewSourceDataArrived,
EvSourceError,
- EvTransformNewData,
- EvTransformCompleted,
// place all new events here
EvEnd
diff --git a/ydb/library/yql/dq/actors/transform/CMakeLists.txt b/ydb/library/yql/dq/actors/transform/CMakeLists.txt
deleted file mode 100644
index 4dc0fe6ead5..00000000000
--- a/ydb/library/yql/dq/actors/transform/CMakeLists.txt
+++ /dev/null
@@ -1,24 +0,0 @@
-
-# This file was gererated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(dq-actors-transform)
-target_compile_options(dq-actors-transform PRIVATE
- -DUSE_CURRENT_UDF_ABI_VERSION
-)
-target_link_libraries(dq-actors-transform PUBLIC
- contrib-libs-cxxsupp
- yutil
- cpp-actors-core
- yql-dq-runtime
- yql-dq-proto
-)
-target_sources(dq-actors-transform PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/transform/dq_transform_events.cpp
-)
diff --git a/ydb/library/yql/dq/actors/transform/dq_transform_actor.h b/ydb/library/yql/dq/actors/transform/dq_transform_actor.h
deleted file mode 100644
index 0f8e1a63948..00000000000
--- a/ydb/library/yql/dq/actors/transform/dq_transform_actor.h
+++ /dev/null
@@ -1,11 +0,0 @@
-#pragma once
-
-namespace NYql::NDq {
-
-class IDqTransformActor {
-public:
- virtual void DoTransform() = 0;
- virtual ~IDqTransformActor() = default;
-};
-
-} \ No newline at end of file
diff --git a/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.cpp b/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.cpp
deleted file mode 100644
index aae0e130094..00000000000
--- a/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.cpp
+++ /dev/null
@@ -1,28 +0,0 @@
-#include "dq_transform_actor_factory.h"
-
-#include <ydb/library/yql/utils/yql_panic.h>
-#include <google/protobuf/text_format.h>
-
-namespace NYql::NDq {
-
-TDqTransformActorFactory::TDqTransformActorFactory()
-{}
-
-std::pair<IDqTransformActor*, NActors::IActor*> TDqTransformActorFactory::CreateDqTransformActor(const NDqProto::TDqTransform& transform, TArguments&& args) {
- auto creator = CreatorsByType.find(transform.GetType());
- if (creator == CreatorsByType.end()) {
- YQL_ENSURE(false, "Unregistered type of transform actor: \"" << transform.GetType() << "\"");
- }
-
- std::pair<IDqTransformActor*, NActors::IActor*> actor = (creator->second)(transform, std::move(args));
- Y_VERIFY(actor.first);
- Y_VERIFY(actor.second);
- return actor;
-}
-
-void TDqTransformActorFactory::Register(TString type, TTransformCreator creator) {
- auto [_, registered] = CreatorsByType.emplace(type, std::move(creator));
- Y_VERIFY(registered);
-}
-
-}
diff --git a/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h b/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h
deleted file mode 100644
index d041d54447a..00000000000
--- a/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h
+++ /dev/null
@@ -1,41 +0,0 @@
-#pragma once
-
-#include "dq_transform_actor.h"
-
-#include <ydb/library/yql/dq/common/dq_common.h>
-#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
-#include <ydb/library/yql/dq/runtime/dq_output_consumer.h>
-#include <ydb/library/yql/dq/runtime/dq_output_channel.h>
-#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
-#include <ydb/library/yql/minikql/mkql_program_builder.h>
-#include <library/cpp/actors/core/actor.h>
-
-namespace NYql::NDq {
-
-class TDqTransformActorFactory : public TThrRefBase {
-public:
- using TPtr = TIntrusivePtr<TDqTransformActorFactory>;
-
- struct TArguments {
- const NActors::TActorId ComputeActorId;
- const IDqOutputChannel::TPtr TransformInput;
- const IDqOutputConsumer::TPtr TransformOutput;
- const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
- const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv;
- NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder;
-
- const THashMap<TString, TString>& SecureParams;
- };
-
- TDqTransformActorFactory();
-
- using TTransformCreator = std::function<std::pair<IDqTransformActor*, NActors::IActor*>(
- const NDqProto::TDqTransform& transform, TArguments&& args)>;
-
- std::pair<IDqTransformActor*, NActors::IActor*> CreateDqTransformActor(const NDqProto::TDqTransform& transform, TArguments&& args);
- void Register(TString type, TTransformCreator creator);
-
-private:
- std::unordered_map<TString, TTransformCreator> CreatorsByType;
-};
-}
diff --git a/ydb/library/yql/dq/actors/transform/dq_transform_events.cpp b/ydb/library/yql/dq/actors/transform/dq_transform_events.cpp
deleted file mode 100644
index b48f009e34a..00000000000
--- a/ydb/library/yql/dq/actors/transform/dq_transform_events.cpp
+++ /dev/null
@@ -1 +0,0 @@
-#include "dq_transform_events.h" \ No newline at end of file
diff --git a/ydb/library/yql/dq/actors/transform/dq_transform_events.h b/ydb/library/yql/dq/actors/transform/dq_transform_events.h
deleted file mode 100644
index a41c79480e6..00000000000
--- a/ydb/library/yql/dq/actors/transform/dq_transform_events.h
+++ /dev/null
@@ -1,23 +0,0 @@
-#pragma once
-
-#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
-#include <ydb/library/yql/dq/actors/dq_events_ids.h>
-
-#include <library/cpp/actors/core/events.h>
-#include <library/cpp/actors/core/event_local.h>
-
-namespace NYql::NDq {
-using namespace NActors;
-
-namespace NTransformActor {
- struct TEvTransformNewData : public TEventLocal<TEvTransformNewData, TDqComputeEvents::EvTransformNewData> {
- TEvTransformNewData() {}
- };
-
- struct TEvTransformCompleted : public TEventLocal<TEvTransformCompleted, TDqComputeEvents::EvTransformCompleted> {
- TEvTransformCompleted() {}
- };
-
-} // namespace NTransformActor
-
-} \ No newline at end of file
diff --git a/ydb/library/yql/dq/runtime/CMakeLists.txt b/ydb/library/yql/dq/runtime/CMakeLists.txt
index 0b9fdf10956..73d906dbf6b 100644
--- a/ydb/library/yql/dq/runtime/CMakeLists.txt
+++ b/ydb/library/yql/dq/runtime/CMakeLists.txt
@@ -24,6 +24,7 @@ target_link_libraries(yql-dq-runtime PUBLIC
yql-dq-common
yql-dq-expr_nodes
yql-dq-type_ann
+ common-schema-mkql
tools-enum_parser-enum_serialization_runtime
)
target_sources(yql-dq-runtime PRIVATE
diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
index db7a683c014..4b612b4de46 100644
--- a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
@@ -1,5 +1,6 @@
#include "dq_output_consumer.h"
+#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/public/udf/udf_value.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_node.h>
@@ -38,6 +39,12 @@ public:
Consumers[index]->Consume(std::move(variantItem));
}
+ void Consume(NDqProto::TCheckpoint&& checkpoint) override {
+ for (auto& consumer : Consumers) {
+ consumer->Consume(NDqProto::TCheckpoint(checkpoint));
+ }
+ }
+
void Finish() override {
for (auto& consumer : Consumers) {
consumer->Finish();
@@ -61,6 +68,10 @@ public:
Output->Push(std::move(value));
}
+ void Consume(NDqProto::TCheckpoint&& checkpoint) override {
+ Output->Push(std::move(checkpoint));
+ }
+
void Finish() override {
Output->Finish();
}
@@ -105,6 +116,12 @@ public:
Outputs[partitionIndex]->Push(std::move(value));
}
+ void Consume(NDqProto::TCheckpoint&& checkpoint) override {
+ for (auto& output : Outputs) {
+ output->Push(NDqProto::TCheckpoint(checkpoint));
+ }
+ }
+
void Finish() final {
for (auto& output : Outputs) {
output->Finish();
@@ -168,6 +185,12 @@ public:
}
}
+ void Consume(NDqProto::TCheckpoint&& checkpoint) override {
+ for (auto& output : Outputs) {
+ output->Push(NDqProto::TCheckpoint(checkpoint));
+ }
+ }
+
void Finish() override {
for (auto& output : Outputs) {
output->Finish();
diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.h b/ydb/library/yql/dq/runtime/dq_output_consumer.h
index 80e69ae065e..1438c15c23b 100644
--- a/ydb/library/yql/dq/runtime/dq_output_consumer.h
+++ b/ydb/library/yql/dq/runtime/dq_output_consumer.h
@@ -19,6 +19,7 @@ public:
virtual bool IsFull() const = 0;
virtual void Consume(NKikimr::NUdf::TUnboxedValue&& value) = 0;
+ virtual void Consume(NDqProto::TCheckpoint&& checkpoint) = 0;
virtual void Finish() = 0;
};
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index ab8a5eb190c..37525bc54b0 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -16,6 +16,7 @@
#include <ydb/library/yql/minikql/mkql_node_serialization.h>
#include <ydb/library/yql/minikql/mkql_node_visitor.h>
#include <ydb/library/yql/minikql/mkql_program_builder.h>
+#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h>
#include <util/generic/scope.h>
@@ -439,24 +440,40 @@ public:
TVector<IDqOutputConsumer::TPtr> outputConsumers(task.OutputsSize());
for (ui32 i = 0; i < task.OutputsSize(); ++i) {
- auto& outputDesc = task.GetOutputs(i);
+ const auto& outputDesc = task.GetOutputs(i);
if (outputDesc.GetTypeCase() == NDqProto::TTaskOutput::kEffects) {
TaskHasEffects = true;
}
+ TVector<IDqOutput::TPtr> outputs{Reserve(std::max<ui64>(outputDesc.ChannelsSize(), 1))};
+ TTransformInfo* transform = nullptr;
+ TType** taskOutputType = &ProgramParsed.OutputItemTypes[i];
if (outputDesc.HasTransform()) {
- auto transform = outputDesc.GetTransform();
- auto outputType = NCommon::ParseTypeFromYson(TStringBuf{transform.GetOutputType()}, *pb, Cerr);
- auto inputType = NCommon::ParseTypeFromYson(TStringBuf{transform.GetInputType()}, *pb, Cerr);
+ const auto& transformDesc = outputDesc.GetTransform();
+ transform = &Transforms[i];
+ Y_VERIFY(!transform->TransformInput);
+ Y_VERIFY(!transform->TransformOutput);
+
+ TStringBuilder err;
+ transform->TransformOutputType = NCommon::ParseTypeFromYson(TStringBuf{transformDesc.GetOutputType()}, *pb, err.Out);
+ YQL_ENSURE(transform->TransformOutputType, "Can't parse transform output type: " << err);
+
+ err.clear();
+ TType* inputType = NCommon::ParseTypeFromYson(TStringBuf{transformDesc.GetInputType()}, *pb, err.Out);
+ YQL_ENSURE(inputType, "Can't parse transform input type: " << err);
+ YQL_ENSURE(inputType->IsSameType(*ProgramParsed.OutputItemTypes[i]));
LOG(TStringBuilder() << "Task: " << TaskId << " has transform by "
- << transform.GetType() << " with input type: " << *inputType
- << " , output type: " << *outputType);
- }
+ << transformDesc.GetType() << " with input type: " << *inputType
+ << " , output type: " << *transform->TransformOutputType);
- TVector<IDqOutput::TPtr> outputs{Reserve(std::max<ui64>(outputDesc.ChannelsSize(), 1))};
+ transform->TransformInput = CreateDqAsyncOutputBuffer(i, ProgramParsed.OutputItemTypes[i], memoryLimits.ChannelBufferSize,
+ Settings.CollectProfileStats);
+
+ taskOutputType = &transform->TransformOutputType;
+ }
if (outputDesc.HasSink()) {
- auto sink = CreateDqAsyncOutputBuffer(i, ProgramParsed.OutputItemTypes[i], memoryLimits.ChannelBufferSize,
+ auto sink = CreateDqAsyncOutputBuffer(i, *taskOutputType, memoryLimits.ChannelBufferSize,
Settings.CollectProfileStats);
auto [_, inserted] = Sinks.emplace(i, sink);
Y_VERIFY(inserted);
@@ -476,7 +493,7 @@ public:
settings.ChannelStorage = execCtx.CreateChannelStorage(channelId);
}
- auto outputChannel = CreateDqOutputChannel(channelId, ProgramParsed.OutputItemTypes[i], typeEnv,
+ auto outputChannel = CreateDqOutputChannel(channelId, *taskOutputType, typeEnv,
holderFactory, settings, LogFunc);
auto ret = OutputChannels.emplace(channelId, outputChannel);
@@ -485,6 +502,14 @@ public:
}
}
+ if (transform) {
+ transform->TransformOutput = execCtx.CreateOutputConsumer(outputDesc, transform->TransformOutputType,
+ Context.ApplyCtx, typeEnv, std::move(outputs));
+
+ outputs.clear();
+ outputs.emplace_back(transform->TransformInput);
+ }
+
outputConsumers[i] = execCtx.CreateOutputConsumer(outputDesc, ProgramParsed.OutputItemTypes[i],
Context.ApplyCtx, typeEnv, std::move(outputs));
}
@@ -597,6 +622,12 @@ public:
return *ptr;
}
+ std::pair<IDqAsyncOutputBuffer::TPtr, IDqOutputConsumer::TPtr> GetOutputTransform(ui64 outputIndex) override {
+ auto ptr = Transforms.FindPtr(outputIndex);
+ YQL_ENSURE(ptr, "task: " << TaskId << " does not have output index: " << outputIndex << " or such transform");
+ return {ptr->TransformInput, ptr->TransformOutput};
+ }
+
TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit) override {
auto guard = Context.TypeEnv ? Context.TypeEnv->BindAllocator() : SelfTypeEnv->BindAllocator();
if (memoryLimit) {
@@ -711,6 +742,12 @@ private:
std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> SelfAlloc; // if not set -> use Context.Alloc
std::unique_ptr<NKikimr::NMiniKQL::TTypeEnvironment> SelfTypeEnv; // if not set -> use Context.TypeEnv
+ struct TTransformInfo {
+ IDqAsyncOutputBuffer::TPtr TransformInput;
+ IDqOutputConsumer::TPtr TransformOutput;
+ TType* TransformOutputType = nullptr;
+ };
+
struct TProgramParsed {
TRuntimeNode ProgramNode;
TVector<TType*> InputItemTypes;
@@ -727,6 +764,7 @@ private:
THashMap<ui64, IDqSource::TPtr> Sources; // Input index -> Source
THashMap<ui64, IDqOutputChannel::TPtr> OutputChannels; // Channel id -> Channel
THashMap<ui64, IDqAsyncOutputBuffer::TPtr> Sinks; // Output index -> Sink
+ THashMap<ui64, TTransformInfo> Transforms; // Output index -> Transform
IDqOutputConsumer::TPtr Output;
NUdf::TUnboxedValue ResultStream;
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index 7db08493785..aa9e9b8a96e 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -301,6 +301,7 @@ public:
virtual IDqSource::TPtr GetSource(ui64 inputIndex) = 0;
virtual IDqOutputChannel::TPtr GetOutputChannel(ui64 channelId) = 0;
virtual IDqAsyncOutputBuffer::TPtr GetSink(ui64 outputIndex) = 0;
+ virtual std::pair<IDqAsyncOutputBuffer::TPtr, IDqOutputConsumer::TPtr> GetOutputTransform(ui64 outputIndex) = 0;
// if memoryLimit = Nothing() then don't set memory limit, use existing one (if any)
// if memoryLimit = 0 then set unlimited
diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
index 072aa7af624..75d76dd52e7 100644
--- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
+++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
@@ -188,14 +188,6 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) {
}
}
- if (!transforms.empty() && !sinks.empty()
- && transforms.size() != sinks.size()) {
-
- ctx.AddError(TIssue(ctx.GetPosition(stage->Pos()), TStringBuilder()
- << "Not every transform has a corresponding sink"));
- return TStatus::Error;
- }
-
if (!sinks.empty()) {
for (auto sink : sinks) {
sink->SetTypeAnn(resultType);
@@ -1016,4 +1008,4 @@ TString PrintDqStageOnly(const TDqStageBase& stage, TExprContext& ctx) {
return NCommon::ExprToPrettyString(ctx, *newStage);
}
-} // namespace NYql::NDq \ No newline at end of file
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
index 2d436741f65..cc3eff29377 100644
--- a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
@@ -32,6 +32,17 @@ public:
~THTTPMockGateway() {
}
+ static TString PrintKey(const TKeyType& key) {
+ TStringBuilder ret;
+ ret << "{ Url: \"" << std::get<0>(key) << "\"";
+ ret << " Headers: [";
+ for (const TString& header : std::get<1>(key)) {
+ ret << " \"" << header << "\"";
+ }
+ ret << " ] Data: \"" << std::get<2>(key) << "\" }";
+ return std::move(ret);
+ }
+
void Download(
TString url,
IHTTPGateway::THeaders headers,
@@ -52,7 +63,7 @@ public:
} else if (DefaultResponse) {
callback(DefaultResponse(url, headers, data));
} else {
- YQL_ENSURE(false, "There isn't any response callback at url " + url);
+ YQL_ENSURE(false, "There isn't any response callback for " + PrintKey(key));
}
}
diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp
index 5eb9ede8c77..22b93b13ced 100644
--- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp
+++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp
@@ -1,5 +1,7 @@
#include "dq_fake_ca.h"
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
#include <ydb/library/yql/minikql/mkql_string_util.h>
#include <ydb/core/testlib/basics/appdata.h>
@@ -20,14 +22,18 @@ NYql::NDqProto::TCheckpoint CreateCheckpoint(ui64 id) {
return checkpoint;
}
-TFakeActor::TFakeActor(TSourcePromises& sourcePromises, TSinkPromises& sinkPromises)
+TFakeActor::TFakeActor(TSourcePromises& sourcePromises, TAsyncOutputPromises& asyncOutputPromises)
: TActor<TFakeActor>(&TFakeActor::StateFunc)
, MemoryInfo("test")
, HolderFactory(Alloc.Ref(), MemoryInfo)
+ , TypeEnv(Alloc)
+ , FunctionRegistry(NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry()))
+ , ProgramBuilder(TypeEnv, *FunctionRegistry)
+ , ValueBuilder(HolderFactory)
, SourceEvents(*this)
- , SinkCallbacks(*this)
+ , AsyncOutputCallbacks(*this)
, SourcePromises(sourcePromises)
- , SinkPromises(sinkPromises)
+ , AsyncOutputPromises(asyncOutputPromises)
{
Alloc.Release();
}
@@ -36,10 +42,10 @@ TFakeActor::~TFakeActor() {
Alloc.Acquire();
}
-void TFakeActor::InitSink(IDqComputeActorAsyncOutput* dqSink, IActor* dqSinkAsActor) {
- DqSinkActorId = RegisterWithSameMailbox(dqSinkAsActor),
- DqSink = dqSink;
- DqSinkAsActor = dqSinkAsActor;
+void TFakeActor::InitAsyncOutput(IDqComputeActorAsyncOutput* dqAsyncOutput, IActor* dqAsyncOutputAsActor) {
+ DqAsyncOutputActorId = RegisterWithSameMailbox(dqAsyncOutputAsActor),
+ DqAsyncOutput = dqAsyncOutput;
+ DqAsyncOutputAsActor = dqAsyncOutputAsActor;
}
void TFakeActor::InitSource(IDqSourceActor* dqSource, IActor* dqSourceAsActor) {
@@ -57,17 +63,17 @@ void TFakeActor::Terminate() {
DqSourceActorAsActor = nullptr;
}
- if (DqSinkActorId) {
- DqSink->PassAway();
+ if (DqAsyncOutputActorId) {
+ DqAsyncOutput->PassAway();
- DqSinkActorId = std::nullopt;
- DqSink = nullptr;
- DqSinkAsActor = nullptr;
+ DqAsyncOutputActorId = std::nullopt;
+ DqAsyncOutput = nullptr;
+ DqAsyncOutputAsActor = nullptr;
}
}
-TFakeActor::TSinkCallbacks& TFakeActor::GetSinkCallbacks() {
- return SinkCallbacks;
+TFakeActor::TAsyncOutputCallbacks& TFakeActor::GetAsyncOutputCallbacks() {
+ return AsyncOutputCallbacks;
}
NKikimr::NMiniKQL::THolderFactory& TFakeActor::GetHolderFactory() {
@@ -81,7 +87,7 @@ TFakeCASetup::TFakeCASetup()
Runtime->AddLocalService(
FakeActorId,
NActors::TActorSetupCmd(
- new TFakeActor(SourcePromises, SinkPromises),
+ new TFakeActor(SourcePromises, AsyncOutputPromises),
NActors::TMailboxType::Simple,
0));
@@ -95,11 +101,11 @@ TFakeCASetup::~TFakeCASetup() {
});
}
-void TFakeCASetup::SinkWrite(const TWriteValueProducer valueProducer, TMaybe<NDqProto::TCheckpoint> checkpoint) {
- Execute([&valueProducer, checkpoint](TFakeActor& actor) {
+void TFakeCASetup::AsyncOutputWrite(const TWriteValueProducer valueProducer, TMaybe<NDqProto::TCheckpoint> checkpoint, bool finish) {
+ Execute([&valueProducer, checkpoint, finish](TFakeActor& actor) {
auto batch = valueProducer(actor.GetHolderFactory());
- Y_ASSERT(actor.DqSink);
- actor.DqSink->SendData(std::move(batch), 0, checkpoint, false);
+ Y_ASSERT(actor.DqAsyncOutput);
+ actor.DqAsyncOutput->SendData(std::move(batch), 0, checkpoint, finish);
});
}
@@ -119,8 +125,8 @@ void TFakeCASetup::LoadSource(const NDqProto::TSourceState& state) {
void TFakeCASetup::LoadSink(const NDqProto::TSinkState& state) {
Execute([&state](TFakeActor& actor) {
- Y_ASSERT(actor.DqSink);
- actor.DqSink->LoadState(state);
+ Y_ASSERT(actor.DqAsyncOutput);
+ actor.DqAsyncOutput->LoadState(state);
});
}
diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
index 1d24d3ad1e0..2a7aa44f5f0 100644
--- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
+++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
@@ -5,7 +5,9 @@
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h>
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h>
+#include <ydb/library/yql/minikql/computation/mkql_value_builder.h>
#include <ydb/library/yql/minikql/mkql_alloc.h>
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
#include <ydb/core/testlib/basics/runtime.h>
@@ -59,7 +61,7 @@ struct TSourcePromises {
NThreading::TPromise<TIssues> FatalError = NThreading::NewPromise<TIssues>();
};
-struct TSinkPromises {
+struct TAsyncOutputPromises {
NThreading::TPromise<void> ResumeExecution = NThreading::NewPromise();
NThreading::TPromise<TIssues> Issue = NThreading::NewPromise<TIssues>();
NThreading::TPromise<NDqProto::TSinkState> StateSaved = NThreading::NewPromise<NDqProto::TSinkState>();
@@ -85,43 +87,43 @@ class TFakeActor : public NActors::TActor<TFakeActor> {
TFakeActor& Parent;
};
- struct TSinkCallbacks : public IDqComputeActorAsyncOutput::ICallbacks {
- explicit TSinkCallbacks(TFakeActor& parent) : Parent(parent) {}
+ struct TAsyncOutputCallbacks : public IDqComputeActorAsyncOutput::ICallbacks {
+ explicit TAsyncOutputCallbacks(TFakeActor& parent) : Parent(parent) {}
void ResumeExecution() override {
- Parent.SinkPromises.ResumeExecution.SetValue();
- Parent.SinkPromises.ResumeExecution = NThreading::NewPromise();
+ Parent.AsyncOutputPromises.ResumeExecution.SetValue();
+ Parent.AsyncOutputPromises.ResumeExecution = NThreading::NewPromise();
};
- void OnSinkError(ui64, const TIssues& issues, bool isFatal) override {
+ void OnAsyncOutputError(ui64, const TIssues& issues, bool isFatal) override {
Y_UNUSED(isFatal);
- Parent.SinkPromises.Issue.SetValue(issues);
- Parent.SinkPromises.Issue = NThreading::NewPromise<TIssues>();
+ Parent.AsyncOutputPromises.Issue.SetValue(issues);
+ Parent.AsyncOutputPromises.Issue = NThreading::NewPromise<TIssues>();
};
- void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint&) override {
+ void OnAsyncOutputStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint&) override {
Y_UNUSED(outputIndex);
- Parent.SinkPromises.StateSaved.SetValue(state);
- Parent.SinkPromises.StateSaved = NThreading::NewPromise<NDqProto::TSinkState>();
+ Parent.AsyncOutputPromises.StateSaved.SetValue(state);
+ Parent.AsyncOutputPromises.StateSaved = NThreading::NewPromise<NDqProto::TSinkState>();
};
TFakeActor& Parent;
};
public:
- TFakeActor(TSourcePromises& sourcePromises, TSinkPromises& sinkPromises);
+ TFakeActor(TSourcePromises& sourcePromises, TAsyncOutputPromises& asyncOutputPromises);
~TFakeActor();
- void InitSink(IDqComputeActorAsyncOutput* dqSink, IActor* dqSinkAsActor);
+ void InitAsyncOutput(IDqComputeActorAsyncOutput* dqAsyncOutput, IActor* dqAsyncOutputAsActor);
void InitSource(IDqSourceActor* dqSource, IActor* dqSourceAsActor);
void Terminate();
- TSinkCallbacks& GetSinkCallbacks();
+ TAsyncOutputCallbacks& GetAsyncOutputCallbacks();
NKikimr::NMiniKQL::THolderFactory& GetHolderFactory();
public:
IDqSourceActor* DqSourceActor = nullptr;
- IDqComputeActorAsyncOutput* DqSink = nullptr;
+ IDqComputeActorAsyncOutput* DqAsyncOutput = nullptr;
private:
STRICT_STFUNC(StateFunc,
@@ -147,22 +149,29 @@ private:
void Handle(const IDqSourceActor::TEvSourceError::TPtr& ev) {
SourceEvents.OnSourceError(ev->Get()->InputIndex, ev->Get()->Issues, ev->Get()->IsFatal);
}
-private:
+
+public:
NKikimr::NMiniKQL::TScopedAlloc Alloc;
NKikimr::NMiniKQL::TMemoryUsageInfo MemoryInfo;
NKikimr::NMiniKQL::THolderFactory HolderFactory;
+ NKikimr::NMiniKQL::TTypeEnvironment TypeEnv;
+ TIntrusivePtr<NKikimr::NMiniKQL::IFunctionRegistry> FunctionRegistry;
+ NKikimr::NMiniKQL::TProgramBuilder ProgramBuilder;
+ NKikimr::NMiniKQL::TDefaultValueBuilder ValueBuilder;
+
+private:
std::optional<NActors::TActorId> DqSourceActorId;
IActor* DqSourceActorAsActor = nullptr;
- std::optional<NActors::TActorId> DqSinkActorId;
- IActor* DqSinkAsActor = nullptr;
+ std::optional<NActors::TActorId> DqAsyncOutputActorId;
+ IActor* DqAsyncOutputAsActor = nullptr;
TSourceEvents SourceEvents;
- TSinkCallbacks SinkCallbacks;
+ TAsyncOutputCallbacks AsyncOutputCallbacks;
TSourcePromises& SourcePromises;
- TSinkPromises& SinkPromises;
+ TAsyncOutputPromises& AsyncOutputPromises;
};
struct TFakeCASetup {
@@ -212,7 +221,7 @@ struct TFakeCASetup {
return result;
}
- void SinkWrite(const TWriteValueProducer valueProducer, TMaybe<NDqProto::TCheckpoint> checkpoint = Nothing());
+ void AsyncOutputWrite(const TWriteValueProducer valueProducer, TMaybe<NDqProto::TCheckpoint> checkpoint = Nothing(), bool finish = false);
void SaveSourceState(NDqProto::TCheckpoint checkpoint, NDqProto::TSourceState& state);
@@ -225,7 +234,7 @@ public:
TRuntimePtr Runtime;
NActors::TActorId FakeActorId;
TSourcePromises SourcePromises;
- TSinkPromises SinkPromises;
+ TAsyncOutputPromises AsyncOutputPromises;
};
} // namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
index 55faca97130..8ccb7ccd92c 100644
--- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
@@ -65,8 +65,10 @@ IActor* CreateComputeActor(
executerId,
operationId,
std::move(task),
- std::move(options.SourceActorFactory),
- std::move(options.SinkFactory),
+ options.SourceActorFactory,
+ options.SinkFactory,
+ options.TransformFactory,
+ options.FunctionRegistry,
computeRuntimeSettings,
memoryLimits,
taskRunnerFactory);
@@ -75,8 +77,10 @@ IActor* CreateComputeActor(
executerId,
operationId,
std::move(task),
- std::move(options.SourceActorFactory),
- std::move(options.SinkFactory),
+ options.SourceActorFactory,
+ options.SinkFactory,
+ options.TransformFactory,
+ options.FunctionRegistry,
computeRuntimeSettings,
memoryLimits,
taskRunnerActorFactory);
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index e0fd163afcd..49860f50e44 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -311,8 +311,8 @@ private:
.OutputDesc = output,
.OutputIndex = static_cast<ui64>(outputId),
.TxId = TraceId,
- .SecureParams = secureParams,
.Callback = this,
+ .SecureParams = secureParams,
.TypeEnv = typeEnv,
.HolderFactory = holderFactory
});
@@ -696,12 +696,12 @@ private:
Send(SelfId(), new TEvContinueRun());
}
- void OnSinkError(ui64 outputIndex, const TIssues& issues, bool isFatal) override {
+ void OnAsyncOutputError(ui64 outputIndex, const TIssues& issues, bool isFatal) override {
Y_UNUSED(outputIndex);
SendFailure(MakeHolder<TEvDqFailure>(isFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, issues.ToString()));
}
- void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override {
+ void OnAsyncOutputStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override {
Y_UNUSED(state);
Y_UNUSED(outputIndex);
Y_UNUSED(checkpoint);
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
index 99885567993..66f0cd150a2 100644
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
@@ -7,13 +7,7 @@
#include <ydb/library/yql/providers/dq/service/service_node.h>
#include <ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h>
-#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
-#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
-#include <ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.h>
-#include <ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h>
-#include <ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
#include <ydb/library/yql/utils/range_walker.h>
#include <ydb/library/yql/utils/bind_in_range.h>
@@ -24,28 +18,11 @@ namespace NYql {
using namespace NActors;
using NDqs::MakeWorkerManagerActorID;
-namespace {
- // TODO: Use the only driver for both sources.
- NDq::IDqSourceActorFactory::TPtr CreateSourceActorFactory(const NYdb::TDriver& driver, IHTTPGateway::TPtr httpGateway) {
- auto factory = MakeIntrusive<NYql::NDq::TDqSourceFactory>();
- RegisterDqPqReadActorFactory(*factory, driver, nullptr);
- RegisterYdbReadActorFactory(*factory, driver, nullptr);
- RegisterS3ReadActorFactory(*factory, nullptr, httpGateway);
- RegisterClickHouseReadActorFactory(*factory, nullptr, httpGateway);
- return factory;
- }
-
- NDq::IDqSinkFactory::TPtr CreateSinkFactory(const NYdb::TDriver& driver) {
- auto factory = MakeIntrusive<NYql::NDq::TDqSinkFactory>();
- RegisterDqPqWriteActorFactory(*factory, driver, nullptr);
- return factory;
- }
-}
-
class TLocalServiceHolder {
public:
- TLocalServiceHolder(NYdb::TDriver driver, IHTTPGateway::TPtr httpGateway, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
- TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort)
+ TLocalServiceHolder(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
+ TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort,
+ NDq::IDqSourceActorFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory)
{
ui32 nodeId = 1;
@@ -71,8 +48,10 @@ public:
NDqs::TLocalWorkerManagerOptions lwmOptions;
lwmOptions.Factory = NTaskRunnerProxy::CreateFactory(functionRegistry, compFactory, taskTransformFactory, true);
- lwmOptions.SourceActorFactory = CreateSourceActorFactory(driver, std::move(httpGateway));
- lwmOptions.SinkFactory = CreateSinkFactory(driver);
+ lwmOptions.SourceActorFactory = std::move(sourceFactory);
+ lwmOptions.SinkFactory = std::move(sinkFactory);
+ lwmOptions.TransformFactory = std::move(transformFactory);
+ lwmOptions.FunctionRegistry = functionRegistry;
lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory();
lwmOptions.TaskRunnerActorFactory = NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
[=](const NDqProto::TDqTask& task, const NDq::TLogFunc& )
@@ -130,18 +109,19 @@ private:
IDqGateway::TPtr Gateway;
};
-THolder<TLocalServiceHolder> CreateLocalServiceHolder(NYdb::TDriver driver, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
- TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, IHTTPGateway::TPtr gateway,
- NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort)
+ TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
+ NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort,
+ NDq::IDqSourceActorFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory)
{
- return MakeHolder<TLocalServiceHolder>(driver, std::move(gateway), functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort);
+ return MakeHolder<TLocalServiceHolder>(functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort, std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory));
}
-TIntrusivePtr<IDqGateway> CreateLocalDqGateway(NYdb::TDriver driver, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
- IHTTPGateway::TPtr gateway)
+ NDq::IDqSourceActorFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory)
{
int startPort = 31337;
TRangeWalker<int> portWalker(startPort, startPort+100);
@@ -149,7 +129,7 @@ TIntrusivePtr<IDqGateway> CreateLocalDqGateway(NYdb::TDriver driver, const NKiki
auto grpcPort = BindInRange(portWalker)[1];
return new TDqGatewayLocal(
- CreateLocalServiceHolder(driver, functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, std::move(gateway), interconnectPort, grpcPort),
+ CreateLocalServiceHolder(functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort, std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory)),
CreateDqGateway(std::get<0>(NDqs::GetLocalAddress()), grpcPort.Addr.GetPort(), 8));
}
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h
index b248e7c8330..263e9573491 100644
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h
@@ -2,11 +2,13 @@
#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
namespace NYql {
-TIntrusivePtr<IDqGateway> CreateLocalDqGateway(NYdb::TDriver driver, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
- TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, IHTTPGateway::TPtr gateway = {});
+ TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
+ NDq::IDqSourceActorFactory::TPtr = nullptr, NDq::IDqSinkFactory::TPtr = nullptr, NDq::IDqOutputTransformFactory::TPtr = nullptr);
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
index 9f52f3a2ea6..f672c9ded64 100644
--- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp
+++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
@@ -205,7 +205,7 @@ namespace NYql::NDqs {
transform->Type = outputTransform.Type;
transform->InputType = outputTransform.InputType;
transform->OutputType = outputTransform.OutputType;
- //transform->Settings = outputTransform.Settings;
+ transform->Settings = outputTransform.Settings;
}
}
}
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index 951011ce454..1d360eb6e3d 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -1501,6 +1501,10 @@ public:
return sink;
}
+ std::pair<IDqAsyncOutputBuffer::TPtr, IDqOutputConsumer::TPtr> GetOutputTransform(ui64 /*outputIndex*/) override {
+ return {};
+ }
+
const NKikimr::NMiniKQL::TTypeEnvironment& GetTypeEnv() const override {
return Delegate->GetTypeEnv();
}
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
index 929f5af4ced..e5ef846be46 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
@@ -2,7 +2,7 @@
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_sources.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h>
-
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
#include <ydb/library/yql/providers/dq/worker_manager/interface/events.h>
#include <ydb/library/yql/providers/dq/worker_manager/interface/counters.h>
@@ -24,6 +24,8 @@ namespace NYql::NDqs {
NTaskRunnerProxy::IProxyFactory::TPtr Factory;
NDq::IDqSourceActorFactory::TPtr SourceActorFactory;
NDq::IDqSinkFactory::TPtr SinkFactory;
+ NDq::IDqOutputTransformFactory::TPtr TransformFactory;
+ const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
TWorkerRuntimeData* RuntimeData = nullptr;
TTaskRunnerInvokerFactory::TPtr TaskRunnerInvokerFactory;
NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory;
diff --git a/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp b/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp
index 95e0b2d6585..eea277de54d 100644
--- a/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp
+++ b/ydb/library/yql/providers/function/provider/dq_function_dq_integration.cpp
@@ -47,4 +47,4 @@ THolder<IDqIntegration> CreateDqFunctionDqIntegration(TDqFunctionState::TPtr sta
return MakeHolder<TDqFunctionDqIntegration>(state);
}
-} \ No newline at end of file
+}
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
index 6cd23adcb8b..b415d34c5ea 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
@@ -153,7 +153,7 @@ public:
if (checkpoint) {
if (Buffer.empty()) {
- Callbacks->OnSinkStateSaved(BuildState(), OutputIndex, *checkpoint);
+ Callbacks->OnAsyncOutputStateSaved(BuildState(), OutputIndex, *checkpoint);
} else {
DeferredCheckpoints.emplace(NextSeqNo + Buffer.size() - 1, *checkpoint);
}
@@ -273,7 +273,7 @@ private:
if (issues) {
WriteSession->Close(TDuration::Zero());
WriteSession.reset();
- Callbacks->OnSinkError(OutputIndex, *issues, true);
+ Callbacks->OnAsyncOutputError(OutputIndex, *issues, true);
break;
}
@@ -308,8 +308,7 @@ private:
void Fail(TString message) {
TIssues issues;
issues.AddIssue(message);
- Callbacks->OnSinkError(OutputIndex, issues, true);
- return;
+ Callbacks->OnAsyncOutputError(OutputIndex, issues, true);
}
struct TPQEventProcessor {
@@ -340,7 +339,7 @@ private:
if (!Self.DeferredCheckpoints.empty() && std::get<0>(Self.DeferredCheckpoints.front()) == it->SeqNo) {
Self.ConfirmedSeqNo = it->SeqNo;
- Self.Callbacks->OnSinkStateSaved(Self.BuildState(), Self.OutputIndex, std::get<1>(Self.DeferredCheckpoints.front()));
+ Self.Callbacks->OnAsyncOutputStateSaved(Self.BuildState(), Self.OutputIndex, std::get<1>(Self.DeferredCheckpoints.front()));
Self.DeferredCheckpoints.pop();
}
}
diff --git a/ydb/library/yql/providers/pq/async_io/ut/dq_pq_write_actor_ut.cpp b/ydb/library/yql/providers/pq/async_io/ut/dq_pq_write_actor_ut.cpp
index b64c7ee0e35..227736c1422 100644
--- a/ydb/library/yql/providers/pq/async_io/ut/dq_pq_write_actor_ut.cpp
+++ b/ydb/library/yql/providers/pq/async_io/ut/dq_pq_write_actor_ut.cpp
@@ -11,26 +11,26 @@ constexpr TDuration WaitTimeout = TDuration::MilliSeconds(10000);
Y_UNIT_TEST_SUITE(TPqWriterTest) {
Y_UNIT_TEST_F(TestWriteToTopic, TPqIoTestFixture) {
const TString topicName = "WriteToTopic";
- InitSink(topicName);
+ InitAsyncOutput(topicName);
const std::vector<TString> data = { "1", "2", "3", "4" };
- SinkWrite(data);
+ AsyncOutputWrite(data);
auto result = PQReadUntil(topicName, 4);
UNIT_ASSERT_EQUAL(result, data);
}
Y_UNIT_TEST_F(TestWriteToTopicMultiBatch, TPqIoTestFixture) {
const TString topicName = "WriteToTopicMultiBatch";
- InitSink(topicName);
+ InitAsyncOutput(topicName);
const std::vector<TString> data1 = { "1" };
const std::vector<TString> data2 = { "2" };
const std::vector<TString> data3 = { "3" };
- SinkWrite(data1);
- SinkWrite(data2);
- SinkWrite(data3);
+ AsyncOutputWrite(data1);
+ AsyncOutputWrite(data2);
+ AsyncOutputWrite(data3);
auto result = PQReadUntil(topicName, 3);
std::vector<TString> expected = { "1", "2", "3" };
@@ -40,12 +40,12 @@ Y_UNIT_TEST_SUITE(TPqWriterTest) {
Y_UNIT_TEST_F(TestDeferredWriteToTopic, TPqIoTestFixture) {
// In this case we are checking free space overflow
const TString topicName = "DeferredWriteToTopic";
- InitSink(topicName, 1);
+ InitAsyncOutput(topicName, 1);
const std::vector<TString> data = { "1", "2", "3" };
- auto future = CaSetup->SinkPromises.ResumeExecution.GetFuture();
- SinkWrite(data);
+ auto future = CaSetup->AsyncOutputPromises.ResumeExecution.GetFuture();
+ AsyncOutputWrite(data);
auto result = PQReadUntil(topicName, 3);
UNIT_ASSERT_EQUAL(result, data);
@@ -53,7 +53,7 @@ Y_UNIT_TEST_SUITE(TPqWriterTest) {
const std::vector<TString> data2 = { "4", "5", "6" };
- SinkWrite(data2);
+ AsyncOutputWrite(data2);
auto result2 = PQReadUntil(topicName, 6);
const std::vector<TString> expected = { "1", "2", "3", "4", "5", "6" };
UNIT_ASSERT_EQUAL(result2, expected);
@@ -61,11 +61,11 @@ Y_UNIT_TEST_SUITE(TPqWriterTest) {
Y_UNIT_TEST_F(WriteNonExistentTopic, TPqIoTestFixture) {
const TString topicName = "NonExistentTopic";
- InitSink(topicName);
+ InitAsyncOutput(topicName);
const std::vector<TString> data = { "1" };
- auto future = CaSetup->SinkPromises.Issue.GetFuture();
- SinkWrite(data);
+ auto future = CaSetup->AsyncOutputPromises.Issue.GetFuture();
+ AsyncOutputWrite(data);
UNIT_ASSERT(future.Wait(WaitTimeout));
UNIT_ASSERT_STRING_CONTAINS(future.GetValue().ToString(), "Write session to topic \"NonExistentTopic\" was closed");
@@ -77,15 +77,15 @@ Y_UNIT_TEST_SUITE(TPqWriterTest) {
NDqProto::TSinkState state1;
{
TPqIoTestFixture setup;
- setup.InitSink(topicName);
+ setup.InitAsyncOutput(topicName);
const std::vector<TString> data1 = { "1" };
- setup.SinkWrite(data1);
+ setup.AsyncOutputWrite(data1);
const std::vector<TString> data2 = { "2", "3" };
auto checkpoint = CreateCheckpoint();
- auto future = setup.CaSetup->SinkPromises.StateSaved.GetFuture();
- setup.SinkWrite(data2, checkpoint);
+ auto future = setup.CaSetup->AsyncOutputPromises.StateSaved.GetFuture();
+ setup.AsyncOutputWrite(data2, checkpoint);
UNIT_ASSERT(future.Wait(WaitTimeout));
state1 = future.GetValue();
@@ -93,11 +93,11 @@ Y_UNIT_TEST_SUITE(TPqWriterTest) {
{
TPqIoTestFixture setup;
- setup.InitSink(topicName);
+ setup.InitAsyncOutput(topicName);
setup.LoadSink(state1);
const std::vector<TString> data3 = { "4", "5" };
- setup.SinkWrite(data3);
+ setup.AsyncOutputWrite(data3);
auto result = PQReadUntil(topicName, 5);
const std::vector<TString> expected = { "1", "2", "3", "4", "5" };
@@ -106,11 +106,11 @@ Y_UNIT_TEST_SUITE(TPqWriterTest) {
{
TPqIoTestFixture setup;
- setup.InitSink(topicName);
+ setup.InitAsyncOutput(topicName);
setup.LoadSink(state1);
const std::vector<TString> data4 = { "4", "5" };
- setup.SinkWrite(data4); // This write should be deduplicated
+ setup.AsyncOutputWrite(data4); // This write should be deduplicated
auto result = PQReadUntil(topicName, 4);
const std::vector<TString> expected = { "1", "2", "3", "4", "5" };
@@ -123,12 +123,12 @@ Y_UNIT_TEST_SUITE(TPqWriterTest) {
NDqProto::TSinkState state1;
{
- InitSink(topicName);
+ InitAsyncOutput(topicName);
const std::vector<TString> data = {};
auto checkpoint = CreateCheckpoint();
- auto future = CaSetup->SinkPromises.StateSaved.GetFuture();
- SinkWrite(data, checkpoint);
+ auto future = CaSetup->AsyncOutputPromises.StateSaved.GetFuture();
+ AsyncOutputWrite(data, checkpoint);
UNIT_ASSERT(future.Wait(WaitTimeout));
state1 = future.GetValue();
diff --git a/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.cpp b/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.cpp
index 0c4cdb50d82..fd64116e9c7 100644
--- a/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.cpp
+++ b/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.cpp
@@ -73,24 +73,24 @@ void TPqIoTestFixture::InitSource(
});
}
-void TPqIoTestFixture::InitSink(
+void TPqIoTestFixture::InitAsyncOutput(
NPq::NProto::TDqPqTopicSink&& settings,
i64 freeSpace)
{
const THashMap<TString, TString> secureParams;
CaSetup->Execute([&](TFakeActor& actor) {
- auto [dqSink, dqSinkAsActor] = CreateDqPqWriteActor(
+ auto [dqAsyncOutput, dqAsyncOutputAsActor] = CreateDqPqWriteActor(
std::move(settings),
0,
"query_1",
secureParams,
Driver,
nullptr,
- &actor.GetSinkCallbacks(),
+ &actor.GetAsyncOutputCallbacks(),
freeSpace);
- actor.InitSink(dqSink, dqSinkAsActor);
+ actor.InitAsyncOutput(dqAsyncOutput, dqAsyncOutputAsActor);
});
}
@@ -168,8 +168,8 @@ std::vector<TString> UVParser(const NUdf::TUnboxedValue& item) {
return { TString(item.AsStringRef()) };
}
-void TPqIoTestFixture::SinkWrite(std::vector<TString> data, TMaybe<NDqProto::TCheckpoint> checkpoint) {
- CaSetup->SinkWrite([data](NKikimr::NMiniKQL::THolderFactory& factory) {
+void TPqIoTestFixture::AsyncOutputWrite(std::vector<TString> data, TMaybe<NDqProto::TCheckpoint> checkpoint) {
+ CaSetup->AsyncOutputWrite([data](NKikimr::NMiniKQL::THolderFactory& factory) {
NKikimr::NMiniKQL::TUnboxedValueVector batch;
batch.reserve(data.size());
for (const auto& item : data) {
diff --git a/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h b/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h
index 76c66c299ba..51e13d923e4 100644
--- a/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h
+++ b/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h
@@ -65,22 +65,22 @@ struct TPqIoTestFixture : public NUnitTest::TBaseFixture {
}
- void InitSink(
+ void InitAsyncOutput(
NYql::NPq::NProto::TDqPqTopicSink&& settings,
i64 freeSpace = 1_MB);
- void InitSink(
+ void InitAsyncOutput(
const TString& topic,
i64 freeSpace = 1_MB)
{
- InitSink(BuildPqTopicSinkSettings(topic), freeSpace);
+ InitAsyncOutput(BuildPqTopicSinkSettings(topic), freeSpace);
}
void LoadSink(const NDqProto::TSinkState& state) {
CaSetup->LoadSink(state);
}
- void SinkWrite(std::vector<TString> data, TMaybe<NDqProto::TCheckpoint> checkpoint = Nothing());
+ void AsyncOutputWrite(std::vector<TString> data, TMaybe<NDqProto::TCheckpoint> checkpoint = Nothing());
};
TString GetDefaultPqEndpoint();
diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h
index dbf6f6b52be..56cd4604de3 100644
--- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h
+++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h
@@ -34,7 +34,7 @@ public:
NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override;
void CloseSession(const TString& sessionId) override;
- NPq::NConfigurationManager::TAsyncDescribePathResult DescribePath(
+ ::NPq::NConfigurationManager::TAsyncDescribePathResult DescribePath(
const TString& sessionId,
const TString& cluster,
const TString& database,
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp
index 68a33d3e2f9..d8a888f484d 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp
@@ -8,6 +8,8 @@
#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
#include <ydb/library/yql/providers/dq/provider/yql_dq_provider.h>
+#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
+#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
#include <ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_provider.h>
@@ -39,6 +41,18 @@
namespace NYql {
+NDq::IDqSourceActorFactory::TPtr CreateSourceFactory(const NYdb::TDriver& driver) {
+ auto factory = MakeIntrusive<NYql::NDq::TDqSourceFactory>();
+ RegisterDqPqReadActorFactory(*factory, driver, nullptr);
+ return factory;
+}
+
+NDq::IDqSinkFactory::TPtr CreateSinkFactory(const NYdb::TDriver& driver) {
+ auto factory = MakeIntrusive<NYql::NDq::TDqSinkFactory>();
+ RegisterDqPqWriteActorFactory(*factory, driver, nullptr);
+ return factory;
+}
+
bool RunPqProgram(
const TString& code,
bool optimizeOnly,
@@ -110,7 +124,7 @@ bool RunPqProgram(
const auto driverConfig = NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr"));
NYdb::TDriver driver(driverConfig);
- auto dqGateway = CreateLocalDqGateway(driver, functionRegistry.Get(), dqCompFactory, dqTaskTransformFactory, {});
+ auto dqGateway = CreateLocalDqGateway(functionRegistry.Get(), dqCompFactory, dqTaskTransformFactory, {}, CreateSourceFactory(driver), CreateSinkFactory(driver));
auto storage = NYql::CreateFileStorage({});
dataProvidersInit.push_back(NYql::GetDqDataProviderInitializer(&CreateDqExecTransformer, dqGateway, dqCompFactory, {}, storage));
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
index 862e5d1e0bb..81d34c6de4b 100644
--- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
+++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
@@ -252,7 +252,7 @@ private:
TIssues issues { TIssue(errorBuilder) };
SINK_LOG_W("Got " << (res->IsTerminal ? "terminal " : "") << "error response[" << ev->Cookie << "] from solomon: " << issues.ToOneLineString());
- Callbacks->OnSinkError(OutputIndex, issues, res->IsTerminal);
+ Callbacks->OnAsyncOutputError(OutputIndex, issues, res->IsTerminal);
return;
}
@@ -295,7 +295,7 @@ private:
SendingBuffer.emplace(TMetricsToSend { std::move(data), metricsCount });
} catch (const yexception& e) {
TIssues issues { TIssue(TStringBuilder() << "Error while encoding solomon metrics: " << e.what()) };
- Callbacks->OnSinkError(OutputIndex, issues, true);
+ Callbacks->OnAsyncOutputError(OutputIndex, issues, true);
}
metricsCount = 0;
@@ -398,7 +398,7 @@ private:
if (!parser.Parse(TString(response.Response->Body), &res)) {
TIssues issues { TIssue(TStringBuilder() << "Invalid monitoring response: " << response.Response->GetObfuscatedData()) };
SINK_LOG_E("Failed to parse response[" << cookie << "] from solomon: " << issues.ToOneLineString());
- Callbacks->OnSinkError(OutputIndex, issues, true);
+ Callbacks->OnAsyncOutputError(OutputIndex, issues, true);
return;
}
Y_VERIFY(res.size() == 2);
@@ -407,7 +407,7 @@ private:
if (ptr == InflightBuffer.end()) {
SINK_LOG_E("Solomon response[" << cookie << "] was not found in inflight");
TIssues issues { TIssue(TStringBuilder() << "Internal error in monitoring writer") };
- Callbacks->OnSinkError(OutputIndex, issues, true);
+ Callbacks->OnAsyncOutputError(OutputIndex, issues, true);
return;
}
@@ -416,7 +416,7 @@ private:
if (writtenMetricsCount != ptr->second.MetricsCount) {
// TODO: YQ-340
// TIssues issues { TIssue(TStringBuilder() << ToString(ptr->second.MetricsCount - writtenMetricsCount) << " metrics were not written: " << res[1]) };
- // Callbacks->OnSinkError(OutputIndex, issues, true);
+ // Callbacks->OnAsyncOutputError(OutputIndex, issues, true);
// return;
SINK_LOG_W("Some metrics were not written. MetricsCount=" << ptr->second.MetricsCount << " writtenMetricsCount=" << writtenMetricsCount << " Solomon response: " << response.Response->GetObfuscatedData());
}
@@ -434,7 +434,7 @@ private:
}
void DoCheckpoint() {
- Callbacks->OnSinkStateSaved(BuildState(), OutputIndex, *CheckpointInProgress);
+ Callbacks->OnAsyncOutputStateSaved(BuildState(), OutputIndex, *CheckpointInProgress);
CheckpointInProgress = std::nullopt;
}
diff --git a/ydb/library/yql/providers/solomon/async_io/ut/dq_solomon_write_actor_ut.cpp b/ydb/library/yql/providers/solomon/async_io/ut/dq_solomon_write_actor_ut.cpp
index 3fca3a2bdc5..bf1f81ca439 100644
--- a/ydb/library/yql/providers/solomon/async_io/ut/dq_solomon_write_actor_ut.cpp
+++ b/ydb/library/yql/providers/solomon/async_io/ut/dq_solomon_write_actor_ut.cpp
@@ -19,10 +19,10 @@ namespace {
CleanupSolomon("cloudId1", "folderId1", "custom", isCloud);
TFakeCASetup setup;
- InitSink(setup, BuildSolomonShardSettings(isCloud));
+ InitAsyncOutput(setup, BuildSolomonShardSettings(isCloud));
- auto issue = setup.SinkPromises.Issue.GetFuture();
- setup.SinkWrite([](NKikimr::NMiniKQL::THolderFactory& holderFactory){
+ auto issue = setup.AsyncOutputPromises.Issue.GetFuture();
+ setup.AsyncOutputWrite([](NKikimr::NMiniKQL::THolderFactory& holderFactory){
TUnboxedValueVector res;
res.reserve(batchSize);
@@ -48,10 +48,10 @@ Y_UNIT_TEST_SUITE(TDqSolomonWriteActorTest) {
CleanupSolomon("cloudId1", "folderId1", "custom", true);
TFakeCASetup setup;
- InitSink(setup, BuildSolomonShardSettings(true));
+ InitAsyncOutput(setup, BuildSolomonShardSettings(true));
- auto issue = setup.SinkPromises.Issue.GetFuture();
- setup.SinkWrite([](NKikimr::NMiniKQL::THolderFactory& holderFactory){
+ auto issue = setup.AsyncOutputPromises.Issue.GetFuture();
+ setup.AsyncOutputWrite([](NKikimr::NMiniKQL::THolderFactory& holderFactory){
NUdf::TUnboxedValue val1 = CreateStruct(holderFactory, {
NUdf::TUnboxedValuePod(static_cast<NUdf::TDataType<NUdf::TTimestamp>::TLayout>(1624811684)),
NKikimr::NMiniKQL::MakeString("123"),
@@ -94,10 +94,10 @@ Y_UNIT_TEST_SUITE(TDqSolomonWriteActorTest) {
CleanupSolomon("cloudId1", "folderId1", "custom", true);
TFakeCASetup setup;
- InitSink(setup, BuildSolomonShardSettings(true));
+ InitAsyncOutput(setup, BuildSolomonShardSettings(true));
- auto issue = setup.SinkPromises.Issue.GetFuture();
- setup.SinkWrite([](NKikimr::NMiniKQL::THolderFactory& holderFactory){
+ auto issue = setup.AsyncOutputPromises.Issue.GetFuture();
+ setup.AsyncOutputWrite([](NKikimr::NMiniKQL::THolderFactory& holderFactory){
TUnboxedValueVector res;
res.reserve(batchSize);
@@ -123,10 +123,10 @@ Y_UNIT_TEST_SUITE(TDqSolomonWriteActorTest) {
{
TFakeCASetup setup;
CleanupSolomon("cloudId1", "folderId1", "custom", true);
- InitSink(setup, BuildSolomonShardSettings(true));
+ InitAsyncOutput(setup, BuildSolomonShardSettings(true));
- auto stateSaved = setup.SinkPromises.StateSaved.GetFuture();
- setup.SinkWrite([](NKikimr::NMiniKQL::THolderFactory& holderFactory){
+ auto stateSaved = setup.AsyncOutputPromises.StateSaved.GetFuture();
+ setup.AsyncOutputWrite([](NKikimr::NMiniKQL::THolderFactory& holderFactory){
TUnboxedValueVector res;
res.reserve(batchSize);
@@ -151,10 +151,10 @@ Y_UNIT_TEST_SUITE(TDqSolomonWriteActorTest) {
{
TFakeCASetup setup;
CleanupSolomon("cloudId1", "folderId1", "custom", true);
- InitSink(setup, BuildSolomonShardSettings(true));
+ InitAsyncOutput(setup, BuildSolomonShardSettings(true));
- auto stateSaved = setup.SinkPromises.StateSaved.GetFuture();
- setup.SinkWrite([](NKikimr::NMiniKQL::THolderFactory& holderFactory){
+ auto stateSaved = setup.AsyncOutputPromises.StateSaved.GetFuture();
+ setup.AsyncOutputWrite([](NKikimr::NMiniKQL::THolderFactory& holderFactory){
return TUnboxedValueVector {
CreateStruct(holderFactory, {
NUdf::TUnboxedValuePod(static_cast<NUdf::TDataType<NUdf::TTimestamp>::TLayout>(200000)),
@@ -164,8 +164,8 @@ Y_UNIT_TEST_SUITE(TDqSolomonWriteActorTest) {
}, CreateCheckpoint(1));
UNIT_ASSERT(stateSaved.Wait(WaitTimeout));
- auto issue = setup.SinkPromises.Issue.GetFuture();
- setup.SinkWrite([](NKikimr::NMiniKQL::THolderFactory& holderFactory){
+ auto issue = setup.AsyncOutputPromises.Issue.GetFuture();
+ setup.AsyncOutputWrite([](NKikimr::NMiniKQL::THolderFactory& holderFactory){
return TUnboxedValueVector {
CreateStruct(holderFactory, {
NUdf::TUnboxedValuePod(static_cast<NUdf::TDataType<NUdf::TTimestamp>::TLayout>(200001)),
diff --git a/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.cpp b/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.cpp
index aa82671e1d1..dbe4eee3af2 100644
--- a/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.cpp
+++ b/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.cpp
@@ -36,7 +36,7 @@ void FillDqSolomonScheme(NSo::NProto::TDqSolomonShardScheme& scheme) {
}
-void InitSink(
+void InitAsyncOutput(
TFakeCASetup& caSetup,
NSo::NProto::TDqSolomonShard&& settings,
i64 freeSpace)
@@ -45,17 +45,17 @@ void InitSink(
const THashMap<TString, TString> secureParams;
caSetup.Execute([&](TFakeActor& actor) {
- auto [dqSink, dqSinkAsActor] = CreateDqSolomonWriteActor(
+ auto [dqAsyncOutput, dqAsyncOutputAsActor] = CreateDqSolomonWriteActor(
std::move(settings),
0,
"TxId-42",
secureParams,
- &actor.GetSinkCallbacks(),
+ &actor.GetAsyncOutputCallbacks(),
counters,
nullptr,
freeSpace);
- actor.InitSink(dqSink, dqSinkAsActor);
+ actor.InitAsyncOutput(dqAsyncOutput, dqAsyncOutputAsActor);
});
}
diff --git a/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.h b/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.h
index e557c45bf37..a514a29090e 100644
--- a/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.h
+++ b/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.h
@@ -17,7 +17,7 @@
namespace NYql::NDq {
-void InitSink(
+void InitAsyncOutput(
TFakeCASetup& caSetup,
NSo::NProto::TDqSolomonShard&& settings,
i64 freeSpace = 100000);