diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-18 16:16:53 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-18 16:16:53 +0300 |
commit | 62e03f13d707a48ecdbbf7dd4bbd0702511fbc58 (patch) | |
tree | 0ec11e7f2a7ddc3f97be6060586475e45fa6c3b0 | |
parent | 6954d0e0b8d4f8b2afa47e7c18a44d95f4f0ccbc (diff) | |
download | ydb-62e03f13d707a48ecdbbf7dd4bbd0702511fbc58.tar.gz |
metaId for same tasks meta glueing
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_service.cpp | 89 | ||||
-rw-r--r-- | ydb/library/yql/dq/proto/dq_tasks.proto | 1 | ||||
-rw-r--r-- | ydb/library/yql/dq/tasks/dq_tasks_graph.h | 16 |
6 files changed, 76 insertions, 41 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 5724f7f0168..44fbf2508e0 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -99,7 +99,7 @@ void TKqpScanComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, b } void TKqpScanComputeActor::HandleEvWakeup(EEvWakeupTag tag) { - ALS_DEBUG(NKikimrServices::KQP_COMPUTE) << "HandleEvWakeup for " << SelfId(); + AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "HandleEvWakeup")("self_id", SelfId()); switch (tag) { case RlSendAllowedTag: DoExecute(); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 02402e5cf0e..84321327a16 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -333,7 +333,7 @@ private: const auto& table = GetTableKeys().GetTable(stageInfo.Meta.TableId); const auto& keyTypes = table.KeyColumnTypes; - + ui32 metaId = 0; for (auto& op : stage.GetTableOps()) { Y_VERIFY_DEBUG(stageInfo.Meta.TablePath == op.GetTable().GetPath()); @@ -380,6 +380,7 @@ private: for (auto&& pair : nodeShards) { const auto nodeId = pair.first; auto& shardsInfo = pair.second; + const ui32 metaGlueingId = ++metaId; TTaskMeta meta; { for (auto&& shardInfo : shardsInfo) { @@ -395,6 +396,7 @@ private: task.Meta.ExecuterId = SelfId(); task.Meta.NodeId = nodeId; task.Meta.ScanTask = true; + task.SetMetaId(metaGlueingId); } } } else { @@ -404,6 +406,7 @@ private: for (auto&& shardInfo : shardsInfo) { YQL_ENSURE(!shardInfo.KeyWriteRanges); TTaskMeta meta; + const ui32 metaGlueingId = ++metaId; MergeToTaskMeta(meta, shardInfo, readSettings, columns, op); PrepareMetaForUsage(meta, keyTypes); @@ -415,6 +418,7 @@ private: task.Meta.ExecuterId = SelfId(); task.Meta.NodeId = nodeId; task.Meta.ScanTask = true; + task.SetMetaId(metaGlueingId); } } } diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index dde112a4843..226813877be 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -978,6 +978,9 @@ NYql::NDqProto::TDqTask SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, c ActorIdToProto(task.Meta.ExecuterId, result.MutableExecuter()->MutableActorId()); result.SetId(task.Id); result.SetStageId(stageInfo.Id.StageId); + if (task.HasMetaId()) { + result.SetMetaId(task.GetMetaIdUnsafe()); + } for (const auto& [paramName, paramValue] : task.Meta.DqTaskParams) { (*result.MutableTaskParams())[paramName] = paramValue; diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index b53a582987c..173bc638144 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -148,34 +148,33 @@ private: class TComputeStageInfo { private: - YDB_ACCESSOR_DEF(std::optional<TMetaScan>, MetaNonSorted); - YDB_ACCESSOR_DEF(std::deque<TMetaScan>, MetaSorted); - std::map<ui64, TMetaScan*> SortedReadyShardIds; + YDB_ACCESSOR_DEF(std::deque<TMetaScan>, MetaInfo); + std::map<ui32, TMetaScan*> MetaWithIds; public: TComputeStageInfo() = default; - TMetaScan& MergeMetaReads(const NYql::NDqProto::TDqTask& task, const bool forceOneToMany) { - NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta meta; - YQL_ENSURE(task.GetMeta().UnpackTo(&meta), "Invalid task meta for merge: " << task.GetMeta().DebugString()); + bool GetMetaById(const ui32 metaId, NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& result) const { + auto it = MetaWithIds.find(metaId); + if (it == MetaWithIds.end()) { + return false; + } + result = it->second->GetMeta(); + return true; + } + + TMetaScan& MergeMetaReads(const NYql::NDqProto::TDqTask& task, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const bool forceOneToMany) { YQL_ENSURE(meta.ReadsSize(), "unexpected merge with no reads"); - if (forceOneToMany) { - MetaSorted.emplace_back(TMetaScan(meta)); - return MetaSorted.back(); - } else if (meta.GetSorted()) { - Y_VERIFY(meta.GetReads().size() == 1); - auto it = SortedReadyShardIds.find(meta.GetReads()[0].GetShardId()); - if (it == SortedReadyShardIds.end()) { - MetaSorted.emplace_back(TMetaScan(meta)); - it = SortedReadyShardIds.emplace(meta.GetReads()[0].GetShardId(), &MetaSorted.back()).first; - } - return *it->second; + if (forceOneToMany || !task.HasMetaId()) { + MetaInfo.emplace_back(TMetaScan(meta)); + return MetaInfo.back(); } else { - if (!MetaNonSorted) { - MetaNonSorted = TMetaScan(meta); + auto it = MetaWithIds.find(task.GetMetaId()); + if (it == MetaWithIds.end()) { + MetaInfo.emplace_back(TMetaScan(meta)); + return *MetaWithIds.emplace(task.GetMetaId(), &MetaInfo.back()).first->second; } else { - Y_VERIFY(meta.GetReads().size() == MetaNonSorted->GetMeta().GetReads().size()); + return *it->second; } - return *MetaNonSorted; } } }; @@ -192,12 +191,24 @@ private: return Stages.end(); } - TMetaScan& UpsertTaskWithScan(const NYql::NDqProto::TDqTask& dqTask, const bool forceOneToMany) { + bool GetMetaById(const NYql::NDqProto::TDqTask& dqTask, NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& result) const { + if (!dqTask.HasMetaId()) { + return false; + } + auto it = Stages.find(dqTask.GetStageId()); + if (it == Stages.end()) { + return false; + } else { + return it->second.GetMetaById(dqTask.GetMetaId(), result); + } + } + + TMetaScan& UpsertTaskWithScan(const NYql::NDqProto::TDqTask& dqTask, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const bool forceOneToMany) { auto it = Stages.find(dqTask.GetStageId()); if (it == Stages.end()) { it = Stages.emplace(dqTask.GetStageId(), TComputeStageInfo()).first; } - return it->second.MergeMetaReads(dqTask, forceOneToMany); + return it->second.MergeMetaReads(dqTask, meta, forceOneToMany); } }; @@ -380,21 +391,26 @@ private: FinishKqpTask(txId, taskId, success, issues, bucket); }; - ETableKind tableKind = ETableKind::Unknown; - { - NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta meta; - if (dqTask.GetMeta().UnpackTo(&meta)) { - tableKind = (ETableKind)meta.GetTable().GetTableKind(); - if (tableKind == ETableKind::Unknown) { - // For backward compatibility - tableKind = meta.GetTable().GetSysViewInfo().empty() ? ETableKind::Datashard : ETableKind::SysView; - } + NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta meta; + const auto tableKindExtract = [](const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta) { + ETableKind result = (ETableKind)meta.GetTable().GetTableKind(); + if (result == ETableKind::Unknown) { + // For backward compatibility + result = meta.GetTable().GetSysViewInfo().empty() ? ETableKind::Datashard : ETableKind::SysView; } + return result; + }; + ETableKind tableKind = ETableKind::Unknown; + if (dqTask.HasMetaId()) { + YQL_ENSURE(computesByStage.GetMetaById(dqTask, meta) || dqTask.GetMeta().UnpackTo(&meta), "cannot take meta on MetaId exists in tasks"); + tableKind = tableKindExtract(meta); + } else if (dqTask.GetMeta().UnpackTo(&meta)) { + tableKind = tableKindExtract(meta); } IActor* computeActor; if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) { - auto& info = computesByStage.UpsertTaskWithScan(dqTask, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead()); + auto& info = computesByStage.UpsertTaskWithScan(dqTask, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead()); computeActor = CreateKqpScanComputeActor(request.Executer, txId, std::move(dqTask), AsyncIoFactory, AppData()->FunctionRegistry, runtimeSettings, memoryLimits, NWilson::TTraceId(ev->TraceId)); @@ -420,12 +436,7 @@ private: } for (auto&& i : computesByStage) { - for (auto&& m : i.second.MutableMetaSorted()) { - Register(CreateKqpScanFetcher(msg.GetSnapshot(), std::move(m.MutableActorIds()), - m.GetMeta(), runtimeSettingsBase, txId, scanPolicy, Counters, NWilson::TTraceId(ev->TraceId))); - } - if (i.second.GetMetaNonSorted()) { - auto& m = *i.second.MutableMetaNonSorted(); + for (auto&& m : i.second.MutableMetaInfo()) { Register(CreateKqpScanFetcher(msg.GetSnapshot(), std::move(m.MutableActorIds()), m.GetMeta(), runtimeSettingsBase, txId, scanPolicy, Counters, NWilson::TTraceId(ev->TraceId))); } diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto index d04939a2a7c..1930f20d094 100644 --- a/ydb/library/yql/dq/proto/dq_tasks.proto +++ b/ydb/library/yql/dq/proto/dq_tasks.proto @@ -177,4 +177,5 @@ message TDqTask { uint64 InitialTaskMemoryLimit = 12; map<string, bytes> TaskParams = 13; map<string, string> SecureParams = 14; + optional uint32 MetaId = 15; } diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h index 1271821b8e7..dbddcb874db 100644 --- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h +++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h @@ -166,6 +166,9 @@ struct TTaskOutput { template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta> struct TTask { +private: + std::optional<ui32> MetaId; +public: using TInputType = TTaskInput<TInputMeta>; using TOutputType = TTaskOutput<TOutputMeta>; @@ -175,6 +178,19 @@ struct TTask { , Outputs(stageInfo.OutputsCount) { } + bool HasMetaId() const { + return !!MetaId; + } + + void SetMetaId(const ui32 value) { + MetaId = value; + } + + ui32 GetMetaIdUnsafe() const { + Y_VERIFY(MetaId); + return *MetaId; + } + ui64 Id = 0; TStageId StageId; TVector<TInputType> Inputs; |