aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-04-19 16:35:05 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-04-19 16:35:05 +0300
commit555965601de2e981b0afe718d56ea5704d5bbd44 (patch)
tree5d7ffe844657b034e607fc83c61aec51c2c5e873
parentbd27a4544747a6b44dfaa9e41b84239975db9b7e (diff)
downloadydb-555965601de2e981b0afe718d56ea5704d5bbd44.tar.gz
activate independent usage compute tasks and scan tasks for column shards in case primary key sorting
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.cpp4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp40
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.h3
-rw-r--r--ydb/core/protos/tx_datashard.proto2
5 files changed, 18 insertions, 35 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
index 272d01f8cf5..0ecc4df768a 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
@@ -65,7 +65,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(TIntrusivePtr<TKqpCou
void TShardsScanningPolicy::FillRequestScanFeatures(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta,
ui32& maxInFlight, bool& isAggregationRequest) const {
- const bool isSorted = (meta.HasSorted() ? meta.GetSorted() : true);
+ const bool enableShardsSequentialScan = (meta.HasEnableShardsSequentialScan() ? meta.GetEnableShardsSequentialScan() : true);
isAggregationRequest = false;
maxInFlight = 1;
@@ -87,7 +87,7 @@ void TShardsScanningPolicy::FillRequestScanFeatures(const NKikimrTxDataShard::TK
}
}
isAggregationRequest = hasGroupByWithFields || hasGroupByWithNoFields;
- if (isSorted) {
+ if (enableShardsSequentialScan) {
maxInFlight = 1;
} else if (hasGroupByWithFields) {
maxInFlight = ProtoConfig.GetAggregationGroupByLimit();
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 84321327a16..438354e7b9f 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -191,7 +191,6 @@ private:
if (!readOlapRange || readOlapRange->GetOlapProgram().empty()) {
return;
}
-
taskMeta.ReadInfo.ReadType = ReadTypeFromProto(readOlapRange->GetReadType());
taskMeta.ReadInfo.OlapProgram.Program = readOlapRange->GetOlapProgram();
for (auto& name: readOlapRange->GetOlapProgramParameterNames()) {
@@ -199,14 +198,14 @@ private:
}
};
- ui32 GetTasksPerNode(TStageInfo& stageInfo, const bool isOlapScan, const ui64 /*nodeId*/, const ui32 shardsCountCompute, const ui32 shardsCountNode) const {
+ ui32 GetTasksPerNode(TStageInfo& stageInfo, const bool isOlapScan, const ui64 /*nodeId*/) const {
ui32 result = 0;
if (isOlapScan) {
if (AggregationSettings.HasCSScanThreadsPerNode()) {
- result = AggregationSettings.GetCSScanThreadsPerNode() * 1.0 * shardsCountCompute / shardsCountNode;
+ result = AggregationSettings.GetCSScanThreadsPerNode();
} else {
const TStagePredictor& predictor = stageInfo.Meta.Tx.Body->GetCalculationPredictor(stageInfo.Id.StageId);
- result = predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), {}) * 1.0 * shardsCountCompute / shardsCountNode;
+ result = predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), {});
}
} else {
const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
@@ -249,7 +248,7 @@ private:
auto& tasks = nodeTasks[nodeId];
auto& cnt = assignedShardsCount[nodeId];
- const ui32 maxScansPerNode = GetTasksPerNode(stageInfo, isOlapScan, nodeId, 1, 1);
+ const ui32 maxScansPerNode = GetTasksPerNode(stageInfo, isOlapScan, nodeId);
if (cnt < maxScansPerNode) {
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.NodeId = nodeId;
@@ -360,7 +359,7 @@ private:
}
}
- if (!AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead()) {
+ if (!AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead() || (!isOlapScan && readSettings.Sorted)) {
for (auto&& pair : nodeShards) {
auto& shardsInfo = pair.second;
for (auto&& shardInfo : shardsInfo) {
@@ -372,11 +371,12 @@ private:
for (const auto& pair : nodeTasks) {
for (const auto& taskIdx : pair.second) {
auto& task = TasksGraph.GetTask(taskIdx);
+ task.Meta.SetEnableShardsSequentialScan(readSettings.Sorted);
PrepareMetaForUsage(task.Meta, keyTypes);
}
}
- } else if (!readSettings.Sorted) {
+ } else {
for (auto&& pair : nodeShards) {
const auto nodeId = pair.first;
auto& shardsInfo = pair.second;
@@ -390,38 +390,16 @@ private:
LOG_D("Stage " << stageInfo.Id << " create scan task meta for node: " << nodeId
<< ", meta: " << meta.ToString(keyTypes, *AppData()->TypeRegistry));
}
- for (ui32 t = 0; t < GetTasksPerNode(stageInfo, isOlapScan, nodeId, shardsInfo.size(), shardsInfo.size()); ++t) {
+ for (ui32 t = 0; t < GetTasksPerNode(stageInfo, isOlapScan, nodeId); ++t) {
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta = meta;
+ task.Meta.SetEnableShardsSequentialScan(false);
task.Meta.ExecuterId = SelfId();
task.Meta.NodeId = nodeId;
task.Meta.ScanTask = true;
task.SetMetaId(metaGlueingId);
}
}
- } else {
- for (auto&& pair : nodeShards) {
- const auto nodeId = pair.first;
- auto& shardsInfo = pair.second;
- for (auto&& shardInfo : shardsInfo) {
- YQL_ENSURE(!shardInfo.KeyWriteRanges);
- TTaskMeta meta;
- const ui32 metaGlueingId = ++metaId;
- MergeToTaskMeta(meta, shardInfo, readSettings, columns, op);
- PrepareMetaForUsage(meta, keyTypes);
-
- LOG_D("Stage " << stageInfo.Id << " create datashard scan task meta for node: " << nodeId
- << ", meta: " << meta.ToString(keyTypes, *AppData()->TypeRegistry));
- for (ui32 t = 0; t < GetTasksPerNode(stageInfo, isOlapScan, nodeId, 1, shardsInfo.size()); ++t) {
- auto& task = TasksGraph.AddTask(stageInfo);
- task.Meta = meta;
- task.Meta.ExecuterId = SelfId();
- task.Meta.NodeId = nodeId;
- task.Meta.ScanTask = true;
- task.SetMetaId(metaGlueingId);
- }
- }
- }
}
}
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index 226813877be..f3077c54e51 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -822,7 +822,9 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, const TKqpTabl
if (!task.Meta.Reads->empty()) {
protoTaskMeta.SetReverse(task.Meta.ReadInfo.Reverse);
protoTaskMeta.SetItemsLimit(task.Meta.ReadInfo.ItemsLimit);
- protoTaskMeta.SetSorted(task.Meta.ReadInfo.Sorted);
+ if (task.Meta.HasEnableShardsSequentialScan()) {
+ protoTaskMeta.SetEnableShardsSequentialScan(task.Meta.GetEnableShardsSequentialScanUnsafe());
+ }
protoTaskMeta.SetReadType(ReadTypeToProto(task.Meta.ReadInfo.ReadType));
for (auto columnType : task.Meta.ReadInfo.ResultColumnsTypes) {
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
index bbcebbba257..2b52d682ef0 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
@@ -131,6 +131,9 @@ struct TShardKeyRanges {
struct TTaskMeta {
+private:
+ YDB_OPT(bool, EnableShardsSequentialScan);
+public:
ui64 ShardId = 0; // only in case of non-scans (data-query & legacy scans)
ui64 NodeId = 0; // only in case of scans over persistent snapshots
bool ScanTask = false;
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 323e4fde057..1aa269779c6 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -228,7 +228,7 @@ message TKqpTransaction {
reserved 8; // optional bytes ProcessProgram = 8;
optional EScanDataFormat DataFormat = 9;
optional NKikimrSSA.TOlapProgram OlapProgram = 10; // Currently only for OLAP tables
- optional bool Sorted = 11;
+ optional bool EnableShardsSequentialScan = 11;
repeated TColumnMeta ResultColumns = 12;
// Type of read result: unboxed values or Arrow blocks of data
optional EReadType ReadType = 14;