aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2022-06-06 11:49:47 +0300
committerVitalii Gridnev <gridnevvvit@gmail.com>2022-06-06 11:49:47 +0300
commita71c6e60fb06d32db0a73dcd1fc58e57b0d1dfc4 (patch)
tree6ad2a19032b847e8b35aa0bc4c112c8d7efa7737
parent1e2646c24f77e641a759006867b0b60d17747f97 (diff)
downloadydb-a71c6e60fb06d32db0a73dcd1fc58e57b0d1dfc4.tar.gz
[kqp] allow to schedule multiple shards in the single scan task
ref:0504d350becab9b719ba76a65dfac235827b4675
-rw-r--r--ydb/core/kqp/common/kqp_yql.cpp1
-rw-r--r--ydb/core/kqp/executer/kqp_scan_executer.cpp11
-rw-r--r--ydb/core/kqp/executer/kqp_tasks_validate.cpp2
3 files changed, 7 insertions, 7 deletions
diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp
index e98b1ea63c1..7f76bfd8b84 100644
--- a/ydb/core/kqp/common/kqp_yql.cpp
+++ b/ydb/core/kqp/common/kqp_yql.cpp
@@ -147,6 +147,7 @@ TKqpReadTableSettings ParseInternal(const TKqlReadOperation& node) {
settings.Reverse = true;
} else if (name == TKqpReadTableSettings::SortedSettingName) {
YQL_ENSURE(tuple.Ref().ChildrenSize() == 1);
+ settings.Sorted = true;
} else {
YQL_ENSURE(false, "Unknown KqpReadTable setting name '" << name << "'");
}
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp
index 02c5db30640..01d1d911bb7 100644
--- a/ydb/core/kqp/executer/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp
@@ -417,10 +417,11 @@ private:
TTask& AssignTaskToShard(
TStageInfo& stageInfo, ui64 shardId,
THashMap<ui64, std::vector<ui64>>& nodeTasks,
- THashMap<ui64, ui64>& assignedShardsCount)
+ THashMap<ui64, ui64>& assignedShardsCount,
+ bool sorted)
{
ui64 nodeId = ShardIdToNodeId.at(shardId);
- if (stageInfo.Meta.IsOlap()) {
+ if (stageInfo.Meta.IsOlap() && sorted) {
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.NodeId = nodeId;
return task;
@@ -462,7 +463,7 @@ private:
bool reverse = false;
ui64 itemsLimit = 0;
- bool sortedScanFlag = true;
+ bool sorted = true;
TString itemsLimitParamName;
NDqProto::TData itemsLimitBytes;
NKikimr::NMiniKQL::TType* itemsLimitType = nullptr;
@@ -478,7 +479,7 @@ private:
stageInfo.Meta.SkipNullKeys.assign(op.GetReadRange().GetSkipNullKeys().begin(),
op.GetReadRange().GetSkipNullKeys().end());
} else if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
- sortedScanFlag = op.GetReadOlapRange().GetSorted();
+ sorted = op.GetReadOlapRange().GetSorted();
reverse = op.GetReadOlapRange().GetReverse();
ExtractItemsLimit(stageInfo, op.GetReadOlapRange().GetItemsLimit(), holderFactory, typeEnv,
itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType);
@@ -487,7 +488,7 @@ private:
for (auto& [shardId, shardInfo] : partitions) {
YQL_ENSURE(!shardInfo.KeyWriteRanges);
- auto& task = AssignTaskToShard(stageInfo, shardId, nodeTasks, assignedShardsCount);
+ auto& task = AssignTaskToShard(stageInfo, shardId, nodeTasks, assignedShardsCount, sorted);
for (auto& [name, value] : shardInfo.Params) {
auto ret = task.Meta.Params.emplace(name, std::move(value));
diff --git a/ydb/core/kqp/executer/kqp_tasks_validate.cpp b/ydb/core/kqp/executer/kqp_tasks_validate.cpp
index dea7056ff09..955c68ef5b4 100644
--- a/ydb/core/kqp/executer/kqp_tasks_validate.cpp
+++ b/ydb/core/kqp/executer/kqp_tasks_validate.cpp
@@ -76,8 +76,6 @@ private:
const auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
if (stageInfo.Meta.TableKind == ETableKind::Olap) {
- YQL_ENSURE(task.Meta.Reads->size() == 1,
- "OLAP scan task must read exactly 1 range from 1 columnshard");
YQL_ENSURE(!task.Meta.Writes, "OLAP writes are not supported yet");
}
}