diff options
author | gvit <gvit@ydb.tech> | 2023-04-24 13:17:27 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-04-24 13:17:27 +0300 |
commit | 66e4cab5936a4f6c6fdab9a96c9d49b560256a00 (patch) | |
tree | d2157b8f17b78e7d407688a56646c3c22f5c2813 | |
parent | e7596bf9ee03bb4ebb8b9b124753414c59998add (diff) | |
download | ydb-66e4cab5936a4f6c6fdab9a96c9d49b560256a00.tar.gz |
launch some tasks locally (if there is only one read) and optimize partition helpers
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 53 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_partition_helper.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 3 | ||||
-rw-r--r-- | ydb/core/protos/kqp_physical.proto | 1 |
4 files changed, 36 insertions, 24 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 658142e9f9..ffa91e888a 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1608,9 +1608,9 @@ private: YQL_ENSURE(result.second); } - void ExecuteDataComputeTask(NDqProto::TDqTask&& taskDesc, bool shareMailbox) { - auto taskId = taskDesc.GetId(); + void ExecuteDataComputeTask(ui64 taskId, bool shareMailbox) { auto& task = TasksGraph.GetTask(taskId); + auto taskDesc = SerializeTaskToProto(TasksGraph, task); TComputeRuntimeSettings settings; if (Deadline) { @@ -2124,31 +2124,40 @@ private: // first, start compute tasks bool shareMailbox = (ComputeTasks.size() <= 1); for (ui64 taskId : ComputeTasks) { - const auto& task = TasksGraph.GetTask(taskId); - auto taskDesc = SerializeTaskToProto(TasksGraph, task); - ExecuteDataComputeTask(std::move(taskDesc), shareMailbox); + ExecuteDataComputeTask(taskId, shareMailbox); } - THashMap<ui64, TVector<ui64>> tasksPerNode; - for (auto& [shardId, tasks] : RemoteComputeTasks) { - auto it = ShardIdToNodeId.find(shardId); - YQL_ENSURE(it != ShardIdToNodeId.end()); - for (ui64 taskId : tasks) { - PendingComputeTasks.insert(taskId); - tasksPerNode[it->second].emplace_back(taskId); + if (ComputeTasks.size() == 0 && RemoteComputeTasks.size() == 1) { + // query affects a single key or shard, so it might be more effective + // to execute this task locally so we can avoid useless overhead for remote task launching. + for(auto& [shardId, tasks]: RemoteComputeTasks) { + for(ui64 taskId: tasks) { + ExecuteDataComputeTask(taskId, true); + } } - } - Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {}, std::move(tasksPerNode), GetSnapshot(), - Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, - Request.DisableLlvmForUdfStages, Request.LlvmEnabled, false, Nothing(), - ExecuterSpan, {}, ExecuterRetriesConfig, true /* isDataQuery */); - auto err = Planner->PlanExecution(); - if (err) { - TlsActivationContext->Send(err.release()); - return; + } else { + THashMap<ui64, TVector<ui64>> tasksPerNode; + for (auto& [shardId, tasks] : RemoteComputeTasks) { + auto it = ShardIdToNodeId.find(shardId); + YQL_ENSURE(it != ShardIdToNodeId.end()); + for (ui64 taskId : tasks) { + PendingComputeTasks.insert(taskId); + tasksPerNode[it->second].emplace_back(taskId); + } + } + + Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {}, std::move(tasksPerNode), GetSnapshot(), + Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, + Request.DisableLlvmForUdfStages, Request.LlvmEnabled, false, Nothing(), + ExecuterSpan, {}, ExecuterRetriesConfig, true /* isDataQuery */); + auto err = Planner->PlanExecution(); + if (err) { + TlsActivationContext->Send(err.release()); + return; + } + Planner->Submit(); } - Planner->Submit(); // then start data tasks with known actor ids of compute tasks for (auto& [shardId, shardTx] : DatashardTxs) { diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp index 8406e04ad8..e5ac979740 100644 --- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp +++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp @@ -597,8 +597,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, ); } else if (source.HasKeyRange()) { const auto& range = source.GetKeyRange(); - if (range.GetFrom().SerializeAsString() == range.GetTo().SerializeAsString() && - range.GetFrom().ValuesSize() == keyColumnTypes.size()) { + if (range.GetRangeIsPoint() && range.GetFrom().ValuesSize() == keyColumnTypes.size()) { auto cells = FillKeyValues(keyColumnTypes, range.GetFrom(), stageInfo, holderFactory, typeEnv); ranges.push_back(TSerializedCellVec(TSerializedCellVec::Serialize(cells))); } else { diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index da719d8f2b..969fa631a4 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -286,6 +286,9 @@ void FillKeyRange(const TKqlKeyRange& range, NKqpProto::TKqpPhyKeyRange& rangePr FillKeyBound(range.From(), *rangeProto.MutableFrom()); FillKeyBound(range.To(), *rangeProto.MutableTo()); + if (rangeProto.GetFrom().SerializeAsString() == rangeProto.GetTo().SerializeAsString()) { + rangeProto.SetRangeIsPoint(true); + } } void FillReadRange(const TKqpWideReadTable& read, const TKikimrTableMetadata& tableMeta, diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index f1359d54ab..84f4e2cbdc 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -127,6 +127,7 @@ message TKqpPhyKeyBound { message TKqpPhyKeyRange { TKqpPhyKeyBound From = 1; TKqpPhyKeyBound To = 2; + bool RangeIsPoint = 3; } message TKqpPhyOpReadRange { |