summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <[email protected]>2022-05-24 08:20:02 +0300
committerVasily Gerasimov <[email protected]>2022-05-24 08:20:02 +0300
commit3c0775eaf14f1e1c78bebe437135753ed12eaaf7 (patch)
treec283596a88a85bc6fc2c89d1a99343ddb68d42bb
parent0b208585ec07f732f0a41c7cb65fb6e57858866c (diff)
YQ-1098 Rename common source & transform entities
Factory and string literals GetSourceData -> GetAsyncInputData Source -> AsyncInput Rename IDqSourceActor Rename Source -> Buffer Rename SourceActorFactory -> SourceFactory Rename factory ref:2496a1411c02c3040d9a52059318f29f64c30641
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.h4
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp8
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp8
-rw-r--r--ydb/core/yq/libs/init/init.cpp14
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp12
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp8
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp8
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h16
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h60
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_sources.h40
-rw-r--r--ydb/library/yql/dq/actors/dq_events_ids.h4
-rw-r--r--ydb/library/yql/dq/actors/protos/dq_stats.proto4
-rw-r--r--ydb/library/yql/dq/runtime/dq_source.cpp6
-rw-r--r--ydb/library/yql/dq/runtime/dq_source.h6
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h12
-rw-r--r--ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp13
-rw-r--r--ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.h3
-rw-r--r--ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.cpp2
-rw-r--r--ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp34
-rw-r--r--ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h58
-rw-r--r--ydb/library/yql/providers/dq/actors/compute_actor.cpp4
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp32
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.h2
-rw-r--r--ydb/library/yql/providers/dq/api/protos/task_command_executor.proto2
-rw-r--r--ydb/library/yql/providers/dq/counters/counters.h4
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp8
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h2
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp6
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp4
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp2
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h2
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp12
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h2
-rw-r--r--ydb/library/yql/providers/pq/async_io/ut/ut_helpers.cpp2
-rw-r--r--ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h4
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp28
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp2
-rw-r--r--ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp12
-rw-r--r--ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.h2
-rw-r--r--ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.cpp4
44 files changed, 231 insertions, 233 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
index d70dcc3275a..68d86e72aab 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
@@ -16,12 +16,12 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput
namespace NKqp {
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask&& task,
- NYql::NDq::IDqSourceActorFactory::TPtr sourceActorFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory, NYql::NDq::IDqOutputTransformFactory::TPtr transformFactory,
+ NYql::NDq::IDqSourceFactory::TPtr sourceFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory, NYql::NDq::IDqOutputTransformFactory::TPtr transformFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits);
IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
- NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqSourceActorFactory::TPtr sourceActorFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory, NYql::NDq::IDqOutputTransformFactory::TPtr transformFactory,
+ NYql::NDqProto::TDqTask&& task, NYql::NDq::IDqSourceFactory::TPtr sourceFactory, NYql::NDq::IDqSinkFactory::TPtr sinkFactory, NYql::NDq::IDqOutputTransformFactory::TPtr transformFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
TIntrusivePtr<TKqpCounters> counters);
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index 683cfb600dd..c0afd232d3b 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -33,10 +33,10 @@ public:
}
TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
- : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true)
+ : TBase(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true)
, ComputeCtx(settings.StatsMode)
{
if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) {
@@ -318,11 +318,11 @@ private:
} // anonymous namespace
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
{
- return new TKqpComputeActor(executerId, txId, std::move(task), std::move(sourceActorFactory),
+ return new TKqpComputeActor(executerId, txId, std::move(task), std::move(sourceFactory),
std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits);
}
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 61233473529..76aa3d711c3 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -72,10 +72,10 @@ public:
}
TKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
- NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ NDqProto::TDqTask&& task, IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters)
- : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits)
+ : TBase(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits)
, ComputeCtx(settings.StatsMode)
, Snapshot(snapshot)
, Counters(counters)
@@ -1127,11 +1127,11 @@ private:
} // anonymous namespace
IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
- NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ NDqProto::TDqTask&& task, IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters)
{
- return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), std::move(transformFactory),
+ return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory),
functionRegistry, settings, memoryLimits, counters);
}
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 11cf9146a19..c963406650e 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -115,7 +115,7 @@ void Init(
NYql::CreateYdbDqTaskTransformFactory()
});
- auto sourceActorFactory = MakeIntrusive<NYql::NDq::TDqSourceFactory>();
+ auto sourceFactory = MakeIntrusive<NYql::NDq::TDqSourceFactory>();
auto sinkFactory = MakeIntrusive<NYql::NDq::TDqSinkFactory>();
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory;
@@ -135,11 +135,11 @@ void Init(
}
if (protoConfig.GetPrivateApi().GetEnabled()) {
- RegisterDqPqReadActorFactory(*sourceActorFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
- RegisterYdbReadActorFactory(*sourceActorFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
- RegisterS3ReadActorFactory(*sourceActorFactory, credentialsFactory,
+ RegisterDqPqReadActorFactory(*sourceFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
+ RegisterYdbReadActorFactory(*sourceFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
+ RegisterS3ReadActorFactory(*sourceFactory, credentialsFactory,
httpGateway, std::make_shared<NYql::NS3::TRetryConfig>(protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetRetryConfig()));
- RegisterClickHouseReadActorFactory(*sourceActorFactory, credentialsFactory, httpGateway);
+ RegisterClickHouseReadActorFactory(*sourceFactory, credentialsFactory, httpGateway);
RegisterDqPqWriteActorFactory(*sinkFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
RegisterDQSolomonWriteActorFactory(*sinkFactory, credentialsFactory);
@@ -160,7 +160,7 @@ void Init(
NYql::NDqs::TLocalWorkerManagerOptions lwmOptions;
lwmOptions.Counters = workerManagerCounters;
lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, false);
- lwmOptions.SourceActorFactory = sourceActorFactory;
+ lwmOptions.SourceFactory = sourceFactory;
lwmOptions.SinkFactory = sinkFactory;
lwmOptions.FunctionRegistry = appData->FunctionRegistry;
lwmOptions.TaskRunnerInvokerFactory = new NYql::NDqs::TTaskRunnerInvokerFactory();
@@ -179,7 +179,7 @@ void Init(
::NYql::NCommon::TServiceCounters serviceCounters(appData->Counters);
if (protoConfig.GetNodesManager().GetEnabled() || protoConfig.GetPendingFetcher().GetEnabled()) {
- auto internal = protoConfig.GetPrivateApi().GetLoopback()
+ auto internal = protoConfig.GetPrivateApi().GetLoopback()
? CreateLoopbackServiceActor(clientCounters)
: CreateInternalServiceActor(
yqSharedResources,
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index fc468e163b6..a84f863e72e 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -29,11 +29,11 @@ public:
static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR";
TDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory)
- : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ false)
+ : TBase(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ false)
, TaskRunnerActorFactory(taskRunnerActorFactory)
, ReadyToCheckpointFlag(false)
, SentStatsRequest(false)
@@ -378,9 +378,9 @@ private:
// TODO:(whcrc) maybe save Sources before Program?
for (auto& [inputIndex, source] : SourcesMap) {
- YQL_ENSURE(source.SourceActor, "Source[" << inputIndex << "] is not created");
+ YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created");
NDqProto::TSourceState& sourceState = *state.AddSources();
- source.SourceActor->SaveState(checkpoint, sourceState);
+ source.AsyncInput->SaveState(checkpoint, sourceState);
sourceState.SetInputIndex(inputIndex);
}
}
@@ -573,12 +573,12 @@ private:
IActor* CreateDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory)
{
- return new TDqAsyncComputeActor(executerId, txId, std::move(task), std::move(sourceActorFactory),
+ return new TDqAsyncComputeActor(executerId, txId, std::move(task), std::move(sourceFactory),
std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, taskRunnerActorFactory);
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
index dae744e3777..662e8085e81 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
@@ -19,7 +19,7 @@ namespace NYql {
namespace NDq {
NActors::IActor* CreateDqAsyncComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory);
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
index 7f8a2564b80..85e6d3c96e3 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
@@ -33,11 +33,11 @@ public:
static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR";
TDqComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const TTaskRunnerFactory& taskRunnerFactory)
- : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits)
+ : TBase(executerId, txId, std::move(task), std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits)
, TaskRunnerFactory(taskRunnerFactory)
{}
@@ -68,11 +68,11 @@ private:
IActor* CreateDqComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TTaskRunnerFactory& taskRunnerFactory)
{
- return new TDqComputeActor(executerId, txId, std::move(task), std::move(sourceActorFactory),
+ return new TDqComputeActor(executerId, txId, std::move(task), std::move(sourceFactory),
std::move(sinkFactory), std::move(transformFactory), functionRegistry, settings, memoryLimits, taskRunnerFactory);
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
index 49568b7687c..ab4f8d0af49 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
@@ -257,7 +257,7 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase&
NDqProto::TDqTaskStats* protoTask, bool withProfileStats);
NActors::IActor* CreateDqComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const TTaskRunnerFactory& taskRunnerFactory);
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp
index fdbc0648380..1d22e8a1f96 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp
@@ -7,13 +7,13 @@
namespace NYql::NDq {
-std::pair<IDqSourceActor*, NActors::IActor*> TDqSourceFactory::CreateDqSourceActor(IDqSourceActorFactory::TArguments&& args) const
+std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> TDqSourceFactory::CreateDqSource(IDqSourceFactory::TArguments&& args) const
{
const TString& type = args.InputDesc.GetSource().GetType();
- YQL_ENSURE(!type.empty(), "Attempt to create source actor of empty type");
+ YQL_ENSURE(!type.empty(), "Attempt to create source of empty type");
const TCreatorFunction* creatorFunc = CreatorsByType.FindPtr(type);
- YQL_ENSURE(creatorFunc, "Unknown type of source actor: \"" << type << "\"");
- std::pair<IDqSourceActor*, NActors::IActor*> actor = (*creatorFunc)(std::move(args));
+ YQL_ENSURE(creatorFunc, "Unknown type of source: \"" << type << "\"");
+ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> actor = (*creatorFunc)(std::move(args));
Y_VERIFY(actor.first);
Y_VERIFY(actor.second);
return actor;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h
index d82f5cdd317..12f5de2a061 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h
@@ -13,23 +13,23 @@
namespace NYql::NDq {
template <class T>
-concept TCastsToSourceActorPair =
- std::is_convertible_v<T, std::pair<IDqSourceActor*, NActors::IActor*>>;
+concept TCastsToAsyncInputPair =
+ std::is_convertible_v<T, std::pair<IDqComputeActorAsyncInput*, NActors::IActor*>>;
template <class T, class TProto>
-concept TSourceActorCreatorFunc = requires(T f, TProto&& settings, IDqSourceActorFactory::TArguments args) {
- { f(std::move(settings), std::move(args)) } -> TCastsToSourceActorPair;
+concept TSourceCreatorFunc = requires(T f, TProto&& settings, IDqSourceFactory::TArguments args) {
+ { f(std::move(settings), std::move(args)) } -> TCastsToAsyncInputPair;
};
-class TDqSourceFactory : public IDqSourceActorFactory {
+class TDqSourceFactory : public IDqSourceFactory {
public:
- using TCreatorFunction = std::function<std::pair<IDqSourceActor*, NActors::IActor*>(TArguments&& args)>;
+ using TCreatorFunction = std::function<std::pair<IDqComputeActorAsyncInput*, NActors::IActor*>(TArguments&& args)>;
- std::pair<IDqSourceActor*, NActors::IActor*> CreateDqSourceActor(TArguments&& args) const override;
+ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSource(TArguments&& args) const override;
void Register(const TString& type, TCreatorFunction creator);
- template <class TProtoMsg, TSourceActorCreatorFunc<TProtoMsg> TCreatorFunc>
+ template <class TProtoMsg, TSourceCreatorFunc<TProtoMsg> TCreatorFunc>
void Register(const TString& type, TCreatorFunc creator) {
Register(type,
[creator = std::move(creator), type](TArguments&& args)
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index aad35f07580..c6ea72a0237 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -142,7 +142,7 @@ public:
protected:
TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, bool ownMemoryQuota = true, bool passExceptions = false)
: ExecuterId(executerId)
@@ -151,7 +151,7 @@ protected:
, RuntimeSettings(settings)
, MemoryLimits(memoryLimits)
, CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn)
- , SourceActorFactory(std::move(sourceActorFactory))
+ , SourceFactory(std::move(sourceFactory))
, SinkFactory(std::move(sinkFactory))
, OutputTransformFactory(std::move(transformFactory))
, FunctionRegistry(functionRegistry)
@@ -168,7 +168,7 @@ protected:
}
TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, const NDqProto::TDqTask& task,
- IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
+ IDqSourceFactory::TPtr sourceFactory, IDqSinkFactory::TPtr sinkFactory, IDqOutputTransformFactory::TPtr transformFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
: ExecuterId(executerId)
@@ -177,7 +177,7 @@ protected:
, RuntimeSettings(settings)
, MemoryLimits(memoryLimits)
, CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn)
- , SourceActorFactory(std::move(sourceActorFactory))
+ , SourceFactory(std::move(sourceFactory))
, SinkFactory(std::move(sinkFactory))
, OutputTransformFactory(std::move(transformFactory))
, FunctionRegistry(functionRegistry)
@@ -228,8 +228,8 @@ protected:
FFunc(TEvDqCompute::TEvRestoreFromCheckpoint::EventType, Checkpoints->Receive);
hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, HandleExecuteBase);
hFunc(NActors::TEvInterconnect::TEvNodeConnected, HandleExecuteBase);
- hFunc(IDqSourceActor::TEvNewSourceDataArrived, OnNewSourceDataArrived);
- hFunc(IDqSourceActor::TEvSourceError, OnSourceError);
+ hFunc(IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived, OnNewAsyncInputDataArrived);
+ hFunc(IDqComputeActorAsyncInput::TEvAsyncInputError, OnAsyncInputError);
default: {
CA_LOG_C("TDqComputeActorBase, unexpected event: " << ev->GetTypeRewrite() << " (" << GetEventTypeString(ev) << ")");
InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite() << " (" << GetEventTypeString(ev) << ")");
@@ -443,7 +443,7 @@ protected:
for (auto& [_, source] : SourcesMap) {
if (source.Actor) {
- source.SourceActor->PassAway();
+ source.AsyncInput->PassAway();
}
}
@@ -636,9 +636,9 @@ protected:
data.SetBlob(TaskRunner->Save());
for (auto& [inputIndex, source] : SourcesMap) {
- YQL_ENSURE(source.SourceActor, "Source[" << inputIndex << "] is not created");
+ YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created");
NDqProto::TSourceState& sourceState = *state.AddSources();
- source.SourceActor->SaveState(checkpoint, sourceState);
+ source.AsyncInput->SaveState(checkpoint, sourceState);
sourceState.SetInputIndex(inputIndex);
}
}
@@ -646,8 +646,8 @@ protected:
void CommitState(const NDqProto::TCheckpoint& checkpoint) override {
CA_LOG_D("Commit state");
for (auto& [inputIndex, source] : SourcesMap) {
- Y_VERIFY(source.SourceActor);
- source.SourceActor->CommitState(checkpoint);
+ Y_VERIFY(source.AsyncInput);
+ source.AsyncInput->CommitState(checkpoint);
}
}
@@ -696,8 +696,8 @@ protected:
for (const NDqProto::TSourceState& sourceState : state.GetSources()) {
TSourceInfo* source = SourcesMap.FindPtr(sourceState.GetInputIndex());
YQL_ENSURE(source, "Failed to load state. Source with input index " << sourceState.GetInputIndex() << " was not found");
- YQL_ENSURE(source->SourceActor, "Source[" << sourceState.GetInputIndex() << "] is not created");
- source->SourceActor->LoadState(sourceState);
+ YQL_ENSURE(source->AsyncInput, "Source[" << sourceState.GetInputIndex() << "] is not created");
+ source->AsyncInput->LoadState(sourceState);
}
for (const NDqProto::TSinkState& sinkState : state.GetSinks()) {
TAsyncOutputInfoBase* sink = SinksMap.FindPtr(sinkState.GetOutputIndex());
@@ -766,8 +766,8 @@ protected:
struct TSourceInfo {
ui64 Index;
- IDqSource::TPtr Source;
- IDqSourceActor* SourceActor = nullptr;
+ IDqSource::TPtr Buffer;
+ IDqComputeActorAsyncInput* AsyncInput = nullptr;
NActors::IActor* Actor = nullptr;
TIssuesBuffer IssuesBuffer;
bool Finished = false;
@@ -843,15 +843,15 @@ protected:
}
virtual void SourcePush(NKikimr::NMiniKQL::TUnboxedValueVector&& batch, TSourceInfo& source, i64 space, bool finished) {
- source.Source->Push(std::move(batch), space);
+ source.Buffer->Push(std::move(batch), space);
if (finished) {
- source.Source->Finish();
+ source.Buffer->Finish();
source.Finished = true;
}
}
virtual i64 SourceFreeSpace(TSourceInfo& source) {
- return source.Source->GetFreeSpace();
+ return source.Buffer->GetFreeSpace();
}
virtual bool SayHelloOnBootstrap() {
@@ -1248,13 +1248,13 @@ protected:
}
}
for (auto& [inputIndex, source] : SourcesMap) {
- if (TaskRunner) { source.Source = TaskRunner->GetSource(inputIndex); Y_VERIFY(source.Source);}
- Y_VERIFY(SourceActorFactory);
+ if (TaskRunner) { source.Buffer = TaskRunner->GetSource(inputIndex); Y_VERIFY(source.Buffer);}
+ Y_VERIFY(SourceFactory);
const auto& inputDesc = Task.GetInputs(inputIndex);
const ui64 i = inputIndex; // Crutch for clang
- CA_LOG_D("Create source actor for input " << i << " " << inputDesc);
- std::tie(source.SourceActor, source.Actor) = SourceActorFactory->CreateDqSourceActor(
- IDqSourceActorFactory::TArguments{
+ CA_LOG_D("Create source for input " << i << " " << inputDesc);
+ std::tie(source.AsyncInput, source.Actor) = SourceFactory->CreateDqSource(
+ IDqSourceFactory::TArguments{
.InputDesc = inputDesc,
.InputIndex = inputIndex,
.TxId = TxId,
@@ -1323,7 +1323,7 @@ protected:
}
for (auto& [inputIndex, source] : SourcesMap) {
- Y_VERIFY(!TaskRunner || source.Source);
+ Y_VERIFY(!TaskRunner || source.Buffer);
if (source.Finished) {
const ui64 indexForLogging = inputIndex; // Crutch for clang
CA_LOG_D("Skip polling source[" << indexForLogging << "]: finished");
@@ -1332,9 +1332,9 @@ protected:
const i64 freeSpace = SourceFreeSpace(source);
if (freeSpace > 0) {
NKikimr::NMiniKQL::TUnboxedValueVector batch;
- Y_VERIFY(source.SourceActor);
+ Y_VERIFY(source.AsyncInput);
bool finished = false;
- const i64 space = source.SourceActor->GetSourceData(batch, finished, freeSpace);
+ const i64 space = source.AsyncInput->GetAsyncInputData(batch, finished, freeSpace);
const ui64 index = inputIndex;
CA_LOG_D("Poll source " << index
<< ". Buffer free space: " << freeSpace
@@ -1343,7 +1343,7 @@ protected:
if (!batch.empty()) {
// If we have read some data, we must run such reading again
- // to process the case when source actor notified us about new data
+ // to process the case when source notified us about new data
// but we haven't read all of it.
ContinueExecute();
}
@@ -1352,12 +1352,12 @@ protected:
}
}
- void OnNewSourceDataArrived(const IDqSourceActor::TEvNewSourceDataArrived::TPtr& ev) {
+ void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) {
Y_VERIFY(SourcesMap.FindPtr(ev->Get()->InputIndex));
ContinueExecute();
}
- void OnSourceError(const IDqSourceActor::TEvSourceError::TPtr& ev) {
+ void OnAsyncInputError(const IDqComputeActorAsyncInput::TEvAsyncInputError::TPtr& ev) {
if (!ev->Get()->IsFatal) {
SourcesMap.at(ev->Get()->InputIndex).IssuesBuffer.Push(ev->Get()->Issues);
return;
@@ -1576,7 +1576,7 @@ protected:
const TComputeRuntimeSettings RuntimeSettings;
const TComputeMemoryLimits MemoryLimits;
const bool CanAllocateExtraMemory = false;
- const IDqSourceActorFactory::TPtr SourceActorFactory;
+ const IDqSourceFactory::TPtr SourceFactory;
const IDqSinkFactory::TPtr SinkFactory;
const IDqOutputTransformFactory::TPtr OutputTransformFactory;
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_sources.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_sources.h
index 672af2b72d2..113120031f3 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_sources.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_sources.h
@@ -21,33 +21,33 @@ class IActor;
namespace NYql::NDq {
-// Source actor.
+// Source/transform.
// Must be IActor.
//
// Protocol:
-// 1. CA starts source actor.
-// 2. CA calls IDqSourceActor::GetSourceData(batch, FreeSpace).
-// 3. Source actor sends TEvNewSourceDataArrived when it has data to process.
-// 4. CA calls IDqSourceActor::GetSourceData(batch, FreeSpace) to get data when it is ready to process it.
+// 1. CA starts source/transform.
+// 2. CA calls IDqComputeActorAsyncInput::GetAsyncInputData(batch, FreeSpace).
+// 3. Source/transform sends TEvNewAsyncInputDataArrived when it has data to process.
+// 4. CA calls IDqComputeActorAsyncInput::GetAsyncInputData(batch, FreeSpace) to get data when it is ready to process it.
//
-// In case of error source actor sends TEvSourceError
+// In case of error source/transform sends TEvAsyncInputError
//
// Checkpointing:
// 1. InjectCheckpoint event arrives to CA.
// 2. ...
-// 3. CA calls IDqSourceActor::SaveState() and IDqTaskRunner::SaveGraphState() and uses this pair as state for CA.
+// 3. CA calls IDqComputeActorAsyncInput::SaveState() and IDqTaskRunner::SaveGraphState() and uses this pair as state for CA.
// 3. ...
-// 5. CA calls IDqSourceActor::CommitState() to apply all side effects.
-struct IDqSourceActor {
- struct TEvNewSourceDataArrived : public NActors::TEventLocal<TEvNewSourceDataArrived, TDqComputeEvents::EvNewSourceDataArrived> {
+// 5. CA calls IDqComputeActorAsyncInput::CommitState() to apply all side effects.
+struct IDqComputeActorAsyncInput {
+ struct TEvNewAsyncInputDataArrived : public NActors::TEventLocal<TEvNewAsyncInputDataArrived, TDqComputeEvents::EvNewAsyncInputDataArrived> {
const ui64 InputIndex;
- explicit TEvNewSourceDataArrived(ui64 inputIndex)
+ explicit TEvNewAsyncInputDataArrived(ui64 inputIndex)
: InputIndex(inputIndex)
{}
};
- struct TEvSourceError : public NActors::TEventLocal<TEvSourceError, TDqComputeEvents::EvSourceError> {
- TEvSourceError(ui64 inputIndex, const TIssues& issues, bool isFatal)
+ struct TEvAsyncInputError : public NActors::TEventLocal<TEvAsyncInputError, TDqComputeEvents::EvAsyncInputError> {
+ TEvAsyncInputError(ui64 inputIndex, const TIssues& issues, bool isFatal)
: InputIndex(inputIndex)
, Issues(issues)
, IsFatal(isFatal)
@@ -63,7 +63,7 @@ struct IDqSourceActor {
// Gets data and returns space used by filled data batch.
// Method should be called under bound mkql allocator.
// Could throw YQL errors.
- virtual i64 GetSourceData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, bool& finished, i64 freeSpace) = 0;
+ virtual i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, bool& finished, i64 freeSpace) = 0;
// Checkpointing.
virtual void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TSourceState& state) = 0;
@@ -72,11 +72,11 @@ struct IDqSourceActor {
virtual void PassAway() = 0; // The same signature as IActor::PassAway()
- virtual ~IDqSourceActor() = default;
+ virtual ~IDqComputeActorAsyncInput() = default;
};
-struct IDqSourceActorFactory : public TThrRefBase {
- using TPtr = TIntrusivePtr<IDqSourceActorFactory>;
+struct IDqSourceFactory : public TThrRefBase {
+ using TPtr = TIntrusivePtr<IDqSourceFactory>;
struct TArguments {
const NDqProto::TTaskInput& InputDesc;
@@ -89,10 +89,10 @@ struct IDqSourceActorFactory : public TThrRefBase {
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
};
- // Creates source actor.
+ // Creates source.
// Could throw YQL errors.
- // IActor* and IDqSourceActor* returned by method must point to the objects with consistent lifetime.
- virtual std::pair<IDqSourceActor*, NActors::IActor*> CreateDqSourceActor(TArguments&& args) const = 0;
+ // IActor* and IDqComputeActorAsyncInput* returned by method must point to the objects with consistent lifetime.
+ virtual std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSource(TArguments&& args) const = 0;
};
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/dq_events_ids.h b/ydb/library/yql/dq/actors/dq_events_ids.h
index 2049edee75a..11b02252332 100644
--- a/ydb/library/yql/dq/actors/dq_events_ids.h
+++ b/ydb/library/yql/dq/actors/dq_events_ids.h
@@ -48,8 +48,8 @@ struct TDqComputeEvents {
EvGetTaskStateResult,
EvStateRequest,
EvNewCheckpointCoordinatorAck,
- EvNewSourceDataArrived,
- EvSourceError,
+ EvNewAsyncInputDataArrived,
+ EvAsyncInputError,
// place all new events here
EvEnd
diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto
index cf671332c27..4c60332bbf7 100644
--- a/ydb/library/yql/dq/actors/protos/dq_stats.proto
+++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto
@@ -13,7 +13,7 @@ enum EDqStatsMode {
DQ_STATS_MODE_PROFILE = 30;
}
-message TDqSourceStats {
+message TDqAsyncInputBufferStats {
// basic stats
uint64 InputIndex = 1;
uint64 Chunks = 2;
@@ -136,7 +136,7 @@ message TDqTaskStats {
}
repeated THistBucket ComputeCpuTimeByRun = 106;
- repeated TDqSourceStats Sources = 150;
+ repeated TDqAsyncInputBufferStats Sources = 150;
repeated TDqInputChannelStats InputChannels = 151;
repeated TDqAsyncOutputBufferStats Sinks = 152;
repeated TDqOutputChannelStats OutputChannels = 153;
diff --git a/ydb/library/yql/dq/runtime/dq_source.cpp b/ydb/library/yql/dq/runtime/dq_source.cpp
index b9e75a15e81..dfa3da25fd7 100644
--- a/ydb/library/yql/dq/runtime/dq_source.cpp
+++ b/ydb/library/yql/dq/runtime/dq_source.cpp
@@ -24,14 +24,14 @@ public:
}
}
- const TDqSourceStats* GetStats() const override {
+ const TDqAsyncInputBufferStats* GetStats() const override {
return &BasicStats;
}
private:
const ui64 InputIndex;
- TDqSourceStats BasicStats;
- TDqSourceStats* ProfileStats = nullptr;
+ TDqAsyncInputBufferStats BasicStats;
+ TDqAsyncInputBufferStats* ProfileStats = nullptr;
};
IDqSource::TPtr CreateDqSource(
diff --git a/ydb/library/yql/dq/runtime/dq_source.h b/ydb/library/yql/dq/runtime/dq_source.h
index 3355a4966f9..82f5cb2ade6 100644
--- a/ydb/library/yql/dq/runtime/dq_source.h
+++ b/ydb/library/yql/dq/runtime/dq_source.h
@@ -3,10 +3,10 @@
namespace NYql::NDq {
-struct TDqSourceStats : TDqInputStats {
+struct TDqAsyncInputBufferStats : TDqInputStats {
ui64 InputIndex = 0;
- explicit TDqSourceStats(ui64 inputIndex)
+ explicit TDqAsyncInputBufferStats(ui64 inputIndex)
: InputIndex(inputIndex) {}
template<typename T>
@@ -33,7 +33,7 @@ public:
virtual void Finish() = 0;
- virtual const TDqSourceStats* GetStats() const = 0;
+ virtual const TDqAsyncInputBufferStats* GetStats() const = 0;
};
IDqSource::TPtr CreateDqSource(ui64 inputIndex, NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes,
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index 5dbefd9f2f5..e3389ab985a 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -81,7 +81,7 @@ struct TTaskRunnerStatsBase {
NMonitoring::IHistogramCollectorPtr ComputeCpuTimeByRun; // in millis
THashMap<ui64, const TDqInputChannelStats*> InputChannels; // Channel id -> Channel stats
- THashMap<ui64, const TDqSourceStats*> Sources; // Input index -> Source stats
+ THashMap<ui64, const TDqAsyncInputBufferStats*> Sources; // Input index -> Source stats
THashMap<ui64, const TDqOutputChannelStats*> OutputChannels; // Channel id -> Channel stats
TVector<TMkqlStat> MkqlStats;
@@ -127,7 +127,7 @@ struct TTaskRunnerStatsBase {
private:
virtual TDqInputChannelStats* MutableInputChannel(ui64 channelId) = 0;
- virtual TDqSourceStats* MutableSource(ui64 sourceId) = 0; // todo: (whcrc) unused, not modified by these pointers
+ virtual TDqAsyncInputBufferStats* MutableSource(ui64 sourceId) = 0; // todo: (whcrc) unused, not modified by these pointers
virtual TDqOutputChannelStats* MutableOutputChannel(ui64 channelId) = 0;
};
@@ -137,8 +137,8 @@ struct TDqTaskRunnerStats : public TTaskRunnerStatsBase {
return const_cast<TDqInputChannelStats*>(InputChannels[channelId]);
}
- TDqSourceStats* MutableSource(ui64 sourceId) override {
- return const_cast<TDqSourceStats*>(Sources[sourceId]);
+ TDqAsyncInputBufferStats* MutableSource(ui64 sourceId) override {
+ return const_cast<TDqAsyncInputBufferStats*>(Sources[sourceId]);
}
TDqOutputChannelStats* MutableOutputChannel(ui64 channelId) override {
@@ -149,7 +149,7 @@ struct TDqTaskRunnerStats : public TTaskRunnerStatsBase {
struct TDqTaskRunnerStatsInplace : public TTaskRunnerStatsBase {
// all stats are owned by this object
TVector<THolder<TDqInputChannelStats>> InputChannelHolder;
- TVector<THolder<TDqSourceStats>> SourceHolder;
+ TVector<THolder<TDqAsyncInputBufferStats>> SourceHolder;
TVector<THolder<TDqOutputChannelStats>> OutputChannelHolder;
template<typename TStat>
@@ -166,7 +166,7 @@ struct TDqTaskRunnerStatsInplace : public TTaskRunnerStatsBase {
return GetOrCreate(InputChannels, InputChannelHolder, channelId);
}
- TDqSourceStats* MutableSource(ui64 sourceId) override {
+ TDqAsyncInputBufferStats* MutableSource(ui64 sourceId) override {
return GetOrCreate(Sources, SourceHolder, sourceId);
}
diff --git a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp
index d611c91201a..fa9c5221f7f 100644
--- a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp
+++ b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp
@@ -45,7 +45,7 @@ struct TEvPrivate {
} // namespace
-class TClickHouseReadActor : public TActorBootstrapped<TClickHouseReadActor>, public IDqSourceActor {
+class TClickHouseReadActor : public TActorBootstrapped<TClickHouseReadActor>, public IDqComputeActorAsyncInput {
public:
TClickHouseReadActor(ui64 inputIndex,
IHTTPGateway::TPtr gateway,
@@ -91,7 +91,7 @@ private:
}
}
- i64 GetSourceData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final {
+ i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final {
if (Result) {
const auto size = Result->size();
buffer.emplace_back(NKikimr::NMiniKQL::MakeString(std::string_view(*Result)));
@@ -106,14 +106,14 @@ private:
void Handle(TEvPrivate::TEvReadResult::TPtr& result) {
Result.emplace(std::move(result->Get()->Result));
- Send(ComputeActorId, new TEvNewSourceDataArrived(InputIndex));
+ Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
void Handle(TEvPrivate::TEvReadError::TPtr& result) {
- Send(ComputeActorId, new TEvSourceError(InputIndex, result->Get()->Error, true));
+ Send(ComputeActorId, new TEvAsyncInputError(InputIndex, result->Get()->Error, true));
}
- // IActor & IDqSourceActor
+ // IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
TActorBootstrapped<TClickHouseReadActor>::PassAway();
}
@@ -130,7 +130,7 @@ private:
std::optional<IHTTPGateway::TContent> Result;
};
-std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateClickHouseReadActor(
+std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateClickHouseReadActor(
IHTTPGateway::TPtr gateway,
NCH::TSource&& params,
ui64 inputIndex,
@@ -162,4 +162,3 @@ std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateClickHouseReadActor(
}
} // namespace NYql::NDq
-
diff --git a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.h b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.h
index 0f9696851a5..5a965013025 100644
--- a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.h
+++ b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.h
@@ -8,7 +8,7 @@
namespace NYql::NDq {
-std::pair<NYql::NDq::IDqSourceActor*, NActors::IActor*> CreateClickHouseReadActor(
+std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateClickHouseReadActor(
IHTTPGateway::TPtr gateway,
NCH::TSource&& params,
ui64 inputIndex,
@@ -18,4 +18,3 @@ std::pair<NYql::NDq::IDqSourceActor*, NActors::IActor*> CreateClickHouseReadActo
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
} // namespace NYql::NDq
-
diff --git a/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.cpp b/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.cpp
index c6053eb64f1..824bdd958b1 100644
--- a/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.cpp
+++ b/ydb/library/yql/providers/clickhouse/actors/yql_ch_source_factory.cpp
@@ -7,7 +7,7 @@ namespace NYql::NDq {
void RegisterClickHouseReadActorFactory(TDqSourceFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway) {
factory.Register<NCH::TSource>("ClickHouseSource",
- [credentialsFactory, gateway](NCH::TSource&& settings, IDqSourceActorFactory::TArguments&& args) {
+ [credentialsFactory, gateway](NCH::TSource&& settings, IDqSourceFactory::TArguments&& args) {
return CreateClickHouseReadActor(gateway, std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory);
});
}
diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp
index 22b93b13ced..f036dee78f4 100644
--- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp
+++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp
@@ -22,7 +22,7 @@ NYql::NDqProto::TCheckpoint CreateCheckpoint(ui64 id) {
return checkpoint;
}
-TFakeActor::TFakeActor(TSourcePromises& sourcePromises, TAsyncOutputPromises& asyncOutputPromises)
+TFakeActor::TFakeActor(TAsyncInputPromises& sourcePromises, TAsyncOutputPromises& asyncOutputPromises)
: TActor<TFakeActor>(&TFakeActor::StateFunc)
, MemoryInfo("test")
, HolderFactory(Alloc.Ref(), MemoryInfo)
@@ -30,9 +30,9 @@ TFakeActor::TFakeActor(TSourcePromises& sourcePromises, TAsyncOutputPromises& as
, FunctionRegistry(NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry()))
, ProgramBuilder(TypeEnv, *FunctionRegistry)
, ValueBuilder(HolderFactory)
- , SourceEvents(*this)
+ , AsyncInputEvents(*this)
, AsyncOutputCallbacks(*this)
- , SourcePromises(sourcePromises)
+ , AsyncInputPromises(sourcePromises)
, AsyncOutputPromises(asyncOutputPromises)
{
Alloc.Release();
@@ -48,19 +48,19 @@ void TFakeActor::InitAsyncOutput(IDqComputeActorAsyncOutput* dqAsyncOutput, IAct
DqAsyncOutputAsActor = dqAsyncOutputAsActor;
}
-void TFakeActor::InitSource(IDqSourceActor* dqSource, IActor* dqSourceAsActor) {
- DqSourceActorId = RegisterWithSameMailbox(dqSourceAsActor),
- DqSourceActor = dqSource;
- DqSourceActorAsActor = dqSourceAsActor;
+void TFakeActor::InitAsyncInput(IDqComputeActorAsyncInput* dqAsyncInput, IActor* dqAsyncInputAsActor) {
+ DqAsyncInputActorId = RegisterWithSameMailbox(dqAsyncInputAsActor),
+ DqAsyncInput = dqAsyncInput;
+ DqAsyncInputAsActor = dqAsyncInputAsActor;
}
void TFakeActor::Terminate() {
- if (DqSourceActorId) {
- DqSourceActor->PassAway();
+ if (DqAsyncInputActorId) {
+ DqAsyncInput->PassAway();
- DqSourceActorId = std::nullopt;
- DqSourceActor = nullptr;
- DqSourceActorAsActor = nullptr;
+ DqAsyncInputActorId = std::nullopt;
+ DqAsyncInput = nullptr;
+ DqAsyncInputAsActor = nullptr;
}
if (DqAsyncOutputActorId) {
@@ -87,7 +87,7 @@ TFakeCASetup::TFakeCASetup()
Runtime->AddLocalService(
FakeActorId,
NActors::TActorSetupCmd(
- new TFakeActor(SourcePromises, AsyncOutputPromises),
+ new TFakeActor(AsyncInputPromises, AsyncOutputPromises),
NActors::TMailboxType::Simple,
0));
@@ -111,15 +111,15 @@ void TFakeCASetup::AsyncOutputWrite(const TWriteValueProducer valueProducer, TMa
void TFakeCASetup::SaveSourceState(NDqProto::TCheckpoint checkpoint, NDqProto::TSourceState& state) {
Execute([&state, &checkpoint](TFakeActor& actor) {
- Y_ASSERT(actor.DqSourceActor);
- actor.DqSourceActor->SaveState(checkpoint, state);
+ Y_ASSERT(actor.DqAsyncInput);
+ actor.DqAsyncInput->SaveState(checkpoint, state);
});
}
void TFakeCASetup::LoadSource(const NDqProto::TSourceState& state) {
Execute([&state](TFakeActor& actor) {
- Y_ASSERT(actor.DqSourceActor);
- actor.DqSourceActor->LoadState(state);
+ Y_ASSERT(actor.DqAsyncInput);
+ actor.DqAsyncInput->LoadState(state);
});
}
diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
index 2a7aa44f5f0..a0a460e46ea 100644
--- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
+++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
@@ -56,8 +56,8 @@ namespace {
};
}
-struct TSourcePromises {
- NThreading::TPromise<void> NewSourceDataArrived = NThreading::NewPromise();
+struct TAsyncInputPromises {
+ NThreading::TPromise<void> NewAsyncInputDataArrived = NThreading::NewPromise();
NThreading::TPromise<TIssues> FatalError = NThreading::NewPromise<TIssues>();
};
@@ -70,18 +70,18 @@ struct TAsyncOutputPromises {
NYql::NDqProto::TCheckpoint CreateCheckpoint(ui64 id = 0);
class TFakeActor : public NActors::TActor<TFakeActor> {
- struct TSourceEvents {
- explicit TSourceEvents(TFakeActor& parent) : Parent(parent) {}
+ struct TAsyncInputEvents {
+ explicit TAsyncInputEvents(TFakeActor& parent) : Parent(parent) {}
- void OnNewSourceDataArrived(ui64) {
- Parent.SourcePromises.NewSourceDataArrived.SetValue();
- Parent.SourcePromises.NewSourceDataArrived = NThreading::NewPromise();
+ void OnNewAsyncInputDataArrived(ui64) {
+ Parent.AsyncInputPromises.NewAsyncInputDataArrived.SetValue();
+ Parent.AsyncInputPromises.NewAsyncInputDataArrived = NThreading::NewPromise();
}
- void OnSourceError(ui64, const TIssues& issues, bool isFatal) {
+ void OnAsyncInputError(ui64, const TIssues& issues, bool isFatal) {
Y_UNUSED(isFatal);
- Parent.SourcePromises.FatalError.SetValue(issues);
- Parent.SourcePromises.FatalError = NThreading::NewPromise<TIssues>();
+ Parent.AsyncInputPromises.FatalError.SetValue(issues);
+ Parent.AsyncInputPromises.FatalError = NThreading::NewPromise<TIssues>();
}
TFakeActor& Parent;
@@ -111,25 +111,25 @@ class TFakeActor : public NActors::TActor<TFakeActor> {
};
public:
- TFakeActor(TSourcePromises& sourcePromises, TAsyncOutputPromises& asyncOutputPromises);
+ TFakeActor(TAsyncInputPromises& sourcePromises, TAsyncOutputPromises& asyncOutputPromises);
~TFakeActor();
void InitAsyncOutput(IDqComputeActorAsyncOutput* dqAsyncOutput, IActor* dqAsyncOutputAsActor);
- void InitSource(IDqSourceActor* dqSource, IActor* dqSourceAsActor);
+ void InitAsyncInput(IDqComputeActorAsyncInput* dqAsyncInput, IActor* dqAsyncInputAsActor);
void Terminate();
TAsyncOutputCallbacks& GetAsyncOutputCallbacks();
NKikimr::NMiniKQL::THolderFactory& GetHolderFactory();
public:
- IDqSourceActor* DqSourceActor = nullptr;
+ IDqComputeActorAsyncInput* DqAsyncInput = nullptr;
IDqComputeActorAsyncOutput* DqAsyncOutput = nullptr;
private:
STRICT_STFUNC(StateFunc,
hFunc(TEvPrivate::TEvExecute, Handle);
- hFunc(IDqSourceActor::TEvNewSourceDataArrived, Handle);
- hFunc(IDqSourceActor::TEvSourceError, Handle);
+ hFunc(IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived, Handle);
+ hFunc(IDqComputeActorAsyncInput::TEvAsyncInputError, Handle);
)
void Handle(TEvPrivate::TEvExecute::TPtr& ev) {
@@ -142,12 +142,12 @@ private:
ev->Get()->Promise.SetValue();
}
- void Handle(const IDqSourceActor::TEvNewSourceDataArrived::TPtr& ev) {
- SourceEvents.OnNewSourceDataArrived(ev->Get()->InputIndex);
+ void Handle(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) {
+ AsyncInputEvents.OnNewAsyncInputDataArrived(ev->Get()->InputIndex);
}
- void Handle(const IDqSourceActor::TEvSourceError::TPtr& ev) {
- SourceEvents.OnSourceError(ev->Get()->InputIndex, ev->Get()->Issues, ev->Get()->IsFatal);
+ void Handle(const IDqComputeActorAsyncInput::TEvAsyncInputError::TPtr& ev) {
+ AsyncInputEvents.OnAsyncInputError(ev->Get()->InputIndex, ev->Get()->Issues, ev->Get()->IsFatal);
}
public:
@@ -161,16 +161,16 @@ public:
NKikimr::NMiniKQL::TDefaultValueBuilder ValueBuilder;
private:
- std::optional<NActors::TActorId> DqSourceActorId;
- IActor* DqSourceActorAsActor = nullptr;
+ std::optional<NActors::TActorId> DqAsyncInputActorId;
+ IActor* DqAsyncInputAsActor = nullptr;
std::optional<NActors::TActorId> DqAsyncOutputActorId;
IActor* DqAsyncOutputAsActor = nullptr;
- TSourceEvents SourceEvents;
+ TAsyncInputEvents AsyncInputEvents;
TAsyncOutputCallbacks AsyncOutputCallbacks;
- TSourcePromises& SourcePromises;
+ TAsyncInputPromises& AsyncInputPromises;
TAsyncOutputPromises& AsyncOutputPromises;
};
@@ -179,12 +179,12 @@ struct TFakeCASetup {
~TFakeCASetup();
template<typename T>
- std::vector<T> SourceRead(const TReadValueParser<T> parser, i64 freeSpace = 12345) {
+ std::vector<T> AsyncInputRead(const TReadValueParser<T> parser, i64 freeSpace = 12345) {
std::vector<T> result;
Execute([&result, &parser, freeSpace](TFakeActor& actor) {
NKikimr::NMiniKQL::TUnboxedValueVector buffer;
bool finished = false;
- actor.DqSourceActor->GetSourceData(buffer, finished, freeSpace);
+ actor.DqAsyncInput->GetAsyncInputData(buffer, finished, freeSpace);
for (const auto& uv : buffer) {
for (const auto item : parser(uv)) {
@@ -197,7 +197,7 @@ struct TFakeCASetup {
}
template<typename T>
- std::vector<T> SourceReadUntil(
+ std::vector<T> AsyncInputReadUntil(
const TReadValueParser<T> parser,
ui64 size,
i64 eachReadFreeSpace = 1000,
@@ -205,13 +205,13 @@ struct TFakeCASetup {
{
std::vector<T> result;
DoWithRetry([&](){
- auto batch = SourceRead<T>(parser, eachReadFreeSpace);
+ auto batch = AsyncInputRead<T>(parser, eachReadFreeSpace);
for (const auto& item : batch) {
result.emplace_back(item);
}
if (result.size() < size) {
- SourcePromises.NewSourceDataArrived.GetFuture().Wait(timeout);
+ AsyncInputPromises.NewAsyncInputDataArrived.GetFuture().Wait(timeout);
ythrow yexception() << "Not enough data";
}
},
@@ -233,7 +233,7 @@ struct TFakeCASetup {
public:
TRuntimePtr Runtime;
NActors::TActorId FakeActorId;
- TSourcePromises SourcePromises;
+ TAsyncInputPromises AsyncInputPromises;
TAsyncOutputPromises AsyncOutputPromises;
};
diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
index 8ccb7ccd92c..956c39eb061 100644
--- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
@@ -65,7 +65,7 @@ IActor* CreateComputeActor(
executerId,
operationId,
std::move(task),
- options.SourceActorFactory,
+ options.SourceFactory,
options.SinkFactory,
options.TransformFactory,
options.FunctionRegistry,
@@ -77,7 +77,7 @@ IActor* CreateComputeActor(
executerId,
operationId,
std::move(task),
- options.SourceActorFactory,
+ options.SourceFactory,
options.SinkFactory,
options.TransformFactory,
options.FunctionRegistry,
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index 49860f50e44..22d9bc86682 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -49,7 +49,7 @@ struct TOutputChannel {
};
struct TSourceInfo {
- IDqSourceActor* SourceActor = nullptr;
+ IDqComputeActorAsyncInput* Source = nullptr;
NActors::IActor* Actor = nullptr;
i64 FreeSpace = 1;
bool HasData = false;
@@ -76,12 +76,12 @@ public:
explicit TDqWorker(
const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
- const IDqSourceActorFactory::TPtr& sourceActorFactory,
+ const IDqSourceFactory::TPtr& sourceFactory,
const IDqSinkFactory::TPtr& sinkFactory,
TWorkerRuntimeData* runtimeData,
const TString& traceId)
: TRichActor<TDqWorker>(&TDqWorker::Handler)
- , SourceActorFactory(sourceActorFactory)
+ , SourceFactory(sourceFactory)
, SinkFactory(sinkFactory)
, TaskRunnerActorFactory(taskRunnerActorFactory)
, RuntimeData(runtimeData)
@@ -116,7 +116,7 @@ public:
Actor->PassAway();
}
for (const auto& [_, v] : SourcesMap) {
- v.SourceActor->PassAway();
+ v.Source->PassAway();
}
for (const auto& [_, v] : SinksMap) {
v.Sink->PassAway();
@@ -146,8 +146,8 @@ private:
HFunc(TEvContinueRun, OnContinueRun);
cFunc(TEvents::TEvWakeup::EventType, OnWakeup);
- hFunc(IDqSourceActor::TEvNewSourceDataArrived, OnNewSourceDataArrived);
- hFunc(IDqSourceActor::TEvSourceError, OnSourceError);
+ hFunc(IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived, OnNewAsyncInputDataArrived);
+ hFunc(IDqComputeActorAsyncInput::TEvAsyncInputError, OnAsyncInputError);
})
void ExtractStats(::Ydb::Issue::IssueMessage* issue) {
@@ -275,9 +275,9 @@ private:
if (input.HasSource()) {
auto& source = SourcesMap[inputId];
source.TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv);
- std::tie(source.SourceActor, source.Actor) =
- SourceActorFactory->CreateDqSourceActor(
- IDqSourceActorFactory::TArguments{
+ std::tie(source.Source, source.Actor) =
+ SourceFactory->CreateDqSource(
+ IDqSourceFactory::TArguments{
.InputDesc = input,
.InputIndex = static_cast<ui64>(inputId),
.TxId = TraceId,
@@ -577,7 +577,7 @@ private:
auto guard = source.TypeEnv->BindAllocator();
NKikimr::NMiniKQL::TUnboxedValueVector batch;
bool finished = false;
- const i64 space = source.SourceActor->GetSourceData(batch, finished, freeSpace);
+ const i64 space = source.Source->GetAsyncInputData(batch, finished, freeSpace);
const ui64 index = inputIndex;
if (space <= 0) {
continue;
@@ -667,8 +667,8 @@ private:
}
}
- /*____________________ SourceActorEvents __________________*/
- void OnNewSourceDataArrived(const IDqSourceActor::TEvNewSourceDataArrived::TPtr& ev) {
+ /*____________________ SourceEvents __________________*/
+ void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) {
try {
if (!TaskRunnerPrepared) {
return;
@@ -680,7 +680,7 @@ private:
SendFailure(MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::UNSPECIFIED, CurrentExceptionMessage()));
}
}
- void OnSourceError(const IDqSourceActor::TEvSourceError::TPtr& ev) {
+ void OnAsyncInputError(const IDqComputeActorAsyncInput::TEvAsyncInputError::TPtr& ev) {
Y_UNUSED(ev->Get()->InputIndex);
SendFailure(MakeHolder<TEvDqFailure>(ev->Get()->IsFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, ev->Get()->Issues.ToString()));
}
@@ -725,7 +725,7 @@ private:
/*_________________________________________________________*/
- IDqSourceActorFactory::TPtr SourceActorFactory;
+ IDqSourceFactory::TPtr SourceFactory;
IDqSinkFactory::TPtr SinkFactory;
ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory;
NTaskRunnerActor::ITaskRunnerActor* Actor = nullptr;
@@ -764,14 +764,14 @@ NActors::IActor* CreateWorkerActor(
TWorkerRuntimeData* runtimeData,
const TString& traceId,
const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
- const IDqSourceActorFactory::TPtr& sourceActorFactory,
+ const IDqSourceFactory::TPtr& sourceFactory,
const IDqSinkFactory::TPtr& sinkFactory)
{
Y_VERIFY(taskRunnerActorFactory);
return new TLogWrapReceive(
new TDqWorker(
taskRunnerActorFactory,
- sourceActorFactory,
+ sourceFactory,
sinkFactory,
runtimeData,
traceId), traceId);
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.h b/ydb/library/yql/providers/dq/actors/worker_actor.h
index a9eac60c486..94022f93112 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.h
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.h
@@ -25,7 +25,7 @@ namespace NYql::NDqs {
TWorkerRuntimeData* runtimeData,
const TString& traceId,
const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
- const NDq::IDqSourceActorFactory::TPtr& sourceActorFactory,
+ const NDq::IDqSourceFactory::TPtr& sourceFactory,
const NDq::IDqSinkFactory::TPtr& sinkFactory);
} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
index 213b7fb38b9..62e029b30c6 100644
--- a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
+++ b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
@@ -122,7 +122,7 @@ message TGetStatsOutputResponse {
message TGetStatsSourceResponse {
reserved 1; // NKqpProto.TKqpStatsTask.TSourceStats Stats = 1;
- NDqProto.TDqSourceStats Stats = 2;
+ NDqProto.TDqAsyncInputBufferStats Stats = 2;
}
message TSinkStatsResponse {
diff --git a/ydb/library/yql/providers/dq/counters/counters.h b/ydb/library/yql/providers/dq/counters/counters.h
index 4a457d24fc7..ec05baa82da 100644
--- a/ydb/library/yql/providers/dq/counters/counters.h
+++ b/ydb/library/yql/providers/dq/counters/counters.h
@@ -220,8 +220,8 @@ struct TCounters {
}
void AddSourceStats(
- const NDq::TDqSourceStats& currentStats,
- NDq::TDqSourceStats& oldStats,
+ const NDq::TDqAsyncInputBufferStats& currentStats,
+ NDq::TDqAsyncInputBufferStats& oldStats,
ui64 taskId, ui64 inputIndex)
{
std::map<TString, TString> labels = {
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
index 66f0cd150a2..82092045a22 100644
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
@@ -22,7 +22,7 @@ class TLocalServiceHolder {
public:
TLocalServiceHolder(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort,
- NDq::IDqSourceActorFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory)
+ NDq::IDqSourceFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory)
{
ui32 nodeId = 1;
@@ -48,7 +48,7 @@ public:
NDqs::TLocalWorkerManagerOptions lwmOptions;
lwmOptions.Factory = NTaskRunnerProxy::CreateFactory(functionRegistry, compFactory, taskTransformFactory, true);
- lwmOptions.SourceActorFactory = std::move(sourceFactory);
+ lwmOptions.SourceFactory = std::move(sourceFactory);
lwmOptions.SinkFactory = std::move(sinkFactory);
lwmOptions.TransformFactory = std::move(transformFactory);
lwmOptions.FunctionRegistry = functionRegistry;
@@ -113,7 +113,7 @@ THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::I
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort,
- NDq::IDqSourceActorFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory)
+ NDq::IDqSourceFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory)
{
return MakeHolder<TLocalServiceHolder>(functionRegistry, compFactory, taskTransformFactory, dqTaskPreprocessorFactories, interconnectPort, grpcPort, std::move(sourceFactory), std::move(sinkFactory), std::move(transformFactory));
}
@@ -121,7 +121,7 @@ THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::I
TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
- NDq::IDqSourceActorFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory)
+ NDq::IDqSourceFactory::TPtr sourceFactory, NDq::IDqSinkFactory::TPtr sinkFactory, NDq::IDqOutputTransformFactory::TPtr transformFactory)
{
int startPort = 31337;
TRangeWalker<int> portWalker(startPort, startPort+100);
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h
index 263e9573491..8afcb036cd6 100644
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h
@@ -9,6 +9,6 @@ namespace NYql {
TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
- NDq::IDqSourceActorFactory::TPtr = nullptr, NDq::IDqSinkFactory::TPtr = nullptr, NDq::IDqOutputTransformFactory::TPtr = nullptr);
+ NDq::IDqSourceFactory::TPtr = nullptr, NDq::IDqSinkFactory::TPtr = nullptr, NDq::IDqOutputTransformFactory::TPtr = nullptr);
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index 2695689b47c..cfbccb9a7a8 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -55,7 +55,7 @@ void ToProto(T* s1, const NDq::TDqInputChannelStats* ss)
}
template<typename T>
-void ToProto(T* s1, const NDq::TDqSourceStats* ss)
+void ToProto(T* s1, const NDq::TDqAsyncInputBufferStats* ss)
{
s1->SetChunks(ss->Chunks);
s1->SetBytes(ss->Bytes);
@@ -186,7 +186,7 @@ public:
auto maybeSourceOldStats = CurrentSourcesStats.find(inputIndex);
if (maybeSourceOldStats == CurrentSourcesStats.end()) {
maybeSourceOldStats = CurrentSourcesStats.emplace_hint(
- maybeSourceOldStats, inputIndex, NDq::TDqSourceStats(inputIndex));
+ maybeSourceOldStats, inputIndex, NDq::TDqAsyncInputBufferStats(inputIndex));
}
QueryStat.AddSourceStats(
*maybeSourceStats->second,
@@ -783,7 +783,7 @@ public:
NDq::TDqTaskRunnerStats CurrentStats;
std::unordered_map<ui64, NDq::TDqInputChannelStats> CurrentInputChannelsStats;
- std::unordered_map<ui64, NDq::TDqSourceStats> CurrentSourcesStats;
+ std::unordered_map<ui64, NDq::TDqAsyncInputBufferStats> CurrentSourcesStats;
std::unordered_map<ui64, NDq::TDqOutputChannelStats> CurrentOutputChannelsStats;
i64 LastCommand = -1;
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index 1d360eb6e3d..dd7b70fb165 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -736,7 +736,7 @@ public:
return InputIndex;
}
- const TDqSourceStats* GetStats() const override {
+ const TDqAsyncInputBufferStats* GetStats() const override {
try {
NDqProto::TCommandHeader header;
header.SetVersion(4);
@@ -789,7 +789,7 @@ public:
private:
ui64 TaskId;
ui64 InputIndex;
- mutable TDqSourceStats Stats;
+ mutable TDqAsyncInputBufferStats Stats;
IPipeTaskRunner* TaskRunner;
IInputStream& Input;
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
index ab0f32af1d7..2538334a295 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
@@ -261,7 +261,7 @@ private:
Options.RuntimeData,
traceId,
Options.TaskRunnerActorFactory,
- Options.SourceActorFactory,
+ Options.SourceFactory,
Options.SinkFactory));
}
allocationInfo.WorkerActors.emplace_back(RegisterChild(
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
index e5ef846be46..4cb3144e5d3 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
@@ -22,7 +22,7 @@ namespace NYql::NDqs {
struct TLocalWorkerManagerOptions {
TWorkerManagerCounters Counters;
NTaskRunnerProxy::IProxyFactory::TPtr Factory;
- NDq::IDqSourceActorFactory::TPtr SourceActorFactory;
+ NDq::IDqSourceFactory::TPtr SourceFactory;
NDq::IDqSinkFactory::TPtr SinkFactory;
NDq::IDqOutputTransformFactory::TPtr TransformFactory;
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
index 9400ba7b6f4..46be83524a0 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
@@ -85,7 +85,7 @@ struct TEvPrivate {
} // namespace
-class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqSourceActor {
+class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqComputeActorAsyncInput {
public:
using TPartitionKey = std::pair<TString, ui64>; // Cluster, partition id.
@@ -223,10 +223,10 @@ private:
void Handle(TEvPrivate::TEvSourceDataReady::TPtr&, const TActorContext& ctx) {
SubscribedOnEvent = false;
- ctx.Send(ComputeActorId, new TEvNewSourceDataArrived(InputIndex));
+ ctx.Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
- // IActor & IDqSourceActor
+ // IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
if (ReadSession) {
ReadSession->Close(TDuration::Zero());
@@ -236,7 +236,7 @@ private:
TActor<TDqPqReadActor>::PassAway();
}
- i64 GetSourceData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool&, i64 freeSpace) override {
+ i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool&, i64 freeSpace) override {
auto events = GetReadSession().GetEvents(false, TMaybe<size_t>(), static_cast<size_t>(Max<i64>(freeSpace, 0)));
ui32 batchSize = 0;
@@ -370,7 +370,7 @@ private:
bool SubscribedOnEvent = false;
};
-std::pair<IDqSourceActor*, NActors::IActor*> CreateDqPqReadActor(
+std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
NPq::NProto::TDqPqTopicSource&& settings,
ui64 inputIndex,
TTxId txId,
@@ -414,7 +414,7 @@ void RegisterDqPqReadActorFactory(TDqSourceFactory& factory, NYdb::TDriver drive
factory.Register<NPq::NProto::TDqPqTopicSource>("PqSource",
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), rangesMode](
NPq::NProto::TDqPqTopicSource&& settings,
- IDqSourceActorFactory::TArguments&& args)
+ IDqSourceFactory::TArguments&& args)
{
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(DQ_PQ_PROVIDER));
return CreateDqPqReadActor(
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h
index 74058ee4621..9f82253c8f2 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h
@@ -22,7 +22,7 @@ class TDqSourceFactory;
const i64 PQReadDefaultFreeSpace = 16_MB;
-std::pair<IDqSourceActor*, NActors::IActor*> CreateDqPqReadActor(
+std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
NPq::NProto::TDqPqTopicSource&& settings,
ui64 inputIndex,
TTxId txId,
diff --git a/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.cpp b/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.cpp
index fd64116e9c7..86894dd534e 100644
--- a/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.cpp
+++ b/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.cpp
@@ -69,7 +69,7 @@ void TPqIoTestFixture::InitSource(
actor.GetHolderFactory(),
freeSpace);
- actor.InitSource(dqSource, dqSourceAsActor);
+ actor.InitAsyncInput(dqSource, dqSourceAsActor);
});
}
diff --git a/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h b/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h
index 51e13d923e4..470123cd7d8 100644
--- a/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h
+++ b/ydb/library/yql/providers/pq/async_io/ut/ut_helpers.h
@@ -43,7 +43,7 @@ struct TPqIoTestFixture : public NUnitTest::TBaseFixture {
template<typename T>
std::vector<T> SourceRead(const TReadValueParser<T> parser, i64 freeSpace = 12345) {
- return CaSetup->SourceRead(parser, freeSpace);
+ return CaSetup->AsyncInputRead(parser, freeSpace);
}
template<typename T>
@@ -53,7 +53,7 @@ struct TPqIoTestFixture : public NUnitTest::TBaseFixture {
i64 eachReadFreeSpace = 1000,
TDuration timeout = TDuration::Seconds(10))
{
- return CaSetup->SourceReadUntil(parser, size, eachReadFreeSpace, timeout);
+ return CaSetup->AsyncInputReadUntil(parser, size, eachReadFreeSpace, timeout);
}
void SaveSourceState(NDqProto::TCheckpoint checkpoint, NDqProto::TSourceState& state) {
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp
index d8a888f484d..866a0d7772a 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_ut.cpp
@@ -41,7 +41,7 @@
namespace NYql {
-NDq::IDqSourceActorFactory::TPtr CreateSourceFactory(const NYdb::TDriver& driver) {
+NDq::IDqSourceFactory::TPtr CreateSourceFactory(const NYdb::TDriver& driver) {
auto factory = MakeIntrusive<NYql::NDq::TDqSourceFactory>();
RegisterDqPqReadActorFactory(*factory, driver, nullptr);
return factory;
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index c94180e7c50..8c9b01ccf32 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -137,7 +137,7 @@ private:
double Epsilon;
};
-class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqSourceActor {
+class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeActorAsyncInput {
public:
TS3ReadActor(ui64 inputIndex,
IHTTPGateway::TPtr gateway,
@@ -194,7 +194,7 @@ private:
}
}
- i64 GetSourceData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final {
+ i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final {
i64 total = 0LL;
if (!Blocks.empty()) {
buffer.reserve(buffer.size() + Blocks.size());
@@ -218,7 +218,7 @@ private:
void Handle(TEvPrivate::TEvReadResult::TPtr& result) {
++IsDoneCounter;
Blocks.emplace(std::move(result->Get()->Result));
- Send(ComputeActorId, new TEvNewSourceDataArrived(InputIndex));
+ Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
void HandleRetry(TEvPrivate::TEvRetryEvent::TPtr& ev) {
@@ -236,10 +236,10 @@ private:
return;
}
++IsDoneCounter;
- Send(ComputeActorId, new TEvSourceError(InputIndex, result->Get()->Error, true));
+ Send(ComputeActorId, new TEvAsyncInputError(InputIndex, result->Get()->Error, true));
}
- // IActor & IDqSourceActor
+ // IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
TActorBootstrapped<TS3ReadActor>::PassAway();
}
@@ -311,11 +311,11 @@ public:
Finished = true;
return false;
case TEvPrivate::TEvReadError::EventType:
- Send(ComputeActorId, new IDqSourceActor::TEvSourceError(InputIndex, ev->Get<TEvPrivate::TEvReadError>()->Error, true));
+ Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, ev->Get<TEvPrivate::TEvReadError>()->Error, true));
return false;
case TEvPrivate::TEvReadResult::EventType:
value = std::move(ev->Get<TEvPrivate::TEvReadResult>()->Result);
- Send(ComputeActorId, new IDqSourceActor::TEvNewSourceDataArrived(InputIndex));
+ Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
return true;
default:
return false;
@@ -331,12 +331,12 @@ private:
Send(SourceActorId, new TEvPrivate::TEvReadFinished);
} catch (const std::exception& err) {
- Send(ComputeActorId, new IDqSourceActor::TEvSourceError(InputIndex, TIssues{TIssue(err.what())}, true));
+ Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, TIssues{TIssue(err.what())}, true));
return;
}
void ProcessUnexpectedEvent(TAutoPtr<IEventHandle>) final {
- Send(ComputeActorId, new IDqSourceActor::TEvSourceError(InputIndex, TIssues{TIssue("Unexpected event")}, true));
+ Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, TIssues{TIssue("Unexpected event")}, true));
}
private:
const ui64 InputIndex;
@@ -415,7 +415,7 @@ private:
const TRetryStuff::TPtr RetryStuff;
};
-class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public IDqSourceActor {
+class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public IDqComputeActorAsyncInput {
public:
TS3StreamReadActor(
ui64 inputIndex,
@@ -470,7 +470,7 @@ private:
void CommitState(const NDqProto::TCheckpoint&) final {}
ui64 GetInputIndex() const final { return InputIndex; }
- i64 GetSourceData(NKikimr::NMiniKQL::TUnboxedValueVector& output, bool& finished, i64 free) final {
+ i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& output, bool& finished, i64 free) final {
i64 total = 0LL;
if (!Blocks.empty()) do {
const i64 s = Blocks.front().bytes();
@@ -484,7 +484,7 @@ private:
return total;
}
- // IActor & IDqSourceActor
+ // IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
TActorBootstrapped<TS3StreamReadActor>::PassAway();
}
@@ -506,7 +506,7 @@ private:
void HandleNextBlock(TEvPrivate::TEvNextBlock::TPtr& next) {
Blocks.emplace_back();
Blocks.back().swap(next->Get()->Block);
- Send(ComputeActorId, new IDqSourceActor::TEvNewSourceDataArrived(InputIndex));
+ Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
}
void HandleReadFinished() {
@@ -595,7 +595,7 @@ NDB::DataTypePtr MetaToClickHouse(const TType* type) {
using namespace NKikimr::NMiniKQL;
-std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateS3ReadActor(
+std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
const TTypeEnvironment& typeEnv,
const IFunctionRegistry& functionRegistry,
IHTTPGateway::TPtr gateway,
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
index 1edfb68ddf5..9c38eda1efb 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
@@ -9,7 +9,7 @@
namespace NYql::NDq {
-std::pair<NYql::NDq::IDqSourceActor*, NActors::IActor*> CreateS3ReadActor(
+std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadActor(
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
IHTTPGateway::TPtr gateway,
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
index f7e18783872..25588841c66 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
@@ -16,7 +16,7 @@ void RegisterS3ReadActorFactory(
#ifdef __linux__
NDB::registerFormats();
factory.Register<NS3::TSource>("S3Source",
- [credentialsFactory, gateway, retryConfig](NS3::TSource&& settings, IDqSourceActorFactory::TArguments&& args) {
+ [credentialsFactory, gateway, retryConfig](NS3::TSource&& settings, IDqSourceFactory::TArguments&& args) {
return CreateS3ReadActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), gateway, std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryConfig);
});
#else
diff --git a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp
index 5b8f28bdb44..cc702d87dc5 100644
--- a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp
+++ b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp
@@ -68,7 +68,7 @@ bool RangeFinished(const TString& lastReadKey, const TString& endKey, const TVec
} // namespace
-class TYdbReadActor : public TActorBootstrapped<TYdbReadActor>, public IDqSourceActor {
+class TYdbReadActor : public TActorBootstrapped<TYdbReadActor>, public IDqComputeActorAsyncInput {
public:
TYdbReadActor(
ui64 inputIndex,
@@ -120,13 +120,13 @@ private:
SendRequest();
}
- // IActor & IDqSourceActor
+ // IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
RequestsDone = true;
TActorBootstrapped<TYdbReadActor>::PassAway();
}
- i64 GetSourceData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final {
+ i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final {
i64 total = 0LL;
if (!Blocks.empty()) {
buffer.reserve(buffer.size() + Blocks.size());
@@ -183,7 +183,7 @@ private:
RequestsDone = res.IsEos() || RangeFinished(LastReadKey, EndKey, KeyColumnTypes);
SendRequest();
if (notify)
- Send(ComputeActorId, new TEvNewSourceDataArrived(InputIndex));
+ Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
void ProcessError(const ::NYdb::NClickhouseInternal::TScanResult& res) {
@@ -192,7 +192,7 @@ private:
RequestsDone = true;
while(!Blocks.empty())
Blocks.pop();
- Send(ComputeActorId, new TEvSourceError(InputIndex, res.GetIssues(), true));
+ Send(ComputeActorId, new TEvAsyncInputError(InputIndex, res.GetIssues(), true));
} else {
WakeUpTime = TMonotonic::Now() + Min(TDuration::Seconds(3), TDuration::MilliSeconds(0x30U * (1U << ++Retried)));
ActorSystem->Schedule(WakeUpTime, new IEventHandle(SelfId(), TActorId(), new TEvPrivate::TEvRetryTime));
@@ -229,7 +229,7 @@ private:
std::queue<TString> Blocks;
};
-std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateYdbReadActor(
+std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateYdbReadActor(
NYql::NYdb::TSource&& params,
ui64 inputIndex,
const THashMap<TString, TString>& secureParams,
diff --git a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.h b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.h
index 427e5ed8028..214ffedc0a3 100644
--- a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.h
+++ b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.h
@@ -9,7 +9,7 @@
namespace NYql::NDq {
-std::pair<NYql::NDq::IDqSourceActor*, NActors::IActor*> CreateYdbReadActor(
+std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateYdbReadActor(
NYql::NYdb::TSource&& params,
ui64 inputIndex,
const THashMap<TString, TString>& secureParams,
diff --git a/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.cpp b/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.cpp
index c42c9f8968f..f0c0f751e32 100644
--- a/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.cpp
+++ b/ydb/library/yql/providers/ydb/actors/yql_ydb_source_factory.cpp
@@ -7,8 +7,8 @@ namespace NYql::NDq {
void RegisterYdbReadActorFactory(NYql::NDq::TDqSourceFactory& factory, ::NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) {
factory.Register<NYql::NYdb::TSource>("YdbSource",
- [driver, credentialsFactory](NYql::NYdb::TSource&& settings, IDqSourceActorFactory::TArguments&& args) {
- return CreateYdbReadActor(std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, driver, credentialsFactory);
+ [driver, credentialsFactory](NYql::NYdb::TSource&& settings, IDqSourceFactory::TArguments&& args) {
+ return CreateYdbReadActor(std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, driver, credentialsFactory);
});
}