summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <[email protected]>2023-08-16 15:31:19 +0300
committergvit <[email protected]>2023-08-16 16:55:01 +0300
commit4c9df3d01ce4efa9e576f9b72f046966b39d8f67 (patch)
tree695eddedf40d6f082cce41955ec9fc0ead77a1d3
parent5b31dd67e2acdaca5aec0404edbe949ca9c1d544 (diff)
don't apply optimization in case of external sources
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp15
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp13
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.h7
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp3
4 files changed, 21 insertions, 17 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index ea06f6e3cf0..b7b6373ccc5 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1714,6 +1714,7 @@ private:
break;
case NKqpProto::TKqpSource::kExternalSource:
BuildReadTasksFromSource(stageInfo, secureParams);
+ HasExternalSources = true;
break;
default:
YQL_ENSURE(false, "unknown source type");
@@ -2138,9 +2139,15 @@ private:
}
}
- Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {}, {}, GetSnapshot(),
+ const bool enableOptForTasks = !UnknownAffectedShardCount && !HasExternalSources;
+ bool dataQueryPool = true;
+ if (HasExternalSources && DatashardTxs.size() == 0) {
+ dataQueryPool = false;
+ }
+
+ Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), GetSnapshot(),
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, false, Nothing(),
- ExecuterSpan, {}, ExecuterRetriesConfig, true /* isDataQuery */, Request.MkqlMemoryLimit, AsyncIoFactory, !UnknownAffectedShardCount);
+ ExecuterSpan, {}, ExecuterRetriesConfig, dataQueryPool /* isDataQuery */, Request.MkqlMemoryLimit, AsyncIoFactory, enableOptForTasks);
auto err = Planner->PlanExecution();
if (err) {
@@ -2222,7 +2229,8 @@ private:
THashMap<TActorId, THashSet<ui64>> updates;
for (ui64 taskId : ComputeTasks) {
auto& task = TasksGraph.GetTask(taskId);
- CollectTaskChannelsUpdates(task, updates);
+ if (task.ComputeActorId)
+ CollectTaskChannelsUpdates(task, updates);
}
PropagateChannelsUpdates(updates);
@@ -2344,6 +2352,7 @@ private:
bool StreamResult = false;
bool UnknownAffectedShardCount = false;
+ bool HasExternalSources = false;
ui64 TxCoordinator = 0;
THashMap<ui64, TShardState> ShardStates;
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index f20e873ef11..fc5d0f91b89 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -54,8 +54,7 @@ void BuildInitialTaskResources(const TKqpTasksGraph& graph, ui64 taskId, TTaskRe
constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2;
constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4;
-TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& executer, TVector<ui64>&& computeTasks,
- THashMap<ui64, TVector<ui64>>&& tasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot,
+TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& executer, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
@@ -64,8 +63,6 @@ TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& execu
bool isDataQuery, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization)
: TxId(txId)
, ExecuterId(executer)
- , ComputeTasks(std::move(computeTasks))
- , TasksPerNode(std::move(tasksPerNode))
, Snapshot(snapshot)
, Database(database)
, UserToken(userToken)
@@ -367,7 +364,7 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
ComputeTasks.clear();
}
- if (nComputeTasks == 0 && TasksPerNode.size() == 1 && (AsyncIoFactory != nullptr) && DoOptimization) {
+ if (nComputeTasks == 0 && TasksPerNode.size() == 1 && (AsyncIoFactory != nullptr) && DoOptimization && IsDataQuery) {
// query affects a single key or shard, so it might be more effective
// to execute this task locally so we can avoid useless overhead for remote task launching.
for(auto& [shardId, tasks]: TasksPerNode) {
@@ -472,15 +469,15 @@ ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) {
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks,
- THashMap<ui64, TVector<ui64>>&& tasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot,
+std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer,
+ const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
bool isDataQuery, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization)
{
- return std::make_unique<TKqpPlanner>(tasksGraph, txId, executer, std::move(tasks), std::move(tasksPerNode), snapshot,
+ return std::make_unique<TKqpPlanner>(tasksGraph, txId, executer, snapshot,
database, userToken, deadline, statsMode, withSpilling, rlPath, executerSpan,
std::move(resourcesSnapshot), executerRetriesConfig, isDataQuery, mkqlMemoryLimit, asyncIoFactory, doOptimization);
}
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h
index 997e2d44994..22dbb8ad9db 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.h
+++ b/ydb/core/kqp/executer_actor/kqp_planner.h
@@ -38,8 +38,7 @@ class TKqpPlanner {
};
public:
- TKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks,
- THashMap<ui64, TVector<ui64>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
+ TKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan,
@@ -104,8 +103,8 @@ private:
};
-std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks,
- THashMap<ui64, TVector<ui64>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot,
+std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer,
+ const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 0528fd9919e..4c79c54dd12 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -658,8 +658,7 @@ public:
private:
void ExecuteScanTx(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) {
- Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {},
- {}, GetSnapshot(),
+ Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), GetSnapshot(),
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, AppData()->EnableKqpSpilling,
Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig, false /* isDataQuery */, Request.MkqlMemoryLimit, nullptr, false);