diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-06 11:49:47 +0300 |
---|---|---|
committer | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-06 11:49:47 +0300 |
commit | a71c6e60fb06d32db0a73dcd1fc58e57b0d1dfc4 (patch) | |
tree | 6ad2a19032b847e8b35aa0bc4c112c8d7efa7737 | |
parent | 1e2646c24f77e641a759006867b0b60d17747f97 (diff) | |
download | ydb-a71c6e60fb06d32db0a73dcd1fc58e57b0d1dfc4.tar.gz |
[kqp] allow to schedule multiple shards in the single scan task
ref:0504d350becab9b719ba76a65dfac235827b4675
-rw-r--r-- | ydb/core/kqp/common/kqp_yql.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_scan_executer.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_tasks_validate.cpp | 2 |
3 files changed, 7 insertions, 7 deletions
diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp index e98b1ea63c1..7f76bfd8b84 100644 --- a/ydb/core/kqp/common/kqp_yql.cpp +++ b/ydb/core/kqp/common/kqp_yql.cpp @@ -147,6 +147,7 @@ TKqpReadTableSettings ParseInternal(const TKqlReadOperation& node) { settings.Reverse = true; } else if (name == TKqpReadTableSettings::SortedSettingName) { YQL_ENSURE(tuple.Ref().ChildrenSize() == 1); + settings.Sorted = true; } else { YQL_ENSURE(false, "Unknown KqpReadTable setting name '" << name << "'"); } diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp index 02c5db30640..01d1d911bb7 100644 --- a/ydb/core/kqp/executer/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp @@ -417,10 +417,11 @@ private: TTask& AssignTaskToShard( TStageInfo& stageInfo, ui64 shardId, THashMap<ui64, std::vector<ui64>>& nodeTasks, - THashMap<ui64, ui64>& assignedShardsCount) + THashMap<ui64, ui64>& assignedShardsCount, + bool sorted) { ui64 nodeId = ShardIdToNodeId.at(shardId); - if (stageInfo.Meta.IsOlap()) { + if (stageInfo.Meta.IsOlap() && sorted) { auto& task = TasksGraph.AddTask(stageInfo); task.Meta.NodeId = nodeId; return task; @@ -462,7 +463,7 @@ private: bool reverse = false; ui64 itemsLimit = 0; - bool sortedScanFlag = true; + bool sorted = true; TString itemsLimitParamName; NDqProto::TData itemsLimitBytes; NKikimr::NMiniKQL::TType* itemsLimitType = nullptr; @@ -478,7 +479,7 @@ private: stageInfo.Meta.SkipNullKeys.assign(op.GetReadRange().GetSkipNullKeys().begin(), op.GetReadRange().GetSkipNullKeys().end()); } else if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) { - sortedScanFlag = op.GetReadOlapRange().GetSorted(); + sorted = op.GetReadOlapRange().GetSorted(); reverse = op.GetReadOlapRange().GetReverse(); ExtractItemsLimit(stageInfo, op.GetReadOlapRange().GetItemsLimit(), holderFactory, typeEnv, itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType); @@ -487,7 +488,7 @@ private: for (auto& [shardId, shardInfo] : partitions) { YQL_ENSURE(!shardInfo.KeyWriteRanges); - auto& task = AssignTaskToShard(stageInfo, shardId, nodeTasks, assignedShardsCount); + auto& task = AssignTaskToShard(stageInfo, shardId, nodeTasks, assignedShardsCount, sorted); for (auto& [name, value] : shardInfo.Params) { auto ret = task.Meta.Params.emplace(name, std::move(value)); diff --git a/ydb/core/kqp/executer/kqp_tasks_validate.cpp b/ydb/core/kqp/executer/kqp_tasks_validate.cpp index dea7056ff09..955c68ef5b4 100644 --- a/ydb/core/kqp/executer/kqp_tasks_validate.cpp +++ b/ydb/core/kqp/executer/kqp_tasks_validate.cpp @@ -76,8 +76,6 @@ private: const auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); if (stageInfo.Meta.TableKind == ETableKind::Olap) { - YQL_ENSURE(task.Meta.Reads->size() == 1, - "OLAP scan task must read exactly 1 range from 1 columnshard"); YQL_ENSURE(!task.Meta.Writes, "OLAP writes are not supported yet"); } } |