diff options
| author | Vasily Gerasimov <[email protected]> | 2022-05-24 08:20:02 +0300 |
|---|---|---|
| committer | Vasily Gerasimov <[email protected]> | 2022-05-24 08:20:02 +0300 |
| commit | 3c0775eaf14f1e1c78bebe437135753ed12eaaf7 (patch) | |
| tree | c283596a88a85bc6fc2c89d1a99343ddb68d42bb | |
| parent | 0b208585ec07f732f0a41c7cb65fb6e57858866c (diff) | |
YQ-1098 Rename common source & transform entities
Factory and string literals
GetSourceData -> GetAsyncInputData
Source -> AsyncInput
Rename IDqSourceActor
Rename Source -> Buffer
Rename SourceActorFactory -> SourceFactory
Rename factory
ref:2496a1411c02c3040d9a52059318f29f64c30641
44 files changed, 231 insertions, 233 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index d70dcc3275a..68d86e72aab 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -16,12 +16,12 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput namespace NKqp { IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask&& task, - NYql::NDq::IDqSourceActorFactory::TPtr sourceActorFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory, NYql::NDq::IDqOutputTransformFactory::TPtr transformFactory, + NYql::NDq::IDqSourceFactory::TPtr sourceFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory, NYql::NDq::IDqOutputTransformFactory::TPtr transformFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits); IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId, - NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqSourceActorFactory::TPtr sourceActorFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory, NYql::NDq::IDqOutputTransformFactory::TPtr transformFactory, + NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqSourceFactory::TPtr sourceFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory, NYql::NDq::IDqOutputTransformFactory::TPtr transformFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters); diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index 683cfb600dd..c0afd232d3b 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -33,10 +33,10 @@ public: } TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits) - : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true) + : TBase(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true) , ComputeCtx(settings.StatsMode) { if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) { @@ -318,11 +318,11 @@ private: } // anonymous namespace IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits) { - return new TKqpComputeActor(executerId, txId, std::move(task), std::move(sourceActorFactory), + return new TKqpComputeActor(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits); } diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 61233473529..76aa3d711c3 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -72,10 +72,10 @@ public: } TKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId, - NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + NDqProto::TDqTask&& task, IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters) - : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits) + : TBase(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits) , ComputeCtx(settings.StatsMode) , Snapshot(snapshot) , Counters(counters) @@ -1127,11 +1127,11 @@ private: } // anonymous namespace IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId, - NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + NDqProto::TDqTask&& task, IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters) { - return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), std::move(transformFactory), + return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, counters); } diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index 11cf9146a19..c963406650e 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -115,7 +115,7 @@ void Init( NYql::CreateYdbDqTaskTransformFactory() }); - auto sourceActorFactory = MakeIntrusive<NYql::NDq::TDqSourceFactory>(); + auto sourceFactory = MakeIntrusive<NYql::NDq::TDqSourceFactory>(); auto sinkFactory = MakeIntrusive<NYql::NDq::TDqSinkFactory>(); NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory; @@ -135,11 +135,11 @@ void Init( } if (protoConfig.GetPrivateApi().GetEnabled()) { - RegisterDqPqReadActorFactory(*sourceActorFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode()); - RegisterYdbReadActorFactory(*sourceActorFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); - RegisterS3ReadActorFactory(*sourceActorFactory, credentialsFactory, + RegisterDqPqReadActorFactory(*sourceFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode()); + RegisterYdbReadActorFactory(*sourceFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); + RegisterS3ReadActorFactory(*sourceFactory, credentialsFactory, httpGateway, std::make_shared<NYql::NS3::TRetryConfig>(protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetRetryConfig())); - RegisterClickHouseReadActorFactory(*sourceActorFactory, credentialsFactory, httpGateway); + RegisterClickHouseReadActorFactory(*sourceFactory, credentialsFactory, httpGateway); RegisterDqPqWriteActorFactory(*sinkFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); RegisterDQSolomonWriteActorFactory(*sinkFactory, credentialsFactory); @@ -160,7 +160,7 @@ void Init( NYql::NDqs::TLocalWorkerManagerOptions lwmOptions; lwmOptions.Counters = workerManagerCounters; lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, false); - lwmOptions.SourceActorFactory = sourceActorFactory; + lwmOptions.SourceFactory = sourceFactory; lwmOptions.SinkFactory = sinkFactory; lwmOptions.FunctionRegistry = appData->FunctionRegistry; lwmOptions.TaskRunnerInvokerFactory = new NYql::NDqs::TTaskRunnerInvokerFactory(); @@ -179,7 +179,7 @@ void Init( ::NYql::NCommon::TServiceCounters serviceCounters(appData->Counters); if (protoConfig.GetNodesManager().GetEnabled() || protoConfig.GetPendingFetcher().GetEnabled()) { - auto internal = protoConfig.GetPrivateApi().GetLoopback() + auto internal = protoConfig.GetPrivateApi().GetLoopback() ? CreateLoopbackServiceActor(clientCounters) : CreateInternalServiceActor( yqSharedResources, diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index fc468e163b6..a84f863e72e 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -29,11 +29,11 @@ public: static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR"; TDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory) - : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ false) + : TBase(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ false) , TaskRunnerActorFactory(taskRunnerActorFactory) , ReadyToCheckpointFlag(false) , SentStatsRequest(false) @@ -378,9 +378,9 @@ private: // TODO:(whcrc) maybe save Sources before Program? for (auto& [inputIndex, source] : SourcesMap) { - YQL_ENSURE(source.SourceActor, "Source[" << inputIndex << "] is not created"); + YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created"); NDqProto::TSourceState& sourceState = *state.AddSources(); - source.SourceActor->SaveState(checkpoint, sourceState); + source.AsyncInput->SaveState(checkpoint, sourceState); sourceState.SetInputIndex(inputIndex); } } @@ -573,12 +573,12 @@ private: IActor* CreateDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask&& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory) { - return new TDqAsyncComputeActor(executerId, txId, std::move(task), std::move(sourceActorFactory), + return new TDqAsyncComputeActor(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, taskRunnerActorFactory); } diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h index dae744e3777..662e8085e81 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h @@ -19,7 +19,7 @@ namespace NYql { namespace NDq { NActors::IActor* CreateDqAsyncComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp index 7f8a2564b80..85e6d3c96e3 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp @@ -33,11 +33,11 @@ public: static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR"; TDqComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TTaskRunnerFactory& taskRunnerFactory) - : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits) + : TBase(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits) , TaskRunnerFactory(taskRunnerFactory) {} @@ -68,11 +68,11 @@ private: IActor* CreateDqComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask&& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TTaskRunnerFactory& taskRunnerFactory) { - return new TDqComputeActor(executerId, txId, std::move(task), std::move(sourceActorFactory), + return new TDqComputeActor(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, taskRunnerFactory); } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index 49568b7687c..ab4f8d0af49 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -257,7 +257,7 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& NDqProto::TDqTaskStats* protoTask, bool withProfileStats); NActors::IActor* CreateDqComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TTaskRunnerFactory& taskRunnerFactory); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp index fdbc0648380..1d22e8a1f96 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp @@ -7,13 +7,13 @@ namespace NYql::NDq { -std::pair<IDqSourceActor*, NActors::IActor*> TDqSourceFactory::CreateDqSourceActor(IDqSourceActorFactory::TArguments&& args) const +std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> TDqSourceFactory::CreateDqSource(IDqSourceFactory::TArguments&& args) const { const TString& type = args.InputDesc.GetSource().GetType(); - YQL_ENSURE(!type.empty(), "Attempt to create source actor of empty type"); + YQL_ENSURE(!type.empty(), "Attempt to create source of empty type"); const TCreatorFunction* creatorFunc = CreatorsByType.FindPtr(type); - YQL_ENSURE(creatorFunc, "Unknown type of source actor: \"" << type << "\""); - std::pair<IDqSourceActor*, NActors::IActor*> actor = (*creatorFunc)(std::move(args)); + YQL_ENSURE(creatorFunc, "Unknown type of source: \"" << type << "\""); + std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> actor = (*creatorFunc)(std::move(args)); Y_VERIFY(actor.first); Y_VERIFY(actor.second); return actor; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h index d82f5cdd317..12f5de2a061 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h @@ -13,23 +13,23 @@ namespace NYql::NDq { template <class T> -concept TCastsToSourceActorPair = - std::is_convertible_v<T, std::pair<IDqSourceActor*, NActors::IActor*>>; +concept TCastsToAsyncInputPair = + std::is_convertible_v<T, std::pair<IDqComputeActorAsyncInput*, NActors::IActor*>>; template <class T, class TProto> -concept TSourceActorCreatorFunc = requires(T f, TProto&& settings, IDqSourceActorFactory::TArguments args) { - { f(std::move(settings), std::move(args)) } -> TCastsToSourceActorPair; +concept TSourceCreatorFunc = requires(T f, TProto&& settings, IDqSourceFactory::TArguments args) { + { f(std::move(settings), std::move(args)) } -> TCastsToAsyncInputPair; }; -class TDqSourceFactory : public IDqSourceActorFactory { +class TDqSourceFactory : public IDqSourceFactory { public: - using TCreatorFunction = std::function<std::pair<IDqSourceActor*, NActors::IActor*>(TArguments&& args)>; + using TCreatorFunction = std::function<std::pair<IDqComputeActorAsyncInput*, NActors::IActor*>(TArguments&& args)>; - std::pair<IDqSourceActor*, NActors::IActor*> CreateDqSourceActor(TArguments&& args) const override; + std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSource(TArguments&& args) const override; void Register(const TString& type, TCreatorFunction creator); - template <class TProtoMsg, TSourceActorCreatorFunc<TProtoMsg> TCreatorFunc> + template <class TProtoMsg, TSourceCreatorFunc<TProtoMsg> TCreatorFunc> void Register(const TString& type, TCreatorFunc creator) { Register(type, [creator = std::move(creator), type](TArguments&& args) diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index aad35f07580..c6ea72a0237 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -142,7 +142,7 @@ public: protected: TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, bool ownMemoryQuota = true, bool passExceptions = false) : ExecuterId(executerId) @@ -151,7 +151,7 @@ protected: , RuntimeSettings(settings) , MemoryLimits(memoryLimits) , CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn) - , SourceActorFactory(std::move(sourceActorFactory)) + , SourceFactory(std::move(sourceFactory)) , SinkFactory(std::move(sinkFactory)) , OutputTransformFactory(std::move(transformFactory)) , FunctionRegistry(functionRegistry) @@ -168,7 +168,7 @@ protected: } TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, const NDqProto::TDqTask& task, - IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, + IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits) : ExecuterId(executerId) @@ -177,7 +177,7 @@ protected: , RuntimeSettings(settings) , MemoryLimits(memoryLimits) , CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn) - , SourceActorFactory(std::move(sourceActorFactory)) + , SourceFactory(std::move(sourceFactory)) , SinkFactory(std::move(sinkFactory)) , OutputTransformFactory(std::move(transformFactory)) , FunctionRegistry(functionRegistry) @@ -228,8 +228,8 @@ protected: FFunc(TEvDqCompute::TEvRestoreFromCheckpoint::EventType, Checkpoints->Receive); hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, HandleExecuteBase); hFunc(NActors::TEvInterconnect::TEvNodeConnected, HandleExecuteBase); - hFunc(IDqSourceActor::TEvNewSourceDataArrived, OnNewSourceDataArrived); - hFunc(IDqSourceActor::TEvSourceError, OnSourceError); + hFunc(IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived, OnNewAsyncInputDataArrived); + hFunc(IDqComputeActorAsyncInput::TEvAsyncInputError, OnAsyncInputError); default: { CA_LOG_C("TDqComputeActorBase, unexpected event: " << ev->GetTypeRewrite() << " (" << GetEventTypeString(ev) << ")"); InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite() << " (" << GetEventTypeString(ev) << ")"); @@ -443,7 +443,7 @@ protected: for (auto& [_, source] : SourcesMap) { if (source.Actor) { - source.SourceActor->PassAway(); + source.AsyncInput->PassAway(); } } @@ -636,9 +636,9 @@ protected: data.SetBlob(TaskRunner->Save()); for (auto& [inputIndex, source] : SourcesMap) { - YQL_ENSURE(source.SourceActor, "Source[" << inputIndex << "] is not created"); + YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created"); NDqProto::TSourceState& sourceState = *state.AddSources(); - source.SourceActor->SaveState(checkpoint, sourceState); + source.AsyncInput->SaveState(checkpoint, sourceState); sourceState.SetInputIndex(inputIndex); } } @@ -646,8 +646,8 @@ protected: void CommitState(const NDqProto::TCheckpoint& checkpoint) override { CA_LOG_D("Commit state"); for (auto& [inputIndex, source] : SourcesMap) { - Y_VERIFY(source.SourceActor); - source.SourceActor->CommitState(checkpoint); + Y_VERIFY(source.AsyncInput); + source.AsyncInput->CommitState(checkpoint); } } @@ -696,8 +696,8 @@ protected: for (const NDqProto::TSourceState& sourceState : state.GetSources()) { TSourceInfo* source = SourcesMap.FindPtr(sourceState.GetInputIndex()); YQL_ENSURE(source, "Failed to load state. Source with input index " << sourceState.GetInputIndex() << " was not found"); - YQL_ENSURE(source->SourceActor, "Source[" << sourceState.GetInputIndex() << "] is not created"); - source->SourceActor->LoadState(sourceState); + YQL_ENSURE(source->AsyncInput, "Source[" << sourceState.GetInputIndex() << "] is not created"); + source->AsyncInput->LoadState(sourceState); } for (const NDqProto::TSinkState& sinkState : state.GetSinks()) { TAsyncOutputInfoBase* sink = SinksMap.FindPtr(sinkState.GetOutputIndex()); @@ -766,8 +766,8 @@ protected: struct TSourceInfo { ui64 Index; - IDqSource::TPtr Source; - IDqSourceActor* SourceActor = nullptr; + IDqSource::TPtr Buffer; + IDqComputeActorAsyncInput* AsyncInput = nullptr; NActors::IActor* Actor = nullptr; TIssuesBuffer IssuesBuffer; bool Finished = false; @@ -843,15 +843,15 @@ protected: } virtual void SourcePush(NKikimr::NMiniKQL::TUnboxedValueVector&& batch, TSourceInfo& source, i64 space, bool finished) { - source.Source->Push(std::move(batch), space); + source.Buffer->Push(std::move(batch), space); if (finished) { - source.Source->Finish(); + source.Buffer->Finish(); source.Finished = true; } } virtual i64 SourceFreeSpace(TSourceInfo& source) { - return source.Source->GetFreeSpace(); + return source.Buffer->GetFreeSpace(); } virtual bool SayHelloOnBootstrap() { @@ -1248,13 +1248,13 @@ protected: } } for (auto& [inputIndex, source] : SourcesMap) { - if (TaskRunner) { source.Source = TaskRunner->GetSource(inputIndex); Y_VERIFY(source.Source);} - Y_VERIFY(SourceActorFactory); + if (TaskRunner) { source.Buffer = TaskRunner->GetSource(inputIndex); Y_VERIFY(source.Buffer);} + Y_VERIFY(SourceFactory); const auto& inputDesc = Task.GetInputs(inputIndex); const ui64 i = inputIndex; // Crutch for clang - CA_LOG_D("Create source actor for input " << i << " " << inputDesc); - std::tie(source.SourceActor, source.Actor) = SourceActorFactory->CreateDqSourceActor( - IDqSourceActorFactory::TArguments{ + CA_LOG_D("Create source for input " << i << " " << inputDesc); + std::tie(source.AsyncInput, source.Actor) = SourceFactory->CreateDqSource( + IDqSourceFactory::TArguments{ .InputDesc = inputDesc, .InputIndex = inputIndex, .TxId = TxId, @@ -1323,7 +1323,7 @@ protected: } for (auto& [inputIndex, source] : SourcesMap) { - Y_VERIFY(!TaskRunner || source.Source); + Y_VERIFY(!TaskRunner || source.Buffer); if (source.Finished) { const ui64 indexForLogging = inputIndex; // Crutch for clang CA_LOG_D("Skip polling source[" << indexForLogging << "]: finished"); @@ -1332,9 +1332,9 @@ protected: const i64 freeSpace = SourceFreeSpace(source); if (freeSpace > 0) { NKikimr::NMiniKQL::TUnboxedValueVector batch; - Y_VERIFY(source.SourceActor); + Y_VERIFY(source.AsyncInput); bool finished = false; - const i64 space = source.SourceActor->GetSourceData(batch, finished, freeSpace); + const i64 space = source.AsyncInput->GetAsyncInputData(batch, finished, freeSpace); const ui64 index = inputIndex; CA_LOG_D("Poll source " << index << ". Buffer free space: " << freeSpace @@ -1343,7 +1343,7 @@ protected: if (!batch.empty()) { // If we have read some data, we must run such reading again - // to process the case when source actor notified us about new data + // to process the case when source notified us about new data // but we haven't read all of it. ContinueExecute(); } @@ -1352,12 +1352,12 @@ protected: } } - void OnNewSourceDataArrived(const IDqSourceActor::TEvNewSourceDataArrived::TPtr& ev) { + void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) { Y_VERIFY(SourcesMap.FindPtr(ev->Get()->InputIndex)); ContinueExecute(); } - void OnSourceError(const IDqSourceActor::TEvSourceError::TPtr& ev) { + void OnAsyncInputError(const IDqComputeActorAsyncInput::TEvAsyncInputError::TPtr& ev) { if (!ev->Get()->IsFatal) { SourcesMap.at(ev->Get()->InputIndex).IssuesBuffer.Push(ev->Get()->Issues); return; @@ -1576,7 +1576,7 @@ protected: const TComputeRuntimeSettings RuntimeSettings; const TComputeMemoryLimits MemoryLimits; const bool CanAllocateExtraMemory = false; - const IDqSourceActorFactory::TPtr SourceActorFactory; + const IDqSourceFactory::TPtr SourceFactory; const IDqSinkFactory::TPtr SinkFactory; const IDqOutputTransformFactory::TPtr OutputTransformFactory; const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_sources.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_sources.h index 672af2b72d2..113120031f3 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_sources.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_sources.h @@ -21,33 +21,33 @@ class IActor; namespace NYql::NDq { -// Source actor. +// Source/transform. // Must be IActor. // // Protocol: -// 1. CA starts source actor. -// 2. CA calls IDqSourceActor::GetSourceData(batch, FreeSpace). -// 3. Source actor sends TEvNewSourceDataArrived when it has data to process. -// 4. CA calls IDqSourceActor::GetSourceData(batch, FreeSpace) to get data when it is ready to process it. +// 1. CA starts source/transform. +// 2. CA calls IDqComputeActorAsyncInput::GetAsyncInputData(batch, FreeSpace). +// 3. Source/transform sends TEvNewAsyncInputDataArrived when it has data to process. +// 4. CA calls IDqComputeActorAsyncInput::GetAsyncInputData(batch, FreeSpace) to get data when it is ready to process it. // -// In case of error source actor sends TEvSourceError +// In case of error source/transform sends TEvAsyncInputError // // Checkpointing: // 1. InjectCheckpoint event arrives to CA. // 2. ... -// 3. CA calls IDqSourceActor::SaveState() and IDqTaskRunner::SaveGraphState() and uses this pair as state for CA. +// 3. CA calls IDqComputeActorAsyncInput::SaveState() and IDqTaskRunner::SaveGraphState() and uses this pair as state for CA. // 3. ... -// 5. CA calls IDqSourceActor::CommitState() to apply all side effects. -struct IDqSourceActor { - struct TEvNewSourceDataArrived : public NActors::TEventLocal<TEvNewSourceDataArrived, TDqComputeEvents::EvNewSourceDataArrived> { +// 5. CA calls IDqComputeActorAsyncInput::CommitState() to apply all side effects. +struct IDqComputeActorAsyncInput { + struct TEvNewAsyncInputDataArrived : public NActors::TEventLocal<TEvNewAsyncInputDataArrived, TDqComputeEvents::EvNewAsyncInputDataArrived> { const ui64 InputIndex; - explicit TEvNewSourceDataArrived(ui64 inputIndex) + explicit TEvNewAsyncInputDataArrived(ui64 inputIndex) : InputIndex(inputIndex) {} }; - struct TEvSourceError : public NActors::TEventLocal<TEvSourceError, TDqComputeEvents::EvSourceError> { - TEvSourceError(ui64 inputIndex, const TIssues& issues, bool isFatal) + struct TEvAsyncInputError : public NActors::TEventLocal<TEvAsyncInputError, TDqComputeEvents::EvAsyncInputError> { + TEvAsyncInputError(ui64 inputIndex, const TIssues& issues, bool isFatal) : InputIndex(inputIndex) , Issues(issues) , IsFatal(isFatal) @@ -63,7 +63,7 @@ struct IDqSourceActor { // Gets data and returns space used by filled data batch. // Method should be called under bound mkql allocator. // Could throw YQL errors. - virtual i64 GetSourceData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, bool& finished, i64 freeSpace) = 0; + virtual i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, bool& finished, i64 freeSpace) = 0; // Checkpointing. virtual void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TSourceState& state) = 0; @@ -72,11 +72,11 @@ struct IDqSourceActor { virtual void PassAway() = 0; // The same signature as IActor::PassAway() - virtual ~IDqSourceActor() = default; + virtual ~IDqComputeActorAsyncInput() = default; }; -struct IDqSourceActorFactory : public TThrRefBase { - using TPtr = TIntrusivePtr<IDqSourceActorFactory>; +struct IDqSourceFactory : public TThrRefBase { + using TPtr = TIntrusivePtr<IDqSourceFactory>; struct TArguments { const NDqProto::TTaskInput& InputDesc; @@ -89,10 +89,10 @@ struct IDqSourceActorFactory : public TThrRefBase { const NKikimr::NMiniKQL::THolderFactory& HolderFactory; }; - // Creates source actor. + // Creates source. // Could throw YQL errors. - // IActor* and IDqSourceActor* returned by method must point to the objects with consistent lifetime. - virtual std::pair<IDqSourceActor*, NActors::IActor*> CreateDqSourceActor(TArguments&& args) const = 0; + // IActor* and IDqComputeActorAsyncInput* returned by method must point to the objects with consistent lifetime. + virtual std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSource(TArguments&& args) const = 0; }; } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/dq_events_ids.h b/ydb/library/yql/dq/actors/dq_events_ids.h index 2049edee75a..11b02252332 100644 --- a/ydb/library/yql/dq/actors/dq_events_ids.h +++ b/ydb/library/yql/dq/actors/dq_events_ids.h @@ -48,8 +48,8 @@ struct TDqComputeEvents { EvGetTaskStateResult, EvStateRequest, EvNewCheckpointCoordinatorAck, - EvNewSourceDataArrived, - EvSourceError, + EvNewAsyncInputDataArrived, + EvAsyncInputError, // place all new events here EvEnd diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto index cf671332c27..4c60332bbf7 100644 --- a/ydb/library/yql/dq/actors/protos/dq_stats.proto +++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto @@ -13,7 +13,7 @@ enum EDqStatsMode { DQ_STATS_MODE_PROFILE = 30; } -message TDqSourceStats { +message TDqAsyncInputBufferStats { // basic stats uint64 InputIndex = 1; uint64 Chunks = 2; @@ -136,7 +136,7 @@ message TDqTaskStats { } repeated THistBucket ComputeCpuTimeByRun = 106; - repeated TDqSourceStats Sources = 150; + repeated TDqAsyncInputBufferStats Sources = 150; repeated TDqInputChannelStats InputChannels = 151; repeated TDqAsyncOutputBufferStats Sinks = 152; repeated TDqOutputChannelStats OutputChannels = 153; diff --git a/ydb/library/yql/dq/runtime/dq_source.cpp b/ydb/library/yql/dq/runtime/dq_source.cpp index b9e75a15e81..dfa3da25fd7 100644 --- a/ydb/library/yql/dq/runtime/dq_source.cpp +++ b/ydb/library/yql/dq/runtime/dq_source.cpp @@ -24,14 +24,14 @@ public: } } - const TDqSourceStats* GetStats() const override { + const TDqAsyncInputBufferStats* GetStats() const override { return &BasicStats; } private: const ui64 InputIndex; - TDqSourceStats BasicStats; - TDqSourceStats* ProfileStats = nullptr; + TDqAsyncInputBufferStats BasicStats; + TDqAsyncInputBufferStats* ProfileStats = nullptr; }; IDqSource::TPtr CreateDqSource( diff --git a/ydb/library/yql/dq/runtime/dq_source.h b/ydb/library/yql/dq/runtime/dq_source.h index 3355a4966f9..82f5cb2ade6 100644 --- a/ydb/library/yql/dq/runtime/dq_source.h +++ b/ydb/library/yql/dq/runtime/dq_source.h @@ -3,10 +3,10 @@ namespace NYql::NDq { -struct TDqSourceStats : TDqInputStats { +struct TDqAsyncInputBufferStats : TDqInputStats { ui64 InputIndex = 0; - explicit TDqSourceStats(ui64 inputIndex) + explicit TDqAsyncInputBufferStats(ui64 inputIndex) : InputIndex(inputIndex) {} template<typename T> @@ -33,7 +33,7 @@ public: virtual void Finish() = 0; - virtual const TDqSourceStats* GetStats() const = 0; + virtual const TDqAsyncInputBufferStats* GetStats() const = 0; }; IDqSource::TPtr CreateDqSource(ui64 inputIndex, NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes, diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index 5dbefd9f2f5..e3389ab985a 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -81,7 +81,7 @@ struct TTaskRunnerStatsBase { NMonitoring::IHistogramCollectorPtr ComputeCpuTimeByRun; // in millis THashMap<ui64, const TDqInputChannelStats*> InputChannels; // Channel id -> Channel stats - THashMap<ui64, const TDqSourceStats*> Sources; // Input index -> Source stats + THashMap<ui64, const TDqAsyncInputBufferStats*> Sources; // Input index -> Source stats THashMap<ui64, const TDqOutputChannelStats*> OutputChannels; // Channel id -> Channel stats TVector<TMkqlStat> MkqlStats; @@ -127,7 +127,7 @@ struct TTaskRunnerStatsBase { private: virtual TDqInputChannelStats* MutableInputChannel(ui64 channelId) = 0; - virtual TDqSourceStats* MutableSource(ui64 sourceId) = 0; // todo: (whcrc) unused, not modified by these pointers + virtual TDqAsyncInputBufferStats* MutableSource(ui64 sourceId) = 0; // todo: (whcrc) unused, not modified by these pointers virtual TDqOutputChannelStats* MutableOutputChannel(ui64 channelId) = 0; }; @@ -137,8 +137,8 @@ struct TDqTaskRunnerStats : public TTaskRunnerStatsBase { return const_cast<TDqInputChannelStats*>(InputChannels[channelId]); } - TDqSourceStats* MutableSource(ui64 sourceId) override { - return const_cast<TDqSourceStats*>(Sources[sourceId]); + TDqAsyncInputBufferStats* MutableSource(ui64 sourceId) override { + return const_cast<TDqAsyncInputBufferStats*>(Sources[sourceId]); } TDqOutputChannelStats* MutableOutputChannel(ui64 channelId) override { @@ -149,7 +149,7 @@ struct TDqTaskRunnerStats : public TTaskRunnerStatsBase { struct TDqTaskRunnerStatsInplace : public TTaskRunnerStatsBase { // all stats are owned by this object TVector<THolder<TDqInputChannelStats>> InputChannelHolder; - TVector<THolder<TDqSourceStats>> SourceHolder; + TVector<THolder<TDqAsyncInputBufferStats>> SourceHolder; TVector<THolder<TDqOutputChannelStats>> OutputChannelHolder; template<typename TStat> @@ -166,7 +166,7 @@ struct TDqTaskRunnerStatsInplace : public TTaskRunnerStatsBase { return GetOrCreate(InputChannels, InputChannelHolder, channelId); } - TDqSourceStats* MutableSource(ui64 sourceId) override { + TDqAsyncInputBufferStats* MutableSource(ui64 sourceId) override { return GetOrCreate(Sources, SourceHolder, sourceId); } diff --git a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp index d611c91201a..fa9c5221f7f 100644 --- a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp +++ b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp @@ -45,7 +45,7 @@ struct TEvPrivate { } // namespace -class TClickHouseReadActor : public TActorBootstrapped<TClickHouseReadActor>, public IDqSourceActor { +class TClickHouseReadActor : public TActorBootstrapped<TClickHouseReadActor>, public IDqComputeActorAsyncInput { public: TClickHouseReadActor(ui64 inputIndex, IHTTPGateway::TPtr gateway, @@ -91,7 +91,7 @@ private: } } - i64 GetSourceData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final { + i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final { if (Result) { const auto size = Result->size(); buffer.emplace_back(NKikimr::NMiniKQL::MakeString(std::string_view(*Result))); @@ -106,14 +106,14 @@ private: void Handle(TEvPrivate::TEvReadResult::TPtr& result) { Result.emplace(std::move(result->Get()->Result)); - Send(ComputeActorId, new TEvNewSourceDataArrived(InputIndex)); + Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } void Handle(TEvPrivate::TEvReadError::TPtr& result) { - Send(ComputeActorId, new TEvSourceError(InputIndex, result->Get()->Error, true)); + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, result->Get()->Error, true)); } - // IActor & IDqSourceActor + // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor TActorBootstrapped<TClickHouseReadActor>::PassAway(); } @@ -130,7 +130,7 @@ private: std::optional<IHTTPGateway::TContent> Result; }; -std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateClickHouseReadActor( +std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateClickHouseReadActor( IHTTPGateway::TPtr gateway, NCH::TSource&& params, ui64 inputIndex, @@ -162,4 +162,3 @@ std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateClickHouseReadActor( } } // namespace NYql::NDq - diff --git a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.h b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.h index 0f9696851a5..5a965013025 100644 --- a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.h +++ b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.h @@ -8,7 +8,7 @@ namespace NYql::NDq { -std::pair<NYql::NDq::IDqSourceActor*, NActors::IActor*> CreateClickHouseReadActor( +std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateClickHouseReadActor( IHTTPGateway::TPtr gateway, NCH::TSource&& params, ui64 inputIndex, @@ -18,4 +18,3 @@ std::pair<NYql::NDq::IDqSourceActor*, NActors::IActor*> CreateClickHouseReadActo ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); } // namespace NYql::NDq - diff --git a/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.cpp b/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.cpp index c6053eb64f1..824bdd958b1 100644 --- a/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.cpp +++ b/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.cpp @@ -7,7 +7,7 @@ namespace NYql::NDq { void RegisterClickHouseReadActorFactory(TDqSourceFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway) { factory.Register<NCH::TSource>("ClickHouseSource", - [credentialsFactory, gateway](NCH::TSource&& settings, IDqSourceActorFactory::TArguments&& args) { + [credentialsFactory, gateway](NCH::TSource&& settings, IDqSourceFactory::TArguments&& args) { return CreateClickHouseReadActor(gateway, std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory); }); } diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp index 22b93b13ced..f036dee78f4 100644 --- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp +++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp @@ -22,7 +22,7 @@ NYql::NDqProto::TCheckpoint CreateCheckpoint(ui64 id) { return checkpoint; } -TFakeActor::TFakeActor(TSourcePromises& sourcePromises, TAsyncOutputPromises& asyncOutputPromises) +TFakeActor::TFakeActor(TAsyncInputPromises& sourcePromises, TAsyncOutputPromises& asyncOutputPromises) : TActor<TFakeActor>(&TFakeActor::StateFunc) , MemoryInfo("test") , HolderFactory(Alloc.Ref(), MemoryInfo) @@ -30,9 +30,9 @@ TFakeActor::TFakeActor(TSourcePromises& sourcePromises, TAsyncOutputPromises& as , FunctionRegistry(NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry())) , ProgramBuilder(TypeEnv, *FunctionRegistry) , ValueBuilder(HolderFactory) - , SourceEvents(*this) + , AsyncInputEvents(*this) , AsyncOutputCallbacks(*this) - , SourcePromises(sourcePromises) + , AsyncInputPromises(sourcePromises) , AsyncOutputPromises(asyncOutputPromises) { Alloc.Release(); @@ -48,19 +48,19 @@ void TFakeActor::InitAsyncOutput(IDqComputeActorAsyncOutput* dqAsyncOutput, IAct DqAsyncOutputAsActor = dqAsyncOutputAsActor; } -void TFakeActor::InitSource(IDqSourceActor* dqSource, IActor* dqSourceAsActor) { - DqSourceActorId = RegisterWithSameMailbox(dqSourceAsActor), - DqSourceActor = dqSource; - DqSourceActorAsActor = dqSourceAsActor; +void TFakeActor::InitAsyncInput(IDqComputeActorAsyncInput* dqAsyncInput, IActor* dqAsyncInputAsActor) { + DqAsyncInputActorId = RegisterWithSameMailbox(dqAsyncInputAsActor), + DqAsyncInput = dqAsyncInput; + DqAsyncInputAsActor = dqAsyncInputAsActor; } void TFakeActor::Terminate() { - if (DqSourceActorId) { - DqSourceActor->PassAway(); + if (DqAsyncInputActorId) { + DqAsyncInput->PassAway(); - DqSourceActorId = std::nullopt; - DqSourceActor = nullptr; - DqSourceActorAsActor = nullptr; + DqAsyncInputActorId = std::nullopt; + DqAsyncInput = nullptr; + DqAsyncInputAsActor = nullptr; } if (DqAsyncOutputActorId) { @@ -87,7 +87,7 @@ TFakeCASetup::TFakeCASetup() Runtime->AddLocalService( FakeActorId, NActors::TActorSetupCmd( - new TFakeActor(SourcePromises, AsyncOutputPromises), + new TFakeActor(AsyncInputPromises, AsyncOutputPromises), NActors::TMailboxType::Simple, 0)); @@ -111,15 +111,15 @@ void TFakeCASetup::AsyncOutputWrite(const TWriteValueProducer valueProducer, TMa void TFakeCASetup::SaveSourceState(NDqProto::TCheckpoint checkpoint, NDqProto::TSourceState& state) { Execute([&state, &checkpoint](TFakeActor& actor) { - Y_ASSERT(actor.DqSourceActor); - actor.DqSourceActor->SaveState(checkpoint, state); + Y_ASSERT(actor.DqAsyncInput); + actor.DqAsyncInput->SaveState(checkpoint, state); }); } void TFakeCASetup::LoadSource(const NDqProto::TSourceState& state) { Execute([&state](TFakeActor& actor) { - Y_ASSERT(actor.DqSourceActor); - actor.DqSourceActor->LoadState(state); + Y_ASSERT(actor.DqAsyncInput); + actor.DqAsyncInput->LoadState(state); }); } diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h index 2a7aa44f5f0..a0a460e46ea 100644 --- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h +++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h @@ -56,8 +56,8 @@ namespace { }; } -struct TSourcePromises { - NThreading::TPromise<void> NewSourceDataArrived = NThreading::NewPromise(); +struct TAsyncInputPromises { + NThreading::TPromise<void> NewAsyncInputDataArrived = NThreading::NewPromise(); NThreading::TPromise<TIssues> FatalError = NThreading::NewPromise<TIssues>(); }; @@ -70,18 +70,18 @@ struct TAsyncOutputPromises { NYql::NDqProto::TCheckpoint CreateCheckpoint(ui64 id = 0); class TFakeActor : public NActors::TActor<TFakeActor> { - struct TSourceEvents { - explicit TSourceEvents(TFakeActor& parent) : Parent(parent) {} + struct TAsyncInputEvents { + explicit TAsyncInputEvents(TFakeActor& parent) : Parent(parent) {} - void OnNewSourceDataArrived(ui64) { - Parent.SourcePromises.NewSourceDataArrived.SetValue(); - Parent.SourcePromises.NewSourceDataArrived = NThreading::NewPromise(); + void OnNewAsyncInputDataArrived(ui64) { + Parent.AsyncInputPromises.NewAsyncInputDataArrived.SetValue(); + Parent.AsyncInputPromises.NewAsyncInputDataArrived = NThreading::NewPromise(); } - void OnSourceError(ui64, const TIssues& issues, bool isFatal) { + void OnAsyncInputError(ui64, const TIssues& issues, bool isFatal) { Y_UNUSED(isFatal); - Parent.SourcePromises.FatalError.SetValue(issues); - Parent.SourcePromises.FatalError = NThreading::NewPromise<TIssues>(); + Parent.AsyncInputPromises.FatalError.SetValue(issues); + Parent.AsyncInputPromises.FatalError = NThreading::NewPromise<TIssues>(); } TFakeActor& Parent; @@ -111,25 +111,25 @@ class TFakeActor : public NActors::TActor<TFakeActor> { }; public: - TFakeActor(TSourcePromises& sourcePromises, TAsyncOutputPromises& asyncOutputPromises); + TFakeActor(TAsyncInputPromises& sourcePromises, TAsyncOutputPromises& asyncOutputPromises); ~TFakeActor(); void InitAsyncOutput(IDqComputeActorAsyncOutput* dqAsyncOutput, IActor* dqAsyncOutputAsActor); - void InitSource(IDqSourceActor* dqSource, IActor* dqSourceAsActor); + void InitAsyncInput(IDqComputeActorAsyncInput* dqAsyncInput, IActor* dqAsyncInputAsActor); void Terminate(); TAsyncOutputCallbacks& GetAsyncOutputCallbacks(); NKikimr::NMiniKQL::THolderFactory& GetHolderFactory(); public: - IDqSourceActor* DqSourceActor = nullptr; + IDqComputeActorAsyncInput* DqAsyncInput = nullptr; IDqComputeActorAsyncOutput* DqAsyncOutput = nullptr; private: STRICT_STFUNC(StateFunc, hFunc(TEvPrivate::TEvExecute, Handle); - hFunc(IDqSourceActor::TEvNewSourceDataArrived, Handle); - hFunc(IDqSourceActor::TEvSourceError, Handle); + hFunc(IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived, Handle); + hFunc(IDqComputeActorAsyncInput::TEvAsyncInputError, Handle); ) void Handle(TEvPrivate::TEvExecute::TPtr& ev) { @@ -142,12 +142,12 @@ private: ev->Get()->Promise.SetValue(); } - void Handle(const IDqSourceActor::TEvNewSourceDataArrived::TPtr& ev) { - SourceEvents.OnNewSourceDataArrived(ev->Get()->InputIndex); + void Handle(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) { + AsyncInputEvents.OnNewAsyncInputDataArrived(ev->Get()->InputIndex); } - void Handle(const IDqSourceActor::TEvSourceError::TPtr& ev) { - SourceEvents.OnSourceError(ev->Get()->InputIndex, ev->Get()->Issues, ev->Get()->IsFatal); + void Handle(const IDqComputeActorAsyncInput::TEvAsyncInputError::TPtr& ev) { + AsyncInputEvents.OnAsyncInputError(ev->Get()->InputIndex, ev->Get()->Issues, ev->Get()->IsFatal); } public: @@ -161,16 +161,16 @@ public: NKikimr::NMiniKQL::TDefaultValueBuilder ValueBuilder; private: - std::optional<NActors::TActorId> DqSourceActorId; - IActor* DqSourceActorAsActor = nullptr; + std::optional<NActors::TActorId> DqAsyncInputActorId; + IActor* DqAsyncInputAsActor = nullptr; std::optional<NActors::TActorId> DqAsyncOutputActorId; IActor* DqAsyncOutputAsActor = nullptr; - TSourceEvents SourceEvents; + TAsyncInputEvents AsyncInputEvents; TAsyncOutputCallbacks AsyncOutputCallbacks; - TSourcePromises& SourcePromises; + TAsyncInputPromises& AsyncInputPromises; TAsyncOutputPromises& AsyncOutputPromises; }; @@ -179,12 +179,12 @@ struct TFakeCASetup { ~TFakeCASetup(); template<typename T> - std::vector<T> SourceRead(const TReadValueParser<T> parser, i64 freeSpace = 12345) { + std::vector<T> AsyncInputRead(const TReadValueParser<T> parser, i64 freeSpace = 12345) { std::vector<T> result; Execute([&result, &parser, freeSpace](TFakeActor& actor) { NKikimr::NMiniKQL::TUnboxedValueVector buffer; bool finished = false; - actor.DqSourceActor->GetSourceData(buffer, finished, freeSpace); + actor.DqAsyncInput->GetAsyncInputData(buffer, finished, freeSpace); for (const auto& uv : buffer) { for (const auto item : parser(uv)) { @@ -197,7 +197,7 @@ struct TFakeCASetup { } template<typename T> - std::vector<T> SourceReadUntil( + std::vector<T> AsyncInputReadUntil( const TReadValueParser<T> parser, ui64 size, i64 eachReadFreeSpace = 1000, @@ -205,13 +205,13 @@ struct TFakeCASetup { { std::vector<T> result; DoWithRetry([&](){ - auto batch = SourceRead<T>(parser, eachReadFreeSpace); + auto batch = AsyncInputRead<T>(parser, eachReadFreeSpace); for (const auto& item : batch) { result.emplace_back(item); } if (result.size() < size) { - SourcePromises.NewSourceDataArrived.GetFuture().Wait(timeout); + AsyncInputPromises.NewAsyncInputDataArrived.GetFuture().Wait(timeout); ythrow yexception() << "Not enough data"; } }, @@ -233,7 +233,7 @@ struct TFakeCASetup { public: TRuntimePtr Runtime; NActors::TActorId FakeActorId; - TSourcePromises SourcePromises; + TAsyncInputPromises AsyncInputPromises; TAsyncOutputPromises AsyncOutputPromises; }; diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp index 8ccb7ccd92c..956c39eb061 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp @@ -65,7 +65,7 @@ IActor* CreateComputeActor( executerId, operationId, std::move(task), - options.SourceActorFactory, + options.SourceFactory, options.SinkFactory, options.TransformFactory, options.FunctionRegistry, @@ -77,7 +77,7 @@ IActor* CreateComputeActor( executerId, operationId, std::move(task), - options.SourceActorFactory, + options.SourceFactory, options.SinkFactory, options.TransformFactory, options.FunctionRegistry, diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index 49860f50e44..22d9bc86682 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -49,7 +49,7 @@ struct TOutputChannel { }; struct TSourceInfo { - IDqSourceActor* SourceActor = nullptr; + IDqComputeActorAsyncInput* Source = nullptr; NActors::IActor* Actor = nullptr; i64 FreeSpace = 1; bool HasData = false; @@ -76,12 +76,12 @@ public: explicit TDqWorker( const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, - const IDqSourceActorFactory::TPtr& sourceActorFactory, + const IDqSourceFactory::TPtr& sourceFactory, const IDqSinkFactory::TPtr& sinkFactory, TWorkerRuntimeData* runtimeData, const TString& traceId) : TRichActor<TDqWorker>(&TDqWorker::Handler) - , SourceActorFactory(sourceActorFactory) + , SourceFactory(sourceFactory) , SinkFactory(sinkFactory) , TaskRunnerActorFactory(taskRunnerActorFactory) , RuntimeData(runtimeData) @@ -116,7 +116,7 @@ public: Actor->PassAway(); } for (const auto& [_, v] : SourcesMap) { - v.SourceActor->PassAway(); + v.Source->PassAway(); } for (const auto& [_, v] : SinksMap) { v.Sink->PassAway(); @@ -146,8 +146,8 @@ private: HFunc(TEvContinueRun, OnContinueRun); cFunc(TEvents::TEvWakeup::EventType, OnWakeup); - hFunc(IDqSourceActor::TEvNewSourceDataArrived, OnNewSourceDataArrived); - hFunc(IDqSourceActor::TEvSourceError, OnSourceError); + hFunc(IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived, OnNewAsyncInputDataArrived); + hFunc(IDqComputeActorAsyncInput::TEvAsyncInputError, OnAsyncInputError); }) void ExtractStats(::Ydb::Issue::IssueMessage* issue) { @@ -275,9 +275,9 @@ private: if (input.HasSource()) { auto& source = SourcesMap[inputId]; source.TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv); - std::tie(source.SourceActor, source.Actor) = - SourceActorFactory->CreateDqSourceActor( - IDqSourceActorFactory::TArguments{ + std::tie(source.Source, source.Actor) = + SourceFactory->CreateDqSource( + IDqSourceFactory::TArguments{ .InputDesc = input, .InputIndex = static_cast<ui64>(inputId), .TxId = TraceId, @@ -577,7 +577,7 @@ private: auto guard = source.TypeEnv->BindAllocator(); NKikimr::NMiniKQL::TUnboxedValueVector batch; bool finished = false; - const i64 space = source.SourceActor->GetSourceData(batch, finished, freeSpace); + const i64 space = source.Source->GetAsyncInputData(batch, finished, freeSpace); const ui64 index = inputIndex; if (space <= 0) { continue; @@ -667,8 +667,8 @@ private: } } - /*____________________ SourceActorEvents __________________*/ - void OnNewSourceDataArrived(const IDqSourceActor::TEvNewSourceDataArrived::TPtr& ev) { + /*____________________ SourceEvents __________________*/ + void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) { try { if (!TaskRunnerPrepared) { return; @@ -680,7 +680,7 @@ private: SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSPECIFIED, CurrentExceptionMessage())); } } - void OnSourceError(const IDqSourceActor::TEvSourceError::TPtr& ev) { + void OnAsyncInputError(const IDqComputeActorAsyncInput::TEvAsyncInputError::TPtr& ev) { Y_UNUSED(ev->Get()->InputIndex); SendFailure(MakeHolder<TEvDqFailure>(ev->Get()->IsFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, ev->Get()->Issues.ToString())); } @@ -725,7 +725,7 @@ private: /*_________________________________________________________*/ - IDqSourceActorFactory::TPtr SourceActorFactory; + IDqSourceFactory::TPtr SourceFactory; IDqSinkFactory::TPtr SinkFactory; ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory; NTaskRunnerActor::ITaskRunnerActor* Actor = nullptr; @@ -764,14 +764,14 @@ NActors::IActor* CreateWorkerActor( TWorkerRuntimeData* runtimeData, const TString& traceId, const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, - const IDqSourceActorFactory::TPtr& sourceActorFactory, + const IDqSourceFactory::TPtr& sourceFactory, const IDqSinkFactory::TPtr& sinkFactory) { Y_VERIFY(taskRunnerActorFactory); return new TLogWrapReceive( new TDqWorker( taskRunnerActorFactory, - sourceActorFactory, + sourceFactory, sinkFactory, runtimeData, traceId), traceId); diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.h b/ydb/library/yql/providers/dq/actors/worker_actor.h index a9eac60c486..94022f93112 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.h +++ b/ydb/library/yql/providers/dq/actors/worker_actor.h @@ -25,7 +25,7 @@ namespace NYql::NDqs { TWorkerRuntimeData* runtimeData, const TString& traceId, const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, - const NDq::IDqSourceActorFactory::TPtr& sourceActorFactory, + const NDq::IDqSourceFactory::TPtr& sourceFactory, const NDq::IDqSinkFactory::TPtr& sinkFactory); } // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto index 213b7fb38b9..62e029b30c6 100644 --- a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto +++ b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto @@ -122,7 +122,7 @@ message TGetStatsOutputResponse { message TGetStatsSourceResponse { reserved 1; // NKqpProto.TKqpStatsTask.TSourceStats Stats = 1; - NDqProto.TDqSourceStats Stats = 2; + NDqProto.TDqAsyncInputBufferStats Stats = 2; } message TSinkStatsResponse { diff --git a/ydb/library/yql/providers/dq/counters/counters.h b/ydb/library/yql/providers/dq/counters/counters.h index 4a457d24fc7..ec05baa82da 100644 --- a/ydb/library/yql/providers/dq/counters/counters.h +++ b/ydb/library/yql/providers/dq/counters/counters.h @@ -220,8 +220,8 @@ struct TCounters { } void AddSourceStats( - const NDq::TDqSourceStats& currentStats, - NDq::TDqSourceStats& oldStats, + const NDq::TDqAsyncInputBufferStats& currentStats, + NDq::TDqAsyncInputBufferStats& oldStats, ui64 taskId, ui64 inputIndex) { std::map<TString, TString> labels = { diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp index 66f0cd150a2..82092045a22 100644 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp @@ -22,7 +22,7 @@ class TLocalServiceHolder { public: TLocalServiceHolder(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort, - NDq::IDqSourceActorFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory) + NDq::IDqSourceFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory) { ui32 nodeId = 1; @@ -48,7 +48,7 @@ public: NDqs::TLocalWorkerManagerOptions lwmOptions; lwmOptions.Factory = NTaskRunnerProxy::CreateFactory(functionRegistry, compFactory, taskTransformFactory, true); - lwmOptions.SourceActorFactory = std::move(sourceFactory); + lwmOptions.SourceFactory = std::move(sourceFactory); lwmOptions.SinkFactory = std::move(sinkFactory); lwmOptions.TransformFactory = std::move(transformFactory); lwmOptions.FunctionRegistry = functionRegistry; @@ -113,7 +113,7 @@ THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::I NKikimr::NMiniKQL::TComputationNodeFactory compFactory, TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort, - NDq::IDqSourceActorFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory) + NDq::IDqSourceFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory) { return MakeHolder<TLocalServiceHolder>(functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort, std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory)); } @@ -121,7 +121,7 @@ THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::I TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, - NDq::IDqSourceActorFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory) + NDq::IDqSourceFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory) { int startPort = 31337; TRangeWalker<int> portWalker(startPort, startPort+100); diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h index 263e9573491..8afcb036cd6 100644 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h @@ -9,6 +9,6 @@ namespace NYql { TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, - NDq::IDqSourceActorFactory::TPtr = nullptr, NDq::IDqSinkFactory::TPtr = nullptr, NDq::IDqOutputTransformFactory::TPtr = nullptr); + NDq::IDqSourceFactory::TPtr = nullptr, NDq::IDqSinkFactory::TPtr = nullptr, NDq::IDqOutputTransformFactory::TPtr = nullptr); } // namespace NYql diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index 2695689b47c..cfbccb9a7a8 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -55,7 +55,7 @@ void ToProto(T* s1, const NDq::TDqInputChannelStats* ss) } template<typename T> -void ToProto(T* s1, const NDq::TDqSourceStats* ss) +void ToProto(T* s1, const NDq::TDqAsyncInputBufferStats* ss) { s1->SetChunks(ss->Chunks); s1->SetBytes(ss->Bytes); @@ -186,7 +186,7 @@ public: auto maybeSourceOldStats = CurrentSourcesStats.find(inputIndex); if (maybeSourceOldStats == CurrentSourcesStats.end()) { maybeSourceOldStats = CurrentSourcesStats.emplace_hint( - maybeSourceOldStats, inputIndex, NDq::TDqSourceStats(inputIndex)); + maybeSourceOldStats, inputIndex, NDq::TDqAsyncInputBufferStats(inputIndex)); } QueryStat.AddSourceStats( *maybeSourceStats->second, @@ -783,7 +783,7 @@ public: NDq::TDqTaskRunnerStats CurrentStats; std::unordered_map<ui64, NDq::TDqInputChannelStats> CurrentInputChannelsStats; - std::unordered_map<ui64, NDq::TDqSourceStats> CurrentSourcesStats; + std::unordered_map<ui64, NDq::TDqAsyncInputBufferStats> CurrentSourcesStats; std::unordered_map<ui64, NDq::TDqOutputChannelStats> CurrentOutputChannelsStats; i64 LastCommand = -1; diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index 1d360eb6e3d..dd7b70fb165 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -736,7 +736,7 @@ public: return InputIndex; } - const TDqSourceStats* GetStats() const override { + const TDqAsyncInputBufferStats* GetStats() const override { try { NDqProto::TCommandHeader header; header.SetVersion(4); @@ -789,7 +789,7 @@ public: private: ui64 TaskId; ui64 InputIndex; - mutable TDqSourceStats Stats; + mutable TDqAsyncInputBufferStats Stats; IPipeTaskRunner* TaskRunner; IInputStream& Input; diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp index ab0f32af1d7..2538334a295 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp @@ -261,7 +261,7 @@ private: Options.RuntimeData, traceId, Options.TaskRunnerActorFactory, - Options.SourceActorFactory, + Options.SourceFactory, Options.SinkFactory)); } allocationInfo.WorkerActors.emplace_back(RegisterChild( diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h index e5ef846be46..4cb3144e5d3 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h @@ -22,7 +22,7 @@ namespace NYql::NDqs { struct TLocalWorkerManagerOptions { TWorkerManagerCounters Counters; NTaskRunnerProxy::IProxyFactory::TPtr Factory; - NDq::IDqSourceActorFactory::TPtr SourceActorFactory; + NDq::IDqSourceFactory::TPtr SourceFactory; NDq::IDqSinkFactory::TPtr SinkFactory; NDq::IDqOutputTransformFactory::TPtr TransformFactory; const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index 9400ba7b6f4..46be83524a0 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -85,7 +85,7 @@ struct TEvPrivate { } // namespace -class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqSourceActor { +class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqComputeActorAsyncInput { public: using TPartitionKey = std::pair<TString, ui64>; // Cluster, partition id. @@ -223,10 +223,10 @@ private: void Handle(TEvPrivate::TEvSourceDataReady::TPtr&, const TActorContext& ctx) { SubscribedOnEvent = false; - ctx.Send(ComputeActorId, new TEvNewSourceDataArrived(InputIndex)); + ctx.Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } - // IActor & IDqSourceActor + // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor if (ReadSession) { ReadSession->Close(TDuration::Zero()); @@ -236,7 +236,7 @@ private: TActor<TDqPqReadActor>::PassAway(); } - i64 GetSourceData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool&, i64 freeSpace) override { + i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool&, i64 freeSpace) override { auto events = GetReadSession().GetEvents(false, TMaybe<size_t>(), static_cast<size_t>(Max<i64>(freeSpace, 0))); ui32 batchSize = 0; @@ -370,7 +370,7 @@ private: bool SubscribedOnEvent = false; }; -std::pair<IDqSourceActor*, NActors::IActor*> CreateDqPqReadActor( +std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor( NPq::NProto::TDqPqTopicSource&& settings, ui64 inputIndex, TTxId txId, @@ -414,7 +414,7 @@ void RegisterDqPqReadActorFactory(TDqSourceFactory& factory, NYdb::TDriver drive factory.Register<NPq::NProto::TDqPqTopicSource>("PqSource", [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), rangesMode]( NPq::NProto::TDqPqTopicSource&& settings, - IDqSourceActorFactory::TArguments&& args) + IDqSourceFactory::TArguments&& args) { NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(DQ_PQ_PROVIDER)); return CreateDqPqReadActor( diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h index 74058ee4621..9f82253c8f2 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h @@ -22,7 +22,7 @@ class TDqSourceFactory; const i64 PQReadDefaultFreeSpace = 16_MB; -std::pair<IDqSourceActor*, NActors::IActor*> CreateDqPqReadActor( +std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor( NPq::NProto::TDqPqTopicSource&& settings, ui64 inputIndex, TTxId txId, diff --git a/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.cpp b/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.cpp index fd64116e9c7..86894dd534e 100644 --- a/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.cpp +++ b/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.cpp @@ -69,7 +69,7 @@ void TPqIoTestFixture::InitSource( actor.GetHolderFactory(), freeSpace); - actor.InitSource(dqSource, dqSourceAsActor); + actor.InitAsyncInput(dqSource, dqSourceAsActor); }); } diff --git a/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h b/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h index 51e13d923e4..470123cd7d8 100644 --- a/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h +++ b/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h @@ -43,7 +43,7 @@ struct TPqIoTestFixture : public NUnitTest::TBaseFixture { template<typename T> std::vector<T> SourceRead(const TReadValueParser<T> parser, i64 freeSpace = 12345) { - return CaSetup->SourceRead(parser, freeSpace); + return CaSetup->AsyncInputRead(parser, freeSpace); } template<typename T> @@ -53,7 +53,7 @@ struct TPqIoTestFixture : public NUnitTest::TBaseFixture { i64 eachReadFreeSpace = 1000, TDuration timeout = TDuration::Seconds(10)) { - return CaSetup->SourceReadUntil(parser, size, eachReadFreeSpace, timeout); + return CaSetup->AsyncInputReadUntil(parser, size, eachReadFreeSpace, timeout); } void SaveSourceState(NDqProto::TCheckpoint checkpoint, NDqProto::TSourceState& state) { diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp index d8a888f484d..866a0d7772a 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp @@ -41,7 +41,7 @@ namespace NYql { -NDq::IDqSourceActorFactory::TPtr CreateSourceFactory(const NYdb::TDriver& driver) { +NDq::IDqSourceFactory::TPtr CreateSourceFactory(const NYdb::TDriver& driver) { auto factory = MakeIntrusive<NYql::NDq::TDqSourceFactory>(); RegisterDqPqReadActorFactory(*factory, driver, nullptr); return factory; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index c94180e7c50..8c9b01ccf32 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -137,7 +137,7 @@ private: double Epsilon; }; -class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqSourceActor { +class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeActorAsyncInput { public: TS3ReadActor(ui64 inputIndex, IHTTPGateway::TPtr gateway, @@ -194,7 +194,7 @@ private: } } - i64 GetSourceData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final { + i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final { i64 total = 0LL; if (!Blocks.empty()) { buffer.reserve(buffer.size() + Blocks.size()); @@ -218,7 +218,7 @@ private: void Handle(TEvPrivate::TEvReadResult::TPtr& result) { ++IsDoneCounter; Blocks.emplace(std::move(result->Get()->Result)); - Send(ComputeActorId, new TEvNewSourceDataArrived(InputIndex)); + Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } void HandleRetry(TEvPrivate::TEvRetryEvent::TPtr& ev) { @@ -236,10 +236,10 @@ private: return; } ++IsDoneCounter; - Send(ComputeActorId, new TEvSourceError(InputIndex, result->Get()->Error, true)); + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, result->Get()->Error, true)); } - // IActor & IDqSourceActor + // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor TActorBootstrapped<TS3ReadActor>::PassAway(); } @@ -311,11 +311,11 @@ public: Finished = true; return false; case TEvPrivate::TEvReadError::EventType: - Send(ComputeActorId, new IDqSourceActor::TEvSourceError(InputIndex, ev->Get<TEvPrivate::TEvReadError>()->Error, true)); + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, ev->Get<TEvPrivate::TEvReadError>()->Error, true)); return false; case TEvPrivate::TEvReadResult::EventType: value = std::move(ev->Get<TEvPrivate::TEvReadResult>()->Result); - Send(ComputeActorId, new IDqSourceActor::TEvNewSourceDataArrived(InputIndex)); + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); return true; default: return false; @@ -331,12 +331,12 @@ private: Send(SourceActorId, new TEvPrivate::TEvReadFinished); } catch (const std::exception& err) { - Send(ComputeActorId, new IDqSourceActor::TEvSourceError(InputIndex, TIssues{TIssue(err.what())}, true)); + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, TIssues{TIssue(err.what())}, true)); return; } void ProcessUnexpectedEvent(TAutoPtr<IEventHandle>) final { - Send(ComputeActorId, new IDqSourceActor::TEvSourceError(InputIndex, TIssues{TIssue("Unexpected event")}, true)); + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, TIssues{TIssue("Unexpected event")}, true)); } private: const ui64 InputIndex; @@ -415,7 +415,7 @@ private: const TRetryStuff::TPtr RetryStuff; }; -class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public IDqSourceActor { +class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public IDqComputeActorAsyncInput { public: TS3StreamReadActor( ui64 inputIndex, @@ -470,7 +470,7 @@ private: void CommitState(const NDqProto::TCheckpoint&) final {} ui64 GetInputIndex() const final { return InputIndex; } - i64 GetSourceData(NKikimr::NMiniKQL::TUnboxedValueVector& output, bool& finished, i64 free) final { + i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& output, bool& finished, i64 free) final { i64 total = 0LL; if (!Blocks.empty()) do { const i64 s = Blocks.front().bytes(); @@ -484,7 +484,7 @@ private: return total; } - // IActor & IDqSourceActor + // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor TActorBootstrapped<TS3StreamReadActor>::PassAway(); } @@ -506,7 +506,7 @@ private: void HandleNextBlock(TEvPrivate::TEvNextBlock::TPtr& next) { Blocks.emplace_back(); Blocks.back().swap(next->Get()->Block); - Send(ComputeActorId, new IDqSourceActor::TEvNewSourceDataArrived(InputIndex)); + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); } void HandleReadFinished() { @@ -595,7 +595,7 @@ NDB::DataTypePtr MetaToClickHouse(const TType* type) { using namespace NKikimr::NMiniKQL; -std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateS3ReadActor( +std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( const TTypeEnvironment& typeEnv, const IFunctionRegistry& functionRegistry, IHTTPGateway::TPtr gateway, diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h index 1edfb68ddf5..9c38eda1efb 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h @@ -9,7 +9,7 @@ namespace NYql::NDq { -std::pair<NYql::NDq::IDqSourceActor*, NActors::IActor*> CreateS3ReadActor( +std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadActor( const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, IHTTPGateway::TPtr gateway, diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp index f7e18783872..25588841c66 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp @@ -16,7 +16,7 @@ void RegisterS3ReadActorFactory( #ifdef __linux__ NDB::registerFormats(); factory.Register<NS3::TSource>("S3Source", - [credentialsFactory, gateway, retryConfig](NS3::TSource&& settings, IDqSourceActorFactory::TArguments&& args) { + [credentialsFactory, gateway, retryConfig](NS3::TSource&& settings, IDqSourceFactory::TArguments&& args) { return CreateS3ReadActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), gateway, std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryConfig); }); #else diff --git a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp index 5b8f28bdb44..cc702d87dc5 100644 --- a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp +++ b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp @@ -68,7 +68,7 @@ bool RangeFinished(const TString& lastReadKey, const TString& endKey, const TVec } // namespace -class TYdbReadActor : public TActorBootstrapped<TYdbReadActor>, public IDqSourceActor { +class TYdbReadActor : public TActorBootstrapped<TYdbReadActor>, public IDqComputeActorAsyncInput { public: TYdbReadActor( ui64 inputIndex, @@ -120,13 +120,13 @@ private: SendRequest(); } - // IActor & IDqSourceActor + // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor RequestsDone = true; TActorBootstrapped<TYdbReadActor>::PassAway(); } - i64 GetSourceData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final { + i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final { i64 total = 0LL; if (!Blocks.empty()) { buffer.reserve(buffer.size() + Blocks.size()); @@ -183,7 +183,7 @@ private: RequestsDone = res.IsEos() || RangeFinished(LastReadKey, EndKey, KeyColumnTypes); SendRequest(); if (notify) - Send(ComputeActorId, new TEvNewSourceDataArrived(InputIndex)); + Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } void ProcessError(const ::NYdb::NClickhouseInternal::TScanResult& res) { @@ -192,7 +192,7 @@ private: RequestsDone = true; while(!Blocks.empty()) Blocks.pop(); - Send(ComputeActorId, new TEvSourceError(InputIndex, res.GetIssues(), true)); + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, res.GetIssues(), true)); } else { WakeUpTime = TMonotonic::Now() + Min(TDuration::Seconds(3), TDuration::MilliSeconds(0x30U * (1U << ++Retried))); ActorSystem->Schedule(WakeUpTime, new IEventHandle(SelfId(), TActorId(), new TEvPrivate::TEvRetryTime)); @@ -229,7 +229,7 @@ private: std::queue<TString> Blocks; }; -std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateYdbReadActor( +std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateYdbReadActor( NYql::NYdb::TSource&& params, ui64 inputIndex, const THashMap<TString, TString>& secureParams, diff --git a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.h b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.h index 427e5ed8028..214ffedc0a3 100644 --- a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.h +++ b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.h @@ -9,7 +9,7 @@ namespace NYql::NDq { -std::pair<NYql::NDq::IDqSourceActor*, NActors::IActor*> CreateYdbReadActor( +std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateYdbReadActor( NYql::NYdb::TSource&& params, ui64 inputIndex, const THashMap<TString, TString>& secureParams, diff --git a/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.cpp b/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.cpp index c42c9f8968f..f0c0f751e32 100644 --- a/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.cpp +++ b/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.cpp @@ -7,8 +7,8 @@ namespace NYql::NDq { void RegisterYdbReadActorFactory(NYql::NDq::TDqSourceFactory& factory, ::NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) { factory.Register<NYql::NYdb::TSource>("YdbSource", - [driver, credentialsFactory](NYql::NYdb::TSource&& settings, IDqSourceActorFactory::TArguments&& args) { - return CreateYdbReadActor(std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, driver, credentialsFactory); + [driver, credentialsFactory](NYql::NYdb::TSource&& settings, IDqSourceFactory::TArguments&& args) { + return CreateYdbReadActor(std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, driver, credentialsFactory); }); } |
