diff options
author | gvit <gvit@ydb.tech> | 2023-04-06 15:21:11 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-04-06 15:21:11 +0300 |
commit | 7ac34b2f3251a82281b1c3a81e70029ea9de7305 (patch) | |
tree | 01826f39e919c28d533b79173d30b98dbf7b4b96 | |
parent | 867680d32dbb79d3502e1e42d46bdc2fe6a3f2bf (diff) | |
download | ydb-7ac34b2f3251a82281b1c3a81e70029ea9de7305.tar.gz |
continue refactoring task serialization process
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 142 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 35 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_partition_helper.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_partition_helper.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.cpp | 46 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 22 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 144 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.h | 18 | ||||
-rw-r--r-- | ydb/core/kqp/query_data/kqp_query_data.cpp | 1 | ||||
-rw-r--r-- | ydb/library/yql/dq/tasks/dq_connection_builder.h | 4 | ||||
-rw-r--r-- | ydb/library/yql/dq/tasks/dq_tasks_graph.h | 11 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/planner/dqs_task_graph.h | 4 |
13 files changed, 193 insertions, 243 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 1daefa0ff7d..d4f99134dfb 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -138,7 +138,7 @@ public: YQL_ENSURE(Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE); } - if (Request.Snapshot.IsValid()) { + if (GetSnapshot().IsValid()) { YQL_ENSURE(Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE); } @@ -1048,8 +1048,8 @@ private: if (!res->Record.GetTxLocks().empty()) { auto& lock = res->Record.GetTxLocks(0); auto tableId = TTableId(lock.GetSchemeShard(), lock.GetPathId()); - auto it = FindIf(TableKeys.Get(), [tableId](const auto& x){ return x.first.HasSamePath(tableId); }); - if (it != TableKeys.Get().end()) { + auto it = FindIf(GetTableKeys().Get(), [tableId](const auto& x){ return x.first.HasSamePath(tableId); }); + if (it != GetTableKeys().Get().end()) { tableName = it->second.Path; } } @@ -1329,7 +1329,7 @@ private: auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); - const auto& table = TableKeys.GetTable(stageInfo.Meta.TableId); + const auto& table = GetTableKeys().GetTable(stageInfo.Meta.TableId); const auto& keyTypes = table.KeyColumnTypes;; for (auto& op : stage.GetTableOps()) { @@ -1339,7 +1339,7 @@ private: case NKqpProto::TKqpPhyTableOperation::kReadRanges: case NKqpProto::TKqpPhyTableOperation::kReadRange: case NKqpProto::TKqpPhyTableOperation::kLookup: { - auto partitions = PrunePartitions(TableKeys, op, stageInfo, HolderFactory(), TypeEnv()); + auto partitions = PrunePartitions(GetTableKeys(), op, stageInfo, HolderFactory(), TypeEnv()); auto readSettings = ExtractReadSettings(op, stageInfo, HolderFactory(), TypeEnv()); for (auto& [shardId, shardInfo] : partitions) { @@ -1401,7 +1401,7 @@ private: ShardsWithEffects.insert(task.Meta.ShardId); } } else { - auto result = PruneEffectPartitions(TableKeys, op, stageInfo, HolderFactory(), TypeEnv()); + auto result = PruneEffectPartitions(GetTableKeys(), op, stageInfo, HolderFactory(), TypeEnv()); for (auto& [shardId, shardInfo] : result) { YQL_ENSURE(!shardInfo.KeyReadRanges); YQL_ENSURE(shardInfo.KeyWriteRanges); @@ -1580,14 +1580,14 @@ private: const ui32 flags = (ImmediateTx ? NTxDataShard::TTxFlags::Immediate : 0) | (VolatileTx ? NTxDataShard::TTxFlags::VolatilePrepare : 0); - if (Snapshot.IsValid() && (ReadOnlyTx || Request.UseImmediateEffects)) { + if (GetSnapshot().IsValid() && (ReadOnlyTx || Request.UseImmediateEffects)) { ev.reset(new TEvDataShard::TEvProposeTransaction( NKikimrTxDataShard::TX_KIND_DATA, SelfId(), TxId, dataTransaction.SerializeAsString(), - Snapshot.Step, - Snapshot.TxId, + GetSnapshot().Step, + GetSnapshot().TxId, flags)); } else { ev.reset(new TEvDataShard::TEvProposeTransaction( @@ -1650,7 +1650,6 @@ private: if (LockTxId.Defined() && *LockTxId == 0) { LockTxId = TxId; } - Snapshot = Request.Snapshot; NWilson::TSpan prepareTasksSpan(TWilsonKqp::DataExecuterPrepateTasks, ExecuterStateSpan.GetTraceId(), "PrepateTasks", NWilson::EFlags::AUTO_END); LWTRACK(KqpDataExecuterStartExecute, ResponseEv->Orbit, TxId); @@ -1699,7 +1698,7 @@ private: if (stage.SourcesSize() > 0) { switch (stage.GetSources(0).GetTypeCase()) { case NKqpProto::TKqpSource::kReadRangesSource: - readActors += BuildScanTasksFromSource(stageInfo, Request.Snapshot, LockTxId); + readActors += BuildScanTasksFromSource(stageInfo, LockTxId); break; case NKqpProto::TKqpSource::kExternalSource: BuildReadTasksFromSource(stageInfo); @@ -1719,7 +1718,7 @@ private: YQL_ENSURE(stageInfo.Tasks.size() == 1, "Unexpected multiple tasks in single-partition stage"); } - BuildKqpStageChannels(TasksGraph, TableKeys, stageInfo, TxId, /* enableSpilling */ false); + BuildKqpStageChannels(TasksGraph, GetTableKeys(), stageInfo, TxId, /* enableSpilling */ false); } ResponseEv->InitTxResult(tx.Body); @@ -1733,8 +1732,8 @@ private: } THashMap<ui64, TVector<NDqProto::TDqTask>> datashardTasks; // shardId -> [task] - THashMap<ui64, TVector<NDqProto::TDqTask>> remoteComputeTasks; // shardId -> [task] - TVector<NDqProto::TDqTask> computeTasks; + THashMap<ui64, TVector<ui64>> remoteComputeTasks; // shardId -> [task] + TVector<ui64> computeTasks; if (StreamResult) { InitializeChannelProxies(); @@ -1742,17 +1741,16 @@ private: for (auto& task : TasksGraph.GetTasks()) { auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); - NDqProto::TDqTask taskDesc = SerializeTaskToProto(TasksGraph, task, TableKeys, ResultChannelProxies, TypeEnv()); - if (task.Meta.ShardId && (task.Meta.Reads || task.Meta.Writes)) { - datashardTasks[task.Meta.ShardId].emplace_back(std::move(taskDesc)); + auto protoTask = SerializeTaskToProto(TasksGraph, task); + datashardTasks[task.Meta.ShardId].emplace_back(std::move(protoTask)); } else if (stageInfo.Meta.IsSysView()) { - computeTasks.emplace_back(std::move(taskDesc)); + computeTasks.emplace_back(task.Id); } else { if (task.Meta.ShardId) { - remoteComputeTasks[task.Meta.ShardId].emplace_back(std::move(taskDesc)); + remoteComputeTasks[task.Meta.ShardId].emplace_back(task.Id); } else { - computeTasks.emplace_back(std::move(taskDesc)); + computeTasks.emplace_back(task.Id); } } } @@ -1760,6 +1758,7 @@ private: for(const auto& channel: TasksGraph.GetChannels()) { if (IsCrossShardChannel(TasksGraph, channel)) { HasPersistentChannels = true; + break; } } @@ -1817,11 +1816,10 @@ private: break; } - if ((ReadOnlyTx || Request.UseImmediateEffects) && Request.Snapshot.IsValid()) { + if ((ReadOnlyTx || Request.UseImmediateEffects) && GetSnapshot().IsValid()) { // Snapshot reads are always immediate // Uncommitted writes are executed without coordinators, so they can be immediate YQL_ENSURE(!VolatileTx); - Snapshot = Request.Snapshot; ImmediateTx = true; } @@ -1907,7 +1905,7 @@ private: return; } - Snapshot = TKqpSnapshot(record.GetSnapshotStep(), record.GetSnapshotTxId()); + TasksGraph.GetMeta().Snapshot = TKqpSnapshot(record.GetSnapshotStep(), record.GetSnapshotTxId()); ImmediateTx = true; ContinueExecute(); @@ -1924,7 +1922,7 @@ private: // (legacy behaviour, for compatibility with current execution engine) UseFollowers = false; } - if (Snapshot.IsValid()) { + if (GetSnapshot().IsValid()) { // TODO: KIKIMR-11912 UseFollowers = false; } @@ -2127,11 +2125,10 @@ private: LWTRACK(KqpDataExecuterStartTasksAndTxs, ResponseEv->Orbit, TxId, ComputeTasks.size(), DatashardTxs.size()); // first, start compute tasks - TVector<ui64> computeTaskIds{Reserve(ComputeTasks.size())}; bool shareMailbox = (ComputeTasks.size() <= 1); - for (auto&& taskDesc : ComputeTasks) { - computeTaskIds.emplace_back(taskDesc.GetId()); - FillInputSettings(taskDesc); + for (ui64 taskId : ComputeTasks) { + const auto& task = TasksGraph.GetTask(taskId); + auto taskDesc = SerializeTaskToProto(TasksGraph, task); ExecuteDataComputeTask(std::move(taskDesc), shareMailbox); } @@ -2141,37 +2138,16 @@ private: auto it = ShardIdToNodeId.find(shardId); YQL_ENSURE(it != ShardIdToNodeId.end()); - for (auto& taskDesc : tasks) { - ui64 taskId = taskDesc.GetId(); - auto& task = TasksGraph.GetTask(taskId); - for (ui64 outputIndex = 0; outputIndex < task.Outputs.size(); ++outputIndex) { - auto& output = task.Outputs[outputIndex]; - auto* protoOutput = taskDesc.MutableOutputs(outputIndex); - - for (ui64 outputChannelIndex = 0; outputChannelIndex < output.Channels.size(); ++outputChannelIndex) { - ui64 outputChannelId = output.Channels[outputChannelIndex]; - auto* protoChannel = protoOutput->MutableChannels(outputChannelIndex); - - ui64 dstTaskId = TasksGraph.GetChannel(outputChannelId).DstTask; - if (dstTaskId == 0) { - continue; - } - - auto& dstTask = TasksGraph.GetTask(dstTaskId); - if (dstTask.ComputeActorId) { - protoChannel->MutableDstEndpoint()->Clear(); - ActorIdToProto(dstTask.ComputeActorId, protoChannel->MutableDstEndpoint()->MutableActorId()); - } - } - } + for (ui64 taskId : tasks) { + const auto& task = TasksGraph.GetTask(taskId); remoteComputeTasksCnt += 1; - FillInputSettings(taskDesc); - PendingComputeTasks.insert(taskDesc.GetId()); + PendingComputeTasks.insert(taskId); + auto taskDesc = SerializeTaskToProto(TasksGraph, task); tasksPerNode[it->second].emplace_back(std::move(taskDesc)); } } - Planner = CreateKqpPlanner(TxId, SelfId(), {}, std::move(tasksPerNode), Request.Snapshot, + Planner = CreateKqpPlanner(TxId, SelfId(), {}, std::move(tasksPerNode), GetSnapshot(), Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, Request.DisableLlvmForUdfStages, Request.LlvmEnabled, false, Nothing(), ExecuterSpan, {}, ExecuterRetriesConfig); @@ -2246,7 +2222,7 @@ private: LOG_T("Updating channels after the creation of compute actors"); THashMap<TActorId, THashSet<ui64>> updates; - for (ui64 taskId : computeTaskIds) { + for (ui64 taskId : ComputeTasks) { auto& task = TasksGraph.GetTask(taskId); CollectTaskChannelsUpdates(task, updates); } @@ -2335,54 +2311,6 @@ private: } } - void FillInputSettings(NYql::NDqProto::TDqTask& task) { - for (auto& input : *task.MutableInputs()) { - if (input.HasTransform()) { - auto transform = input.MutableTransform(); - YQL_ENSURE(transform->GetType() == "StreamLookupInputTransformer", - "Unexpected input transform type: " << transform->GetType()); - - const google::protobuf::Any& settingsAny = transform->GetSettings(); - YQL_ENSURE(settingsAny.Is<NKikimrKqp::TKqpStreamLookupSettings>(), "Expected settings type: " - << NKikimrKqp::TKqpStreamLookupSettings::descriptor()->full_name() - << " , but got: " << settingsAny.type_url()); - - NKikimrKqp::TKqpStreamLookupSettings settings; - YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings"); - - if (Snapshot.IsValid()) { - settings.MutableSnapshot()->SetStep(Snapshot.Step); - settings.MutableSnapshot()->SetTxId(Snapshot.TxId); - } - - if (LockTxId.Defined()) { - settings.SetLockTxId(*LockTxId); - } - - settings.SetImmediateTx(ImmediateTx); - transform->MutableSettings()->PackFrom(settings); - } - if (input.HasSource() && Snapshot != Request.Snapshot && input.GetSource().GetType() == NYql::KqpReadRangesSourceName) { - auto source = input.MutableSource(); - const google::protobuf::Any& settingsAny = source->GetSettings(); - - YQL_ENSURE(settingsAny.Is<NKikimrTxDataShard::TKqpReadRangesSourceSettings>(), "Expected settings type: " - << NKikimrTxDataShard::TKqpReadRangesSourceSettings::descriptor()->full_name() - << " , but got: " << settingsAny.type_url()); - - NKikimrTxDataShard::TKqpReadRangesSourceSettings settings; - YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings"); - - if (Snapshot.IsValid()) { - settings.MutableSnapshot()->SetStep(Snapshot.Step); - settings.MutableSnapshot()->SetTxId(Snapshot.TxId); - } - - source->MutableSettings()->PackFrom(settings); - } - } - } - static bool HasMissingSnapshotError(const NKikimrTxDataShard::TEvProposeTransactionResult& result) { for (const auto& err : result.GetError()) { if (err.GetKind() == NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST) { @@ -2437,14 +2365,10 @@ private: THashSet<ui64> ShardsWithEffects; bool HasPersistentChannels = false; - // Either requested or temporarily acquired snapshot - TKqpSnapshot Snapshot; - THashSet<ui64> SubscribedNodes; + THashMap<ui64, TVector<ui64>> RemoteComputeTasks; - THashMap<ui64, TVector<NDqProto::TDqTask>> RemoteComputeTasks; - - TVector<NDqProto::TDqTask> ComputeTasks; + TVector<ui64> ComputeTasks; TDatashardTxs DatashardTxs; TTopicTabletTxs TopicTxs; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index c31ea277956..08bb56cf661 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -120,6 +120,7 @@ public: , Planner(nullptr) , ExecuterRetriesConfig(executerRetriesConfig) { + TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId); ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc); ResponseEv->Orbit = std::move(Request.Orbit); Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph, @@ -322,7 +323,7 @@ protected: } auto kqpTableResolver = CreateKqpTableResolver(this->SelfId(), TxId, UserToken, Request.Transactions, - TableKeys, TasksGraph); + GetTableKeysRef(), TasksGraph); KqpTableResolverId = this->RegisterWithSameMailbox(kqpTableResolver); LOG_T("Got request, become WaitResolveState"); @@ -571,7 +572,7 @@ protected: auto& record = channelsInfoEv->Record; for (auto& channelId : channelIds) { - FillChannelDesc(TasksGraph, ResultChannelProxies, *record.AddUpdate(), TasksGraph.GetChannel(channelId)); + FillChannelDesc(TasksGraph, *record.AddUpdate(), TasksGraph.GetChannel(channelId)); } LOG_T("Sending channels info to compute actor: " << computeActorId << ", channels: " << channelIds.size()); @@ -634,7 +635,7 @@ protected: auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); - const auto& table = TableKeys.GetTable(stageInfo.Meta.TableId); + const auto& table = GetTableKeys().GetTable(stageInfo.Meta.TableId); const auto& keyTypes = table.KeyColumnTypes; for (auto& op : stage.GetTableOps()) { @@ -695,7 +696,7 @@ protected: } } - size_t BuildScanTasksFromSource(TStageInfo& stageInfo, IKqpGateway::TKqpSnapshot snapshot, const TMaybe<ui64> lockTxId = {}) { + size_t BuildScanTasksFromSource(TStageInfo& stageInfo, const TMaybe<ui64> lockTxId = {}) { THashMap<ui64, std::vector<ui64>> nodeTasks; THashMap<ui64, ui64> assignedShardsCount; @@ -706,13 +707,13 @@ protected: auto& source = stage.GetSources(0).GetReadRangesSource(); - const auto& table = TableKeys.GetTable(MakeTableId(source.GetTable())); + const auto& table = GetTableKeys().GetTable(MakeTableId(source.GetTable())); const auto& keyTypes = table.KeyColumnTypes; YQL_ENSURE(table.TableKind != NKikimr::NKqp::ETableKind::Olap); auto columns = BuildKqpColumns(source, table); - auto partitions = PrunePartitions(TableKeys, source, stageInfo, HolderFactory(), TypeEnv()); + auto partitions = PrunePartitions(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv()); ui64 itemsLimit = 0; @@ -720,6 +721,8 @@ protected: NYql::NDqProto::TData itemsLimitBytes; NKikimr::NMiniKQL::TType* itemsLimitType = nullptr; + const auto& snapshot = GetSnapshot(); + for (auto& [shardId, shardInfo] : partitions) { YQL_ENSURE(!shardInfo.KeyWriteRanges); @@ -945,6 +948,10 @@ protected: } } + const IKqpGateway::TKqpSnapshot& GetSnapshot() const { + return TasksGraph.GetMeta().Snapshot; + } + IActor* CreateChannelProxy(const NYql::NDq::TChannel& channel) { IActor* proxy; @@ -970,6 +977,7 @@ protected: this->RegisterWithSameMailbox(proxy); ResultChannelProxies.emplace(std::make_pair(channel.Id, proxy)); + TasksGraph.GetMeta().ResultChannelProxies.emplace(channel.Id, proxy->SelfId()); return proxy; } @@ -1012,6 +1020,18 @@ protected: } } + const TKqpTableKeys& GetTableKeys() const { + return TasksGraph.GetMeta().TableKeys; + } + + TKqpTableKeys& GetTableKeysRef() { + return TasksGraph.GetMeta().TableKeys; + } + + std::unordered_map<ui64, IActor*>& GetResultChannelProxies() { + return ResultChannelProxies; + } + TString DebugString() const { TStringBuilder sb; sb << "[KqpExecuter], type: " << (ExecType == EExecType::Data ? "Data" : "Scan") @@ -1036,19 +1056,18 @@ protected: ui64 TxId = 0; TKqpTasksGraph TasksGraph; - TKqpTableKeys TableKeys; TActorId KqpTableResolverId; TActorId KqpShardsResolverId; THashMap<TActorId, TProgressStat> PendingComputeActors; // Running compute actors (pure and DS) THashMap<TActorId, NYql::NDqProto::TComputeActorExtraData> ExtraData; - std::unordered_map<ui64, IActor*> ResultChannelProxies; TVector<TProgressStat> LastStats; TInstant StartResolveTime; TInstant LastResourceUsageUpdate; + std::unordered_map<ui64, IActor*> ResultChannelProxies; std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ResponseEv; NWilson::TSpan ExecuterSpan; NWilson::TSpan ExecuterStateSpan; diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp index d609608dd45..f76b7113215 100644 --- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp +++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp @@ -667,7 +667,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, return shardInfoMap; } -THashMap<ui64, TShardInfo> PrunePartitions(TKqpTableKeys& tableKeys, +THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { @@ -908,7 +908,7 @@ THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys, return PruneEffectPartitionsImpl(tableKeys, effect, stageInfo, holderFactory, typeEnv); } -THashMap<ui64, TShardInfo> PruneEffectPartitions(TKqpTableKeys& tableKeys, +THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.h b/ydb/core/kqp/executer_actor/kqp_partition_helper.h index 64537bc65b9..a50b901abd0 100644 --- a/ydb/core/kqp/executer_actor/kqp_partition_helper.h +++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.h @@ -77,7 +77,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpPhyOpReadOlapRanges& readRanges, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); -THashMap<ui64, TShardInfo> PrunePartitions(TKqpTableKeys& tableKeys, +THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); @@ -89,7 +89,7 @@ THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpPhyOpDeleteRows& effect, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); -THashMap<ui64, TShardInfo> PruneEffectPartitions(TKqpTableKeys& tableKeys, +THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 0080a0f0908..c8270bb628a 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -341,7 +341,6 @@ void TKqpPlanner::PrepareKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& req if (DisableLlvmForUdfStages && taskDesc.GetProgram().GetSettings().GetHasUdf()) { withLLVM = false; } - AddSnapshotInfoToTaskInputs(taskDesc); request.AddTasks()->Swap(&taskDesc); } } @@ -351,7 +350,6 @@ void TKqpPlanner::PrepareKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& req if (DisableLlvmForUdfStages && taskDesc.GetProgram().GetSettings().GetHasUdf()) { withLLVM = false; } - AddSnapshotInfoToTaskInputs(taskDesc); request.AddTasks()->Swap(&taskDesc); } } @@ -396,7 +394,6 @@ void TKqpPlanner::AddScansToKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& if (DisableLlvmForUdfStages && task.GetProgram().GetSettings().GetHasUdf()) { withLLVM = false; } - AddSnapshotInfoToTaskInputs(task); request.AddTasks()->Swap(&task); } MainTasksPerNode.erase(nodeId); @@ -415,49 +412,6 @@ ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) { return flags; } -void TKqpPlanner::AddSnapshotInfoToTaskInputs(NYql::NDqProto::TDqTask& task) { - YQL_ENSURE(Snapshot.IsValid()); - - for (auto& input : *task.MutableInputs()) { - if (input.HasTransform()) { - auto transform = input.MutableTransform(); - YQL_ENSURE(transform->GetType() == "StreamLookupInputTransformer", - "Unexpected input transform type: " << transform->GetType()); - - const google::protobuf::Any& settingsAny = transform->GetSettings(); - YQL_ENSURE(settingsAny.Is<NKikimrKqp::TKqpStreamLookupSettings>(), "Expected settings type: " - << NKikimrKqp::TKqpStreamLookupSettings::descriptor()->full_name() - << " , but got: " << settingsAny.type_url()); - - NKikimrKqp::TKqpStreamLookupSettings settings; - YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings"); - - settings.MutableSnapshot()->SetStep(Snapshot.Step); - settings.MutableSnapshot()->SetTxId(Snapshot.TxId); - - transform->MutableSettings()->PackFrom(settings); - } - if (input.HasSource() && input.GetSource().GetType() == NYql::KqpReadRangesSourceName) { - auto source = input.MutableSource(); - const google::protobuf::Any& settingsAny = source->GetSettings(); - - YQL_ENSURE(settingsAny.Is<NKikimrTxDataShard::TKqpReadRangesSourceSettings>(), "Expected settings type: " - << NKikimrTxDataShard::TKqpReadRangesSourceSettings::descriptor()->full_name() - << " , but got: " << settingsAny.type_url()); - - NKikimrTxDataShard::TKqpReadRangesSourceSettings settings; - YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings"); - - if (Snapshot.IsValid()) { - settings.MutableSnapshot()->SetStep(Snapshot.Step); - settings.MutableSnapshot()->SetTxId(Snapshot.TxId); - } - - source->MutableSettings()->PackFrom(settings); - } - } -} - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// std::unique_ptr<TKqpPlanner> CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NYql::NDqProto::TDqTask>&& tasks, THashMap<ui64, TVector<NYql::NDqProto::TDqTask>>&& mainTasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot, diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index a7e452c0afa..1149c67901d 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -49,7 +49,6 @@ private: void PrepareKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, THashSet<ui64> taskIds); void AddScansToKqpNodeRequest(NKikimrKqp::TEvStartKqpTasksRequest& request, ui64 nodeId); - void AddSnapshotInfoToTaskInputs(NYql::NDqProto::TDqTask& task); ui32 CalcSendMessageFlagsForNode(ui32 nodeId); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 189c1efca66..e135a156500 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -130,14 +130,15 @@ private: << ", enough: " << ev->Get()->Record.GetEnough() << ", from: " << ev->Sender); - if (ResultChannelProxies.empty()) { + auto& resultChannelProxies = GetResultChannelProxies(); + if (resultChannelProxies.empty()) { return; } // Forward only for stream results, data results acks event theirselves. YQL_ENSURE(!ResponseEv->TxResults.empty() && ResponseEv->TxResults[0].IsStream); - auto channelIt = ResultChannelProxies.begin(); + auto channelIt = resultChannelProxies.begin(); auto handle = ev->Forward(channelIt->second->SelfId()); channelIt->second->Receive(handle, TlsActivationContext->AsActorContext()); } @@ -268,14 +269,14 @@ private: THashMap<ui64, ui64> assignedShardsCount; auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); - const auto& table = TableKeys.GetTable(stageInfo.Meta.TableId); + const auto& table = GetTableKeys().GetTable(stageInfo.Meta.TableId); const auto& keyTypes = table.KeyColumnTypes; for (auto& op : stage.GetTableOps()) { Y_VERIFY_DEBUG(stageInfo.Meta.TablePath == op.GetTable().GetPath()); auto columns = BuildKqpColumns(op, table); - auto partitions = PrunePartitions(TableKeys, op, stageInfo, HolderFactory(), TypeEnv()); + auto partitions = PrunePartitions(GetTableKeys(), op, stageInfo, HolderFactory(), TypeEnv()); const bool isOlapScan = (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange); auto readSettings = ExtractReadSettings(op, stageInfo, HolderFactory(), TypeEnv()); @@ -473,7 +474,7 @@ private: if (stage.SourcesSize() > 0) { switch (stage.GetSources(0).GetTypeCase()) { case NKqpProto::TKqpSource::kReadRangesSource: - BuildScanTasksFromSource(stageInfo, Request.Snapshot); + BuildScanTasksFromSource(stageInfo); break; default: YQL_ENSURE(false, "unknown source type"); @@ -492,7 +493,7 @@ private: YQL_ENSURE(stageInfo.Tasks.size() == 1, "Unexpected multiple tasks in single-partition stage"); } - BuildKqpStageChannels(TasksGraph, TableKeys, stageInfo, TxId, AppData()->EnableKqpSpilling); + BuildKqpStageChannels(TasksGraph, GetTableKeys(), stageInfo, TxId, AppData()->EnableKqpSpilling); } ResponseEv->InitTxResult(tx.Body); @@ -515,8 +516,7 @@ private: for (auto& task : TasksGraph.GetTasks()) { auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); - - NYql::NDqProto::TDqTask taskDesc = SerializeTaskToProto(TasksGraph, task, TableKeys, ResultChannelProxies, TypeEnv()); + NYql::NDqProto::TDqTask taskDesc = SerializeTaskToProto(TasksGraph, task); if (task.Meta.NodeId || stageInfo.Meta.IsSysView()) { // Task with source @@ -567,7 +567,7 @@ private: LOG_D("Total tasks: " << TasksGraph.GetTasks().size() << ", readonly: true" << ", " << nScanTasks << " scan tasks on " << scanTasks.size() << " nodes" << ", totalShardScans: " << nShardScans << ", execType: Scan" - << ", snapshot: {" << Request.Snapshot.TxId << ", " << Request.Snapshot.Step << "}"); + << ", snapshot: {" << GetSnapshot().TxId << ", " << GetSnapshot().Step << "}"); ExecuteScanTx(std::move(computeTasks), std::move(scanTasks), std::move(snapshot)); @@ -623,7 +623,7 @@ private: } Planner = CreateKqpPlanner(TxId, SelfId(), std::move(computeTasks), - std::move(scanTasks), Request.Snapshot, + std::move(scanTasks), GetSnapshot(), Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, Request.DisableLlvmForUdfStages, Request.LlvmEnabled, AppData()->EnableKqpSpilling, Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig); @@ -650,7 +650,7 @@ private: } void PassAway() override { - for (auto channelPair: ResultChannelProxies) { + for (auto channelPair: GetResultChannelProxies()) { LOG_D("terminate result channel " << channelPair.first << " proxy at " << channelPair.second->SelfId()); TAutoPtr<IEventHandle> ev = new IEventHandle( diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 5b3cee3a0af..dde112a4843 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -644,6 +644,48 @@ std::pair<const TSerializedCellVec*, bool> TShardKeyRanges::GetRightBorder() con return !lastRange.Point ? std::make_pair(&lastRange.To, lastRange.ToInclusive) : std::make_pair(&lastRange.From, true); } +void AddSnapshotInfoToTaskInputs(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TDqTask& task) { + const auto& snapshot = tasksGraph.GetMeta().Snapshot; + for (auto& input : *task.MutableInputs()) { + if (input.HasTransform()) { + YQL_ENSURE(snapshot.IsValid()); + auto transform = input.MutableTransform(); + YQL_ENSURE(transform->GetType() == "StreamLookupInputTransformer", + "Unexpected input transform type: " << transform->GetType()); + + const google::protobuf::Any& settingsAny = transform->GetSettings(); + YQL_ENSURE(settingsAny.Is<NKikimrKqp::TKqpStreamLookupSettings>(), "Expected settings type: " + << NKikimrKqp::TKqpStreamLookupSettings::descriptor()->full_name() + << " , but got: " << settingsAny.type_url()); + + NKikimrKqp::TKqpStreamLookupSettings settings; + YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings"); + + settings.MutableSnapshot()->SetStep(snapshot.Step); + settings.MutableSnapshot()->SetTxId(snapshot.TxId); + + transform->MutableSettings()->PackFrom(settings); + } + if (input.HasSource() && input.GetSource().GetType() == NYql::KqpReadRangesSourceName) { + auto source = input.MutableSource(); + const google::protobuf::Any& settingsAny = source->GetSettings(); + + YQL_ENSURE(settingsAny.Is<NKikimrTxDataShard::TKqpReadRangesSourceSettings>(), "Expected settings type: " + << NKikimrTxDataShard::TKqpReadRangesSourceSettings::descriptor()->full_name() + << " , but got: " << settingsAny.type_url()); + + NKikimrTxDataShard::TKqpReadRangesSourceSettings settings; + YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings"); + + if (snapshot.IsValid()) { + settings.MutableSnapshot()->SetStep(snapshot.Step); + settings.MutableSnapshot()->SetTxId(snapshot.TxId); + } + + source->MutableSettings()->PackFrom(settings); + } + } +} void FillEndpointDesc(NDqProto::TEndpoint& endpoint, const TTask& task) { if (task.ComputeActorId) { @@ -653,11 +695,13 @@ void FillEndpointDesc(NDqProto::TEndpoint& endpoint, const TTask& task) { } } -void FillChannelDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, NDqProto::TChannel& channelDesc, const TChannel& channel) { +void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NDqProto::TChannel& channelDesc, const TChannel& channel) { channelDesc.SetId(channel.Id); channelDesc.SetSrcTaskId(channel.SrcTask); channelDesc.SetDstTaskId(channel.DstTask); + const auto& resultChannelProxies = tasksGraph.GetMeta().ResultChannelProxies; + YQL_ENSURE(channel.SrcTask); const auto& srcTask = tasksGraph.GetTask(channel.SrcTask); FillEndpointDesc(*channelDesc.MutableSrcEndpoint(), srcTask); @@ -667,7 +711,7 @@ void FillChannelDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map< } else if (!resultChannelProxies.empty()) { auto it = resultChannelProxies.find(channel.Id); YQL_ENSURE(it != resultChannelProxies.end()); - ActorIdToProto(it->second->SelfId(), channelDesc.MutableDstEndpoint()->MutableActorId()); + ActorIdToProto(it->second, channelDesc.MutableDstEndpoint()->MutableActorId()); } else { // For non-stream execution, collect results in executer and forward with response. ActorIdToProto(srcTask.Meta.ExecuterId, channelDesc.MutableDstEndpoint()->MutableActorId()); @@ -831,54 +875,7 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, const TKqpTabl } } - -NYql::NDqProto::TDqTask SerializeTaskToProto( - const TKqpTasksGraph& tasksGraph, - const TTask& task, - const TKqpTableKeys& tableKeys, - const std::unordered_map<ui64, IActor*>& resultChannelProxies, - const NMiniKQL::TTypeEnvironment& typeEnv) -{ - auto& stageInfo = tasksGraph.GetStageInfo(task.StageId); - NYql::NDqProto::TDqTask result; - ActorIdToProto(task.Meta.ExecuterId, result.MutableExecuter()->MutableActorId()); - result.SetId(task.Id); - result.SetStageId(stageInfo.Id.StageId); - - for (const auto& [paramName, paramValue] : task.Meta.DqTaskParams) { - (*result.MutableTaskParams())[paramName] = paramValue; - } - - for (const auto& [paramName, paramValue] : task.Meta.DqSecureParams) { - (*result.MutableSecureParams())[paramName] = paramValue; - } - - for (const auto& input : task.Inputs) { - FillInputDesc(tasksGraph, resultChannelProxies, *result.AddInputs(), input); - } - - for (const auto& output : task.Outputs) { - FillOutputDesc(tasksGraph, resultChannelProxies, *result.AddOutputs(), output); - } - - const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id); - result.MutableProgram()->CopyFrom(stage.GetProgram()); - auto g = typeEnv.BindAllocator(); - for (auto& paramName : stage.GetProgramParameters()) { - auto& dqParams = *result.MutableParameters(); - if (auto* taskParam = task.Meta.Params.FindPtr(paramName)) { - dqParams[paramName] = *taskParam; - } else { - dqParams[paramName] = stageInfo.Meta.Tx.Params->SerializeParamValue(paramName); - } - } - - FillTaskMeta(stageInfo, task, tableKeys, result); - - return result; -} - -void FillOutputDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output) { +void FillOutputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output) { switch (output.Type) { case TTaskOutputType::Map: YQL_ENSURE(output.Channels.size() == 1); @@ -930,11 +927,11 @@ void FillOutputDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<u for (auto& channel : output.Channels) { auto& channelDesc = *outputDesc.AddChannels(); - FillChannelDesc(tasksGraph, resultChannelProxies, channelDesc, tasksGraph.GetChannel(channel)); + FillChannelDesc(tasksGraph, channelDesc, tasksGraph.GetChannel(channel)); } } -void FillInputDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, NYql::NDqProto::TTaskInput& inputDesc, const TTaskInput& input) { +void FillInputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskInput& inputDesc, const TTaskInput& input) { switch (input.Type()) { case NYql::NDq::TTaskInputType::Source: inputDesc.MutableSource()->SetType(input.SourceType); @@ -962,7 +959,7 @@ void FillInputDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui for (ui64 channel : input.Channels) { auto& channelDesc = *inputDesc.AddChannels(); - FillChannelDesc(tasksGraph, resultChannelProxies, channelDesc, tasksGraph.GetChannel(channel)); + FillChannelDesc(tasksGraph, channelDesc, tasksGraph.GetChannel(channel)); } if (input.Transform) { @@ -974,6 +971,47 @@ void FillInputDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui } } +NYql::NDqProto::TDqTask SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task) +{ + auto& stageInfo = tasksGraph.GetStageInfo(task.StageId); + NYql::NDqProto::TDqTask result; + ActorIdToProto(task.Meta.ExecuterId, result.MutableExecuter()->MutableActorId()); + result.SetId(task.Id); + result.SetStageId(stageInfo.Id.StageId); + + for (const auto& [paramName, paramValue] : task.Meta.DqTaskParams) { + (*result.MutableTaskParams())[paramName] = paramValue; + } + + for (const auto& [paramName, paramValue] : task.Meta.DqSecureParams) { + (*result.MutableSecureParams())[paramName] = paramValue; + } + + for (const auto& input : task.Inputs) { + FillInputDesc(tasksGraph, *result.AddInputs(), input); + } + + for (const auto& output : task.Outputs) { + FillOutputDesc(tasksGraph, *result.AddOutputs(), output); + } + + const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id); + result.MutableProgram()->CopyFrom(stage.GetProgram()); + for (auto& paramName : stage.GetProgramParameters()) { + auto& dqParams = *result.MutableParameters(); + if (auto* taskParam = task.Meta.Params.FindPtr(paramName)) { + dqParams[paramName] = *taskParam; + } else { + dqParams[paramName] = stageInfo.Meta.Tx.Params->SerializeParamValue(paramName); + } + } + + AddSnapshotInfoToTaskInputs(tasksGraph, result); + FillTaskMeta(stageInfo, task, tasksGraph.GetMeta().TableKeys, result); + + return result; +} + TString TTaskMeta::ToString(const TVector<NScheme::TTypeInfo>& keyTypes, const NScheme::TTypeRegistry& typeRegistry) const { TStringBuilder sb; diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index f81c0f85104..ad3777350a7 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -83,6 +83,14 @@ struct TStageInfoMeta { }; +// things which are common for all tasks in the graph. +struct TGraphMeta { + TKqpTableKeys TableKeys; + IKqpGateway::TKqpSnapshot Snapshot; + std::unordered_map<ui64, TActorId> ResultChannelProxies; + TActorId ExecuterId; +}; + struct TTaskInputMeta {}; struct TTaskOutputMeta { @@ -196,20 +204,16 @@ using TTaskOutput = NYql::NDq::TTaskOutput<TTaskOutputMeta>; using TTaskOutputType = NYql::NDq::TTaskOutputType; using TTaskInput = NYql::NDq::TTaskInput<TTaskInputMeta>; using TTask = NYql::NDq::TTask<TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>; -using TKqpTasksGraph = NYql::NDq::TDqTasksGraph<TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>; +using TKqpTasksGraph = NYql::NDq::TDqTasksGraph<TGraphMeta, TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>; void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGateway::TPhysicalTxData>& txs); void BuildKqpTaskGraphResultChannels(TKqpTasksGraph& tasksGraph, const TKqpPhyTxHolder::TConstPtr& tx, ui64 txIdx); void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo, ui64 txId, bool enableSpilling); -NYql::NDqProto::TDqTask SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task, - const TKqpTableKeys& tableKeys, - const std::unordered_map<ui64, IActor*>& resultChannelProxies, const NMiniKQL::TTypeEnvironment& typeEnv); +NYql::NDqProto::TDqTask SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task); void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransaction_TTableMeta* meta); -void FillChannelDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, NYql::NDqProto::TChannel& channelDesc, const NYql::NDq::TChannel& channel); -void FillInputDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, NYql::NDqProto::TTaskInput& inputDesc, const TTaskInput& input); -void FillOutputDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output); +void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TChannel& channelDesc, const NYql::NDq::TChannel& channel); template<typename Proto> TVector<TTaskMeta::TColumn> BuildKqpColumns(const Proto& op, const TKqpTableKeys::TTable& table) { diff --git a/ydb/core/kqp/query_data/kqp_query_data.cpp b/ydb/core/kqp/query_data/kqp_query_data.cpp index 925c7ac6f3f..9af65d75a6d 100644 --- a/ydb/core/kqp/query_data/kqp_query_data.cpp +++ b/ydb/core/kqp/query_data/kqp_query_data.cpp @@ -335,6 +335,7 @@ bool TQueryData::MaterializeParamValue(bool ensure, const NKqpProto::TKqpPhyPara } NDqProto::TData TQueryData::SerializeParamValue(const TString& name) { + auto guard = TypeEnv().BindAllocator(); const auto& [type, value] = GetParameterUnboxedValue(name); return NDq::TDqDataSerializer::SerializeParamValue(type, value); } diff --git a/ydb/library/yql/dq/tasks/dq_connection_builder.h b/ydb/library/yql/dq/tasks/dq_connection_builder.h index 9ab5f580432..14c4f871f31 100644 --- a/ydb/library/yql/dq/tasks/dq_connection_builder.h +++ b/ydb/library/yql/dq/tasks/dq_connection_builder.h @@ -8,8 +8,8 @@ namespace NYql::NDq { using TChannelLogFunc = std::function<void(ui64 channel, ui64 from, ui64 to, TStringBuf type, bool enableSpilling)>; -template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta> -void CommonBuildTasks(double hashShuffleTasksRatio, ui32 maxHashShuffleTasks, TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutputMeta>& graph, const NNodes::TDqPhyStage& stage) { +template <class TGraphMeta, class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta> +void CommonBuildTasks(double hashShuffleTasksRatio, ui32 maxHashShuffleTasks, TDqTasksGraph<TGraphMeta, TStageInfoMeta, TTaskMeta, TInputMeta, TOutputMeta>& graph, const NNodes::TDqPhyStage& stage) { ui32 partitionsCount = 1; diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h index 4f63059d8fa..1271821b8e7 100644 --- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h +++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h @@ -185,7 +185,7 @@ struct TTask { NDqProto::EWatermarksMode WatermarksMode = NDqProto::WATERMARKS_MODE_DISABLED; }; -template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta> +template <class TGraphMeta, class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta> class TDqTasksGraph : private TMoveOnly { public: using TStageInfoType = TStageInfo<TStageInfoMeta>; @@ -194,6 +194,14 @@ public: public: TDqTasksGraph() = default; + const TGraphMeta& GetMeta() const { + return Meta; + } + + TGraphMeta& GetMeta() { + return Meta; + } + const TChannel& GetChannel(ui64 id) const { YQL_ENSURE(id <= Channels.size()); return Channels[id - 1]; @@ -292,6 +300,7 @@ private: THashMap<TStageId, TStageInfoType> StagesInfo; TVector<TTaskType> Tasks; TVector<TChannel> Channels; + TGraphMeta Meta; }; } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/dq/planner/dqs_task_graph.h b/ydb/library/yql/providers/dq/planner/dqs_task_graph.h index 0b62dfce4a9..c40ad0f8d3c 100644 --- a/ydb/library/yql/providers/dq/planner/dqs_task_graph.h +++ b/ydb/library/yql/providers/dq/planner/dqs_task_graph.h @@ -10,6 +10,8 @@ namespace NYql::NDqs { NNodes::TDqPhyStage Stage; }; + struct TGraphMeta {}; + struct TTaskInputMeta { }; @@ -25,5 +27,5 @@ namespace NYql::NDqs { using TTaskOutputType = NYql::NDq::TTaskOutputType; using TTaskInput = NYql::NDq::TTaskInput<TTaskInputMeta>; using TTask = NYql::NDq::TTask<TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>; - using TDqsTasksGraph = NYql::NDq::TDqTasksGraph<TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>; + using TDqsTasksGraph = NYql::NDq::TDqTasksGraph<TGraphMeta, TStageInfoMeta, TTaskMeta, TTaskInputMeta, TTaskOutputMeta>; } |