diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-19 16:35:05 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-19 16:35:05 +0300 |
commit | 555965601de2e981b0afe718d56ea5704d5bbd44 (patch) | |
tree | 5d7ffe844657b034e607fc83c61aec51c2c5e873 | |
parent | bd27a4544747a6b44dfaa9e41b84239975db9b7e (diff) | |
download | ydb-555965601de2e981b0afe718d56ea5704d5bbd44.tar.gz |
activate independent usage compute tasks and scan tasks for column shards in case primary key sorting
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 40 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.h | 3 | ||||
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 2 |
5 files changed, 18 insertions, 35 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index 272d01f8cf5..0ecc4df768a 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -65,7 +65,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(TIntrusivePtr<TKqpCou void TShardsScanningPolicy::FillRequestScanFeatures(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, ui32& maxInFlight, bool& isAggregationRequest) const { - const bool isSorted = (meta.HasSorted() ? meta.GetSorted() : true); + const bool enableShardsSequentialScan = (meta.HasEnableShardsSequentialScan() ? meta.GetEnableShardsSequentialScan() : true); isAggregationRequest = false; maxInFlight = 1; @@ -87,7 +87,7 @@ void TShardsScanningPolicy::FillRequestScanFeatures(const NKikimrTxDataShard::TK } } isAggregationRequest = hasGroupByWithFields || hasGroupByWithNoFields; - if (isSorted) { + if (enableShardsSequentialScan) { maxInFlight = 1; } else if (hasGroupByWithFields) { maxInFlight = ProtoConfig.GetAggregationGroupByLimit(); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 84321327a16..438354e7b9f 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -191,7 +191,6 @@ private: if (!readOlapRange || readOlapRange->GetOlapProgram().empty()) { return; } - taskMeta.ReadInfo.ReadType = ReadTypeFromProto(readOlapRange->GetReadType()); taskMeta.ReadInfo.OlapProgram.Program = readOlapRange->GetOlapProgram(); for (auto& name: readOlapRange->GetOlapProgramParameterNames()) { @@ -199,14 +198,14 @@ private: } }; - ui32 GetTasksPerNode(TStageInfo& stageInfo, const bool isOlapScan, const ui64 /*nodeId*/, const ui32 shardsCountCompute, const ui32 shardsCountNode) const { + ui32 GetTasksPerNode(TStageInfo& stageInfo, const bool isOlapScan, const ui64 /*nodeId*/) const { ui32 result = 0; if (isOlapScan) { if (AggregationSettings.HasCSScanThreadsPerNode()) { - result = AggregationSettings.GetCSScanThreadsPerNode() * 1.0 * shardsCountCompute / shardsCountNode; + result = AggregationSettings.GetCSScanThreadsPerNode(); } else { const TStagePredictor& predictor = stageInfo.Meta.Tx.Body->GetCalculationPredictor(stageInfo.Id.StageId); - result = predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), {}) * 1.0 * shardsCountCompute / shardsCountNode; + result = predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), {}); } } else { const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); @@ -249,7 +248,7 @@ private: auto& tasks = nodeTasks[nodeId]; auto& cnt = assignedShardsCount[nodeId]; - const ui32 maxScansPerNode = GetTasksPerNode(stageInfo, isOlapScan, nodeId, 1, 1); + const ui32 maxScansPerNode = GetTasksPerNode(stageInfo, isOlapScan, nodeId); if (cnt < maxScansPerNode) { auto& task = TasksGraph.AddTask(stageInfo); task.Meta.NodeId = nodeId; @@ -360,7 +359,7 @@ private: } } - if (!AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead()) { + if (!AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead() || (!isOlapScan && readSettings.Sorted)) { for (auto&& pair : nodeShards) { auto& shardsInfo = pair.second; for (auto&& shardInfo : shardsInfo) { @@ -372,11 +371,12 @@ private: for (const auto& pair : nodeTasks) { for (const auto& taskIdx : pair.second) { auto& task = TasksGraph.GetTask(taskIdx); + task.Meta.SetEnableShardsSequentialScan(readSettings.Sorted); PrepareMetaForUsage(task.Meta, keyTypes); } } - } else if (!readSettings.Sorted) { + } else { for (auto&& pair : nodeShards) { const auto nodeId = pair.first; auto& shardsInfo = pair.second; @@ -390,38 +390,16 @@ private: LOG_D("Stage " << stageInfo.Id << " create scan task meta for node: " << nodeId << ", meta: " << meta.ToString(keyTypes, *AppData()->TypeRegistry)); } - for (ui32 t = 0; t < GetTasksPerNode(stageInfo, isOlapScan, nodeId, shardsInfo.size(), shardsInfo.size()); ++t) { + for (ui32 t = 0; t < GetTasksPerNode(stageInfo, isOlapScan, nodeId); ++t) { auto& task = TasksGraph.AddTask(stageInfo); task.Meta = meta; + task.Meta.SetEnableShardsSequentialScan(false); task.Meta.ExecuterId = SelfId(); task.Meta.NodeId = nodeId; task.Meta.ScanTask = true; task.SetMetaId(metaGlueingId); } } - } else { - for (auto&& pair : nodeShards) { - const auto nodeId = pair.first; - auto& shardsInfo = pair.second; - for (auto&& shardInfo : shardsInfo) { - YQL_ENSURE(!shardInfo.KeyWriteRanges); - TTaskMeta meta; - const ui32 metaGlueingId = ++metaId; - MergeToTaskMeta(meta, shardInfo, readSettings, columns, op); - PrepareMetaForUsage(meta, keyTypes); - - LOG_D("Stage " << stageInfo.Id << " create datashard scan task meta for node: " << nodeId - << ", meta: " << meta.ToString(keyTypes, *AppData()->TypeRegistry)); - for (ui32 t = 0; t < GetTasksPerNode(stageInfo, isOlapScan, nodeId, 1, shardsInfo.size()); ++t) { - auto& task = TasksGraph.AddTask(stageInfo); - task.Meta = meta; - task.Meta.ExecuterId = SelfId(); - task.Meta.NodeId = nodeId; - task.Meta.ScanTask = true; - task.SetMetaId(metaGlueingId); - } - } - } } } diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 226813877be..f3077c54e51 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -822,7 +822,9 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, const TKqpTabl if (!task.Meta.Reads->empty()) { protoTaskMeta.SetReverse(task.Meta.ReadInfo.Reverse); protoTaskMeta.SetItemsLimit(task.Meta.ReadInfo.ItemsLimit); - protoTaskMeta.SetSorted(task.Meta.ReadInfo.Sorted); + if (task.Meta.HasEnableShardsSequentialScan()) { + protoTaskMeta.SetEnableShardsSequentialScan(task.Meta.GetEnableShardsSequentialScanUnsafe()); + } protoTaskMeta.SetReadType(ReadTypeToProto(task.Meta.ReadInfo.ReadType)); for (auto columnType : task.Meta.ReadInfo.ResultColumnsTypes) { diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index bbcebbba257..2b52d682ef0 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -131,6 +131,9 @@ struct TShardKeyRanges { struct TTaskMeta { +private: + YDB_OPT(bool, EnableShardsSequentialScan); +public: ui64 ShardId = 0; // only in case of non-scans (data-query & legacy scans) ui64 NodeId = 0; // only in case of scans over persistent snapshots bool ScanTask = false; diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 323e4fde057..1aa269779c6 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -228,7 +228,7 @@ message TKqpTransaction { reserved 8; // optional bytes ProcessProgram = 8; optional EScanDataFormat DataFormat = 9; optional NKikimrSSA.TOlapProgram OlapProgram = 10; // Currently only for OLAP tables - optional bool Sorted = 11; + optional bool EnableShardsSequentialScan = 11; repeated TColumnMeta ResultColumns = 12; // Type of read result: unboxed values or Arrow blocks of data optional EReadType ReadType = 14; |