aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-04-06 15:21:11 +0300
committergvit <gvit@ydb.tech>2023-04-06 15:21:11 +0300
commit7ac34b2f3251a82281b1c3a81e70029ea9de7305 (patch)
tree01826f39e919c28d533b79173d30b98dbf7b4b96
parent867680d32dbb79d3502e1e42d46bdc2fe6a3f2bf (diff)
downloadydb-7ac34b2f3251a82281b1c3a81e70029ea9de7305.tar.gz
continue refactoring task serialization process
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp142
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h35
-rw-r--r--ydb/core/kqp/executer_actor/kqp_partition_helper.cpp4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_partition_helper.h4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp46
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.h1
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp22
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp144
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.h18
-rw-r--r--ydb/core/kqp/query_data/kqp_query_data.cpp1
-rw-r--r--ydb/library/yql/dq/tasks/dq_connection_builder.h4
-rw-r--r--ydb/library/yql/dq/tasks/dq_tasks_graph.h11
-rw-r--r--ydb/library/yql/providers/dq/planner/dqs_task_graph.h4
13 files changed, 193 insertions, 243 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 1daefa0ff7d..d4f99134dfb 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -138,7 +138,7 @@ public:
YQL_ENSURE(Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE);
}
- if (Request.Snapshot.IsValid()) {
+ if (GetSnapshot().IsValid()) {
YQL_ENSURE(Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE);
}
@@ -1048,8 +1048,8 @@ private:
if (!res->Record.GetTxLocks().empty()) {
auto& lock = res->Record.GetTxLocks(0);
auto tableId = TTableId(lock.GetSchemeShard(), lock.GetPathId());
- auto it = FindIf(TableKeys.Get(), [tableId](const auto& x){ return x.first.HasSamePath(tableId); });
- if (it != TableKeys.Get().end()) {
+ auto it = FindIf(GetTableKeys().Get(), [tableId](const auto& x){ return x.first.HasSamePath(tableId); });
+ if (it != GetTableKeys().Get().end()) {
tableName = it->second.Path;
}
}
@@ -1329,7 +1329,7 @@ private:
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
- const auto& table = TableKeys.GetTable(stageInfo.Meta.TableId);
+ const auto& table = GetTableKeys().GetTable(stageInfo.Meta.TableId);
const auto& keyTypes = table.KeyColumnTypes;;
for (auto& op : stage.GetTableOps()) {
@@ -1339,7 +1339,7 @@ private:
case NKqpProto::TKqpPhyTableOperation::kReadRanges:
case NKqpProto::TKqpPhyTableOperation::kReadRange:
case NKqpProto::TKqpPhyTableOperation::kLookup: {
- auto partitions = PrunePartitions(TableKeys, op, stageInfo, HolderFactory(), TypeEnv());
+ auto partitions = PrunePartitions(GetTableKeys(), op, stageInfo, HolderFactory(), TypeEnv());
auto readSettings = ExtractReadSettings(op, stageInfo, HolderFactory(), TypeEnv());
for (auto& [shardId, shardInfo] : partitions) {
@@ -1401,7 +1401,7 @@ private:
ShardsWithEffects.insert(task.Meta.ShardId);
}
} else {
- auto result = PruneEffectPartitions(TableKeys, op, stageInfo, HolderFactory(), TypeEnv());
+ auto result = PruneEffectPartitions(GetTableKeys(), op, stageInfo, HolderFactory(), TypeEnv());
for (auto& [shardId, shardInfo] : result) {
YQL_ENSURE(!shardInfo.KeyReadRanges);
YQL_ENSURE(shardInfo.KeyWriteRanges);
@@ -1580,14 +1580,14 @@ private:
const ui32 flags =
(ImmediateTx ? NTxDataShard::TTxFlags::Immediate : 0) |
(VolatileTx ? NTxDataShard::TTxFlags::VolatilePrepare : 0);
- if (Snapshot.IsValid() && (ReadOnlyTx || Request.UseImmediateEffects)) {
+ if (GetSnapshot().IsValid() && (ReadOnlyTx || Request.UseImmediateEffects)) {
ev.reset(new TEvDataShard::TEvProposeTransaction(
NKikimrTxDataShard::TX_KIND_DATA,
SelfId(),
TxId,
dataTransaction.SerializeAsString(),
- Snapshot.Step,
- Snapshot.TxId,
+ GetSnapshot().Step,
+ GetSnapshot().TxId,
flags));
} else {
ev.reset(new TEvDataShard::TEvProposeTransaction(
@@ -1650,7 +1650,6 @@ private:
if (LockTxId.Defined() && *LockTxId == 0) {
LockTxId = TxId;
}
- Snapshot = Request.Snapshot;
NWilson::TSpan prepareTasksSpan(TWilsonKqp::DataExecuterPrepateTasks, ExecuterStateSpan.GetTraceId(), "PrepateTasks", NWilson::EFlags::AUTO_END);
LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId);
@@ -1699,7 +1698,7 @@ private:
if (stage.SourcesSize() > 0) {
switch (stage.GetSources(0).GetTypeCase()) {
case NKqpProto::TKqpSource::kReadRangesSource:
- readActors += BuildScanTasksFromSource(stageInfo, Request.Snapshot, LockTxId);
+ readActors += BuildScanTasksFromSource(stageInfo, LockTxId);
break;
case NKqpProto::TKqpSource::kExternalSource:
BuildReadTasksFromSource(stageInfo);
@@ -1719,7 +1718,7 @@ private:
YQL_ENSURE(stageInfo.Tasks.size() == 1, "Unexpected multiple tasks in single-partition stage");
}
- BuildKqpStageChannels(TasksGraph, TableKeys, stageInfo, TxId, /* enableSpilling */ false);
+ BuildKqpStageChannels(TasksGraph, GetTableKeys(), stageInfo, TxId, /* enableSpilling */ false);
}
ResponseEv->InitTxResult(tx.Body);
@@ -1733,8 +1732,8 @@ private:
}
THashMap<ui64, TVector<NDqProto::TDqTask>> datashardTasks; // shardId -> [task]
- THashMap<ui64, TVector<NDqProto::TDqTask>> remoteComputeTasks; // shardId -> [task]
- TVector<NDqProto::TDqTask> computeTasks;
+ THashMap<ui64, TVector<ui64>> remoteComputeTasks; // shardId -> [task]
+ TVector<ui64> computeTasks;
if (StreamResult) {
InitializeChannelProxies();
@@ -1742,17 +1741,16 @@ private:
for (auto& task : TasksGraph.GetTasks()) {
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
- NDqProto::TDqTask taskDesc = SerializeTaskToProto(TasksGraph, task, TableKeys, ResultChannelProxies, TypeEnv());
-
if (task.Meta.ShardId && (task.Meta.Reads || task.Meta.Writes)) {
- datashardTasks[task.Meta.ShardId].emplace_back(std::move(taskDesc));
+ auto protoTask = SerializeTaskToProto(TasksGraph, task);
+ datashardTasks[task.Meta.ShardId].emplace_back(std::move(protoTask));
} else if (stageInfo.Meta.IsSysView()) {
- computeTasks.emplace_back(std::move(taskDesc));
+ computeTasks.emplace_back(task.Id);
} else {
if (task.Meta.ShardId) {
- remoteComputeTasks[task.Meta.ShardId].emplace_back(std::move(taskDesc));
+ remoteComputeTasks[task.Meta.ShardId].emplace_back(task.Id);
} else {
- computeTasks.emplace_back(std::move(taskDesc));
+ computeTasks.emplace_back(task.Id);
}
}
}
@@ -1760,6 +1758,7 @@ private:
for(const auto& channel: TasksGraph.GetChannels()) {
if (IsCrossShardChannel(TasksGraph, channel)) {
HasPersistentChannels = true;
+ break;
}
}
@@ -1817,11 +1816,10 @@ private:
break;
}
- if ((ReadOnlyTx || Request.UseImmediateEffects) && Request.Snapshot.IsValid()) {
+ if ((ReadOnlyTx || Request.UseImmediateEffects) && GetSnapshot().IsValid()) {
// Snapshot reads are always immediate
// Uncommitted writes are executed without coordinators, so they can be immediate
YQL_ENSURE(!VolatileTx);
- Snapshot = Request.Snapshot;
ImmediateTx = true;
}
@@ -1907,7 +1905,7 @@ private:
return;
}
- Snapshot = TKqpSnapshot(record.GetSnapshotStep(), record.GetSnapshotTxId());
+ TasksGraph.GetMeta().Snapshot = TKqpSnapshot(record.GetSnapshotStep(), record.GetSnapshotTxId());
ImmediateTx = true;
ContinueExecute();
@@ -1924,7 +1922,7 @@ private:
// (legacy behaviour, for compatibility with current execution engine)
UseFollowers = false;
}
- if (Snapshot.IsValid()) {
+ if (GetSnapshot().IsValid()) {
// TODO: KIKIMR-11912
UseFollowers = false;
}
@@ -2127,11 +2125,10 @@ private:
LWTRACK(KqpDataExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, ComputeTasks.size(), DatashardTxs.size());
// first, start compute tasks
- TVector<ui64> computeTaskIds{Reserve(ComputeTasks.size())};
bool shareMailbox = (ComputeTasks.size() <= 1);
- for (auto&& taskDesc : ComputeTasks) {
- computeTaskIds.emplace_back(taskDesc.GetId());
- FillInputSettings(taskDesc);
+ for (ui64 taskId : ComputeTasks) {
+ const auto& task = TasksGraph.GetTask(taskId);
+ auto taskDesc = SerializeTaskToProto(TasksGraph, task);
ExecuteDataComputeTask(std::move(taskDesc), shareMailbox);
}
@@ -2141,37 +2138,16 @@ private:
auto it = ShardIdToNodeId.find(shardId);
YQL_ENSURE(it != ShardIdToNodeId.end());
- for (auto& taskDesc : tasks) {
- ui64 taskId = taskDesc.GetId();
- auto& task = TasksGraph.GetTask(taskId);
- for (ui64 outputIndex = 0; outputIndex < task.Outputs.size(); ++outputIndex) {
- auto& output = task.Outputs[outputIndex];
- auto* protoOutput = taskDesc.MutableOutputs(outputIndex);
-
- for (ui64 outputChannelIndex = 0; outputChannelIndex < output.Channels.size(); ++outputChannelIndex) {
- ui64 outputChannelId = output.Channels[outputChannelIndex];
- auto* protoChannel = protoOutput->MutableChannels(outputChannelIndex);
-
- ui64 dstTaskId = TasksGraph.GetChannel(outputChannelId).DstTask;
- if (dstTaskId == 0) {
- continue;
- }
-
- auto& dstTask = TasksGraph.GetTask(dstTaskId);
- if (dstTask.ComputeActorId) {
- protoChannel->MutableDstEndpoint()->Clear();
- ActorIdToProto(dstTask.ComputeActorId, protoChannel->MutableDstEndpoint()->MutableActorId());
- }
- }
- }
+ for (ui64 taskId : tasks) {
+ const auto& task = TasksGraph.GetTask(taskId);
remoteComputeTasksCnt += 1;
- FillInputSettings(taskDesc);
- PendingComputeTasks.insert(taskDesc.GetId());
+ PendingComputeTasks.insert(taskId);
+ auto taskDesc = SerializeTaskToProto(TasksGraph, task);
tasksPerNode[it->second].emplace_back(std::move(taskDesc));
}
}
- Planner = CreateKqpPlanner(TxId, SelfId(), {}, std::move(tasksPerNode), Request.Snapshot,
+ Planner = CreateKqpPlanner(TxId, SelfId(), {}, std::move(tasksPerNode), GetSnapshot(),
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode,
Request.DisableLlvmForUdfStages, Request.LlvmEnabled, false, Nothing(),
ExecuterSpan, {}, ExecuterRetriesConfig);
@@ -2246,7 +2222,7 @@ private:
LOG_T("Updating channels after the creation of compute actors");
THashMap<TActorId, THashSet<ui64>> updates;
- for (ui64 taskId : computeTaskIds) {
+ for (ui64 taskId : ComputeTasks) {
auto& task = TasksGraph.GetTask(taskId);
CollectTaskChannelsUpdates(task, updates);
}
@@ -2335,54 +2311,6 @@ private:
}
}
- void FillInputSettings(NYql::NDqProto::TDqTask& task) {
- for (auto& input : *task.MutableInputs()) {
- if (input.HasTransform()) {
- auto transform = input.MutableTransform();
- YQL_ENSURE(transform->GetType() == "StreamLookupInputTransformer",
- "Unexpected input transform type: " << transform->GetType());
-
- const google::protobuf::Any& settingsAny = transform->GetSettings();
- YQL_ENSURE(settingsAny.Is<NKikimrKqp::TKqpStreamLookupSettings>(), "Expected settings type: "
- << NKikimrKqp::TKqpStreamLookupSettings::descriptor()->full_name()
- << " , but got: " << settingsAny.type_url());
-
- NKikimrKqp::TKqpStreamLookupSettings settings;
- YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings");
-
- if (Snapshot.IsValid()) {
- settings.MutableSnapshot()->SetStep(Snapshot.Step);
- settings.MutableSnapshot()->SetTxId(Snapshot.TxId);
- }
-
- if (LockTxId.Defined()) {
- settings.SetLockTxId(*LockTxId);
- }
-
- settings.SetImmediateTx(ImmediateTx);
- transform->MutableSettings()->PackFrom(settings);
- }
- if (input.HasSource() && Snapshot != Request.Snapshot && input.GetSource().GetType() == NYql::KqpReadRangesSourceName) {
- auto source = input.MutableSource();
- const google::protobuf::Any& settingsAny = source->GetSettings();
-
- YQL_ENSURE(settingsAny.Is<NKikimrTxDataShard::TKqpReadRangesSourceSettings>(), "Expected settings type: "
- << NKikimrTxDataShard::TKqpReadRangesSourceSettings::descriptor()->full_name()
- << " , but got: " << settingsAny.type_url());
-
- NKikimrTxDataShard::TKqpReadRangesSourceSettings settings;
- YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings");
-
- if (Snapshot.IsValid()) {
- settings.MutableSnapshot()->SetStep(Snapshot.Step);
- settings.MutableSnapshot()->SetTxId(Snapshot.TxId);
- }
-
- source->MutableSettings()->PackFrom(settings);
- }
- }
- }
-
static bool HasMissingSnapshotError(const NKikimrTxDataShard::TEvProposeTransactionResult& result) {
for (const auto& err : result.GetError()) {
if (err.GetKind() == NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST) {
@@ -2437,14 +2365,10 @@ private:
THashSet<ui64> ShardsWithEffects;
bool HasPersistentChannels = false;
- // Either requested or temporarily acquired snapshot
- TKqpSnapshot Snapshot;
-
THashSet<ui64> SubscribedNodes;
+ THashMap<ui64, TVector<ui64>> RemoteComputeTasks;
- THashMap<ui64, TVector<NDqProto::TDqTask>> RemoteComputeTasks;
-
- TVector<NDqProto::TDqTask> ComputeTasks;
+ TVector<ui64> ComputeTasks;
TDatashardTxs DatashardTxs;
TTopicTabletTxs TopicTxs;
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index c31ea277956..08bb56cf661 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -120,6 +120,7 @@ public:
, Planner(nullptr)
, ExecuterRetriesConfig(executerRetriesConfig)
{
+ TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId);
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc);
ResponseEv->Orbit = std::move(Request.Orbit);
Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph,
@@ -322,7 +323,7 @@ protected:
}
auto kqpTableResolver = CreateKqpTableResolver(this->SelfId(), TxId, UserToken, Request.Transactions,
- TableKeys, TasksGraph);
+ GetTableKeysRef(), TasksGraph);
KqpTableResolverId = this->RegisterWithSameMailbox(kqpTableResolver);
LOG_T("Got request, become WaitResolveState");
@@ -571,7 +572,7 @@ protected:
auto& record = channelsInfoEv->Record;
for (auto& channelId : channelIds) {
- FillChannelDesc(TasksGraph, ResultChannelProxies, *record.AddUpdate(), TasksGraph.GetChannel(channelId));
+ FillChannelDesc(TasksGraph, *record.AddUpdate(), TasksGraph.GetChannel(channelId));
}
LOG_T("Sending channels info to compute actor: " << computeActorId << ", channels: " << channelIds.size());
@@ -634,7 +635,7 @@ protected:
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
- const auto& table = TableKeys.GetTable(stageInfo.Meta.TableId);
+ const auto& table = GetTableKeys().GetTable(stageInfo.Meta.TableId);
const auto& keyTypes = table.KeyColumnTypes;
for (auto& op : stage.GetTableOps()) {
@@ -695,7 +696,7 @@ protected:
}
}
- size_t BuildScanTasksFromSource(TStageInfo& stageInfo, IKqpGateway::TKqpSnapshot snapshot, const TMaybe<ui64> lockTxId = {}) {
+ size_t BuildScanTasksFromSource(TStageInfo& stageInfo, const TMaybe<ui64> lockTxId = {}) {
THashMap<ui64, std::vector<ui64>> nodeTasks;
THashMap<ui64, ui64> assignedShardsCount;
@@ -706,13 +707,13 @@ protected:
auto& source = stage.GetSources(0).GetReadRangesSource();
- const auto& table = TableKeys.GetTable(MakeTableId(source.GetTable()));
+ const auto& table = GetTableKeys().GetTable(MakeTableId(source.GetTable()));
const auto& keyTypes = table.KeyColumnTypes;
YQL_ENSURE(table.TableKind != NKikimr::NKqp::ETableKind::Olap);
auto columns = BuildKqpColumns(source, table);
- auto partitions = PrunePartitions(TableKeys, source, stageInfo, HolderFactory(), TypeEnv());
+ auto partitions = PrunePartitions(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv());
ui64 itemsLimit = 0;
@@ -720,6 +721,8 @@ protected:
NYql::NDqProto::TData itemsLimitBytes;
NKikimr::NMiniKQL::TType* itemsLimitType = nullptr;
+ const auto& snapshot = GetSnapshot();
+
for (auto& [shardId, shardInfo] : partitions) {
YQL_ENSURE(!shardInfo.KeyWriteRanges);
@@ -945,6 +948,10 @@ protected:
}
}
+ const IKqpGateway::TKqpSnapshot& GetSnapshot() const {
+ return TasksGraph.GetMeta().Snapshot;
+ }
+
IActor* CreateChannelProxy(const NYql::NDq::TChannel& channel) {
IActor* proxy;
@@ -970,6 +977,7 @@ protected:
this->RegisterWithSameMailbox(proxy);
ResultChannelProxies.emplace(std::make_pair(channel.Id, proxy));
+ TasksGraph.GetMeta().ResultChannelProxies.emplace(channel.Id, proxy->SelfId());
return proxy;
}
@@ -1012,6 +1020,18 @@ protected:
}
}
+ const TKqpTableKeys& GetTableKeys() const {
+ return TasksGraph.GetMeta().TableKeys;
+ }
+
+ TKqpTableKeys& GetTableKeysRef() {
+ return TasksGraph.GetMeta().TableKeys;
+ }
+
+ std::unordered_map<ui64, IActor*>& GetResultChannelProxies() {
+ return ResultChannelProxies;
+ }
+
TString DebugString() const {
TStringBuilder sb;
sb << "[KqpExecuter], type: " << (ExecType == EExecType::Data ? "Data" : "Scan")
@@ -1036,19 +1056,18 @@ protected:
ui64 TxId = 0;
TKqpTasksGraph TasksGraph;
- TKqpTableKeys TableKeys;
TActorId KqpTableResolverId;
TActorId KqpShardsResolverId;
THashMap<TActorId, TProgressStat> PendingComputeActors; // Running compute actors (pure and DS)
THashMap<TActorId, NYql::NDqProto::TComputeActorExtraData> ExtraData;
- std::unordered_map<ui64, IActor*> ResultChannelProxies;
TVector<TProgressStat> LastStats;
TInstant StartResolveTime;
TInstant LastResourceUsageUpdate;
+ std::unordered_map<ui64, IActor*> ResultChannelProxies;
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ResponseEv;
NWilson::TSpan ExecuterSpan;
NWilson::TSpan ExecuterStateSpan;
diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
index d609608dd45..f76b7113215 100644
--- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
@@ -667,7 +667,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
return shardInfoMap;
}
-THashMap<ui64, TShardInfo> PrunePartitions(TKqpTableKeys& tableKeys,
+THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
@@ -908,7 +908,7 @@ THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys,
return PruneEffectPartitionsImpl(tableKeys, effect, stageInfo, holderFactory, typeEnv);
}
-THashMap<ui64, TShardInfo> PruneEffectPartitions(TKqpTableKeys& tableKeys,
+THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.h b/ydb/core/kqp/executer_actor/kqp_partition_helper.h
index 64537bc65b9..a50b901abd0 100644
--- a/ydb/core/kqp/executer_actor/kqp_partition_helper.h
+++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.h
@@ -77,7 +77,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpPhyOpReadOlapRanges& readRanges, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
-THashMap<ui64, TShardInfo> PrunePartitions(TKqpTableKeys& tableKeys,
+THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
@@ -89,7 +89,7 @@ THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpPhyOpDeleteRows& effect, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
-THashMap<ui64, TShardInfo> PruneEffectPartitions(TKqpTableKeys& tableKeys,
+THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index 0080a0f0908..c8270bb628a 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -341,7 +341,6 @@ void TKqpPlanner::PrepareKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& req
if (DisableLlvmForUdfStages && taskDesc.GetProgram().GetSettings().GetHasUdf()) {
withLLVM = false;
}
- AddSnapshotInfoToTaskInputs(taskDesc);
request.AddTasks()->Swap(&taskDesc);
}
}
@@ -351,7 +350,6 @@ void TKqpPlanner::PrepareKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& req
if (DisableLlvmForUdfStages && taskDesc.GetProgram().GetSettings().GetHasUdf()) {
withLLVM = false;
}
- AddSnapshotInfoToTaskInputs(taskDesc);
request.AddTasks()->Swap(&taskDesc);
}
}
@@ -396,7 +394,6 @@ void TKqpPlanner::AddScansToKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest&
if (DisableLlvmForUdfStages && task.GetProgram().GetSettings().GetHasUdf()) {
withLLVM = false;
}
- AddSnapshotInfoToTaskInputs(task);
request.AddTasks()->Swap(&task);
}
MainTasksPerNode.erase(nodeId);
@@ -415,49 +412,6 @@ ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) {
return flags;
}
-void TKqpPlanner::AddSnapshotInfoToTaskInputs(NYql::NDqProto::TDqTask& task) {
- YQL_ENSURE(Snapshot.IsValid());
-
- for (auto& input : *task.MutableInputs()) {
- if (input.HasTransform()) {
- auto transform = input.MutableTransform();
- YQL_ENSURE(transform->GetType() == "StreamLookupInputTransformer",
- "Unexpected input transform type: " << transform->GetType());
-
- const google::protobuf::Any& settingsAny = transform->GetSettings();
- YQL_ENSURE(settingsAny.Is<NKikimrKqp::TKqpStreamLookupSettings>(), "Expected settings type: "
- << NKikimrKqp::TKqpStreamLookupSettings::descriptor()->full_name()
- << " , but got: " << settingsAny.type_url());
-
- NKikimrKqp::TKqpStreamLookupSettings settings;
- YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings");
-
- settings.MutableSnapshot()->SetStep(Snapshot.Step);
- settings.MutableSnapshot()->SetTxId(Snapshot.TxId);
-
- transform->MutableSettings()->PackFrom(settings);
- }
- if (input.HasSource() && input.GetSource().GetType() == NYql::KqpReadRangesSourceName) {
- auto source = input.MutableSource();
- const google::protobuf::Any& settingsAny = source->GetSettings();
-
- YQL_ENSURE(settingsAny.Is<NKikimrTxDataShard::TKqpReadRangesSourceSettings>(), "Expected settings type: "
- << NKikimrTxDataShard::TKqpReadRangesSourceSettings::descriptor()->full_name()
- << " , but got: " << settingsAny.type_url());
-
- NKikimrTxDataShard::TKqpReadRangesSourceSettings settings;
- YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings");
-
- if (Snapshot.IsValid()) {
- settings.MutableSnapshot()->SetStep(Snapshot.Step);
- settings.MutableSnapshot()->SetTxId(Snapshot.TxId);
- }
-
- source->MutableSettings()->PackFrom(settings);
- }
- }
-}
-
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
std::unique_ptr<TKqpPlanner> CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks,
THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& mainTasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot,
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h
index a7e452c0afa..1149c67901d 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.h
+++ b/ydb/core/kqp/executer_actor/kqp_planner.h
@@ -49,7 +49,6 @@ private:
void PrepareKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, THashSet<ui64> taskIds);
void AddScansToKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, ui64 nodeId);
- void AddSnapshotInfoToTaskInputs(NYql::NDqProto::TDqTask& task);
ui32 CalcSendMessageFlagsForNode(ui32 nodeId);
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 189c1efca66..e135a156500 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -130,14 +130,15 @@ private:
<< ", enough: " << ev->Get()->Record.GetEnough()
<< ", from: " << ev->Sender);
- if (ResultChannelProxies.empty()) {
+ auto& resultChannelProxies = GetResultChannelProxies();
+ if (resultChannelProxies.empty()) {
return;
}
// Forward only for stream results, data results acks event theirselves.
YQL_ENSURE(!ResponseEv->TxResults.empty() && ResponseEv->TxResults[0].IsStream);
- auto channelIt = ResultChannelProxies.begin();
+ auto channelIt = resultChannelProxies.begin();
auto handle = ev->Forward(channelIt->second->SelfId());
channelIt->second->Receive(handle, TlsActivationContext->AsActorContext());
}
@@ -268,14 +269,14 @@ private:
THashMap<ui64, ui64> assignedShardsCount;
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
- const auto& table = TableKeys.GetTable(stageInfo.Meta.TableId);
+ const auto& table = GetTableKeys().GetTable(stageInfo.Meta.TableId);
const auto& keyTypes = table.KeyColumnTypes;
for (auto& op : stage.GetTableOps()) {
Y_VERIFY_DEBUG(stageInfo.Meta.TablePath == op.GetTable().GetPath());
auto columns = BuildKqpColumns(op, table);
- auto partitions = PrunePartitions(TableKeys, op, stageInfo, HolderFactory(), TypeEnv());
+ auto partitions = PrunePartitions(GetTableKeys(), op, stageInfo, HolderFactory(), TypeEnv());
const bool isOlapScan = (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange);
auto readSettings = ExtractReadSettings(op, stageInfo, HolderFactory(), TypeEnv());
@@ -473,7 +474,7 @@ private:
if (stage.SourcesSize() > 0) {
switch (stage.GetSources(0).GetTypeCase()) {
case NKqpProto::TKqpSource::kReadRangesSource:
- BuildScanTasksFromSource(stageInfo, Request.Snapshot);
+ BuildScanTasksFromSource(stageInfo);
break;
default:
YQL_ENSURE(false, "unknown source type");
@@ -492,7 +493,7 @@ private:
YQL_ENSURE(stageInfo.Tasks.size() == 1, "Unexpected multiple tasks in single-partition stage");
}
- BuildKqpStageChannels(TasksGraph, TableKeys, stageInfo, TxId, AppData()->EnableKqpSpilling);
+ BuildKqpStageChannels(TasksGraph, GetTableKeys(), stageInfo, TxId, AppData()->EnableKqpSpilling);
}
ResponseEv->InitTxResult(tx.Body);
@@ -515,8 +516,7 @@ private:
for (auto& task : TasksGraph.GetTasks()) {
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
-
- NYql::NDqProto::TDqTask taskDesc = SerializeTaskToProto(TasksGraph, task, TableKeys, ResultChannelProxies, TypeEnv());
+ NYql::NDqProto::TDqTask taskDesc = SerializeTaskToProto(TasksGraph, task);
if (task.Meta.NodeId || stageInfo.Meta.IsSysView()) {
// Task with source
@@ -567,7 +567,7 @@ private:
LOG_D("Total tasks: " << TasksGraph.GetTasks().size() << ", readonly: true"
<< ", " << nScanTasks << " scan tasks on " << scanTasks.size() << " nodes"
<< ", totalShardScans: " << nShardScans << ", execType: Scan"
- << ", snapshot: {" << Request.Snapshot.TxId << ", " << Request.Snapshot.Step << "}");
+ << ", snapshot: {" << GetSnapshot().TxId << ", " << GetSnapshot().Step << "}");
ExecuteScanTx(std::move(computeTasks), std::move(scanTasks), std::move(snapshot));
@@ -623,7 +623,7 @@ private:
}
Planner = CreateKqpPlanner(TxId, SelfId(), std::move(computeTasks),
- std::move(scanTasks), Request.Snapshot,
+ std::move(scanTasks), GetSnapshot(),
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode,
Request.DisableLlvmForUdfStages, Request.LlvmEnabled, AppData()->EnableKqpSpilling,
Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig);
@@ -650,7 +650,7 @@ private:
}
void PassAway() override {
- for (auto channelPair: ResultChannelProxies) {
+ for (auto channelPair: GetResultChannelProxies()) {
LOG_D("terminate result channel " << channelPair.first << " proxy at " << channelPair.second->SelfId());
TAutoPtr<IEventHandle> ev = new IEventHandle(
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index 5b3cee3a0af..dde112a4843 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -644,6 +644,48 @@ std::pair<const TSerializedCellVec*, bool> TShardKeyRanges::GetRightBorder() con
return !lastRange.Point ? std::make_pair(&lastRange.To, lastRange.ToInclusive) : std::make_pair(&lastRange.From, true);
}
+void AddSnapshotInfoToTaskInputs(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TDqTask& task) {
+ const auto& snapshot = tasksGraph.GetMeta().Snapshot;
+ for (auto& input : *task.MutableInputs()) {
+ if (input.HasTransform()) {
+ YQL_ENSURE(snapshot.IsValid());
+ auto transform = input.MutableTransform();
+ YQL_ENSURE(transform->GetType() == "StreamLookupInputTransformer",
+ "Unexpected input transform type: " << transform->GetType());
+
+ const google::protobuf::Any& settingsAny = transform->GetSettings();
+ YQL_ENSURE(settingsAny.Is<NKikimrKqp::TKqpStreamLookupSettings>(), "Expected settings type: "
+ << NKikimrKqp::TKqpStreamLookupSettings::descriptor()->full_name()
+ << " , but got: " << settingsAny.type_url());
+
+ NKikimrKqp::TKqpStreamLookupSettings settings;
+ YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings");
+
+ settings.MutableSnapshot()->SetStep(snapshot.Step);
+ settings.MutableSnapshot()->SetTxId(snapshot.TxId);
+
+ transform->MutableSettings()->PackFrom(settings);
+ }
+ if (input.HasSource() && input.GetSource().GetType() == NYql::KqpReadRangesSourceName) {
+ auto source = input.MutableSource();
+ const google::protobuf::Any& settingsAny = source->GetSettings();
+
+ YQL_ENSURE(settingsAny.Is<NKikimrTxDataShard::TKqpReadRangesSourceSettings>(), "Expected settings type: "
+ << NKikimrTxDataShard::TKqpReadRangesSourceSettings::descriptor()->full_name()
+ << " , but got: " << settingsAny.type_url());
+
+ NKikimrTxDataShard::TKqpReadRangesSourceSettings settings;
+ YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings");
+
+ if (snapshot.IsValid()) {
+ settings.MutableSnapshot()->SetStep(snapshot.Step);
+ settings.MutableSnapshot()->SetTxId(snapshot.TxId);
+ }
+
+ source->MutableSettings()->PackFrom(settings);
+ }
+ }
+}
void FillEndpointDesc(NDqProto::TEndpoint& endpoint, const TTask& task) {
if (task.ComputeActorId) {
@@ -653,11 +695,13 @@ void FillEndpointDesc(NDqProto::TEndpoint& endpoint, const TTask& task) {
}
}
-void FillChannelDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, NDqProto::TChannel& channelDesc, const TChannel& channel) {
+void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NDqProto::TChannel& channelDesc, const TChannel& channel) {
channelDesc.SetId(channel.Id);
channelDesc.SetSrcTaskId(channel.SrcTask);
channelDesc.SetDstTaskId(channel.DstTask);
+ const auto& resultChannelProxies = tasksGraph.GetMeta().ResultChannelProxies;
+
YQL_ENSURE(channel.SrcTask);
const auto& srcTask = tasksGraph.GetTask(channel.SrcTask);
FillEndpointDesc(*channelDesc.MutableSrcEndpoint(), srcTask);
@@ -667,7 +711,7 @@ void FillChannelDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<
} else if (!resultChannelProxies.empty()) {
auto it = resultChannelProxies.find(channel.Id);
YQL_ENSURE(it != resultChannelProxies.end());
- ActorIdToProto(it->second->SelfId(), channelDesc.MutableDstEndpoint()->MutableActorId());
+ ActorIdToProto(it->second, channelDesc.MutableDstEndpoint()->MutableActorId());
} else {
// For non-stream execution, collect results in executer and forward with response.
ActorIdToProto(srcTask.Meta.ExecuterId, channelDesc.MutableDstEndpoint()->MutableActorId());
@@ -831,54 +875,7 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, const TKqpTabl
}
}
-
-NYql::NDqProto::TDqTask SerializeTaskToProto(
- const TKqpTasksGraph& tasksGraph,
- const TTask& task,
- const TKqpTableKeys& tableKeys,
- const std::unordered_map<ui64, IActor*>& resultChannelProxies,
- const NMiniKQL::TTypeEnvironment& typeEnv)
-{
- auto& stageInfo = tasksGraph.GetStageInfo(task.StageId);
- NYql::NDqProto::TDqTask result;
- ActorIdToProto(task.Meta.ExecuterId, result.MutableExecuter()->MutableActorId());
- result.SetId(task.Id);
- result.SetStageId(stageInfo.Id.StageId);
-
- for (const auto& [paramName, paramValue] : task.Meta.DqTaskParams) {
- (*result.MutableTaskParams())[paramName] = paramValue;
- }
-
- for (const auto& [paramName, paramValue] : task.Meta.DqSecureParams) {
- (*result.MutableSecureParams())[paramName] = paramValue;
- }
-
- for (const auto& input : task.Inputs) {
- FillInputDesc(tasksGraph, resultChannelProxies, *result.AddInputs(), input);
- }
-
- for (const auto& output : task.Outputs) {
- FillOutputDesc(tasksGraph, resultChannelProxies, *result.AddOutputs(), output);
- }
-
- const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id);
- result.MutableProgram()->CopyFrom(stage.GetProgram());
- auto g = typeEnv.BindAllocator();
- for (auto& paramName : stage.GetProgramParameters()) {
- auto& dqParams = *result.MutableParameters();
- if (auto* taskParam = task.Meta.Params.FindPtr(paramName)) {
- dqParams[paramName] = *taskParam;
- } else {
- dqParams[paramName] = stageInfo.Meta.Tx.Params->SerializeParamValue(paramName);
- }
- }
-
- FillTaskMeta(stageInfo, task, tableKeys, result);
-
- return result;
-}
-
-void FillOutputDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output) {
+void FillOutputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output) {
switch (output.Type) {
case TTaskOutputType::Map:
YQL_ENSURE(output.Channels.size() == 1);
@@ -930,11 +927,11 @@ void FillOutputDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<u
for (auto& channel : output.Channels) {
auto& channelDesc = *outputDesc.AddChannels();
- FillChannelDesc(tasksGraph, resultChannelProxies, channelDesc, tasksGraph.GetChannel(channel));
+ FillChannelDesc(tasksGraph, channelDesc, tasksGraph.GetChannel(channel));
}
}
-void FillInputDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, NYql::NDqProto::TTaskInput& inputDesc, const TTaskInput& input) {
+void FillInputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskInput& inputDesc, const TTaskInput& input) {
switch (input.Type()) {
case NYql::NDq::TTaskInputType::Source:
inputDesc.MutableSource()->SetType(input.SourceType);
@@ -962,7 +959,7 @@ void FillInputDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui
for (ui64 channel : input.Channels) {
auto& channelDesc = *inputDesc.AddChannels();
- FillChannelDesc(tasksGraph, resultChannelProxies, channelDesc, tasksGraph.GetChannel(channel));
+ FillChannelDesc(tasksGraph, channelDesc, tasksGraph.GetChannel(channel));
}
if (input.Transform) {
@@ -974,6 +971,47 @@ void FillInputDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui
}
}
+NYql::NDqProto::TDqTask SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task)
+{
+ auto& stageInfo = tasksGraph.GetStageInfo(task.StageId);
+ NYql::NDqProto::TDqTask result;
+ ActorIdToProto(task.Meta.ExecuterId, result.MutableExecuter()->MutableActorId());
+ result.SetId(task.Id);
+ result.SetStageId(stageInfo.Id.StageId);
+
+ for (const auto& [paramName, paramValue] : task.Meta.DqTaskParams) {
+ (*result.MutableTaskParams())[paramName] = paramValue;
+ }
+
+ for (const auto& [paramName, paramValue] : task.Meta.DqSecureParams) {
+ (*result.MutableSecureParams())[paramName] = paramValue;
+ }
+
+ for (const auto& input : task.Inputs) {
+ FillInputDesc(tasksGraph, *result.AddInputs(), input);
+ }
+
+ for (const auto& output : task.Outputs) {
+ FillOutputDesc(tasksGraph, *result.AddOutputs(), output);
+ }
+
+ const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id);
+ result.MutableProgram()->CopyFrom(stage.GetProgram());
+ for (auto& paramName : stage.GetProgramParameters()) {
+ auto& dqParams = *result.MutableParameters();
+ if (auto* taskParam = task.Meta.Params.FindPtr(paramName)) {
+ dqParams[paramName] = *taskParam;
+ } else {
+ dqParams[paramName] = stageInfo.Meta.Tx.Params->SerializeParamValue(paramName);
+ }
+ }
+
+ AddSnapshotInfoToTaskInputs(tasksGraph, result);
+ FillTaskMeta(stageInfo, task, tasksGraph.GetMeta().TableKeys, result);
+
+ return result;
+}
+
TString TTaskMeta::ToString(const TVector<NScheme::TTypeInfo>& keyTypes, const NScheme::TTypeRegistry& typeRegistry) const
{
TStringBuilder sb;
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
index f81c0f85104..ad3777350a7 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
@@ -83,6 +83,14 @@ struct TStageInfoMeta {
};
+// things which are common for all tasks in the graph.
+struct TGraphMeta {
+ TKqpTableKeys TableKeys;
+ IKqpGateway::TKqpSnapshot Snapshot;
+ std::unordered_map<ui64, TActorId> ResultChannelProxies;
+ TActorId ExecuterId;
+};
+
struct TTaskInputMeta {};
struct TTaskOutputMeta {
@@ -196,20 +204,16 @@ using TTaskOutput = NYql::NDq::TTaskOutput<TTaskOutputMeta>;
using TTaskOutputType = NYql::NDq::TTaskOutputType;
using TTaskInput = NYql::NDq::TTaskInput<TTaskInputMeta>;
using TTask = NYql::NDq::TTask<TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>;
-using TKqpTasksGraph = NYql::NDq::TDqTasksGraph<TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>;
+using TKqpTasksGraph = NYql::NDq::TDqTasksGraph<TGraphMeta, TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>;
void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGateway::TPhysicalTxData>& txs);
void BuildKqpTaskGraphResultChannels(TKqpTasksGraph& tasksGraph, const TKqpPhyTxHolder::TConstPtr& tx, ui64 txIdx);
void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo,
ui64 txId, bool enableSpilling);
-NYql::NDqProto::TDqTask SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task,
- const TKqpTableKeys& tableKeys,
- const std::unordered_map<ui64, IActor*>& resultChannelProxies, const NMiniKQL::TTypeEnvironment& typeEnv);
+NYql::NDqProto::TDqTask SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task);
void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransaction_TTableMeta* meta);
-void FillChannelDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, NYql::NDqProto::TChannel& channelDesc, const NYql::NDq::TChannel& channel);
-void FillInputDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, NYql::NDqProto::TTaskInput& inputDesc, const TTaskInput& input);
-void FillOutputDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output);
+void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TChannel& channelDesc, const NYql::NDq::TChannel& channel);
template<typename Proto>
TVector<TTaskMeta::TColumn> BuildKqpColumns(const Proto& op, const TKqpTableKeys::TTable& table) {
diff --git a/ydb/core/kqp/query_data/kqp_query_data.cpp b/ydb/core/kqp/query_data/kqp_query_data.cpp
index 925c7ac6f3f..9af65d75a6d 100644
--- a/ydb/core/kqp/query_data/kqp_query_data.cpp
+++ b/ydb/core/kqp/query_data/kqp_query_data.cpp
@@ -335,6 +335,7 @@ bool TQueryData::MaterializeParamValue(bool ensure, const NKqpProto::TKqpPhyPara
}
NDqProto::TData TQueryData::SerializeParamValue(const TString& name) {
+ auto guard = TypeEnv().BindAllocator();
const auto& [type, value] = GetParameterUnboxedValue(name);
return NDq::TDqDataSerializer::SerializeParamValue(type, value);
}
diff --git a/ydb/library/yql/dq/tasks/dq_connection_builder.h b/ydb/library/yql/dq/tasks/dq_connection_builder.h
index 9ab5f580432..14c4f871f31 100644
--- a/ydb/library/yql/dq/tasks/dq_connection_builder.h
+++ b/ydb/library/yql/dq/tasks/dq_connection_builder.h
@@ -8,8 +8,8 @@ namespace NYql::NDq {
using TChannelLogFunc = std::function<void(ui64 channel, ui64 from, ui64 to, TStringBuf type, bool enableSpilling)>;
-template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta>
-void CommonBuildTasks(double hashShuffleTasksRatio, ui32 maxHashShuffleTasks, TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutputMeta>& graph, const NNodes::TDqPhyStage& stage) {
+template <class TGraphMeta, class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta>
+void CommonBuildTasks(double hashShuffleTasksRatio, ui32 maxHashShuffleTasks, TDqTasksGraph<TGraphMeta, TStageInfoMeta, TTaskMeta, TInputMeta, TOutputMeta>& graph, const NNodes::TDqPhyStage& stage) {
ui32 partitionsCount = 1;
diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h
index 4f63059d8fa..1271821b8e7 100644
--- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h
+++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h
@@ -185,7 +185,7 @@ struct TTask {
NDqProto::EWatermarksMode WatermarksMode = NDqProto::WATERMARKS_MODE_DISABLED;
};
-template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta>
+template <class TGraphMeta, class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta>
class TDqTasksGraph : private TMoveOnly {
public:
using TStageInfoType = TStageInfo<TStageInfoMeta>;
@@ -194,6 +194,14 @@ public:
public:
TDqTasksGraph() = default;
+ const TGraphMeta& GetMeta() const {
+ return Meta;
+ }
+
+ TGraphMeta& GetMeta() {
+ return Meta;
+ }
+
const TChannel& GetChannel(ui64 id) const {
YQL_ENSURE(id <= Channels.size());
return Channels[id - 1];
@@ -292,6 +300,7 @@ private:
THashMap<TStageId, TStageInfoType> StagesInfo;
TVector<TTaskType> Tasks;
TVector<TChannel> Channels;
+ TGraphMeta Meta;
};
} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/dq/planner/dqs_task_graph.h b/ydb/library/yql/providers/dq/planner/dqs_task_graph.h
index 0b62dfce4a9..c40ad0f8d3c 100644
--- a/ydb/library/yql/providers/dq/planner/dqs_task_graph.h
+++ b/ydb/library/yql/providers/dq/planner/dqs_task_graph.h
@@ -10,6 +10,8 @@ namespace NYql::NDqs {
NNodes::TDqPhyStage Stage;
};
+ struct TGraphMeta {};
+
struct TTaskInputMeta {
};
@@ -25,5 +27,5 @@ namespace NYql::NDqs {
using TTaskOutputType = NYql::NDq::TTaskOutputType;
using TTaskInput = NYql::NDq::TTaskInput<TTaskInputMeta>;
using TTask = NYql::NDq::TTask<TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>;
- using TDqsTasksGraph = NYql::NDq::TDqTasksGraph<TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>;
+ using TDqsTasksGraph = NYql::NDq::TDqTasksGraph<TGraphMeta, TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>;
}