aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-04-18 16:16:53 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-04-18 16:16:53 +0300
commit62e03f13d707a48ecdbbf7dd4bbd0702511fbc58 (patch)
tree0ec11e7f2a7ddc3f97be6060586475e45fa6c3b0
parent6954d0e0b8d4f8b2afa47e7c18a44d95f4f0ccbc (diff)
downloadydb-62e03f13d707a48ecdbbf7dd4bbd0702511fbc58.tar.gz
metaId for same tasks meta glueing
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp6
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp3
-rw-r--r--ydb/core/kqp/node_service/kqp_node_service.cpp89
-rw-r--r--ydb/library/yql/dq/proto/dq_tasks.proto1
-rw-r--r--ydb/library/yql/dq/tasks/dq_tasks_graph.h16
6 files changed, 76 insertions, 41 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 5724f7f0168..44fbf2508e0 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -99,7 +99,7 @@ void TKqpScanComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, b
}
void TKqpScanComputeActor::HandleEvWakeup(EEvWakeupTag tag) {
- ALS_DEBUG(NKikimrServices::KQP_COMPUTE) << "HandleEvWakeup for " << SelfId();
+ AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "HandleEvWakeup")("self_id", SelfId());
switch (tag) {
case RlSendAllowedTag:
DoExecute();
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 02402e5cf0e..84321327a16 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -333,7 +333,7 @@ private:
const auto& table = GetTableKeys().GetTable(stageInfo.Meta.TableId);
const auto& keyTypes = table.KeyColumnTypes;
-
+ ui32 metaId = 0;
for (auto& op : stage.GetTableOps()) {
Y_VERIFY_DEBUG(stageInfo.Meta.TablePath == op.GetTable().GetPath());
@@ -380,6 +380,7 @@ private:
for (auto&& pair : nodeShards) {
const auto nodeId = pair.first;
auto& shardsInfo = pair.second;
+ const ui32 metaGlueingId = ++metaId;
TTaskMeta meta;
{
for (auto&& shardInfo : shardsInfo) {
@@ -395,6 +396,7 @@ private:
task.Meta.ExecuterId = SelfId();
task.Meta.NodeId = nodeId;
task.Meta.ScanTask = true;
+ task.SetMetaId(metaGlueingId);
}
}
} else {
@@ -404,6 +406,7 @@ private:
for (auto&& shardInfo : shardsInfo) {
YQL_ENSURE(!shardInfo.KeyWriteRanges);
TTaskMeta meta;
+ const ui32 metaGlueingId = ++metaId;
MergeToTaskMeta(meta, shardInfo, readSettings, columns, op);
PrepareMetaForUsage(meta, keyTypes);
@@ -415,6 +418,7 @@ private:
task.Meta.ExecuterId = SelfId();
task.Meta.NodeId = nodeId;
task.Meta.ScanTask = true;
+ task.SetMetaId(metaGlueingId);
}
}
}
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index dde112a4843..226813877be 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -978,6 +978,9 @@ NYql::NDqProto::TDqTask SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, c
ActorIdToProto(task.Meta.ExecuterId, result.MutableExecuter()->MutableActorId());
result.SetId(task.Id);
result.SetStageId(stageInfo.Id.StageId);
+ if (task.HasMetaId()) {
+ result.SetMetaId(task.GetMetaIdUnsafe());
+ }
for (const auto& [paramName, paramValue] : task.Meta.DqTaskParams) {
(*result.MutableTaskParams())[paramName] = paramValue;
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp
index b53a582987c..173bc638144 100644
--- a/ydb/core/kqp/node_service/kqp_node_service.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_service.cpp
@@ -148,34 +148,33 @@ private:
class TComputeStageInfo {
private:
- YDB_ACCESSOR_DEF(std::optional<TMetaScan>, MetaNonSorted);
- YDB_ACCESSOR_DEF(std::deque<TMetaScan>, MetaSorted);
- std::map<ui64, TMetaScan*> SortedReadyShardIds;
+ YDB_ACCESSOR_DEF(std::deque<TMetaScan>, MetaInfo);
+ std::map<ui32, TMetaScan*> MetaWithIds;
public:
TComputeStageInfo() = default;
- TMetaScan& MergeMetaReads(const NYql::NDqProto::TDqTask& task, const bool forceOneToMany) {
- NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta meta;
- YQL_ENSURE(task.GetMeta().UnpackTo(&meta), "Invalid task meta for merge: " << task.GetMeta().DebugString());
+ bool GetMetaById(const ui32 metaId, NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& result) const {
+ auto it = MetaWithIds.find(metaId);
+ if (it == MetaWithIds.end()) {
+ return false;
+ }
+ result = it->second->GetMeta();
+ return true;
+ }
+
+ TMetaScan& MergeMetaReads(const NYql::NDqProto::TDqTask& task, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const bool forceOneToMany) {
YQL_ENSURE(meta.ReadsSize(), "unexpected merge with no reads");
- if (forceOneToMany) {
- MetaSorted.emplace_back(TMetaScan(meta));
- return MetaSorted.back();
- } else if (meta.GetSorted()) {
- Y_VERIFY(meta.GetReads().size() == 1);
- auto it = SortedReadyShardIds.find(meta.GetReads()[0].GetShardId());
- if (it == SortedReadyShardIds.end()) {
- MetaSorted.emplace_back(TMetaScan(meta));
- it = SortedReadyShardIds.emplace(meta.GetReads()[0].GetShardId(), &MetaSorted.back()).first;
- }
- return *it->second;
+ if (forceOneToMany || !task.HasMetaId()) {
+ MetaInfo.emplace_back(TMetaScan(meta));
+ return MetaInfo.back();
} else {
- if (!MetaNonSorted) {
- MetaNonSorted = TMetaScan(meta);
+ auto it = MetaWithIds.find(task.GetMetaId());
+ if (it == MetaWithIds.end()) {
+ MetaInfo.emplace_back(TMetaScan(meta));
+ return *MetaWithIds.emplace(task.GetMetaId(), &MetaInfo.back()).first->second;
} else {
- Y_VERIFY(meta.GetReads().size() == MetaNonSorted->GetMeta().GetReads().size());
+ return *it->second;
}
- return *MetaNonSorted;
}
}
};
@@ -192,12 +191,24 @@ private:
return Stages.end();
}
- TMetaScan& UpsertTaskWithScan(const NYql::NDqProto::TDqTask& dqTask, const bool forceOneToMany) {
+ bool GetMetaById(const NYql::NDqProto::TDqTask& dqTask, NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& result) const {
+ if (!dqTask.HasMetaId()) {
+ return false;
+ }
+ auto it = Stages.find(dqTask.GetStageId());
+ if (it == Stages.end()) {
+ return false;
+ } else {
+ return it->second.GetMetaById(dqTask.GetMetaId(), result);
+ }
+ }
+
+ TMetaScan& UpsertTaskWithScan(const NYql::NDqProto::TDqTask& dqTask, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const bool forceOneToMany) {
auto it = Stages.find(dqTask.GetStageId());
if (it == Stages.end()) {
it = Stages.emplace(dqTask.GetStageId(), TComputeStageInfo()).first;
}
- return it->second.MergeMetaReads(dqTask, forceOneToMany);
+ return it->second.MergeMetaReads(dqTask, meta, forceOneToMany);
}
};
@@ -380,21 +391,26 @@ private:
FinishKqpTask(txId, taskId, success, issues, bucket);
};
- ETableKind tableKind = ETableKind::Unknown;
- {
- NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta meta;
- if (dqTask.GetMeta().UnpackTo(&meta)) {
- tableKind = (ETableKind)meta.GetTable().GetTableKind();
- if (tableKind == ETableKind::Unknown) {
- // For backward compatibility
- tableKind = meta.GetTable().GetSysViewInfo().empty() ? ETableKind::Datashard : ETableKind::SysView;
- }
+ NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta meta;
+ const auto tableKindExtract = [](const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta) {
+ ETableKind result = (ETableKind)meta.GetTable().GetTableKind();
+ if (result == ETableKind::Unknown) {
+ // For backward compatibility
+ result = meta.GetTable().GetSysViewInfo().empty() ? ETableKind::Datashard : ETableKind::SysView;
}
+ return result;
+ };
+ ETableKind tableKind = ETableKind::Unknown;
+ if (dqTask.HasMetaId()) {
+ YQL_ENSURE(computesByStage.GetMetaById(dqTask, meta) || dqTask.GetMeta().UnpackTo(&meta), "cannot take meta on MetaId exists in tasks");
+ tableKind = tableKindExtract(meta);
+ } else if (dqTask.GetMeta().UnpackTo(&meta)) {
+ tableKind = tableKindExtract(meta);
}
IActor* computeActor;
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
- auto& info = computesByStage.UpsertTaskWithScan(dqTask, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
+ auto& info = computesByStage.UpsertTaskWithScan(dqTask, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
computeActor = CreateKqpScanComputeActor(request.Executer, txId, std::move(dqTask),
AsyncIoFactory, AppData()->FunctionRegistry, runtimeSettings, memoryLimits,
NWilson::TTraceId(ev->TraceId));
@@ -420,12 +436,7 @@ private:
}
for (auto&& i : computesByStage) {
- for (auto&& m : i.second.MutableMetaSorted()) {
- Register(CreateKqpScanFetcher(msg.GetSnapshot(), std::move(m.MutableActorIds()),
- m.GetMeta(), runtimeSettingsBase, txId, scanPolicy, Counters, NWilson::TTraceId(ev->TraceId)));
- }
- if (i.second.GetMetaNonSorted()) {
- auto& m = *i.second.MutableMetaNonSorted();
+ for (auto&& m : i.second.MutableMetaInfo()) {
Register(CreateKqpScanFetcher(msg.GetSnapshot(), std::move(m.MutableActorIds()),
m.GetMeta(), runtimeSettingsBase, txId, scanPolicy, Counters, NWilson::TTraceId(ev->TraceId)));
}
diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto
index d04939a2a7c..1930f20d094 100644
--- a/ydb/library/yql/dq/proto/dq_tasks.proto
+++ b/ydb/library/yql/dq/proto/dq_tasks.proto
@@ -177,4 +177,5 @@ message TDqTask {
uint64 InitialTaskMemoryLimit = 12;
map<string, bytes> TaskParams = 13;
map<string, string> SecureParams = 14;
+ optional uint32 MetaId = 15;
}
diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h
index 1271821b8e7..dbddcb874db 100644
--- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h
+++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h
@@ -166,6 +166,9 @@ struct TTaskOutput {
template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta>
struct TTask {
+private:
+ std::optional<ui32> MetaId;
+public:
using TInputType = TTaskInput<TInputMeta>;
using TOutputType = TTaskOutput<TOutputMeta>;
@@ -175,6 +178,19 @@ struct TTask {
, Outputs(stageInfo.OutputsCount) {
}
+ bool HasMetaId() const {
+ return !!MetaId;
+ }
+
+ void SetMetaId(const ui32 value) {
+ MetaId = value;
+ }
+
+ ui32 GetMetaIdUnsafe() const {
+ Y_VERIFY(MetaId);
+ return *MetaId;
+ }
+
ui64 Id = 0;
TStageId StageId;
TVector<TInputType> Inputs;