From 5200e6ef03bc46fcbb47cdab7e52ea5ff43ca85e Mon Sep 17 00:00:00 2001 From: Denis Khalikov Date: Mon, 19 Jan 2026 19:44:19 +0300 Subject: [Optimizers] Add flag to create a stage for aggregation (#32331) --- ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 3 ++- ydb/core/kqp/provider/yql_kikimr_settings.cpp | 1 + ydb/core/kqp/provider/yql_kikimr_settings.h | 1 + ydb/library/yql/dq/opt/dq_opt_phy.cpp | 5 +++-- ydb/library/yql/dq/opt/dq_opt_phy.h | 2 +- 5 files changed, 8 insertions(+), 4 deletions(-) diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index e7e621b15ff..c93b6184770 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -404,7 +404,8 @@ protected: TMaybeNode PushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { - TExprBase output = DqPushCombineToStage(node, ctx, optCtx, *getParents(), IsGlobal); + const bool createStageForAggregation = KqpCtx.Config->OptCreateStageForAggregation.Get().GetOrElse(false); + TExprBase output = DqPushCombineToStage(node, ctx, optCtx, *getParents(), IsGlobal, createStageForAggregation); DumpAppliedRule("PushCombineToStage", node.Ptr(), output.Ptr(), ctx); return output; } diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.cpp b/ydb/core/kqp/provider/yql_kikimr_settings.cpp index 9ad10b6f3b8..0e1db2e5dbf 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_settings.cpp @@ -92,6 +92,7 @@ TKikimrConfiguration::TKikimrConfiguration() { REGISTER_SETTING(*this, OptShuffleEliminationWithMap); REGISTER_SETTING(*this, OptShuffleEliminationForAggregation); REGISTER_SETTING(*this, OptDisallowFuseJoins); + REGISTER_SETTING(*this, OptCreateStageForAggregation); REGISTER_SETTING(*this, OverridePlanner); REGISTER_SETTING(*this, UseGraceJoinCoreForMap); REGISTER_SETTING(*this, UseBlockHashJoin); diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index d2db480937a..18416ce7a31 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -85,6 +85,7 @@ public: NCommon::TConfSetting OptShuffleEliminationForAggregation; NCommon::TConfSetting CostBasedOptimizationLevel; NCommon::TConfSetting OptDisallowFuseJoins; + NCommon::TConfSetting OptCreateStageForAggregation; // Use CostBasedOptimizationLevel for internal usage. This is a dummy flag that is mapped to the optimization level during parsing. NCommon::TConfSetting CostBasedOptimization; diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index c6550ba219d..e1d4a5607db 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -1122,7 +1122,7 @@ TExprBase DqBuildLMapOverMuxStage(TExprBase node, TExprContext& ctx, IOptimizati TExprBase DqPushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, - const TParentsMap& parentsMap, bool allowStageMultiUsage) + const TParentsMap& parentsMap, bool allowStageMultiUsage, bool createStageForAggregation) { if (!node.Maybe().Input().Maybe()) { return node; @@ -1174,7 +1174,8 @@ TExprBase DqPushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationC .Done(); } - if (IsDqDependsOnStage(combine.PreMapLambda(), dqUnion.Output().Stage()) || + if (createStageForAggregation || + IsDqDependsOnStage(combine.PreMapLambda(), dqUnion.Output().Stage()) || IsDqDependsOnStage(combine.KeySelectorLambda(), dqUnion.Output().Stage()) || IsDqDependsOnStage(combine.InitHandlerLambda(), dqUnion.Output().Stage()) || IsDqDependsOnStage(combine.UpdateHandlerLambda(), dqUnion.Output().Stage()) || diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index fc5dde5c350..4abdbbdfbdc 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -57,7 +57,7 @@ NNodes::TExprBase DqPushFlatmapToStage(NNodes::TExprBase node, TExprContext& ctx const TParentsMap& parentsMap, bool allowStageMultiUsage = true); NNodes::TExprBase DqPushCombineToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, - const TParentsMap& parentsMap, bool allowStageMultiUsage = true); + const TParentsMap& parentsMap, bool allowStageMultiUsage = true, bool createStageForAggregation = false); NNodes::TExprBase DqPushCombineToStageDependsOnOtherStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage = true); -- cgit v1.3