diff options
| author | gvit <[email protected]> | 2023-08-16 15:31:19 +0300 |
|---|---|---|
| committer | gvit <[email protected]> | 2023-08-16 16:55:01 +0300 |
| commit | 4c9df3d01ce4efa9e576f9b72f046966b39d8f67 (patch) | |
| tree | 695eddedf40d6f082cce41955ec9fc0ead77a1d3 | |
| parent | 5b31dd67e2acdaca5aec0404edbe949ca9c1d544 (diff) | |
don't apply optimization in case of external sources
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 15 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.cpp | 13 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.h | 7 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 3 |
4 files changed, 21 insertions, 17 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index ea06f6e3cf0..b7b6373ccc5 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1714,6 +1714,7 @@ private: break; case NKqpProto::TKqpSource::kExternalSource: BuildReadTasksFromSource(stageInfo, secureParams); + HasExternalSources = true; break; default: YQL_ENSURE(false, "unknown source type"); @@ -2138,9 +2139,15 @@ private: } } - Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {}, {}, GetSnapshot(), + const bool enableOptForTasks = !UnknownAffectedShardCount && !HasExternalSources; + bool dataQueryPool = true; + if (HasExternalSources && DatashardTxs.size() == 0) { + dataQueryPool = false; + } + + Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), GetSnapshot(), Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, false, Nothing(), - ExecuterSpan, {}, ExecuterRetriesConfig, true /* isDataQuery */, Request.MkqlMemoryLimit, AsyncIoFactory, !UnknownAffectedShardCount); + ExecuterSpan, {}, ExecuterRetriesConfig, dataQueryPool /* isDataQuery */, Request.MkqlMemoryLimit, AsyncIoFactory, enableOptForTasks); auto err = Planner->PlanExecution(); if (err) { @@ -2222,7 +2229,8 @@ private: THashMap<TActorId, THashSet<ui64>> updates; for (ui64 taskId : ComputeTasks) { auto& task = TasksGraph.GetTask(taskId); - CollectTaskChannelsUpdates(task, updates); + if (task.ComputeActorId) + CollectTaskChannelsUpdates(task, updates); } PropagateChannelsUpdates(updates); @@ -2344,6 +2352,7 @@ private: bool StreamResult = false; bool UnknownAffectedShardCount = false; + bool HasExternalSources = false; ui64 TxCoordinator = 0; THashMap<ui64, TShardState> ShardStates; diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index f20e873ef11..fc5d0f91b89 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -54,8 +54,7 @@ void BuildInitialTaskResources(const TKqpTasksGraph& graph, ui64 taskId, TTaskRe constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2; constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4; -TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& executer, TVector<ui64>&& computeTasks, - THashMap<ui64, TVector<ui64>>&& tasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot, +TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& executer, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline, const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan, @@ -64,8 +63,6 @@ TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& execu bool isDataQuery, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization) : TxId(txId) , ExecuterId(executer) - , ComputeTasks(std::move(computeTasks)) - , TasksPerNode(std::move(tasksPerNode)) , Snapshot(snapshot) , Database(database) , UserToken(userToken) @@ -367,7 +364,7 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() { ComputeTasks.clear(); } - if (nComputeTasks == 0 && TasksPerNode.size() == 1 && (AsyncIoFactory != nullptr) && DoOptimization) { + if (nComputeTasks == 0 && TasksPerNode.size() == 1 && (AsyncIoFactory != nullptr) && DoOptimization && IsDataQuery) { // query affects a single key or shard, so it might be more effective // to execute this task locally so we can avoid useless overhead for remote task launching. for(auto& [shardId, tasks]: TasksPerNode) { @@ -472,15 +469,15 @@ ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks, - THashMap<ui64, TVector<ui64>>&& tasksPerNode, const IKqpGateway::TKqpSnapshot& snapshot, +std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, + const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline, const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan, TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, bool isDataQuery, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization) { - return std::make_unique<TKqpPlanner>(tasksGraph, txId, executer, std::move(tasks), std::move(tasksPerNode), snapshot, + return std::make_unique<TKqpPlanner>(tasksGraph, txId, executer, snapshot, database, userToken, deadline, statsMode, withSpilling, rlPath, executerSpan, std::move(resourcesSnapshot), executerRetriesConfig, isDataQuery, mkqlMemoryLimit, asyncIoFactory, doOptimization); } diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 997e2d44994..22dbb8ad9db 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -38,8 +38,7 @@ class TKqpPlanner { }; public: - TKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks, - THashMap<ui64, TVector<ui64>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, + TKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline, const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan, @@ -104,8 +103,8 @@ private: }; -std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, TVector<ui64>&& tasks, - THashMap<ui64, TVector<ui64>>&& scanTasks, const IKqpGateway::TKqpSnapshot& snapshot, +std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, + const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline, const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan, diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 0528fd9919e..4c79c54dd12 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -658,8 +658,7 @@ public: private: void ExecuteScanTx(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) { - Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), {}, - {}, GetSnapshot(), + Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), GetSnapshot(), Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, AppData()->EnableKqpSpilling, Request.RlPath, ExecuterSpan, std::move(snapshot), ExecuterRetriesConfig, false /* isDataQuery */, Request.MkqlMemoryLimit, nullptr, false); |
