diff options
author | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-05-26 17:34:46 +0300 |
---|---|---|
committer | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-05-26 17:34:46 +0300 |
commit | a2130178e7652dc319f3edd9001e2dbb0d44ad6b (patch) | |
tree | a97749ed757720b8cd1d6dc1e1f8e852904142ff | |
parent | 2bd2b6be009b493d6179ff8936629cd39a50c624 (diff) | |
download | ydb-a2130178e7652dc319f3edd9001e2dbb0d44ad6b.tar.gz |
YQ-1098 Unite async io factories in one factory
Refactor in the rest of the code
Unite factories in dq library
Unite async io in one header file
ref:f5b935fd3acd4e24bd334c0ed761317bfeba6a6e
42 files changed, 267 insertions, 338 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index 68d86e72aab..8b36af9efd5 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -16,12 +16,12 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput namespace NKqp { IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask&& task, - NYql::NDq::IDqSourceFactory::TPtr sourceFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory, NYql::NDq::IDqOutputTransformFactory::TPtr transformFactory, + NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits); IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId, - NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqSourceFactory::TPtr sourceFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory, NYql::NDq::IDqOutputTransformFactory::TPtr transformFactory, + NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters); diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index c0afd232d3b..dd24f15d395 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -33,10 +33,10 @@ public: } TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, - IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits) - : TBase(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true) + : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true) , ComputeCtx(settings.StatsMode) { if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) { @@ -318,12 +318,12 @@ private: } // anonymous namespace IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, - IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits) { - return new TKqpComputeActor(executerId, txId, std::move(task), std::move(sourceFactory), - std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits); + return new TKqpComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory), + functionRegistry, settings, memoryLimits); } } // namespace NKqp diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index f83a17f54e3..48dfeb1dcd3 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -72,10 +72,10 @@ public: } TKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId, - NDqProto::TDqTask&& task, IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters) - : TBase(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits) + : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits) , ComputeCtx(settings.StatsMode) , Snapshot(snapshot) , Counters(counters) @@ -1140,11 +1140,11 @@ private: } // anonymous namespace IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId, - NDqProto::TDqTask&& task, IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters) { - return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), + return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, counters); } diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp index a405a06f3de..b0d4947f654 100644 --- a/ydb/core/kqp/executer/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer/kqp_data_executer.cpp @@ -1240,7 +1240,7 @@ private: return false; }; - auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), nullptr, nullptr, nullptr, nullptr, settings, limits); + auto computeActor = CreateKqpComputeActor(SelfId(), TxId, std::move(taskDesc), nullptr, nullptr, settings, limits); auto computeActorId = Register(computeActor); task.ComputeActorId = computeActorId; diff --git a/ydb/core/kqp/node/kqp_node.cpp b/ydb/core/kqp/node/kqp_node.cpp index 80728c7160f..bf7d7f69c2d 100644 --- a/ydb/core/kqp/node/kqp_node.cpp +++ b/ydb/core/kqp/node/kqp_node.cpp @@ -311,11 +311,11 @@ private: IActor* computeActor; if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) { computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask), - nullptr, nullptr, nullptr, nullptr, runtimeSettings, memoryLimits, Counters); + nullptr, nullptr, runtimeSettings, memoryLimits, Counters); taskCtx.ComputeActorId = Register(computeActor); } else { if (Y_LIKELY(!CaFactory)) { - computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), nullptr, nullptr, nullptr, nullptr, runtimeSettings, + computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), nullptr, nullptr, runtimeSettings, memoryLimits); taskCtx.ComputeActorId = Register(computeActor); } else { diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index c963406650e..9f2b32a25fd 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -115,8 +115,7 @@ void Init( NYql::CreateYdbDqTaskTransformFactory() }); - auto sourceFactory = MakeIntrusive<NYql::NDq::TDqSourceFactory>(); - auto sinkFactory = MakeIntrusive<NYql::NDq::TDqSinkFactory>(); + auto asyncIoFactory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>(); NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory; const auto httpGateway = NYql::IHTTPGateway::Make( @@ -135,14 +134,14 @@ void Init( } if (protoConfig.GetPrivateApi().GetEnabled()) { - RegisterDqPqReadActorFactory(*sourceFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode()); - RegisterYdbReadActorFactory(*sourceFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); - RegisterS3ReadActorFactory(*sourceFactory, credentialsFactory, + RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode()); + RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); + RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, std::make_shared<NYql::NS3::TRetryConfig>(protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetRetryConfig())); - RegisterClickHouseReadActorFactory(*sourceFactory, credentialsFactory, httpGateway); + RegisterClickHouseReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway); - RegisterDqPqWriteActorFactory(*sinkFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); - RegisterDQSolomonWriteActorFactory(*sinkFactory, credentialsFactory); + RegisterDqPqWriteActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); + RegisterDQSolomonWriteActorFactory(*asyncIoFactory, credentialsFactory); } ui64 mkqlInitialMemoryLimit = 8_GB; @@ -160,8 +159,7 @@ void Init( NYql::NDqs::TLocalWorkerManagerOptions lwmOptions; lwmOptions.Counters = workerManagerCounters; lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, false); - lwmOptions.SourceFactory = sourceFactory; - lwmOptions.SinkFactory = sinkFactory; + lwmOptions.AsyncIoFactory = asyncIoFactory; lwmOptions.FunctionRegistry = appData->FunctionRegistry; lwmOptions.TaskRunnerInvokerFactory = new NYql::NDqs::TTaskRunnerInvokerFactory(); lwmOptions.MkqlInitialMemoryLimit = mkqlInitialMemoryLimit; diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index a84f863e72e..eff16a0825a 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -29,11 +29,11 @@ public: static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR"; TDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, - IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory) - : TBase(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ false) + : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ false) , TaskRunnerActorFactory(taskRunnerActorFactory) , ReadyToCheckpointFlag(false) , SentStatsRequest(false) @@ -573,13 +573,13 @@ private: IActor* CreateDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask&& task, - IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory) { - return new TDqAsyncComputeActor(executerId, txId, std::move(task), std::move(sourceFactory), - std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, taskRunnerActorFactory); + return new TDqAsyncComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory), + functionRegistry, settings, memoryLimits, taskRunnerActorFactory); } } // namespace NDq diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h index 95219cb5a99..23b6c897193 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h @@ -1,7 +1,6 @@ #pragma once -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> #include <ydb/library/yql/dq/actors/dq_events_ids.h> #include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> @@ -19,7 +18,7 @@ namespace NYql { namespace NDq { NActors::IActor* CreateDqAsyncComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, - IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp index 85e6d3c96e3..6d82b5a9c98 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp @@ -33,11 +33,11 @@ public: static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR"; TDqComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, - IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TTaskRunnerFactory& taskRunnerFactory) - : TBase(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits) + : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits) , TaskRunnerFactory(taskRunnerFactory) {} @@ -68,12 +68,12 @@ private: IActor* CreateDqComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask&& task, - IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TTaskRunnerFactory& taskRunnerFactory) { - return new TDqComputeActor(executerId, txId, std::move(task), std::move(sourceFactory), - std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, taskRunnerFactory); + return new TDqComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory), + functionRegistry, settings, memoryLimits, taskRunnerFactory); } } // namespace NDq diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index 5f839ccec60..5db7135fdef 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -1,7 +1,6 @@ #pragma once -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/dq/actors/dq_events_ids.h> #include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> #include <ydb/library/yql/dq/common/dq_common.h> @@ -257,7 +256,7 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& NDqProto::TDqTaskStats* protoTask, bool withProfileStats); NActors::IActor* CreateDqComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, - IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TTaskRunnerFactory& taskRunnerFactory); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h deleted file mode 100644 index 113120031f3..00000000000 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h +++ /dev/null @@ -1,98 +0,0 @@ -#pragma once -#include <ydb/library/yql/dq/common/dq_common.h> -#include <ydb/library/yql/dq/actors/dq_events_ids.h> -#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> -#include <ydb/library/yql/public/issue/yql_issue.h> - -#include <util/generic/ptr.h> - -#include <memory> -#include <utility> - -namespace NYql::NDqProto { -class TCheckpoint; -class TTaskInput; -class TSourceState; -} // namespace NYql::NDqProto - -namespace NActors { -class IActor; -} // namespace NActors - -namespace NYql::NDq { - -// Source/transform. -// Must be IActor. -// -// Protocol: -// 1. CA starts source/transform. -// 2. CA calls IDqComputeActorAsyncInput::GetAsyncInputData(batch, FreeSpace). -// 3. Source/transform sends TEvNewAsyncInputDataArrived when it has data to process. -// 4. CA calls IDqComputeActorAsyncInput::GetAsyncInputData(batch, FreeSpace) to get data when it is ready to process it. -// -// In case of error source/transform sends TEvAsyncInputError -// -// Checkpointing: -// 1. InjectCheckpoint event arrives to CA. -// 2. ... -// 3. CA calls IDqComputeActorAsyncInput::SaveState() and IDqTaskRunner::SaveGraphState() and uses this pair as state for CA. -// 3. ... -// 5. CA calls IDqComputeActorAsyncInput::CommitState() to apply all side effects. -struct IDqComputeActorAsyncInput { - struct TEvNewAsyncInputDataArrived : public NActors::TEventLocal<TEvNewAsyncInputDataArrived, TDqComputeEvents::EvNewAsyncInputDataArrived> { - const ui64 InputIndex; - explicit TEvNewAsyncInputDataArrived(ui64 inputIndex) - : InputIndex(inputIndex) - {} - }; - - struct TEvAsyncInputError : public NActors::TEventLocal<TEvAsyncInputError, TDqComputeEvents::EvAsyncInputError> { - TEvAsyncInputError(ui64 inputIndex, const TIssues& issues, bool isFatal) - : InputIndex(inputIndex) - , Issues(issues) - , IsFatal(isFatal) - {} - - const ui64 InputIndex; - const TIssues Issues; - const bool IsFatal; - }; - - virtual ui64 GetInputIndex() const = 0; - - // Gets data and returns space used by filled data batch. - // Method should be called under bound mkql allocator. - // Could throw YQL errors. - virtual i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, bool& finished, i64 freeSpace) = 0; - - // Checkpointing. - virtual void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TSourceState& state) = 0; - virtual void CommitState(const NDqProto::TCheckpoint& checkpoint) = 0; // Apply side effects related to this checkpoint. - virtual void LoadState(const NDqProto::TSourceState& state) = 0; - - virtual void PassAway() = 0; // The same signature as IActor::PassAway() - - virtual ~IDqComputeActorAsyncInput() = default; -}; - -struct IDqSourceFactory : public TThrRefBase { - using TPtr = TIntrusivePtr<IDqSourceFactory>; - - struct TArguments { - const NDqProto::TTaskInput& InputDesc; - ui64 InputIndex; - TTxId TxId; - const THashMap<TString, TString>& SecureParams; - const THashMap<TString, TString>& TaskParams; - const NActors::TActorId& ComputeActorId; - const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv; - const NKikimr::NMiniKQL::THolderFactory& HolderFactory; - }; - - // Creates source. - // Could throw YQL errors. - // IActor* and IDqComputeActorAsyncInput* returned by method must point to the objects with consistent lifetime. - virtual std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSource(TArguments&& args) const = 0; -}; - -} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 082450196cb..e6f6ad01626 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -1,4 +1,5 @@ #pragma once +#include <ydb/library/yql/dq/actors/dq_events_ids.h> #include <ydb/library/yql/dq/common/dq_common.h> #include <ydb/library/yql/dq/runtime/dq_output_consumer.h> #include <ydb/library/yql/dq/runtime/dq_async_output.h> @@ -12,6 +13,8 @@ namespace NYql::NDqProto { class TCheckpoint; +class TTaskInput; +class TSourceState; class TTaskOutput; class TSinkState; } // namespace NYql::NDqProto @@ -26,6 +29,60 @@ class TProgramBuilder; namespace NYql::NDq { +// Source/transform. +// Must be IActor. +// +// Protocol: +// 1. CA starts source/transform. +// 2. CA calls IDqComputeActorAsyncInput::GetAsyncInputData(batch, FreeSpace). +// 3. Source/transform sends TEvNewAsyncInputDataArrived when it has data to process. +// 4. CA calls IDqComputeActorAsyncInput::GetAsyncInputData(batch, FreeSpace) to get data when it is ready to process it. +// +// In case of error source/transform sends TEvAsyncInputError +// +// Checkpointing: +// 1. InjectCheckpoint event arrives to CA. +// 2. ... +// 3. CA calls IDqComputeActorAsyncInput::SaveState() and IDqTaskRunner::SaveGraphState() and uses this pair as state for CA. +// 3. ... +// 5. CA calls IDqComputeActorAsyncInput::CommitState() to apply all side effects. +struct IDqComputeActorAsyncInput { + struct TEvNewAsyncInputDataArrived : public NActors::TEventLocal<TEvNewAsyncInputDataArrived, TDqComputeEvents::EvNewAsyncInputDataArrived> { + const ui64 InputIndex; + explicit TEvNewAsyncInputDataArrived(ui64 inputIndex) + : InputIndex(inputIndex) + {} + }; + + struct TEvAsyncInputError : public NActors::TEventLocal<TEvAsyncInputError, TDqComputeEvents::EvAsyncInputError> { + TEvAsyncInputError(ui64 inputIndex, const TIssues& issues, bool isFatal) + : InputIndex(inputIndex) + , Issues(issues) + , IsFatal(isFatal) + {} + + const ui64 InputIndex; + const TIssues Issues; + const bool IsFatal; + }; + + virtual ui64 GetInputIndex() const = 0; + + // Gets data and returns space used by filled data batch. + // Method should be called under bound mkql allocator. + // Could throw YQL errors. + virtual i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, bool& finished, i64 freeSpace) = 0; + + // Checkpointing. + virtual void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TSourceState& state) = 0; + virtual void CommitState(const NDqProto::TCheckpoint& checkpoint) = 0; // Apply side effects related to this checkpoint. + virtual void LoadState(const NDqProto::TSourceState& state) = 0; + + virtual void PassAway() = 0; // The same signature as IActor::PassAway() + + virtual ~IDqComputeActorAsyncInput() = default; +}; + // Sink/transform. // Must be IActor. // @@ -77,10 +134,22 @@ struct IDqComputeActorAsyncOutput { virtual ~IDqComputeActorAsyncOutput() = default; }; -struct IDqSinkFactory : public TThrRefBase { - using TPtr = TIntrusivePtr<IDqSinkFactory>; +struct IDqAsyncIoFactory : public TThrRefBase { +public: + using TPtr = TIntrusivePtr<IDqAsyncIoFactory>; - struct TArguments { + struct TSourceArguments { + const NDqProto::TTaskInput& InputDesc; + ui64 InputIndex; + TTxId TxId; + const THashMap<TString, TString>& SecureParams; + const THashMap<TString, TString>& TaskParams; + const NActors::TActorId& ComputeActorId; + const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv; + const NKikimr::NMiniKQL::THolderFactory& HolderFactory; + }; + + struct TSinkArguments { const NDqProto::TTaskOutput& OutputDesc; ui64 OutputIndex; TTxId TxId; @@ -90,17 +159,7 @@ struct IDqSinkFactory : public TThrRefBase { const NKikimr::NMiniKQL::THolderFactory& HolderFactory; }; - // Creates sink. - // Could throw YQL errors. - // IActor* and IDqComputeActorAsyncOutput* returned by method must point to the objects with consistent lifetime. - virtual std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSink(TArguments&& args) const = 0; -}; - -struct IDqOutputTransformFactory : public TThrRefBase { -public: - using TPtr = TIntrusivePtr<IDqOutputTransformFactory>; - - struct TArguments { + struct TOutputTransformArguments { const NDqProto::TTaskOutput& OutputDesc; const ui64 OutputIndex; TTxId TxId; @@ -112,7 +171,20 @@ public: NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder; }; - virtual std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqOutputTransform(TArguments&& args) = 0; + // Creates source. + // Could throw YQL errors. + // IActor* and IDqComputeActorAsyncInput* returned by method must point to the objects with consistent lifetime. + virtual std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSource(TSourceArguments&& args) const = 0; + + // Creates sink. + // Could throw YQL errors. + // IActor* and IDqComputeActorAsyncOutput* returned by method must point to the objects with consistent lifetime. + virtual std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSink(TSinkArguments&& args) const = 0; + + // Creates output transform. + // Could throw YQL errors. + // IActor* and IDqComputeActorAsyncOutput* returned by method must point to the objects with consistent lifetime. + virtual std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqOutputTransform(TOutputTransformArguments&& args) = 0; }; } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp index b0ae4144ab5..9d34bb4593e 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp @@ -1,17 +1,16 @@ #include "dq_compute_actor_async_io_factory.h" -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/dq/common/dq_common.h> #include <ydb/library/yql/minikql/mkql_program_builder.h> namespace NYql::NDq { -std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> TDqSourceFactory::CreateDqSource(IDqSourceFactory::TArguments&& args) const +std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> TDqAsyncIoFactory::CreateDqSource(TSourceArguments&& args) const { const TString& type = args.InputDesc.GetSource().GetType(); YQL_ENSURE(!type.empty(), "Attempt to create source of empty type"); - const TCreatorFunction* creatorFunc = CreatorsByType.FindPtr(type); + const TSourceCreatorFunction* creatorFunc = SourceCreatorsByType.FindPtr(type); YQL_ENSURE(creatorFunc, "Unknown type of source: \"" << type << "\""); std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> actor = (*creatorFunc)(std::move(args)); Y_VERIFY(actor.first); @@ -19,17 +18,17 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> TDqSourceFactory::Create return actor; } -void TDqSourceFactory::Register(const TString& type, TCreatorFunction creator) +void TDqAsyncIoFactory::RegisterSource(const TString& type, TSourceCreatorFunction creator) { - auto [_, registered] = CreatorsByType.emplace(type, std::move(creator)); + auto [_, registered] = SourceCreatorsByType.emplace(type, std::move(creator)); Y_VERIFY(registered); } -std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> TDqSinkFactory::CreateDqSink(IDqSinkFactory::TArguments&& args) const +std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> TDqAsyncIoFactory::CreateDqSink(TSinkArguments&& args) const { const TString& type = args.OutputDesc.GetSink().GetType(); YQL_ENSURE(!type.empty(), "Attempt to create sink of empty type"); - const TCreatorFunction* creatorFunc = CreatorsByType.FindPtr(type); + const TSinkCreatorFunction* creatorFunc = SinkCreatorsByType.FindPtr(type); YQL_ENSURE(creatorFunc, "Unknown type of sink: \"" << type << "\""); std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> actor = (*creatorFunc)(std::move(args)); Y_VERIFY(actor.first); @@ -37,24 +36,27 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> TDqSinkFactory::CreateD return actor; } -void TDqSinkFactory::Register(const TString& type, TCreatorFunction creator) +void TDqAsyncIoFactory::RegisterSink(const TString& type, TSinkCreatorFunction creator) { - auto [_, registered] = CreatorsByType.emplace(type, std::move(creator)); + auto [_, registered] = SinkCreatorsByType.emplace(type, std::move(creator)); Y_VERIFY(registered); } -std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> TDqOutputTransformFactory::CreateDqOutputTransform(TArguments&& args) { - auto creator = CreatorsByType.find(args.OutputDesc.GetTransform().GetType()); - YQL_ENSURE(creator != CreatorsByType.end(), "Unregistered type of transform actor: \"" << args.OutputDesc.GetTransform().GetType() << "\""); - - std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> actor = (creator->second)(std::move(args)); +std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> TDqAsyncIoFactory::CreateDqOutputTransform(TOutputTransformArguments&& args) +{ + const TString& type = args.OutputDesc.GetTransform().GetType(); + YQL_ENSURE(!type.empty(), "Attempt to create output transform of empty type"); + const TOutputTransformCreatorFunction* creatorFunc = OutputTransformCreatorsByType.FindPtr(type); + YQL_ENSURE(creatorFunc, "Unknown type of output transform: \"" << type << "\""); + std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> actor = (*creatorFunc)(std::move(args)); Y_VERIFY(actor.first); Y_VERIFY(actor.second); return actor; } -void TDqOutputTransformFactory::Register(const TString& type, TCreatorFunction creator) { - auto [_, registered] = CreatorsByType.emplace(type, std::move(creator)); +void TDqAsyncIoFactory::RegisterOutputTransform(const TString& type, TOutputTransformCreatorFunction creator) +{ + auto [_, registered] = OutputTransformCreatorsByType.emplace(type, std::move(creator)); Y_VERIFY(registered); } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h index b893d26b786..c045b5b48b6 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h @@ -1,6 +1,5 @@ #pragma once -#include "dq_compute_actor_async_input.h" -#include "dq_compute_actor_async_output.h" +#include "dq_compute_actor_async_io.h" #include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> #include <ydb/library/yql/dq/common/dq_common.h> @@ -17,63 +16,54 @@ concept TCastsToAsyncInputPair = std::is_convertible_v<T, std::pair<IDqComputeActorAsyncInput*, NActors::IActor*>>; template <class T, class TProto> -concept TSourceCreatorFunc = requires(T f, TProto&& settings, IDqSourceFactory::TArguments args) { +concept TSourceCreatorFunc = requires(T f, TProto&& settings, IDqAsyncIoFactory::TSourceArguments args) { { f(std::move(settings), std::move(args)) } -> TCastsToAsyncInputPair; }; -class TDqSourceFactory : public IDqSourceFactory { -public: - using TCreatorFunction = std::function<std::pair<IDqComputeActorAsyncInput*, NActors::IActor*>(TArguments&& args)>; - - std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSource(TArguments&& args) const override; - - void Register(const TString& type, TCreatorFunction creator); - - template <class TProtoMsg, TSourceCreatorFunc<TProtoMsg> TCreatorFunc> - void Register(const TString& type, TCreatorFunc creator) { - Register(type, - [creator = std::move(creator), type](TArguments&& args) - { - const google::protobuf::Any& settingsAny = args.InputDesc.GetSource().GetSettings(); - YQL_ENSURE(settingsAny.Is<TProtoMsg>(), - "Source \"" << type << "\" settings are expected to have protobuf type " << TProtoMsg::descriptor()->full_name() - << ", but got " << settingsAny.type_url()); - TProtoMsg settings; - YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings of type \"" << type << "\""); - return creator(std::move(settings), std::move(args)); - }); - } - -private: - THashMap<TString, TCreatorFunction> CreatorsByType; -}; - template <class T> concept TCastsToAsyncOutputPair = std::is_convertible_v<T, std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*>>; template <class T, class TProto> -concept TSinkCreatorFunc = requires(T f, TProto&& settings, IDqSinkFactory::TArguments&& args) { +concept TSinkCreatorFunc = requires(T f, TProto&& settings, IDqAsyncIoFactory::TSinkArguments&& args) { { f(std::move(settings), std::move(args)) } -> TCastsToAsyncOutputPair; }; template <class T, class TProto> -concept TOutputTransformCreatorFunc = requires(T f, TProto&& settings, IDqOutputTransformFactory::TArguments&& args) { +concept TOutputTransformCreatorFunc = requires(T f, TProto&& settings, IDqAsyncIoFactory::TOutputTransformArguments&& args) { { f(std::move(settings), std::move(args)) } -> TCastsToAsyncOutputPair; }; -class TDqSinkFactory : public IDqSinkFactory { +class TDqAsyncIoFactory : public IDqAsyncIoFactory { public: - using TCreatorFunction = std::function<std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*>(TArguments&& args)>; + using TSourceCreatorFunction = std::function<std::pair<IDqComputeActorAsyncInput*, NActors::IActor*>(TSourceArguments&& args)>; + using TSinkCreatorFunction = std::function<std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*>(TSinkArguments&& args)>; + using TOutputTransformCreatorFunction = std::function<std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*>(TOutputTransformArguments&& args)>; - std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSink(TArguments&& args) const override; + // Registration + void RegisterSource(const TString& type, TSourceCreatorFunction creator); - void Register(const TString& type, TCreatorFunction creator); + template <class TProtoMsg, TSourceCreatorFunc<TProtoMsg> TCreatorFunc> + void RegisterSource(const TString& type, TCreatorFunc creator) { + RegisterSource(type, + [creator = std::move(creator), type](TSourceArguments&& args) + { + const google::protobuf::Any& settingsAny = args.InputDesc.GetSource().GetSettings(); + YQL_ENSURE(settingsAny.Is<TProtoMsg>(), + "Source \"" << type << "\" settings are expected to have protobuf type " << TProtoMsg::descriptor()->full_name() + << ", but got " << settingsAny.type_url()); + TProtoMsg settings; + YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings of type \"" << type << "\""); + return creator(std::move(settings), std::move(args)); + }); + } + + void RegisterSink(const TString& type, TSinkCreatorFunction creator); template <class TProtoMsg, TSinkCreatorFunc<TProtoMsg> TCreatorFunc> - void Register(const TString& type, TCreatorFunc creator) { - Register(type, - [creator = std::move(creator), type](TArguments&& args) + void RegisterSink(const TString& type, TCreatorFunc creator) { + RegisterSink(type, + [creator = std::move(creator), type](TSinkArguments&& args) { const google::protobuf::Any& settingsAny = args.OutputDesc.GetSink().GetSettings(); YQL_ENSURE(settingsAny.Is<TProtoMsg>(), @@ -85,22 +75,12 @@ public: }); } -private: - THashMap<TString, TCreatorFunction> CreatorsByType; -}; - -class TDqOutputTransformFactory : public IDqOutputTransformFactory { -public: - using TCreatorFunction = std::function<std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*>(TArguments&& args)>; - - std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqOutputTransform(TArguments&& args) override; - - void Register(const TString& type, TCreatorFunction creator); + void RegisterOutputTransform(const TString& type, TOutputTransformCreatorFunction creator); template <class TProtoMsg, TOutputTransformCreatorFunc<TProtoMsg> TCreatorFunc> - void Register(const TString& type, TCreatorFunc creator) { - Register(type, - [creator = std::move(creator), type](TArguments&& args) + void RegisterOutputTransform(const TString& type, TCreatorFunc creator) { + RegisterOutputTransform(type, + [creator = std::move(creator), type](TOutputTransformArguments&& args) { const google::protobuf::Any& settingsAny = args.OutputDesc.GetTransform().GetSettings(); YQL_ENSURE(settingsAny.Is<TProtoMsg>(), @@ -112,8 +92,15 @@ public: }); } + // Creation + std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSource(TSourceArguments&& args) const override; + std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSink(TSinkArguments&& args) const override; + std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqOutputTransform(TOutputTransformArguments&& args) override; + private: - std::unordered_map<TString, TCreatorFunction> CreatorsByType; + THashMap<TString, TSourceCreatorFunction> SourceCreatorsByType; + THashMap<TString, TSinkCreatorFunction> SinkCreatorsByType; + THashMap<TString, TOutputTransformCreatorFunction> OutputTransformCreatorsByType; }; } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h index 4ec41f04e4c..e0e671fdac4 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h @@ -1,7 +1,7 @@ #pragma once #include "dq_compute_actor.h" -#include "dq_compute_actor_async_output.h" +#include "dq_compute_actor_async_io.h" #include "retry_queue.h" #include <ydb/library/yql/dq/common/dq_common.h> diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 3d4b2182d3d..aeb1e46847e 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -3,8 +3,7 @@ #include "dq_compute_actor.h" #include "dq_compute_actor_channels.h" #include "dq_compute_actor_checkpoints.h" -#include "dq_compute_actor_async_input.h" -#include "dq_compute_actor_async_output.h" +#include "dq_compute_actor_async_io.h" #include "dq_compute_issues_buffer.h" #include "dq_compute_memory_quota.h" @@ -142,7 +141,7 @@ public: protected: TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, - IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, bool ownMemoryQuota = true, bool passExceptions = false) : ExecuterId(executerId) @@ -151,9 +150,7 @@ protected: , RuntimeSettings(settings) , MemoryLimits(memoryLimits) , CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn) - , SourceFactory(std::move(sourceFactory)) - , SinkFactory(std::move(sinkFactory)) - , OutputTransformFactory(std::move(transformFactory)) + , AsyncIoFactory(std::move(asyncIoFactory)) , FunctionRegistry(functionRegistry) , CheckpointingMode(GetTaskCheckpointingMode(Task)) , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) @@ -168,7 +165,7 @@ protected: } TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, const NDqProto::TDqTask& task, - IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits) : ExecuterId(executerId) @@ -177,9 +174,7 @@ protected: , RuntimeSettings(settings) , MemoryLimits(memoryLimits) , CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn) - , SourceFactory(std::move(sourceFactory)) - , SinkFactory(std::move(sinkFactory)) - , OutputTransformFactory(std::move(transformFactory)) + , AsyncIoFactory(std::move(asyncIoFactory)) , FunctionRegistry(functionRegistry) , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) , MemoryQuota(InitMemoryQuota()) @@ -1249,12 +1244,12 @@ protected: } for (auto& [inputIndex, source] : SourcesMap) { if (TaskRunner) { source.Buffer = TaskRunner->GetSource(inputIndex); Y_VERIFY(source.Buffer);} - Y_VERIFY(SourceFactory); + Y_VERIFY(AsyncIoFactory); const auto& inputDesc = Task.GetInputs(inputIndex); const ui64 i = inputIndex; // Crutch for clang CA_LOG_D("Create source for input " << i << " " << inputDesc); - std::tie(source.AsyncInput, source.Actor) = SourceFactory->CreateDqSource( - IDqSourceFactory::TArguments{ + std::tie(source.AsyncInput, source.Actor) = AsyncIoFactory->CreateDqSource( + IDqAsyncIoFactory::TSourceArguments { .InputDesc = inputDesc, .InputIndex = inputIndex, .TxId = TxId, @@ -1275,12 +1270,12 @@ protected: if (TaskRunner) { transform.ProgramBuilder.ConstructInPlace(TaskRunner->GetTypeEnv(), *FunctionRegistry); std::tie(transform.Buffer, transform.OutputBuffer) = TaskRunner->GetOutputTransform(outputIndex); - Y_VERIFY(OutputTransformFactory); + Y_VERIFY(AsyncIoFactory); const auto& outputDesc = Task.GetOutputs(outputIndex); const ui64 i = outputIndex; // Crutch for clang CA_LOG_D("Create transform for output " << i << " " << outputDesc.ShortDebugString()); - std::tie(transform.AsyncOutput, transform.Actor) = OutputTransformFactory->CreateDqOutputTransform( - IDqOutputTransformFactory::TArguments { + std::tie(transform.AsyncOutput, transform.Actor) = AsyncIoFactory->CreateDqOutputTransform( + IDqAsyncIoFactory::TOutputTransformArguments { .OutputDesc = outputDesc, .OutputIndex = outputIndex, .TxId = TxId, @@ -1297,12 +1292,12 @@ protected: } for (auto& [outputIndex, sink] : SinksMap) { if (TaskRunner) { sink.Buffer = TaskRunner->GetSink(outputIndex); } - Y_VERIFY(SinkFactory); + Y_VERIFY(AsyncIoFactory); const auto& outputDesc = Task.GetOutputs(outputIndex); const ui64 i = outputIndex; // Crutch for clang CA_LOG_D("Create sink for output " << i << " " << outputDesc.ShortDebugString()); - std::tie(sink.AsyncOutput, sink.Actor) = SinkFactory->CreateDqSink( - IDqSinkFactory::TArguments { + std::tie(sink.AsyncOutput, sink.Actor) = AsyncIoFactory->CreateDqSink( + IDqAsyncIoFactory::TSinkArguments { .OutputDesc = outputDesc, .OutputIndex = outputIndex, .TxId = TxId, @@ -1576,9 +1571,7 @@ protected: const TComputeRuntimeSettings RuntimeSettings; const TComputeMemoryLimits MemoryLimits; const bool CanAllocateExtraMemory = false; - const IDqSourceFactory::TPtr SourceFactory; - const IDqSinkFactory::TPtr SinkFactory; - const IDqOutputTransformFactory::TPtr OutputTransformFactory; + const IDqAsyncIoFactory::TPtr AsyncIoFactory; const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; const NDqProto::ECheckpointingMode CheckpointingMode; TIntrusivePtr<IDqTaskRunner> TaskRunner; diff --git a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.h b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.h index 2c2e1345579..9958901969e 100644 --- a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.h +++ b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.h @@ -1,6 +1,6 @@ #pragma once -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> #include <ydb/library/yql/providers/clickhouse/proto/source.pb.h> #include <ydb/library/yql/providers/common/token_accessor/client/factory.h> diff --git a/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.cpp b/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.cpp index c18d84398ad..917cef24dde 100644 --- a/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.cpp +++ b/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.cpp @@ -1,13 +1,13 @@ #include "yql_ch_source_factory.h" #include "yql_ch_read_actor.h" -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> namespace NYql::NDq { -void RegisterClickHouseReadActorFactory(TDqSourceFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway) { - factory.Register<NCH::TSource>("ClickHouseSource", - [credentialsFactory, gateway](NCH::TSource&& settings, IDqSourceFactory::TArguments&& args) { +void RegisterClickHouseReadActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway) { + factory.RegisterSource<NCH::TSource>("ClickHouseSource", + [credentialsFactory, gateway](NCH::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { return CreateClickHouseReadActor(gateway, std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory); }); } diff --git a/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.h b/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.h index 50f38e81e8f..41350011310 100644 --- a/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.h +++ b/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.h @@ -1,7 +1,7 @@ #pragma once #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> @@ -9,6 +9,6 @@ namespace NYql::NDq { -void RegisterClickHouseReadActorFactory(TDqSourceFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway = IHTTPGateway::Make()); +void RegisterClickHouseReadActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway = IHTTPGateway::Make()); } diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h index 39a9df5f378..b58ab1ce3f4 100644 --- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h +++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h @@ -1,8 +1,7 @@ #pragma once #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> #include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h> #include <ydb/library/yql/minikql/computation/mkql_value_builder.h> diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp index 956c39eb061..c09a795e9ec 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp @@ -65,9 +65,7 @@ IActor* CreateComputeActor( executerId, operationId, std::move(task), - options.SourceFactory, - options.SinkFactory, - options.TransformFactory, + options.AsyncIoFactory, options.FunctionRegistry, computeRuntimeSettings, memoryLimits, @@ -77,9 +75,7 @@ IActor* CreateComputeActor( executerId, operationId, std::move(task), - options.SourceFactory, - options.SinkFactory, - options.TransformFactory, + options.AsyncIoFactory, options.FunctionRegistry, computeRuntimeSettings, memoryLimits, diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index fc44dd201dc..f6694c5b72c 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -76,13 +76,11 @@ public: explicit TDqWorker( const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, - const IDqSourceFactory::TPtr& sourceFactory, - const IDqSinkFactory::TPtr& sinkFactory, + const IDqAsyncIoFactory::TPtr& asyncIoFactory, TWorkerRuntimeData* runtimeData, const TString& traceId) : TRichActor<TDqWorker>(&TDqWorker::Handler) - , SourceFactory(sourceFactory) - , SinkFactory(sinkFactory) + , AsyncIoFactory(asyncIoFactory) , TaskRunnerActorFactory(taskRunnerActorFactory) , RuntimeData(runtimeData) , TraceId(traceId) @@ -265,8 +263,8 @@ private: auto& source = SourcesMap[inputId]; source.TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv); std::tie(source.Source, source.Actor) = - SourceFactory->CreateDqSource( - IDqSourceFactory::TArguments{ + AsyncIoFactory->CreateDqSource( + IDqAsyncIoFactory::TSourceArguments { .InputDesc = input, .InputIndex = static_cast<ui64>(inputId), .TxId = TraceId, @@ -295,8 +293,8 @@ private: if (output.HasSink()) { auto& sink = SinksMap[outputId]; sink.TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv); - std::tie(sink.Sink, sink.Actor) = SinkFactory->CreateDqSink( - IDqSinkFactory::TArguments { + std::tie(sink.Sink, sink.Actor) = AsyncIoFactory->CreateDqSink( + IDqAsyncIoFactory::TSinkArguments { .OutputDesc = output, .OutputIndex = static_cast<ui64>(outputId), .TxId = TraceId, @@ -706,8 +704,7 @@ private: /*_________________________________________________________*/ - IDqSourceFactory::TPtr SourceFactory; - IDqSinkFactory::TPtr SinkFactory; + IDqAsyncIoFactory::TPtr AsyncIoFactory; ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory; NTaskRunnerActor::ITaskRunnerActor* Actor = nullptr; TActorId TaskRunnerActor; @@ -745,15 +742,13 @@ NActors::IActor* CreateWorkerActor( TWorkerRuntimeData* runtimeData, const TString& traceId, const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, - const IDqSourceFactory::TPtr& sourceFactory, - const IDqSinkFactory::TPtr& sinkFactory) + const IDqAsyncIoFactory::TPtr& asyncIoFactory) { Y_VERIFY(taskRunnerActorFactory); return new TLogWrapReceive( new TDqWorker( taskRunnerActorFactory, - sourceFactory, - sinkFactory, + asyncIoFactory, runtimeData, traceId), traceId); } diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.h b/ydb/library/yql/providers/dq/actors/worker_actor.h index 94022f93112..ea1ee7bfdb8 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.h +++ b/ydb/library/yql/providers/dq/actors/worker_actor.h @@ -25,7 +25,6 @@ namespace NYql::NDqs { TWorkerRuntimeData* runtimeData, const TString& traceId, const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, - const NDq::IDqSourceFactory::TPtr& sourceFactory, - const NDq::IDqSinkFactory::TPtr& sinkFactory); + const NDq::IDqAsyncIoFactory::TPtr& asyncIoFactory); } // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp index 82092045a22..62d5db770ad 100644 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp @@ -22,7 +22,7 @@ class TLocalServiceHolder { public: TLocalServiceHolder(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort, - NDq::IDqSourceFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory) + NDq::IDqAsyncIoFactory::TPtr asyncIoFactory) { ui32 nodeId = 1; @@ -48,9 +48,7 @@ public: NDqs::TLocalWorkerManagerOptions lwmOptions; lwmOptions.Factory = NTaskRunnerProxy::CreateFactory(functionRegistry, compFactory, taskTransformFactory, true); - lwmOptions.SourceFactory = std::move(sourceFactory); - lwmOptions.SinkFactory = std::move(sinkFactory); - lwmOptions.TransformFactory = std::move(transformFactory); + lwmOptions.AsyncIoFactory = std::move(asyncIoFactory); lwmOptions.FunctionRegistry = functionRegistry; lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory(); lwmOptions.TaskRunnerActorFactory = NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory( @@ -113,15 +111,15 @@ THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::I NKikimr::NMiniKQL::TComputationNodeFactory compFactory, TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort, - NDq::IDqSourceFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory) + NDq::IDqAsyncIoFactory::TPtr asyncIoFactory) { - return MakeHolder<TLocalServiceHolder>(functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort, std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory)); + return MakeHolder<TLocalServiceHolder>(functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort, std::move(asyncIoFactory)); } TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, - NDq::IDqSourceFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory) + NDq::IDqAsyncIoFactory::TPtr asyncIoFactory) { int startPort = 31337; TRangeWalker<int> portWalker(startPort, startPort+100); @@ -129,7 +127,7 @@ TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctio auto grpcPort = BindInRange(portWalker)[1]; return new TDqGatewayLocal( - CreateLocalServiceHolder(functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort, std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory)), + CreateLocalServiceHolder(functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort, std::move(asyncIoFactory)), CreateDqGateway(std::get<0>(NDqs::GetLocalAddress()), grpcPort.Addr.GetPort(), 8)); } diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h index 8afcb036cd6..01634057daf 100644 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h @@ -9,6 +9,6 @@ namespace NYql { TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, - NDq::IDqSourceFactory::TPtr = nullptr, NDq::IDqSinkFactory::TPtr = nullptr, NDq::IDqOutputTransformFactory::TPtr = nullptr); + NDq::IDqAsyncIoFactory::TPtr = nullptr); } // namespace NYql diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp index 653db074676..a515b687479 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp @@ -261,8 +261,7 @@ private: Options.RuntimeData, traceId, Options.TaskRunnerActorFactory, - Options.SourceFactory, - Options.SinkFactory)); + Options.AsyncIoFactory)); } allocationInfo.WorkerActors.emplace_back(RegisterChild( actor.Release(), createComputeActor ? NYql::NDq::TEvDq::TEvAbortExecution::Unavailable("Aborted by LWM").Release() : nullptr diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h index 65143a3a89c..83b88f1792e 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h @@ -1,7 +1,6 @@ #pragma once -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/minikql/mkql_function_registry.h> #include <ydb/library/yql/providers/dq/worker_manager/interface/events.h> #include <ydb/library/yql/providers/dq/worker_manager/interface/counters.h> @@ -22,9 +21,7 @@ namespace NYql::NDqs { struct TLocalWorkerManagerOptions { TWorkerManagerCounters Counters; NTaskRunnerProxy::IProxyFactory::TPtr Factory; - NDq::IDqSourceFactory::TPtr SourceFactory; - NDq::IDqSinkFactory::TPtr SinkFactory; - NDq::IDqOutputTransformFactory::TPtr TransformFactory; + NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; TWorkerRuntimeData* RuntimeData = nullptr; TTaskRunnerInvokerFactory::TPtr TaskRunnerInvokerFactory; diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index 9d3950e69ce..e7627eff093 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -2,7 +2,7 @@ #include "probes.h" #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> #include <ydb/library/yql/dq/common/dq_common.h> #include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h> @@ -410,11 +410,11 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor( return {actor, actor}; } -void RegisterDqPqReadActorFactory(TDqSourceFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool rangesMode) { - factory.Register<NPq::NProto::TDqPqTopicSource>("PqSource", +void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool rangesMode) { + factory.RegisterSource<NPq::NProto::TDqPqTopicSource>("PqSource", [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), rangesMode]( NPq::NProto::TDqPqTopicSource&& settings, - IDqSourceFactory::TArguments&& args) + IDqAsyncIoFactory::TSourceArguments&& args) { NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(DQ_PQ_PROVIDER)); return CreateDqPqReadActor( diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h index c0a90c05cc4..bc3e5292fae 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h @@ -1,7 +1,7 @@ #pragma once #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/providers/common/token_accessor/client/factory.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> @@ -18,7 +18,7 @@ namespace NYql::NDq { -class TDqSourceFactory; +class TDqAsyncIoFactory; const i64 PQReadDefaultFreeSpace = 16_MB; @@ -36,6 +36,6 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor( bool rangesMode = true ); -void RegisterDqPqReadActorFactory(TDqSourceFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool rangesMode = true); +void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool rangesMode = true); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp index b415d34c5ea..ea5236e9b95 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp @@ -1,7 +1,7 @@ #include "dq_pq_write_actor.h" #include "probes.h" -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> #include <ydb/library/yql/dq/common/dq_common.h> #include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h> @@ -412,11 +412,11 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor( return {actor, actor}; } -void RegisterDqPqWriteActorFactory(TDqSinkFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) { - factory.Register<NPq::NProto::TDqPqTopicSink>("PqSink", +void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) { + factory.RegisterSink<NPq::NProto::TDqPqTopicSink>("PqSink", [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory)]( NPq::NProto::TDqPqTopicSink&& settings, - IDqSinkFactory::TArguments&& args) + IDqAsyncIoFactory::TSinkArguments&& args) { NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(DQ_PQ_PROVIDER)); return CreateDqPqWriteActor( diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h index 651abc94a71..88ac95f466b 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h @@ -1,7 +1,7 @@ #pragma once #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/providers/common/token_accessor/client/factory.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> @@ -30,6 +30,6 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor( IDqComputeActorAsyncOutput::ICallbacks* callbacks, i64 freeSpace = DqPqDefaultFreeSpace); -void RegisterDqPqWriteActorFactory(TDqSinkFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); +void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h b/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h index 8efdc8ff41f..0a06b4dc11d 100644 --- a/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h +++ b/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h @@ -4,8 +4,7 @@ #include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h> #include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> #include <ydb/library/yql/minikql/mkql_alloc.h> diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp index 866a0d7772a..7635df0469b 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp @@ -41,14 +41,10 @@ namespace NYql { -NDq::IDqSourceFactory::TPtr CreateSourceFactory(const NYdb::TDriver& driver) { - auto factory = MakeIntrusive<NYql::NDq::TDqSourceFactory>(); +NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(const NYdb::TDriver& driver) { + auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>(); RegisterDqPqReadActorFactory(*factory, driver, nullptr); - return factory; -} -NDq::IDqSinkFactory::TPtr CreateSinkFactory(const NYdb::TDriver& driver) { - auto factory = MakeIntrusive<NYql::NDq::TDqSinkFactory>(); RegisterDqPqWriteActorFactory(*factory, driver, nullptr); return factory; } @@ -124,7 +120,7 @@ bool RunPqProgram( const auto driverConfig = NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr")); NYdb::TDriver driver(driverConfig); - auto dqGateway = CreateLocalDqGateway(functionRegistry.Get(), dqCompFactory, dqTaskTransformFactory, {}, CreateSourceFactory(driver), CreateSinkFactory(driver)); + auto dqGateway = CreateLocalDqGateway(functionRegistry.Get(), dqCompFactory, dqTaskTransformFactory, {}, CreateAsyncIoFactory(driver)); auto storage = NYql::CreateFileStorage({}); dataProvidersInit.push_back(NYql::GetDqDataProviderInitializer(&CreateDqExecTransformer, dqGateway, dqCompFactory, {}, storage)); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h index fba54a9ccf1..9e323c2a337 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h @@ -1,6 +1,6 @@ #pragma once -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> #include <ydb/library/yql/providers/s3/proto/retry_config.pb.h> #include <ydb/library/yql/providers/s3/proto/source.pb.h> diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp index 757bd4dc0c6..b1c3b208a58 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp @@ -2,21 +2,21 @@ #ifdef __linux__ #include "yql_s3_read_actor.h" -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.h> #endif namespace NYql::NDq { void RegisterS3ReadActorFactory( - TDqSourceFactory& factory, + TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig) { #ifdef __linux__ NDB::registerFormats(); - factory.Register<NS3::TSource>("S3Source", - [credentialsFactory, gateway, retryConfig](NS3::TSource&& settings, IDqSourceFactory::TArguments&& args) { + factory.RegisterSource<NS3::TSource>("S3Source", + [credentialsFactory, gateway, retryConfig](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { return CreateS3ReadActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), gateway, std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryConfig); }); #else diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h index 557ea3f1412..65a07b1b2e7 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h @@ -3,7 +3,7 @@ #include <ydb/library/yql/providers/common/token_accessor/client/factory.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> #include <ydb/library/yql/providers/s3/proto/retry_config.pb.h> @@ -11,7 +11,7 @@ namespace NYql::NDq { void RegisterS3ReadActorFactory( - TDqSourceFactory& factory, + TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway = IHTTPGateway::Make(), const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig = nullptr); diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp index 91cacf02809..21b0d44907a 100644 --- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp +++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp @@ -1,7 +1,7 @@ #include "dq_solomon_write_actor.h" #include "metrics_encoder.h" -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> #include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h> @@ -500,11 +500,11 @@ std::pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSolo return {actor, actor}; } -void RegisterDQSolomonWriteActorFactory(TDqSinkFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) { - factory.Register<NSo::NProto::TDqSolomonShard>("SolomonSink", +void RegisterDQSolomonWriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) { + factory.RegisterSink<NSo::NProto::TDqSolomonShard>("SolomonSink", [credentialsFactory]( NYql::NSo::NProto::TDqSolomonShard&& settings, - IDqSinkFactory::TArguments&& args) + IDqAsyncIoFactory::TSinkArguments&& args) { auto counters = MakeIntrusive<NMonitoring::TDynamicCounters>(); diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h index 3f257970b17..5314129a05c 100644 --- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h +++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h @@ -2,7 +2,7 @@ #include <ydb/library/yql/utils/actors/http_sender.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/providers/common/token_accessor/client/factory.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/providers/solomon/proto/dq_solomon_shard.pb.h> @@ -28,7 +28,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSolo ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, i64 freeSpace = DqSolomonDefaultFreeSpace); -void RegisterDQSolomonWriteActorFactory(TDqSinkFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); +void RegisterDQSolomonWriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); TString GetSolomonUrl(const TString& endpoint, bool useSsl, const TString& project, const TString& cluster, const TString& service, const ::NYql::NSo::NProto::ESolomonClusterType& type); diff --git a/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.h b/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.h index e04a7337737..2d802bdff9c 100644 --- a/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.h +++ b/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.h @@ -3,8 +3,7 @@ #include <ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h> #include <ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_output.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> #include <ydb/library/yql/minikql/mkql_alloc.h> diff --git a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.h b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.h index c7f74c284a0..1a67a7fb3f7 100644 --- a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.h +++ b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.h @@ -1,6 +1,6 @@ #pragma once -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/providers/common/token_accessor/client/factory.h> #include <ydb/library/yql/providers/ydb/proto/source.pb.h> diff --git a/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.cpp b/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.cpp index 9fcde8b8b77..d9f8ae9dafc 100644 --- a/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.cpp +++ b/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.cpp @@ -1,13 +1,13 @@ #include "yql_ydb_source_factory.h" #include "yql_ydb_read_actor.h" -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> namespace NYql::NDq { -void RegisterYdbReadActorFactory(NYql::NDq::TDqSourceFactory& factory, ::NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) { - factory.Register<NYql::NYdb::TSource>("YdbSource", - [driver, credentialsFactory](NYql::NYdb::TSource&& settings, IDqSourceFactory::TArguments&& args) { +void RegisterYdbReadActorFactory(NYql::NDq::TDqAsyncIoFactory& factory, ::NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) { + factory.RegisterSource<NYql::NYdb::TSource>("YdbSource", + [driver, credentialsFactory](NYql::NYdb::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { return CreateYdbReadActor(std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, driver, credentialsFactory); }); } diff --git a/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h b/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h index 0a389c7a4ec..b5a44cf293f 100644 --- a/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h +++ b/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.h @@ -3,12 +3,12 @@ #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/providers/common/token_accessor/client/factory.h> namespace NYql::NDq { -void RegisterYdbReadActorFactory(NYql::NDq::TDqSourceFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); +void RegisterYdbReadActorFactory(NYql::NDq::TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); } |