diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-01 23:47:26 +0300 |
---|---|---|
committer | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-01 23:47:26 +0300 |
commit | 746d166a18d80407431565a212317cfc377a3895 (patch) | |
tree | eda7a2e3b264b0ca7d6dcfc625c90f94f1b859fc | |
parent | 2a8720240c26b5040c641c301213406b49320a3b (diff) | |
download | ydb-746d166a18d80407431565a212317cfc377a3895.tar.gz |
[kqp] cleanup scan executor code: reuse code of scan tasks builder KIKIMR-14818
ref:b7c91810385d24b1b882f41c4f840df59a246c5e
-rw-r--r-- | ydb/core/kqp/executer/kqp_scan_executer.cpp | 171 |
1 files changed, 56 insertions, 115 deletions
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp index bd5499c395..a3964f788f 100644 --- a/ydb/core/kqp/executer/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp @@ -401,26 +401,53 @@ private: } }; - void BuildDatashardScanTasks(TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, - const NMiniKQL::TTypeEnvironment& typeEnv) + static ui32 GetMaxTasksPerNodeEstimate(TStageInfo& stageInfo) { + // TODO: take into account number of active scans on node + const auto& stage = GetStage(stageInfo); + bool heavyProgram = stage.GetProgram().GetSettings().GetHasSort() || + stage.GetProgram().GetSettings().GetHasMapJoin(); + + if (heavyProgram) { + return 4; + } else { + return 16; + } + } + + TTask& AssignTaskToShard( + TStageInfo& stageInfo, ui64 shardId, + THashMap<ui64, std::vector<ui64>>& nodeTasks, + THashMap<ui64, ui64>& assignedShardsCount) { - Y_VERIFY_DEBUG(stageInfo.Meta.IsDatashard()); + ui64 nodeId = ShardIdToNodeId.at(shardId); + if (stageInfo.Meta.IsOlap()) { + auto& task = TasksGraph.AddTask(stageInfo); + task.Meta.NodeId = nodeId; + return task; + } - THashMap<ui64, TVector<ui64>> nodeTasks; // nodeId -> [taskId] + auto& tasks = nodeTasks[nodeId]; + auto& cnt = assignedShardsCount[nodeId]; - auto getNodeTask = [&](ui64 nodeId, ui32 taskIdx) -> TTask& { - auto& tasks = nodeTasks[nodeId]; + const ui32 maxScansPerNode = GetMaxTasksPerNodeEstimate(stageInfo); + if (cnt < maxScansPerNode) { + auto& task = TasksGraph.AddTask(stageInfo); + task.Meta.NodeId = nodeId; + tasks.push_back(task.Id); + ++cnt; + return task; + } else { + ui64 taskIdx = cnt % maxScansPerNode; + ++cnt; + return TasksGraph.GetTask(tasks[taskIdx]); + } + } - if (taskIdx < tasks.size()) { - return TasksGraph.GetTask(tasks[taskIdx]); - } else { - Y_ASSERT(taskIdx == tasks.size()); - auto& task = TasksGraph.AddTask(stageInfo); - task.Meta.NodeId = nodeId; - tasks.emplace_back(task.Id); - return task; - } - }; + void BuildScanTasks(TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, + const NMiniKQL::TTypeEnvironment& typeEnv) + { + THashMap<ui64, std::vector<ui64>> nodeTasks; + THashMap<ui64, ui64> assignedShardsCount; auto& stage = GetStage(stageInfo); @@ -449,20 +476,16 @@ private: stageInfo.Meta.SkipNullKeys.assign(op.GetReadRange().GetSkipNullKeys().begin(), op.GetReadRange().GetSkipNullKeys().end()); + } else if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) { + reverse = op.GetReadOlapRange().GetReverse(); + ExtractItemsLimit(stageInfo, op.GetReadOlapRange().GetItemsLimit(), holderFactory, typeEnv, + itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType); } - // TODO: take into account number of active scans on node - bool heavyProgram = stage.GetProgram().GetSettings().GetHasSort() || - stage.GetProgram().GetSettings().GetHasMapJoin(); - const ui32 maxScansPerNode = heavyProgram ? 4 : 16; - THashMap<ui64, ui32> nTasksOnNodes; // nodeId -> tasks count - for (auto& [shardId, shardInfo] : partitions) { YQL_ENSURE(!shardInfo.KeyWriteRanges); - ui64 nodeId = ShardIdToNodeId.at(shardId); - ui32 nTasksOnNode = nTasksOnNodes[nodeId]++; - auto& task = getNodeTask(nodeId, nTasksOnNode % maxScansPerNode); + auto& task = AssignTaskToShard(stageInfo, shardId, nodeTasks, assignedShardsCount); for (auto& [name, value] : shardInfo.Params) { auto ret = task.Meta.Params.emplace(name, std::move(value)); @@ -484,7 +507,12 @@ private: task.Meta.ParamTypes.emplace(itemsLimitParamName, itemsLimitType); } - FillReadInfo(task.Meta, itemsLimit, reverse, TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>()); + if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) { + const auto& readRange = op.GetReadOlapRange(); + FillReadInfo(task.Meta, itemsLimit, reverse, readRange); + } else { + FillReadInfo(task.Meta, itemsLimit, reverse, TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>()); + } if (!task.Meta.Reads) { task.Meta.Reads.ConstructInPlace(); @@ -532,91 +560,6 @@ private: } } - // Creates scan tasks for reading OLAP table range - void BuildColumnshardScanTasks(TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, - const NMiniKQL::TTypeEnvironment& typeEnv) - { - YQL_ENSURE(stageInfo.Meta.TableKind == ETableKind::Olap); - auto& stage = GetStage(stageInfo); - - const auto& table = TableKeys.GetTable(stageInfo.Meta.TableId); - const auto& keyTypes = table.KeyColumnTypes; - - TMap<ui64, TKeyDesc::TPartitionRangeInfo> shard2range; - for (const auto& part: stageInfo.Meta.ShardKey->Partitions) { - shard2range[part.ShardId] = part.Range.GetRef(); - } - - ui64 taskCount = 0; - - for (auto& op : stage.GetTableOps()) { - Y_VERIFY_DEBUG(stageInfo.Meta.TablePath == op.GetTable().GetPath()); - - auto columns = BuildKqpColumns(op, table); - - YQL_ENSURE( - op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange, - "Unexpected OLAP table scan operation: " << (ui32) op.GetTypeCase() - ); - - const auto& readRange = op.GetReadOlapRange(); - - auto allShards = PrunePartitions(TableKeys, op, stageInfo, holderFactory, typeEnv); - - bool reverse = readRange.GetReverse(); - ui64 itemsLimit = 0; - TString itemsLimitParamName; - NDqProto::TData itemsLimitBytes; - NKikimr::NMiniKQL::TType* itemsLimitType; - - ExtractItemsLimit(stageInfo, op.GetReadOlapRange().GetItemsLimit(), holderFactory, typeEnv, - itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType); - - for (auto& [shardId, shardInfo] : allShards) { - YQL_ENSURE(!shardInfo.KeyWriteRanges); - - if (shardInfo.KeyReadRanges->GetRanges().empty()) { - continue; - } - - ui64 nodeId = ShardIdToNodeId.at(shardId); - auto& task = TasksGraph.AddTask(stageInfo); - task.Meta.NodeId = nodeId; - ++taskCount; - - for (auto& [name, value] : shardInfo.Params) { - auto ret = task.Meta.Params.emplace(name, std::move(value)); - YQL_ENSURE(ret.second); - auto typeIterator = shardInfo.ParamTypes.find(name); - YQL_ENSURE(typeIterator != shardInfo.ParamTypes.end()); - auto retType = task.Meta.ParamTypes.emplace(name, typeIterator->second); - YQL_ENSURE(retType.second); - } - - TTaskMeta::TShardReadInfo readInfo = { - .Ranges = std::move(*shardInfo.KeyReadRanges), - .Columns = columns, - .ShardId = shardId, - }; - - FillReadInfo(task.Meta, itemsLimit, reverse, readRange); - - if (itemsLimitParamName) { - task.Meta.Params.emplace(itemsLimitParamName, itemsLimitBytes); - task.Meta.ParamTypes.emplace(itemsLimitParamName, itemsLimitType); - } - - task.Meta.Reads.ConstructInPlace(); - task.Meta.Reads->emplace_back(std::move(readInfo)); - - LOG_D("Stage " << stageInfo.Id << " create columnshard scan task at node: " << nodeId - << ", meta: " << task.Meta.ToString(keyTypes, *AppData()->TypeRegistry)); - } - } - - LOG_D("Stage " << stageInfo.Id << " will be executed using " << taskCount << " tasks."); - } - void BuildComputeTasks(TStageInfo& stageInfo) { auto& stage = GetStage(stageInfo); @@ -694,10 +637,8 @@ private: BuildComputeTasks(stageInfo); } else if (stageInfo.Meta.IsSysView()) { BuildSysViewScanTasks(stageInfo, holderFactory, typeEnv); - } else if (stageInfo.Meta.IsOlap()) { - BuildColumnshardScanTasks(stageInfo, holderFactory, typeEnv); - } else if (stageInfo.Meta.IsDatashard()) { - BuildDatashardScanTasks(stageInfo, holderFactory, typeEnv); + } else if (stageInfo.Meta.IsOlap() || stageInfo.Meta.IsDatashard()) { + BuildScanTasks(stageInfo, holderFactory, typeEnv); } else { YQL_ENSURE(false, "Unexpected stage type " << (int) stageInfo.Meta.TableKind); } |