aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2022-06-01 23:47:26 +0300
committerVitalii Gridnev <gridnevvvit@gmail.com>2022-06-01 23:47:26 +0300
commit746d166a18d80407431565a212317cfc377a3895 (patch)
treeeda7a2e3b264b0ca7d6dcfc625c90f94f1b859fc
parent2a8720240c26b5040c641c301213406b49320a3b (diff)
downloadydb-746d166a18d80407431565a212317cfc377a3895.tar.gz
[kqp] cleanup scan executor code: reuse code of scan tasks builder KIKIMR-14818
ref:b7c91810385d24b1b882f41c4f840df59a246c5e
-rw-r--r--ydb/core/kqp/executer/kqp_scan_executer.cpp171
1 files changed, 56 insertions, 115 deletions
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp
index bd5499c395..a3964f788f 100644
--- a/ydb/core/kqp/executer/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp
@@ -401,26 +401,53 @@ private:
}
};
- void BuildDatashardScanTasks(TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory,
- const NMiniKQL::TTypeEnvironment& typeEnv)
+ static ui32 GetMaxTasksPerNodeEstimate(TStageInfo& stageInfo) {
+ // TODO: take into account number of active scans on node
+ const auto& stage = GetStage(stageInfo);
+ bool heavyProgram = stage.GetProgram().GetSettings().GetHasSort() ||
+ stage.GetProgram().GetSettings().GetHasMapJoin();
+
+ if (heavyProgram) {
+ return 4;
+ } else {
+ return 16;
+ }
+ }
+
+ TTask& AssignTaskToShard(
+ TStageInfo& stageInfo, ui64 shardId,
+ THashMap<ui64, std::vector<ui64>>& nodeTasks,
+ THashMap<ui64, ui64>& assignedShardsCount)
{
- Y_VERIFY_DEBUG(stageInfo.Meta.IsDatashard());
+ ui64 nodeId = ShardIdToNodeId.at(shardId);
+ if (stageInfo.Meta.IsOlap()) {
+ auto& task = TasksGraph.AddTask(stageInfo);
+ task.Meta.NodeId = nodeId;
+ return task;
+ }
- THashMap<ui64, TVector<ui64>> nodeTasks; // nodeId -> [taskId]
+ auto& tasks = nodeTasks[nodeId];
+ auto& cnt = assignedShardsCount[nodeId];
- auto getNodeTask = [&](ui64 nodeId, ui32 taskIdx) -> TTask& {
- auto& tasks = nodeTasks[nodeId];
+ const ui32 maxScansPerNode = GetMaxTasksPerNodeEstimate(stageInfo);
+ if (cnt < maxScansPerNode) {
+ auto& task = TasksGraph.AddTask(stageInfo);
+ task.Meta.NodeId = nodeId;
+ tasks.push_back(task.Id);
+ ++cnt;
+ return task;
+ } else {
+ ui64 taskIdx = cnt % maxScansPerNode;
+ ++cnt;
+ return TasksGraph.GetTask(tasks[taskIdx]);
+ }
+ }
- if (taskIdx < tasks.size()) {
- return TasksGraph.GetTask(tasks[taskIdx]);
- } else {
- Y_ASSERT(taskIdx == tasks.size());
- auto& task = TasksGraph.AddTask(stageInfo);
- task.Meta.NodeId = nodeId;
- tasks.emplace_back(task.Id);
- return task;
- }
- };
+ void BuildScanTasks(TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory,
+ const NMiniKQL::TTypeEnvironment& typeEnv)
+ {
+ THashMap<ui64, std::vector<ui64>> nodeTasks;
+ THashMap<ui64, ui64> assignedShardsCount;
auto& stage = GetStage(stageInfo);
@@ -449,20 +476,16 @@ private:
stageInfo.Meta.SkipNullKeys.assign(op.GetReadRange().GetSkipNullKeys().begin(),
op.GetReadRange().GetSkipNullKeys().end());
+ } else if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
+ reverse = op.GetReadOlapRange().GetReverse();
+ ExtractItemsLimit(stageInfo, op.GetReadOlapRange().GetItemsLimit(), holderFactory, typeEnv,
+ itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType);
}
- // TODO: take into account number of active scans on node
- bool heavyProgram = stage.GetProgram().GetSettings().GetHasSort() ||
- stage.GetProgram().GetSettings().GetHasMapJoin();
- const ui32 maxScansPerNode = heavyProgram ? 4 : 16;
- THashMap<ui64, ui32> nTasksOnNodes; // nodeId -> tasks count
-
for (auto& [shardId, shardInfo] : partitions) {
YQL_ENSURE(!shardInfo.KeyWriteRanges);
- ui64 nodeId = ShardIdToNodeId.at(shardId);
- ui32 nTasksOnNode = nTasksOnNodes[nodeId]++;
- auto& task = getNodeTask(nodeId, nTasksOnNode % maxScansPerNode);
+ auto& task = AssignTaskToShard(stageInfo, shardId, nodeTasks, assignedShardsCount);
for (auto& [name, value] : shardInfo.Params) {
auto ret = task.Meta.Params.emplace(name, std::move(value));
@@ -484,7 +507,12 @@ private:
task.Meta.ParamTypes.emplace(itemsLimitParamName, itemsLimitType);
}
- FillReadInfo(task.Meta, itemsLimit, reverse, TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>());
+ if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
+ const auto& readRange = op.GetReadOlapRange();
+ FillReadInfo(task.Meta, itemsLimit, reverse, readRange);
+ } else {
+ FillReadInfo(task.Meta, itemsLimit, reverse, TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>());
+ }
if (!task.Meta.Reads) {
task.Meta.Reads.ConstructInPlace();
@@ -532,91 +560,6 @@ private:
}
}
- // Creates scan tasks for reading OLAP table range
- void BuildColumnshardScanTasks(TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory,
- const NMiniKQL::TTypeEnvironment& typeEnv)
- {
- YQL_ENSURE(stageInfo.Meta.TableKind == ETableKind::Olap);
- auto& stage = GetStage(stageInfo);
-
- const auto& table = TableKeys.GetTable(stageInfo.Meta.TableId);
- const auto& keyTypes = table.KeyColumnTypes;
-
- TMap<ui64, TKeyDesc::TPartitionRangeInfo> shard2range;
- for (const auto& part: stageInfo.Meta.ShardKey->Partitions) {
- shard2range[part.ShardId] = part.Range.GetRef();
- }
-
- ui64 taskCount = 0;
-
- for (auto& op : stage.GetTableOps()) {
- Y_VERIFY_DEBUG(stageInfo.Meta.TablePath == op.GetTable().GetPath());
-
- auto columns = BuildKqpColumns(op, table);
-
- YQL_ENSURE(
- op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange,
- "Unexpected OLAP table scan operation: " << (ui32) op.GetTypeCase()
- );
-
- const auto& readRange = op.GetReadOlapRange();
-
- auto allShards = PrunePartitions(TableKeys, op, stageInfo, holderFactory, typeEnv);
-
- bool reverse = readRange.GetReverse();
- ui64 itemsLimit = 0;
- TString itemsLimitParamName;
- NDqProto::TData itemsLimitBytes;
- NKikimr::NMiniKQL::TType* itemsLimitType;
-
- ExtractItemsLimit(stageInfo, op.GetReadOlapRange().GetItemsLimit(), holderFactory, typeEnv,
- itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType);
-
- for (auto& [shardId, shardInfo] : allShards) {
- YQL_ENSURE(!shardInfo.KeyWriteRanges);
-
- if (shardInfo.KeyReadRanges->GetRanges().empty()) {
- continue;
- }
-
- ui64 nodeId = ShardIdToNodeId.at(shardId);
- auto& task = TasksGraph.AddTask(stageInfo);
- task.Meta.NodeId = nodeId;
- ++taskCount;
-
- for (auto& [name, value] : shardInfo.Params) {
- auto ret = task.Meta.Params.emplace(name, std::move(value));
- YQL_ENSURE(ret.second);
- auto typeIterator = shardInfo.ParamTypes.find(name);
- YQL_ENSURE(typeIterator != shardInfo.ParamTypes.end());
- auto retType = task.Meta.ParamTypes.emplace(name, typeIterator->second);
- YQL_ENSURE(retType.second);
- }
-
- TTaskMeta::TShardReadInfo readInfo = {
- .Ranges = std::move(*shardInfo.KeyReadRanges),
- .Columns = columns,
- .ShardId = shardId,
- };
-
- FillReadInfo(task.Meta, itemsLimit, reverse, readRange);
-
- if (itemsLimitParamName) {
- task.Meta.Params.emplace(itemsLimitParamName, itemsLimitBytes);
- task.Meta.ParamTypes.emplace(itemsLimitParamName, itemsLimitType);
- }
-
- task.Meta.Reads.ConstructInPlace();
- task.Meta.Reads->emplace_back(std::move(readInfo));
-
- LOG_D("Stage " << stageInfo.Id << " create columnshard scan task at node: " << nodeId
- << ", meta: " << task.Meta.ToString(keyTypes, *AppData()->TypeRegistry));
- }
- }
-
- LOG_D("Stage " << stageInfo.Id << " will be executed using " << taskCount << " tasks.");
- }
-
void BuildComputeTasks(TStageInfo& stageInfo) {
auto& stage = GetStage(stageInfo);
@@ -694,10 +637,8 @@ private:
BuildComputeTasks(stageInfo);
} else if (stageInfo.Meta.IsSysView()) {
BuildSysViewScanTasks(stageInfo, holderFactory, typeEnv);
- } else if (stageInfo.Meta.IsOlap()) {
- BuildColumnshardScanTasks(stageInfo, holderFactory, typeEnv);
- } else if (stageInfo.Meta.IsDatashard()) {
- BuildDatashardScanTasks(stageInfo, holderFactory, typeEnv);
+ } else if (stageInfo.Meta.IsOlap() || stageInfo.Meta.IsDatashard()) {
+ BuildScanTasks(stageInfo, holderFactory, typeEnv);
} else {
YQL_ENSURE(false, "Unexpected stage type " << (int) stageInfo.Meta.TableKind);
}