aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-04-24 13:17:27 +0300
committergvit <gvit@ydb.tech>2023-04-24 13:17:27 +0300
commit66e4cab5936a4f6c6fdab9a96c9d49b560256a00 (patch)
treed2157b8f17b78e7d407688a56646c3c22f5c2813
parente7596bf9ee03bb4ebb8b9b124753414c59998add (diff)
downloadydb-66e4cab5936a4f6c6fdab9a96c9d49b560256a00.tar.gz
launch some tasks locally (if there is only one read) and optimize partition helpers
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp53
-rw-r--r--ydb/core/kqp/executer_actor/kqp_partition_helper.cpp3
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp3
-rw-r--r--ydb/core/protos/kqp_physical.proto1
4 files changed, 36 insertions, 24 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 658142e9f9..ffa91e888a 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1608,9 +1608,9 @@ private:
YQL_ENSURE(result.second);
}
- void ExecuteDataComputeTask(NDqProto::TDqTask&& taskDesc, bool shareMailbox) {
- auto taskId = taskDesc.GetId();
+ void ExecuteDataComputeTask(ui64 taskId, bool shareMailbox) {
auto& task = TasksGraph.GetTask(taskId);
+ auto taskDesc = SerializeTaskToProto(TasksGraph, task);
TComputeRuntimeSettings settings;
if (Deadline) {
@@ -2124,31 +2124,40 @@ private:
// first, start compute tasks
bool shareMailbox = (ComputeTasks.size() <= 1);
for (ui64 taskId : ComputeTasks) {
- const auto& task = TasksGraph.GetTask(taskId);
- auto taskDesc = SerializeTaskToProto(TasksGraph, task);
- ExecuteDataComputeTask(std::move(taskDesc), shareMailbox);
+ ExecuteDataComputeTask(taskId, shareMailbox);
}
- THashMap<ui64, TVector<ui64>> tasksPerNode;
- for (auto& [shardId, tasks] : RemoteComputeTasks) {
- auto it = ShardIdToNodeId.find(shardId);
- YQL_ENSURE(it != ShardIdToNodeId.end());
- for (ui64 taskId : tasks) {
- PendingComputeTasks.insert(taskId);
- tasksPerNode[it->second].emplace_back(taskId);
+ if (ComputeTasks.size() == 0 && RemoteComputeTasks.size() == 1) {
+ // query affects a single key or shard, so it might be more effective
+ // to execute this task locally so we can avoid useless overhead for remote task launching.
+ for(auto& [shardId, tasks]: RemoteComputeTasks) {
+ for(ui64 taskId: tasks) {
+ ExecuteDataComputeTask(taskId, true);
+ }
}
- }
- Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {}, std::move(tasksPerNode), GetSnapshot(),
- Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode,
- Request.DisableLlvmForUdfStages, Request.LlvmEnabled, false, Nothing(),
- ExecuterSpan, {}, ExecuterRetriesConfig, true /* isDataQuery */);
- auto err = Planner->PlanExecution();
- if (err) {
- TlsActivationContext->Send(err.release());
- return;
+ } else {
+ THashMap<ui64, TVector<ui64>> tasksPerNode;
+ for (auto& [shardId, tasks] : RemoteComputeTasks) {
+ auto it = ShardIdToNodeId.find(shardId);
+ YQL_ENSURE(it != ShardIdToNodeId.end());
+ for (ui64 taskId : tasks) {
+ PendingComputeTasks.insert(taskId);
+ tasksPerNode[it->second].emplace_back(taskId);
+ }
+ }
+
+ Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {}, std::move(tasksPerNode), GetSnapshot(),
+ Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode,
+ Request.DisableLlvmForUdfStages, Request.LlvmEnabled, false, Nothing(),
+ ExecuterSpan, {}, ExecuterRetriesConfig, true /* isDataQuery */);
+ auto err = Planner->PlanExecution();
+ if (err) {
+ TlsActivationContext->Send(err.release());
+ return;
+ }
+ Planner->Submit();
}
- Planner->Submit();
// then start data tasks with known actor ids of compute tasks
for (auto& [shardId, shardTx] : DatashardTxs) {
diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
index 8406e04ad8..e5ac979740 100644
--- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
@@ -597,8 +597,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
);
} else if (source.HasKeyRange()) {
const auto& range = source.GetKeyRange();
- if (range.GetFrom().SerializeAsString() == range.GetTo().SerializeAsString() &&
- range.GetFrom().ValuesSize() == keyColumnTypes.size()) {
+ if (range.GetRangeIsPoint() && range.GetFrom().ValuesSize() == keyColumnTypes.size()) {
auto cells = FillKeyValues(keyColumnTypes, range.GetFrom(), stageInfo, holderFactory, typeEnv);
ranges.push_back(TSerializedCellVec(TSerializedCellVec::Serialize(cells)));
} else {
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index da719d8f2b..969fa631a4 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -286,6 +286,9 @@ void FillKeyRange(const TKqlKeyRange& range, NKqpProto::TKqpPhyKeyRange& rangePr
FillKeyBound(range.From(), *rangeProto.MutableFrom());
FillKeyBound(range.To(), *rangeProto.MutableTo());
+ if (rangeProto.GetFrom().SerializeAsString() == rangeProto.GetTo().SerializeAsString()) {
+ rangeProto.SetRangeIsPoint(true);
+ }
}
void FillReadRange(const TKqpWideReadTable& read, const TKikimrTableMetadata& tableMeta,
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index f1359d54ab..84f4e2cbdc 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -127,6 +127,7 @@ message TKqpPhyKeyBound {
message TKqpPhyKeyRange {
TKqpPhyKeyBound From = 1;
TKqpPhyKeyBound To = 2;
+ bool RangeIsPoint = 3;
}
message TKqpPhyOpReadRange {