aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-05-26 17:34:46 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-05-26 17:34:46 +0300
commita2130178e7652dc319f3edd9001e2dbb0d44ad6b (patch)
treea97749ed757720b8cd1d6dc1e1f8e852904142ff
parent2bd2b6be009b493d6179ff8936629cd39a50c624 (diff)
downloadydb-a2130178e7652dc319f3edd9001e2dbb0d44ad6b.tar.gz
YQ-1098 Unite async io factories in one factory
Refactor in the rest of the code Unite factories in dq library Unite async io in one header file ref:f5b935fd3acd4e24bd334c0ed761317bfeba6a6e
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.h4
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp10
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp8
-rw-r--r--ydb/core/kqp/executer/kqp_data_executer.cpp2
-rw-r--r--ydb/core/kqp/node/kqp_node.cpp4
-rw-r--r--ydb/core/yq/libs/init/init.cpp18
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp10
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h5
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp10
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.h5
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h98
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h (renamed from ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h)102
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp36
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h95
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h37
-rw-r--r--ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.h2
-rw-r--r--ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.cpp8
-rw-r--r--ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.h4
-rw-r--r--ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h3
-rw-r--r--ydb/library/yql/providers/dq/actors/compute_actor.cpp8
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp23
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.h3
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp14
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h2
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp3
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h7
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp8
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h6
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp8
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h4
-rw-r--r--ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h3
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp10
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp8
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h4
-rw-r--r--ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp8
-rw-r--r--ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h4
-rw-r--r--ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.h3
-rw-r--r--ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.h2
-rw-r--r--ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.cpp8
-rw-r--r--ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h4
42 files changed, 267 insertions, 338 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
index 68d86e72aab..8b36af9efd5 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
@@ -16,12 +16,12 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput
namespace NKqp {
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask&& task,
- NYql::NDq::IDqSourceFactory::TPtr sourceFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory, NYql::NDq::IDqOutputTransformFactory::TPtr transformFactory,
+ NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits);
IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
- NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqSourceFactory::TPtr sourceFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory, NYql::NDq::IDqOutputTransformFactory::TPtr transformFactory,
+ NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
TIntrusivePtr<TKqpCounters> counters);
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index c0afd232d3b..dd24f15d395 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -33,10 +33,10 @@ public:
}
TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task,
- IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
- : TBase(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true)
+ : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true)
, ComputeCtx(settings.StatsMode)
{
if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) {
@@ -318,12 +318,12 @@ private:
} // anonymous namespace
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task,
- IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
{
- return new TKqpComputeActor(executerId, txId, std::move(task), std::move(sourceFactory),
- std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits);
+ return new TKqpComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory),
+ functionRegistry, settings, memoryLimits);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index f83a17f54e3..48dfeb1dcd3 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -72,10 +72,10 @@ public:
}
TKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
- NDqProto::TDqTask&& task, IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters)
- : TBase(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits)
+ : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits)
, ComputeCtx(settings.StatsMode)
, Snapshot(snapshot)
, Counters(counters)
@@ -1140,11 +1140,11 @@ private:
} // anonymous namespace
IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
- NDqProto::TDqTask&& task, IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters)
{
- return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory),
+ return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(asyncIoFactory),
functionRegistry, settings, memoryLimits, counters);
}
diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp
index a405a06f3de..b0d4947f654 100644
--- a/ydb/core/kqp/executer/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_data_executer.cpp
@@ -1240,7 +1240,7 @@ private:
return false;
};
- auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), nullptr, nullptr, nullptr, nullptr, settings, limits);
+ auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), nullptr, nullptr, settings, limits);
auto computeActorId = Register(computeActor);
task.ComputeActorId = computeActorId;
diff --git a/ydb/core/kqp/node/kqp_node.cpp b/ydb/core/kqp/node/kqp_node.cpp
index 80728c7160f..bf7d7f69c2d 100644
--- a/ydb/core/kqp/node/kqp_node.cpp
+++ b/ydb/core/kqp/node/kqp_node.cpp
@@ -311,11 +311,11 @@ private:
IActor* computeActor;
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask),
- nullptr, nullptr, nullptr, nullptr, runtimeSettings, memoryLimits, Counters);
+ nullptr, nullptr, runtimeSettings, memoryLimits, Counters);
taskCtx.ComputeActorId = Register(computeActor);
} else {
if (Y_LIKELY(!CaFactory)) {
- computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), nullptr, nullptr, nullptr, nullptr, runtimeSettings,
+ computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), nullptr, nullptr, runtimeSettings,
memoryLimits);
taskCtx.ComputeActorId = Register(computeActor);
} else {
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index c963406650e..9f2b32a25fd 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -115,8 +115,7 @@ void Init(
NYql::CreateYdbDqTaskTransformFactory()
});
- auto sourceFactory = MakeIntrusive<NYql::NDq::TDqSourceFactory>();
- auto sinkFactory = MakeIntrusive<NYql::NDq::TDqSinkFactory>();
+ auto asyncIoFactory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory;
const auto httpGateway = NYql::IHTTPGateway::Make(
@@ -135,14 +134,14 @@ void Init(
}
if (protoConfig.GetPrivateApi().GetEnabled()) {
- RegisterDqPqReadActorFactory(*sourceFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
- RegisterYdbReadActorFactory(*sourceFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
- RegisterS3ReadActorFactory(*sourceFactory, credentialsFactory,
+ RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
+ RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
+ RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory,
httpGateway, std::make_shared<NYql::NS3::TRetryConfig>(protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetRetryConfig()));
- RegisterClickHouseReadActorFactory(*sourceFactory, credentialsFactory, httpGateway);
+ RegisterClickHouseReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway);
- RegisterDqPqWriteActorFactory(*sinkFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
- RegisterDQSolomonWriteActorFactory(*sinkFactory, credentialsFactory);
+ RegisterDqPqWriteActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
+ RegisterDQSolomonWriteActorFactory(*asyncIoFactory, credentialsFactory);
}
ui64 mkqlInitialMemoryLimit = 8_GB;
@@ -160,8 +159,7 @@ void Init(
NYql::NDqs::TLocalWorkerManagerOptions lwmOptions;
lwmOptions.Counters = workerManagerCounters;
lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, false);
- lwmOptions.SourceFactory = sourceFactory;
- lwmOptions.SinkFactory = sinkFactory;
+ lwmOptions.AsyncIoFactory = asyncIoFactory;
lwmOptions.FunctionRegistry = appData->FunctionRegistry;
lwmOptions.TaskRunnerInvokerFactory = new NYql::NDqs::TTaskRunnerInvokerFactory();
lwmOptions.MkqlInitialMemoryLimit = mkqlInitialMemoryLimit;
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index a84f863e72e..eff16a0825a 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -29,11 +29,11 @@ public:
static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR";
TDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory)
- : TBase(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ false)
+ : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ false)
, TaskRunnerActorFactory(taskRunnerActorFactory)
, ReadyToCheckpointFlag(false)
, SentStatsRequest(false)
@@ -573,13 +573,13 @@ private:
IActor* CreateDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask&& task,
- IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory)
{
- return new TDqAsyncComputeActor(executerId, txId, std::move(task), std::move(sourceFactory),
- std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, taskRunnerActorFactory);
+ return new TDqAsyncComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory),
+ functionRegistry, settings, memoryLimits, taskRunnerActorFactory);
}
} // namespace NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
index 95219cb5a99..23b6c897193 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
@@ -1,7 +1,6 @@
#pragma once
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <ydb/library/yql/dq/actors/dq_events_ids.h>
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
@@ -19,7 +18,7 @@ namespace NYql {
namespace NDq {
NActors::IActor* CreateDqAsyncComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory);
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
index 85e6d3c96e3..6d82b5a9c98 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
@@ -33,11 +33,11 @@ public:
static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR";
TDqComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const TTaskRunnerFactory& taskRunnerFactory)
- : TBase(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits)
+ : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits)
, TaskRunnerFactory(taskRunnerFactory)
{}
@@ -68,12 +68,12 @@ private:
IActor* CreateDqComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask&& task,
- IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TTaskRunnerFactory& taskRunnerFactory)
{
- return new TDqComputeActor(executerId, txId, std::move(task), std::move(sourceFactory),
- std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, taskRunnerFactory);
+ return new TDqComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory),
+ functionRegistry, settings, memoryLimits, taskRunnerFactory);
}
} // namespace NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
index 5f839ccec60..5db7135fdef 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
@@ -1,7 +1,6 @@
#pragma once
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/dq/actors/dq_events_ids.h>
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/dq/common/dq_common.h>
@@ -257,7 +256,7 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase&
NDqProto::TDqTaskStats* protoTask, bool withProfileStats);
NActors::IActor* CreateDqComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const TTaskRunnerFactory& taskRunnerFactory);
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h
deleted file mode 100644
index 113120031f3..00000000000
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h
+++ /dev/null
@@ -1,98 +0,0 @@
-#pragma once
-#include <ydb/library/yql/dq/common/dq_common.h>
-#include <ydb/library/yql/dq/actors/dq_events_ids.h>
-#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
-#include <ydb/library/yql/public/issue/yql_issue.h>
-
-#include <util/generic/ptr.h>
-
-#include <memory>
-#include <utility>
-
-namespace NYql::NDqProto {
-class TCheckpoint;
-class TTaskInput;
-class TSourceState;
-} // namespace NYql::NDqProto
-
-namespace NActors {
-class IActor;
-} // namespace NActors
-
-namespace NYql::NDq {
-
-// Source/transform.
-// Must be IActor.
-//
-// Protocol:
-// 1. CA starts source/transform.
-// 2. CA calls IDqComputeActorAsyncInput::GetAsyncInputData(batch, FreeSpace).
-// 3. Source/transform sends TEvNewAsyncInputDataArrived when it has data to process.
-// 4. CA calls IDqComputeActorAsyncInput::GetAsyncInputData(batch, FreeSpace) to get data when it is ready to process it.
-//
-// In case of error source/transform sends TEvAsyncInputError
-//
-// Checkpointing:
-// 1. InjectCheckpoint event arrives to CA.
-// 2. ...
-// 3. CA calls IDqComputeActorAsyncInput::SaveState() and IDqTaskRunner::SaveGraphState() and uses this pair as state for CA.
-// 3. ...
-// 5. CA calls IDqComputeActorAsyncInput::CommitState() to apply all side effects.
-struct IDqComputeActorAsyncInput {
- struct TEvNewAsyncInputDataArrived : public NActors::TEventLocal<TEvNewAsyncInputDataArrived, TDqComputeEvents::EvNewAsyncInputDataArrived> {
- const ui64 InputIndex;
- explicit TEvNewAsyncInputDataArrived(ui64 inputIndex)
- : InputIndex(inputIndex)
- {}
- };
-
- struct TEvAsyncInputError : public NActors::TEventLocal<TEvAsyncInputError, TDqComputeEvents::EvAsyncInputError> {
- TEvAsyncInputError(ui64 inputIndex, const TIssues& issues, bool isFatal)
- : InputIndex(inputIndex)
- , Issues(issues)
- , IsFatal(isFatal)
- {}
-
- const ui64 InputIndex;
- const TIssues Issues;
- const bool IsFatal;
- };
-
- virtual ui64 GetInputIndex() const = 0;
-
- // Gets data and returns space used by filled data batch.
- // Method should be called under bound mkql allocator.
- // Could throw YQL errors.
- virtual i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, bool& finished, i64 freeSpace) = 0;
-
- // Checkpointing.
- virtual void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TSourceState& state) = 0;
- virtual void CommitState(const NDqProto::TCheckpoint& checkpoint) = 0; // Apply side effects related to this checkpoint.
- virtual void LoadState(const NDqProto::TSourceState& state) = 0;
-
- virtual void PassAway() = 0; // The same signature as IActor::PassAway()
-
- virtual ~IDqComputeActorAsyncInput() = default;
-};
-
-struct IDqSourceFactory : public TThrRefBase {
- using TPtr = TIntrusivePtr<IDqSourceFactory>;
-
- struct TArguments {
- const NDqProto::TTaskInput& InputDesc;
- ui64 InputIndex;
- TTxId TxId;
- const THashMap<TString, TString>& SecureParams;
- const THashMap<TString, TString>& TaskParams;
- const NActors::TActorId& ComputeActorId;
- const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv;
- const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
- };
-
- // Creates source.
- // Could throw YQL errors.
- // IActor* and IDqComputeActorAsyncInput* returned by method must point to the objects with consistent lifetime.
- virtual std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSource(TArguments&& args) const = 0;
-};
-
-} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
index 082450196cb..e6f6ad01626 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
@@ -1,4 +1,5 @@
#pragma once
+#include <ydb/library/yql/dq/actors/dq_events_ids.h>
#include <ydb/library/yql/dq/common/dq_common.h>
#include <ydb/library/yql/dq/runtime/dq_output_consumer.h>
#include <ydb/library/yql/dq/runtime/dq_async_output.h>
@@ -12,6 +13,8 @@
namespace NYql::NDqProto {
class TCheckpoint;
+class TTaskInput;
+class TSourceState;
class TTaskOutput;
class TSinkState;
} // namespace NYql::NDqProto
@@ -26,6 +29,60 @@ class TProgramBuilder;
namespace NYql::NDq {
+// Source/transform.
+// Must be IActor.
+//
+// Protocol:
+// 1. CA starts source/transform.
+// 2. CA calls IDqComputeActorAsyncInput::GetAsyncInputData(batch, FreeSpace).
+// 3. Source/transform sends TEvNewAsyncInputDataArrived when it has data to process.
+// 4. CA calls IDqComputeActorAsyncInput::GetAsyncInputData(batch, FreeSpace) to get data when it is ready to process it.
+//
+// In case of error source/transform sends TEvAsyncInputError
+//
+// Checkpointing:
+// 1. InjectCheckpoint event arrives to CA.
+// 2. ...
+// 3. CA calls IDqComputeActorAsyncInput::SaveState() and IDqTaskRunner::SaveGraphState() and uses this pair as state for CA.
+// 3. ...
+// 5. CA calls IDqComputeActorAsyncInput::CommitState() to apply all side effects.
+struct IDqComputeActorAsyncInput {
+ struct TEvNewAsyncInputDataArrived : public NActors::TEventLocal<TEvNewAsyncInputDataArrived, TDqComputeEvents::EvNewAsyncInputDataArrived> {
+ const ui64 InputIndex;
+ explicit TEvNewAsyncInputDataArrived(ui64 inputIndex)
+ : InputIndex(inputIndex)
+ {}
+ };
+
+ struct TEvAsyncInputError : public NActors::TEventLocal<TEvAsyncInputError, TDqComputeEvents::EvAsyncInputError> {
+ TEvAsyncInputError(ui64 inputIndex, const TIssues& issues, bool isFatal)
+ : InputIndex(inputIndex)
+ , Issues(issues)
+ , IsFatal(isFatal)
+ {}
+
+ const ui64 InputIndex;
+ const TIssues Issues;
+ const bool IsFatal;
+ };
+
+ virtual ui64 GetInputIndex() const = 0;
+
+ // Gets data and returns space used by filled data batch.
+ // Method should be called under bound mkql allocator.
+ // Could throw YQL errors.
+ virtual i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, bool& finished, i64 freeSpace) = 0;
+
+ // Checkpointing.
+ virtual void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TSourceState& state) = 0;
+ virtual void CommitState(const NDqProto::TCheckpoint& checkpoint) = 0; // Apply side effects related to this checkpoint.
+ virtual void LoadState(const NDqProto::TSourceState& state) = 0;
+
+ virtual void PassAway() = 0; // The same signature as IActor::PassAway()
+
+ virtual ~IDqComputeActorAsyncInput() = default;
+};
+
// Sink/transform.
// Must be IActor.
//
@@ -77,10 +134,22 @@ struct IDqComputeActorAsyncOutput {
virtual ~IDqComputeActorAsyncOutput() = default;
};
-struct IDqSinkFactory : public TThrRefBase {
- using TPtr = TIntrusivePtr<IDqSinkFactory>;
+struct IDqAsyncIoFactory : public TThrRefBase {
+public:
+ using TPtr = TIntrusivePtr<IDqAsyncIoFactory>;
- struct TArguments {
+ struct TSourceArguments {
+ const NDqProto::TTaskInput& InputDesc;
+ ui64 InputIndex;
+ TTxId TxId;
+ const THashMap<TString, TString>& SecureParams;
+ const THashMap<TString, TString>& TaskParams;
+ const NActors::TActorId& ComputeActorId;
+ const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv;
+ const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
+ };
+
+ struct TSinkArguments {
const NDqProto::TTaskOutput& OutputDesc;
ui64 OutputIndex;
TTxId TxId;
@@ -90,17 +159,7 @@ struct IDqSinkFactory : public TThrRefBase {
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
};
- // Creates sink.
- // Could throw YQL errors.
- // IActor* and IDqComputeActorAsyncOutput* returned by method must point to the objects with consistent lifetime.
- virtual std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSink(TArguments&& args) const = 0;
-};
-
-struct IDqOutputTransformFactory : public TThrRefBase {
-public:
- using TPtr = TIntrusivePtr<IDqOutputTransformFactory>;
-
- struct TArguments {
+ struct TOutputTransformArguments {
const NDqProto::TTaskOutput& OutputDesc;
const ui64 OutputIndex;
TTxId TxId;
@@ -112,7 +171,20 @@ public:
NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder;
};
- virtual std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqOutputTransform(TArguments&& args) = 0;
+ // Creates source.
+ // Could throw YQL errors.
+ // IActor* and IDqComputeActorAsyncInput* returned by method must point to the objects with consistent lifetime.
+ virtual std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSource(TSourceArguments&& args) const = 0;
+
+ // Creates sink.
+ // Could throw YQL errors.
+ // IActor* and IDqComputeActorAsyncOutput* returned by method must point to the objects with consistent lifetime.
+ virtual std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSink(TSinkArguments&& args) const = 0;
+
+ // Creates output transform.
+ // Could throw YQL errors.
+ // IActor* and IDqComputeActorAsyncOutput* returned by method must point to the objects with consistent lifetime.
+ virtual std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqOutputTransform(TOutputTransformArguments&& args) = 0;
};
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp
index b0ae4144ab5..9d34bb4593e 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp
@@ -1,17 +1,16 @@
#include "dq_compute_actor_async_io_factory.h"
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/dq/common/dq_common.h>
#include <ydb/library/yql/minikql/mkql_program_builder.h>
namespace NYql::NDq {
-std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> TDqSourceFactory::CreateDqSource(IDqSourceFactory::TArguments&& args) const
+std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> TDqAsyncIoFactory::CreateDqSource(TSourceArguments&& args) const
{
const TString& type = args.InputDesc.GetSource().GetType();
YQL_ENSURE(!type.empty(), "Attempt to create source of empty type");
- const TCreatorFunction* creatorFunc = CreatorsByType.FindPtr(type);
+ const TSourceCreatorFunction* creatorFunc = SourceCreatorsByType.FindPtr(type);
YQL_ENSURE(creatorFunc, "Unknown type of source: \"" << type << "\"");
std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> actor = (*creatorFunc)(std::move(args));
Y_VERIFY(actor.first);
@@ -19,17 +18,17 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> TDqSourceFactory::Create
return actor;
}
-void TDqSourceFactory::Register(const TString& type, TCreatorFunction creator)
+void TDqAsyncIoFactory::RegisterSource(const TString& type, TSourceCreatorFunction creator)
{
- auto [_, registered] = CreatorsByType.emplace(type, std::move(creator));
+ auto [_, registered] = SourceCreatorsByType.emplace(type, std::move(creator));
Y_VERIFY(registered);
}
-std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> TDqSinkFactory::CreateDqSink(IDqSinkFactory::TArguments&& args) const
+std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> TDqAsyncIoFactory::CreateDqSink(TSinkArguments&& args) const
{
const TString& type = args.OutputDesc.GetSink().GetType();
YQL_ENSURE(!type.empty(), "Attempt to create sink of empty type");
- const TCreatorFunction* creatorFunc = CreatorsByType.FindPtr(type);
+ const TSinkCreatorFunction* creatorFunc = SinkCreatorsByType.FindPtr(type);
YQL_ENSURE(creatorFunc, "Unknown type of sink: \"" << type << "\"");
std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> actor = (*creatorFunc)(std::move(args));
Y_VERIFY(actor.first);
@@ -37,24 +36,27 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> TDqSinkFactory::CreateD
return actor;
}
-void TDqSinkFactory::Register(const TString& type, TCreatorFunction creator)
+void TDqAsyncIoFactory::RegisterSink(const TString& type, TSinkCreatorFunction creator)
{
- auto [_, registered] = CreatorsByType.emplace(type, std::move(creator));
+ auto [_, registered] = SinkCreatorsByType.emplace(type, std::move(creator));
Y_VERIFY(registered);
}
-std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> TDqOutputTransformFactory::CreateDqOutputTransform(TArguments&& args) {
- auto creator = CreatorsByType.find(args.OutputDesc.GetTransform().GetType());
- YQL_ENSURE(creator != CreatorsByType.end(), "Unregistered type of transform actor: \"" << args.OutputDesc.GetTransform().GetType() << "\"");
-
- std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> actor = (creator->second)(std::move(args));
+std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> TDqAsyncIoFactory::CreateDqOutputTransform(TOutputTransformArguments&& args)
+{
+ const TString& type = args.OutputDesc.GetTransform().GetType();
+ YQL_ENSURE(!type.empty(), "Attempt to create output transform of empty type");
+ const TOutputTransformCreatorFunction* creatorFunc = OutputTransformCreatorsByType.FindPtr(type);
+ YQL_ENSURE(creatorFunc, "Unknown type of output transform: \"" << type << "\"");
+ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> actor = (*creatorFunc)(std::move(args));
Y_VERIFY(actor.first);
Y_VERIFY(actor.second);
return actor;
}
-void TDqOutputTransformFactory::Register(const TString& type, TCreatorFunction creator) {
- auto [_, registered] = CreatorsByType.emplace(type, std::move(creator));
+void TDqAsyncIoFactory::RegisterOutputTransform(const TString& type, TOutputTransformCreatorFunction creator)
+{
+ auto [_, registered] = OutputTransformCreatorsByType.emplace(type, std::move(creator));
Y_VERIFY(registered);
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h
index b893d26b786..c045b5b48b6 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h
@@ -1,6 +1,5 @@
#pragma once
-#include "dq_compute_actor_async_input.h"
-#include "dq_compute_actor_async_output.h"
+#include "dq_compute_actor_async_io.h"
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/dq/common/dq_common.h>
@@ -17,63 +16,54 @@ concept TCastsToAsyncInputPair =
std::is_convertible_v<T, std::pair<IDqComputeActorAsyncInput*, NActors::IActor*>>;
template <class T, class TProto>
-concept TSourceCreatorFunc = requires(T f, TProto&& settings, IDqSourceFactory::TArguments args) {
+concept TSourceCreatorFunc = requires(T f, TProto&& settings, IDqAsyncIoFactory::TSourceArguments args) {
{ f(std::move(settings), std::move(args)) } -> TCastsToAsyncInputPair;
};
-class TDqSourceFactory : public IDqSourceFactory {
-public:
- using TCreatorFunction = std::function<std::pair<IDqComputeActorAsyncInput*, NActors::IActor*>(TArguments&& args)>;
-
- std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSource(TArguments&& args) const override;
-
- void Register(const TString& type, TCreatorFunction creator);
-
- template <class TProtoMsg, TSourceCreatorFunc<TProtoMsg> TCreatorFunc>
- void Register(const TString& type, TCreatorFunc creator) {
- Register(type,
- [creator = std::move(creator), type](TArguments&& args)
- {
- const google::protobuf::Any& settingsAny = args.InputDesc.GetSource().GetSettings();
- YQL_ENSURE(settingsAny.Is<TProtoMsg>(),
- "Source \"" << type << "\" settings are expected to have protobuf type " << TProtoMsg::descriptor()->full_name()
- << ", but got " << settingsAny.type_url());
- TProtoMsg settings;
- YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings of type \"" << type << "\"");
- return creator(std::move(settings), std::move(args));
- });
- }
-
-private:
- THashMap<TString, TCreatorFunction> CreatorsByType;
-};
-
template <class T>
concept TCastsToAsyncOutputPair =
std::is_convertible_v<T, std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*>>;
template <class T, class TProto>
-concept TSinkCreatorFunc = requires(T f, TProto&& settings, IDqSinkFactory::TArguments&& args) {
+concept TSinkCreatorFunc = requires(T f, TProto&& settings, IDqAsyncIoFactory::TSinkArguments&& args) {
{ f(std::move(settings), std::move(args)) } -> TCastsToAsyncOutputPair;
};
template <class T, class TProto>
-concept TOutputTransformCreatorFunc = requires(T f, TProto&& settings, IDqOutputTransformFactory::TArguments&& args) {
+concept TOutputTransformCreatorFunc = requires(T f, TProto&& settings, IDqAsyncIoFactory::TOutputTransformArguments&& args) {
{ f(std::move(settings), std::move(args)) } -> TCastsToAsyncOutputPair;
};
-class TDqSinkFactory : public IDqSinkFactory {
+class TDqAsyncIoFactory : public IDqAsyncIoFactory {
public:
- using TCreatorFunction = std::function<std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*>(TArguments&& args)>;
+ using TSourceCreatorFunction = std::function<std::pair<IDqComputeActorAsyncInput*, NActors::IActor*>(TSourceArguments&& args)>;
+ using TSinkCreatorFunction = std::function<std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*>(TSinkArguments&& args)>;
+ using TOutputTransformCreatorFunction = std::function<std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*>(TOutputTransformArguments&& args)>;
- std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSink(TArguments&& args) const override;
+ // Registration
+ void RegisterSource(const TString& type, TSourceCreatorFunction creator);
- void Register(const TString& type, TCreatorFunction creator);
+ template <class TProtoMsg, TSourceCreatorFunc<TProtoMsg> TCreatorFunc>
+ void RegisterSource(const TString& type, TCreatorFunc creator) {
+ RegisterSource(type,
+ [creator = std::move(creator), type](TSourceArguments&& args)
+ {
+ const google::protobuf::Any& settingsAny = args.InputDesc.GetSource().GetSettings();
+ YQL_ENSURE(settingsAny.Is<TProtoMsg>(),
+ "Source \"" << type << "\" settings are expected to have protobuf type " << TProtoMsg::descriptor()->full_name()
+ << ", but got " << settingsAny.type_url());
+ TProtoMsg settings;
+ YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings of type \"" << type << "\"");
+ return creator(std::move(settings), std::move(args));
+ });
+ }
+
+ void RegisterSink(const TString& type, TSinkCreatorFunction creator);
template <class TProtoMsg, TSinkCreatorFunc<TProtoMsg> TCreatorFunc>
- void Register(const TString& type, TCreatorFunc creator) {
- Register(type,
- [creator = std::move(creator), type](TArguments&& args)
+ void RegisterSink(const TString& type, TCreatorFunc creator) {
+ RegisterSink(type,
+ [creator = std::move(creator), type](TSinkArguments&& args)
{
const google::protobuf::Any& settingsAny = args.OutputDesc.GetSink().GetSettings();
YQL_ENSURE(settingsAny.Is<TProtoMsg>(),
@@ -85,22 +75,12 @@ public:
});
}
-private:
- THashMap<TString, TCreatorFunction> CreatorsByType;
-};
-
-class TDqOutputTransformFactory : public IDqOutputTransformFactory {
-public:
- using TCreatorFunction = std::function<std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*>(TArguments&& args)>;
-
- std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqOutputTransform(TArguments&& args) override;
-
- void Register(const TString& type, TCreatorFunction creator);
+ void RegisterOutputTransform(const TString& type, TOutputTransformCreatorFunction creator);
template <class TProtoMsg, TOutputTransformCreatorFunc<TProtoMsg> TCreatorFunc>
- void Register(const TString& type, TCreatorFunc creator) {
- Register(type,
- [creator = std::move(creator), type](TArguments&& args)
+ void RegisterOutputTransform(const TString& type, TCreatorFunc creator) {
+ RegisterOutputTransform(type,
+ [creator = std::move(creator), type](TOutputTransformArguments&& args)
{
const google::protobuf::Any& settingsAny = args.OutputDesc.GetTransform().GetSettings();
YQL_ENSURE(settingsAny.Is<TProtoMsg>(),
@@ -112,8 +92,15 @@ public:
});
}
+ // Creation
+ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSource(TSourceArguments&& args) const override;
+ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSink(TSinkArguments&& args) const override;
+ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqOutputTransform(TOutputTransformArguments&& args) override;
+
private:
- std::unordered_map<TString, TCreatorFunction> CreatorsByType;
+ THashMap<TString, TSourceCreatorFunction> SourceCreatorsByType;
+ THashMap<TString, TSinkCreatorFunction> SinkCreatorsByType;
+ THashMap<TString, TOutputTransformCreatorFunction> OutputTransformCreatorsByType;
};
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
index 4ec41f04e4c..e0e671fdac4 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
@@ -1,7 +1,7 @@
#pragma once
#include "dq_compute_actor.h"
-#include "dq_compute_actor_async_output.h"
+#include "dq_compute_actor_async_io.h"
#include "retry_queue.h"
#include <ydb/library/yql/dq/common/dq_common.h>
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 3d4b2182d3d..aeb1e46847e 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -3,8 +3,7 @@
#include "dq_compute_actor.h"
#include "dq_compute_actor_channels.h"
#include "dq_compute_actor_checkpoints.h"
-#include "dq_compute_actor_async_input.h"
-#include "dq_compute_actor_async_output.h"
+#include "dq_compute_actor_async_io.h"
#include "dq_compute_issues_buffer.h"
#include "dq_compute_memory_quota.h"
@@ -142,7 +141,7 @@ public:
protected:
TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, bool ownMemoryQuota = true, bool passExceptions = false)
: ExecuterId(executerId)
@@ -151,9 +150,7 @@ protected:
, RuntimeSettings(settings)
, MemoryLimits(memoryLimits)
, CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn)
- , SourceFactory(std::move(sourceFactory))
- , SinkFactory(std::move(sinkFactory))
- , OutputTransformFactory(std::move(transformFactory))
+ , AsyncIoFactory(std::move(asyncIoFactory))
, FunctionRegistry(functionRegistry)
, CheckpointingMode(GetTaskCheckpointingMode(Task))
, State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING)
@@ -168,7 +165,7 @@ protected:
}
TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, const NDqProto::TDqTask& task,
- IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
: ExecuterId(executerId)
@@ -177,9 +174,7 @@ protected:
, RuntimeSettings(settings)
, MemoryLimits(memoryLimits)
, CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn)
- , SourceFactory(std::move(sourceFactory))
- , SinkFactory(std::move(sinkFactory))
- , OutputTransformFactory(std::move(transformFactory))
+ , AsyncIoFactory(std::move(asyncIoFactory))
, FunctionRegistry(functionRegistry)
, State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING)
, MemoryQuota(InitMemoryQuota())
@@ -1249,12 +1244,12 @@ protected:
}
for (auto& [inputIndex, source] : SourcesMap) {
if (TaskRunner) { source.Buffer = TaskRunner->GetSource(inputIndex); Y_VERIFY(source.Buffer);}
- Y_VERIFY(SourceFactory);
+ Y_VERIFY(AsyncIoFactory);
const auto& inputDesc = Task.GetInputs(inputIndex);
const ui64 i = inputIndex; // Crutch for clang
CA_LOG_D("Create source for input " << i << " " << inputDesc);
- std::tie(source.AsyncInput, source.Actor) = SourceFactory->CreateDqSource(
- IDqSourceFactory::TArguments{
+ std::tie(source.AsyncInput, source.Actor) = AsyncIoFactory->CreateDqSource(
+ IDqAsyncIoFactory::TSourceArguments {
.InputDesc = inputDesc,
.InputIndex = inputIndex,
.TxId = TxId,
@@ -1275,12 +1270,12 @@ protected:
if (TaskRunner) {
transform.ProgramBuilder.ConstructInPlace(TaskRunner->GetTypeEnv(), *FunctionRegistry);
std::tie(transform.Buffer, transform.OutputBuffer) = TaskRunner->GetOutputTransform(outputIndex);
- Y_VERIFY(OutputTransformFactory);
+ Y_VERIFY(AsyncIoFactory);
const auto& outputDesc = Task.GetOutputs(outputIndex);
const ui64 i = outputIndex; // Crutch for clang
CA_LOG_D("Create transform for output " << i << " " << outputDesc.ShortDebugString());
- std::tie(transform.AsyncOutput, transform.Actor) = OutputTransformFactory->CreateDqOutputTransform(
- IDqOutputTransformFactory::TArguments {
+ std::tie(transform.AsyncOutput, transform.Actor) = AsyncIoFactory->CreateDqOutputTransform(
+ IDqAsyncIoFactory::TOutputTransformArguments {
.OutputDesc = outputDesc,
.OutputIndex = outputIndex,
.TxId = TxId,
@@ -1297,12 +1292,12 @@ protected:
}
for (auto& [outputIndex, sink] : SinksMap) {
if (TaskRunner) { sink.Buffer = TaskRunner->GetSink(outputIndex); }
- Y_VERIFY(SinkFactory);
+ Y_VERIFY(AsyncIoFactory);
const auto& outputDesc = Task.GetOutputs(outputIndex);
const ui64 i = outputIndex; // Crutch for clang
CA_LOG_D("Create sink for output " << i << " " << outputDesc.ShortDebugString());
- std::tie(sink.AsyncOutput, sink.Actor) = SinkFactory->CreateDqSink(
- IDqSinkFactory::TArguments {
+ std::tie(sink.AsyncOutput, sink.Actor) = AsyncIoFactory->CreateDqSink(
+ IDqAsyncIoFactory::TSinkArguments {
.OutputDesc = outputDesc,
.OutputIndex = outputIndex,
.TxId = TxId,
@@ -1576,9 +1571,7 @@ protected:
const TComputeRuntimeSettings RuntimeSettings;
const TComputeMemoryLimits MemoryLimits;
const bool CanAllocateExtraMemory = false;
- const IDqSourceFactory::TPtr SourceFactory;
- const IDqSinkFactory::TPtr SinkFactory;
- const IDqOutputTransformFactory::TPtr OutputTransformFactory;
+ const IDqAsyncIoFactory::TPtr AsyncIoFactory;
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
const NDqProto::ECheckpointingMode CheckpointingMode;
TIntrusivePtr<IDqTaskRunner> TaskRunner;
diff --git a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.h b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.h
index 2c2e1345579..9958901969e 100644
--- a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.h
+++ b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.h
@@ -1,6 +1,6 @@
#pragma once
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/clickhouse/proto/source.pb.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
diff --git a/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.cpp b/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.cpp
index c18d84398ad..917cef24dde 100644
--- a/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.cpp
+++ b/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.cpp
@@ -1,13 +1,13 @@
#include "yql_ch_source_factory.h"
#include "yql_ch_read_actor.h"
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
namespace NYql::NDq {
-void RegisterClickHouseReadActorFactory(TDqSourceFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway) {
- factory.Register<NCH::TSource>("ClickHouseSource",
- [credentialsFactory, gateway](NCH::TSource&& settings, IDqSourceFactory::TArguments&& args) {
+void RegisterClickHouseReadActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway) {
+ factory.RegisterSource<NCH::TSource>("ClickHouseSource",
+ [credentialsFactory, gateway](NCH::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
return CreateClickHouseReadActor(gateway, std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory);
});
}
diff --git a/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.h b/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.h
index 50f38e81e8f..41350011310 100644
--- a/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.h
+++ b/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.h
@@ -1,7 +1,7 @@
#pragma once
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
@@ -9,6 +9,6 @@
namespace NYql::NDq {
-void RegisterClickHouseReadActorFactory(TDqSourceFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway = IHTTPGateway::Make());
+void RegisterClickHouseReadActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway = IHTTPGateway::Make());
}
diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
index 39a9df5f378..b58ab1ce3f4 100644
--- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
+++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
@@ -1,8 +1,7 @@
#pragma once
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h>
#include <ydb/library/yql/minikql/computation/mkql_value_builder.h>
diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
index 956c39eb061..c09a795e9ec 100644
--- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
@@ -65,9 +65,7 @@ IActor* CreateComputeActor(
executerId,
operationId,
std::move(task),
- options.SourceFactory,
- options.SinkFactory,
- options.TransformFactory,
+ options.AsyncIoFactory,
options.FunctionRegistry,
computeRuntimeSettings,
memoryLimits,
@@ -77,9 +75,7 @@ IActor* CreateComputeActor(
executerId,
operationId,
std::move(task),
- options.SourceFactory,
- options.SinkFactory,
- options.TransformFactory,
+ options.AsyncIoFactory,
options.FunctionRegistry,
computeRuntimeSettings,
memoryLimits,
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index fc44dd201dc..f6694c5b72c 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -76,13 +76,11 @@ public:
explicit TDqWorker(
const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
- const IDqSourceFactory::TPtr& sourceFactory,
- const IDqSinkFactory::TPtr& sinkFactory,
+ const IDqAsyncIoFactory::TPtr& asyncIoFactory,
TWorkerRuntimeData* runtimeData,
const TString& traceId)
: TRichActor<TDqWorker>(&TDqWorker::Handler)
- , SourceFactory(sourceFactory)
- , SinkFactory(sinkFactory)
+ , AsyncIoFactory(asyncIoFactory)
, TaskRunnerActorFactory(taskRunnerActorFactory)
, RuntimeData(runtimeData)
, TraceId(traceId)
@@ -265,8 +263,8 @@ private:
auto& source = SourcesMap[inputId];
source.TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv);
std::tie(source.Source, source.Actor) =
- SourceFactory->CreateDqSource(
- IDqSourceFactory::TArguments{
+ AsyncIoFactory->CreateDqSource(
+ IDqAsyncIoFactory::TSourceArguments {
.InputDesc = input,
.InputIndex = static_cast<ui64>(inputId),
.TxId = TraceId,
@@ -295,8 +293,8 @@ private:
if (output.HasSink()) {
auto& sink = SinksMap[outputId];
sink.TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv);
- std::tie(sink.Sink, sink.Actor) = SinkFactory->CreateDqSink(
- IDqSinkFactory::TArguments {
+ std::tie(sink.Sink, sink.Actor) = AsyncIoFactory->CreateDqSink(
+ IDqAsyncIoFactory::TSinkArguments {
.OutputDesc = output,
.OutputIndex = static_cast<ui64>(outputId),
.TxId = TraceId,
@@ -706,8 +704,7 @@ private:
/*_________________________________________________________*/
- IDqSourceFactory::TPtr SourceFactory;
- IDqSinkFactory::TPtr SinkFactory;
+ IDqAsyncIoFactory::TPtr AsyncIoFactory;
ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory;
NTaskRunnerActor::ITaskRunnerActor* Actor = nullptr;
TActorId TaskRunnerActor;
@@ -745,15 +742,13 @@ NActors::IActor* CreateWorkerActor(
TWorkerRuntimeData* runtimeData,
const TString& traceId,
const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
- const IDqSourceFactory::TPtr& sourceFactory,
- const IDqSinkFactory::TPtr& sinkFactory)
+ const IDqAsyncIoFactory::TPtr& asyncIoFactory)
{
Y_VERIFY(taskRunnerActorFactory);
return new TLogWrapReceive(
new TDqWorker(
taskRunnerActorFactory,
- sourceFactory,
- sinkFactory,
+ asyncIoFactory,
runtimeData,
traceId), traceId);
}
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.h b/ydb/library/yql/providers/dq/actors/worker_actor.h
index 94022f93112..ea1ee7bfdb8 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.h
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.h
@@ -25,7 +25,6 @@ namespace NYql::NDqs {
TWorkerRuntimeData* runtimeData,
const TString& traceId,
const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
- const NDq::IDqSourceFactory::TPtr& sourceFactory,
- const NDq::IDqSinkFactory::TPtr& sinkFactory);
+ const NDq::IDqAsyncIoFactory::TPtr& asyncIoFactory);
} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
index 82092045a22..62d5db770ad 100644
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
@@ -22,7 +22,7 @@ class TLocalServiceHolder {
public:
TLocalServiceHolder(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort,
- NDq::IDqSourceFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory)
+ NDq::IDqAsyncIoFactory::TPtr asyncIoFactory)
{
ui32 nodeId = 1;
@@ -48,9 +48,7 @@ public:
NDqs::TLocalWorkerManagerOptions lwmOptions;
lwmOptions.Factory = NTaskRunnerProxy::CreateFactory(functionRegistry, compFactory, taskTransformFactory, true);
- lwmOptions.SourceFactory = std::move(sourceFactory);
- lwmOptions.SinkFactory = std::move(sinkFactory);
- lwmOptions.TransformFactory = std::move(transformFactory);
+ lwmOptions.AsyncIoFactory = std::move(asyncIoFactory);
lwmOptions.FunctionRegistry = functionRegistry;
lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory();
lwmOptions.TaskRunnerActorFactory = NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
@@ -113,15 +111,15 @@ THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::I
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort,
- NDq::IDqSourceFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory)
+ NDq::IDqAsyncIoFactory::TPtr asyncIoFactory)
{
- return MakeHolder<TLocalServiceHolder>(functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort, std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory));
+ return MakeHolder<TLocalServiceHolder>(functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort, std::move(asyncIoFactory));
}
TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
- NDq::IDqSourceFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory)
+ NDq::IDqAsyncIoFactory::TPtr asyncIoFactory)
{
int startPort = 31337;
TRangeWalker<int> portWalker(startPort, startPort+100);
@@ -129,7 +127,7 @@ TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctio
auto grpcPort = BindInRange(portWalker)[1];
return new TDqGatewayLocal(
- CreateLocalServiceHolder(functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort, std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory)),
+ CreateLocalServiceHolder(functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort, std::move(asyncIoFactory)),
CreateDqGateway(std::get<0>(NDqs::GetLocalAddress()), grpcPort.Addr.GetPort(), 8));
}
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h
index 8afcb036cd6..01634057daf 100644
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h
@@ -9,6 +9,6 @@ namespace NYql {
TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
- NDq::IDqSourceFactory::TPtr = nullptr, NDq::IDqSinkFactory::TPtr = nullptr, NDq::IDqOutputTransformFactory::TPtr = nullptr);
+ NDq::IDqAsyncIoFactory::TPtr = nullptr);
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
index 653db074676..a515b687479 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
@@ -261,8 +261,7 @@ private:
Options.RuntimeData,
traceId,
Options.TaskRunnerActorFactory,
- Options.SourceFactory,
- Options.SinkFactory));
+ Options.AsyncIoFactory));
}
allocationInfo.WorkerActors.emplace_back(RegisterChild(
actor.Release(), createComputeActor ? NYql::NDq::TEvDq::TEvAbortExecution::Unavailable("Aborted by LWM").Release() : nullptr
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
index 65143a3a89c..83b88f1792e 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
@@ -1,7 +1,6 @@
#pragma once
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/minikql/mkql_function_registry.h>
#include <ydb/library/yql/providers/dq/worker_manager/interface/events.h>
#include <ydb/library/yql/providers/dq/worker_manager/interface/counters.h>
@@ -22,9 +21,7 @@ namespace NYql::NDqs {
struct TLocalWorkerManagerOptions {
TWorkerManagerCounters Counters;
NTaskRunnerProxy::IProxyFactory::TPtr Factory;
- NDq::IDqSourceFactory::TPtr SourceFactory;
- NDq::IDqSinkFactory::TPtr SinkFactory;
- NDq::IDqOutputTransformFactory::TPtr TransformFactory;
+ NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
TWorkerRuntimeData* RuntimeData = nullptr;
TTaskRunnerInvokerFactory::TPtr TaskRunnerInvokerFactory;
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
index 9d3950e69ce..e7627eff093 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
@@ -2,7 +2,7 @@
#include "probes.h"
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/dq/common/dq_common.h>
#include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h>
@@ -410,11 +410,11 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
return {actor, actor};
}
-void RegisterDqPqReadActorFactory(TDqSourceFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool rangesMode) {
- factory.Register<NPq::NProto::TDqPqTopicSource>("PqSource",
+void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool rangesMode) {
+ factory.RegisterSource<NPq::NProto::TDqPqTopicSource>("PqSource",
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), rangesMode](
NPq::NProto::TDqPqTopicSource&& settings,
- IDqSourceFactory::TArguments&& args)
+ IDqAsyncIoFactory::TSourceArguments&& args)
{
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(DQ_PQ_PROVIDER));
return CreateDqPqReadActor(
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h
index c0a90c05cc4..bc3e5292fae 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h
@@ -1,7 +1,7 @@
#pragma once
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
@@ -18,7 +18,7 @@
namespace NYql::NDq {
-class TDqSourceFactory;
+class TDqAsyncIoFactory;
const i64 PQReadDefaultFreeSpace = 16_MB;
@@ -36,6 +36,6 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
bool rangesMode = true
);
-void RegisterDqPqReadActorFactory(TDqSourceFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool rangesMode = true);
+void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool rangesMode = true);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
index b415d34c5ea..ea5236e9b95 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
@@ -1,7 +1,7 @@
#include "dq_pq_write_actor.h"
#include "probes.h"
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/dq/common/dq_common.h>
#include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h>
@@ -412,11 +412,11 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor(
return {actor, actor};
}
-void RegisterDqPqWriteActorFactory(TDqSinkFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) {
- factory.Register<NPq::NProto::TDqPqTopicSink>("PqSink",
+void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) {
+ factory.RegisterSink<NPq::NProto::TDqPqTopicSink>("PqSink",
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory)](
NPq::NProto::TDqPqTopicSink&& settings,
- IDqSinkFactory::TArguments&& args)
+ IDqAsyncIoFactory::TSinkArguments&& args)
{
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(DQ_PQ_PROVIDER));
return CreateDqPqWriteActor(
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h
index 651abc94a71..88ac95f466b 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h
@@ -1,7 +1,7 @@
#pragma once
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
@@ -30,6 +30,6 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor(
IDqComputeActorAsyncOutput::ICallbacks* callbacks,
i64 freeSpace = DqPqDefaultFreeSpace);
-void RegisterDqPqWriteActorFactory(TDqSinkFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
+void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h b/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h
index 8efdc8ff41f..0a06b4dc11d 100644
--- a/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h
+++ b/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h
@@ -4,8 +4,7 @@
#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/minikql/mkql_alloc.h>
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp
index 866a0d7772a..7635df0469b 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp
@@ -41,14 +41,10 @@
namespace NYql {
-NDq::IDqSourceFactory::TPtr CreateSourceFactory(const NYdb::TDriver& driver) {
- auto factory = MakeIntrusive<NYql::NDq::TDqSourceFactory>();
+NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(const NYdb::TDriver& driver) {
+ auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
RegisterDqPqReadActorFactory(*factory, driver, nullptr);
- return factory;
-}
-NDq::IDqSinkFactory::TPtr CreateSinkFactory(const NYdb::TDriver& driver) {
- auto factory = MakeIntrusive<NYql::NDq::TDqSinkFactory>();
RegisterDqPqWriteActorFactory(*factory, driver, nullptr);
return factory;
}
@@ -124,7 +120,7 @@ bool RunPqProgram(
const auto driverConfig = NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr"));
NYdb::TDriver driver(driverConfig);
- auto dqGateway = CreateLocalDqGateway(functionRegistry.Get(), dqCompFactory, dqTaskTransformFactory, {}, CreateSourceFactory(driver), CreateSinkFactory(driver));
+ auto dqGateway = CreateLocalDqGateway(functionRegistry.Get(), dqCompFactory, dqTaskTransformFactory, {}, CreateAsyncIoFactory(driver));
auto storage = NYql::CreateFileStorage({});
dataProvidersInit.push_back(NYql::GetDqDataProviderInitializer(&CreateDqExecTransformer, dqGateway, dqCompFactory, {}, storage));
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
index fba54a9ccf1..9e323c2a337 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
@@ -1,6 +1,6 @@
#pragma once
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/s3/proto/retry_config.pb.h>
#include <ydb/library/yql/providers/s3/proto/source.pb.h>
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
index 757bd4dc0c6..b1c3b208a58 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
@@ -2,21 +2,21 @@
#ifdef __linux__
#include "yql_s3_read_actor.h"
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.h>
#endif
namespace NYql::NDq {
void RegisterS3ReadActorFactory(
- TDqSourceFactory& factory,
+ TDqAsyncIoFactory& factory,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
IHTTPGateway::TPtr gateway,
const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig) {
#ifdef __linux__
NDB::registerFormats();
- factory.Register<NS3::TSource>("S3Source",
- [credentialsFactory, gateway, retryConfig](NS3::TSource&& settings, IDqSourceFactory::TArguments&& args) {
+ factory.RegisterSource<NS3::TSource>("S3Source",
+ [credentialsFactory, gateway, retryConfig](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
return CreateS3ReadActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), gateway, std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryConfig);
});
#else
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
index 557ea3f1412..65a07b1b2e7 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
@@ -3,7 +3,7 @@
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/s3/proto/retry_config.pb.h>
@@ -11,7 +11,7 @@
namespace NYql::NDq {
void RegisterS3ReadActorFactory(
- TDqSourceFactory& factory,
+ TDqAsyncIoFactory& factory,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
IHTTPGateway::TPtr gateway = IHTTPGateway::Make(),
const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig = nullptr);
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
index 91cacf02809..21b0d44907a 100644
--- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
+++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
@@ -1,7 +1,7 @@
#include "dq_solomon_write_actor.h"
#include "metrics_encoder.h"
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h>
@@ -500,11 +500,11 @@ std::pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSolo
return {actor, actor};
}
-void RegisterDQSolomonWriteActorFactory(TDqSinkFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) {
- factory.Register<NSo::NProto::TDqSolomonShard>("SolomonSink",
+void RegisterDQSolomonWriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) {
+ factory.RegisterSink<NSo::NProto::TDqSolomonShard>("SolomonSink",
[credentialsFactory](
NYql::NSo::NProto::TDqSolomonShard&& settings,
- IDqSinkFactory::TArguments&& args)
+ IDqAsyncIoFactory::TSinkArguments&& args)
{
auto counters = MakeIntrusive<NMonitoring::TDynamicCounters>();
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h
index 3f257970b17..5314129a05c 100644
--- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h
+++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h
@@ -2,7 +2,7 @@
#include <ydb/library/yql/utils/actors/http_sender.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/providers/solomon/proto/dq_solomon_shard.pb.h>
@@ -28,7 +28,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSolo
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
i64 freeSpace = DqSolomonDefaultFreeSpace);
-void RegisterDQSolomonWriteActorFactory(TDqSinkFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
+void RegisterDQSolomonWriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
TString GetSolomonUrl(const TString& endpoint, bool useSsl, const TString& project, const TString& cluster, const TString& service, const ::NYql::NSo::NProto::ESolomonClusterType& type);
diff --git a/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.h b/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.h
index e04a7337737..2d802bdff9c 100644
--- a/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.h
+++ b/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.h
@@ -3,8 +3,7 @@
#include <ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h>
#include <ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/minikql/mkql_alloc.h>
diff --git a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.h b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.h
index c7f74c284a0..1a67a7fb3f7 100644
--- a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.h
+++ b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.h
@@ -1,6 +1,6 @@
#pragma once
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/ydb/proto/source.pb.h>
diff --git a/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.cpp b/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.cpp
index 9fcde8b8b77..d9f8ae9dafc 100644
--- a/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.cpp
+++ b/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.cpp
@@ -1,13 +1,13 @@
#include "yql_ydb_source_factory.h"
#include "yql_ydb_read_actor.h"
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
namespace NYql::NDq {
-void RegisterYdbReadActorFactory(NYql::NDq::TDqSourceFactory& factory, ::NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) {
- factory.Register<NYql::NYdb::TSource>("YdbSource",
- [driver, credentialsFactory](NYql::NYdb::TSource&& settings, IDqSourceFactory::TArguments&& args) {
+void RegisterYdbReadActorFactory(NYql::NDq::TDqAsyncIoFactory& factory, ::NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) {
+ factory.RegisterSource<NYql::NYdb::TSource>("YdbSource",
+ [driver, credentialsFactory](NYql::NYdb::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
return CreateYdbReadActor(std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, driver, credentialsFactory);
});
}
diff --git a/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h b/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h
index 0a389c7a4ec..b5a44cf293f 100644
--- a/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h
+++ b/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h
@@ -3,12 +3,12 @@
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
namespace NYql::NDq {
-void RegisterYdbReadActorFactory(NYql::NDq::TDqSourceFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
+void RegisterYdbReadActorFactory(NYql::NDq::TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
}