summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPavel Velikhov <[email protected]>2025-08-29 21:03:32 +0500
committerGitHub <[email protected]>2025-08-29 19:03:32 +0300
commite4456769bc03366fe1c30a820b962bb8447c6a42 (patch)
tree44377b16085030f94b443c0d7312b3595e2c11ac
parent56fd785819f5db386170e199bf39fa37bc29aeca (diff)
Moved FSM Order optimization under a flag (#23770)
Co-authored-by: Pavel Velikhov <[email protected]>
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp1
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_service.cpp5
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp45
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp144
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h13
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp235
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_settings.h1
-rw-r--r--ydb/core/protos/table_service_config.proto2
8 files changed, 432 insertions, 14 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index 3265e360ebf..7731894095f 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -664,6 +664,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnableTempTablesForUser = serviceConfig.GetEnableTempTablesForUser();
kqpConfig.EnableSimpleProgramsSinglePartitionOptimization = serviceConfig.GetEnableSimpleProgramsSinglePartitionOptimization();
kqpConfig.EnableOlapPushdownAggregate = serviceConfig.GetEnableOlapPushdownAggregate();
+ kqpConfig.EnableOrderOptimizaionFSM = serviceConfig.GetEnableOrderOptimizaionFSM();
if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
index 9a98b218103..1ec5c3f645f 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
@@ -353,6 +353,8 @@ private:
bool enableOlapPushdownAggregate = TableServiceConfig.GetEnableOlapPushdownAggregate();
+ bool enableOrderOptimizaionFSM = TableServiceConfig.GetEnableOrderOptimizaionFSM();
+
TableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig());
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");
@@ -394,7 +396,8 @@ private:
TableServiceConfig.GetEnableTempTablesForUser() != enableTempTablesForUser ||
TableServiceConfig.GetEnableSimpleProgramsSinglePartitionOptimization() != enableSimpleProgramsSinglePartitionOptimization ||
TableServiceConfig.GetDefaultLangVer() != defaultLangVer ||
- TableServiceConfig.GetEnableOlapPushdownAggregate() != enableOlapPushdownAggregate)
+ TableServiceConfig.GetEnableOlapPushdownAggregate() != enableOlapPushdownAggregate ||
+ TableServiceConfig.GetEnableOrderOptimizaionFSM() != enableOrderOptimizaionFSM)
{
QueryCache->Clear();
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index 2e90372e9b7..8fd29b46bdf 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -192,10 +192,17 @@ protected:
}
TMaybeNode<TExprBase> BuildStreamIdxLookupJoinStagesKeepSorted(TExprBase node, TExprContext& ctx) {
- bool ruleEnabled = true;
- TExprBase output = KqpBuildStreamIdxLookupJoinStagesKeepSorted(node, ctx, TypesCtx, ruleEnabled);
- DumpAppliedRule("BuildStreamIdxLookupJoinStagesKeepSorted", node.Ptr(), output.Ptr(), ctx);
- return output;
+ bool useFSM = KqpCtx.Config->EnableOrderOptimizaionFSM;
+ if (useFSM) {
+ TExprBase output = KqpBuildStreamIdxLookupJoinStagesKeepSortedFSM(node, ctx, TypesCtx, true);
+ DumpAppliedRule("BuildStreamIdxLookupJoinStagesKeepSortedFSM", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
+ else {
+ TExprBase output = KqpBuildStreamIdxLookupJoinStagesKeepSorted(node, ctx, TypesCtx, true);
+ DumpAppliedRule("BuildStreamIdxLookupJoinStagesKeepSorted", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
}
TMaybeNode<TExprBase> BuildStreamIdxLookupJoinStages(TExprBase node, TExprContext& ctx) {
@@ -211,9 +218,17 @@ protected:
}
TMaybeNode<TExprBase> RemoveRedundantSortOverReadTable(TExprBase node, TExprContext& ctx) {
- TExprBase output = KqpRemoveRedundantSortOverReadTable(node, ctx, KqpCtx, TypesCtx);
- DumpAppliedRule("RemoveRedundantSortOverReadTable", node.Ptr(), output.Ptr(), ctx);
- return output;
+ bool useFSM = KqpCtx.Config->EnableOrderOptimizaionFSM;
+ if (useFSM) {
+ TExprBase output = KqpRemoveRedundantSortOverReadTableFSM(node, ctx, KqpCtx, TypesCtx);
+ DumpAppliedRule("RemoveRedundantSortOverReadTableFSM", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
+ else {
+ TExprBase output = KqpRemoveRedundantSortOverReadTable(node, ctx, KqpCtx);
+ DumpAppliedRule("RemoveRedundantSortOverReadTable", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
}
TMaybeNode<TExprBase> RewriteKqpReadTable(TExprBase node, TExprContext& ctx) {
@@ -404,10 +419,18 @@ protected:
TMaybeNode<TExprBase> BuildTopStageRemoveSort(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
{
- bool ruleEnabled = true;
- TExprBase output = KqpBuildTopStageRemoveSort(node, ctx, optCtx, TypesCtx, *getParents(), IsGlobal, ruleEnabled);
- DumpAppliedRule("BuildTopStageRemoveSort", node.Ptr(), output.Ptr(), ctx);
- return output;
+ bool useFSM = KqpCtx.Config->EnableOrderOptimizaionFSM;
+ if (useFSM)
+ {
+ TExprBase output = KqpBuildTopStageRemoveSortFSM(node, ctx, optCtx, TypesCtx, *getParents(), IsGlobal, true);
+ DumpAppliedRule("BuildTopStageRemoveSortFSM", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
+ else {
+ TExprBase output = KqpBuildTopStageRemoveSort(node, ctx, optCtx, TypesCtx, *getParents(), IsGlobal, true);
+ DumpAppliedRule("BuildTopStageRemoveSort", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
}
template <bool IsGlobal>
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
index 6568d9929b2..5689bb2b809 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
@@ -667,7 +667,149 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase
.Build().Done();
}
-NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStagesKeepSorted(
+NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStagesKeepSorted(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
+ TTypeAnnotationContext& typeCtx, bool ruleEnabled)
+{
+ if (!ruleEnabled) {
+ return node;
+ }
+
+ if (!node.Maybe<TKqlIndexLookupJoin>()) {
+ return node;
+ }
+
+ const auto& idxLookupJoin = node.Cast<TKqlIndexLookupJoin>();
+
+ if (!idxLookupJoin.Input().Maybe<TDqCnUnionAll>()) {
+ return node;
+ }
+
+ auto unionAll = idxLookupJoin.Input().Cast<TDqCnUnionAll>();
+ auto inputStats = typeCtx.GetStats(unionAll.Output().Raw());
+ if (!inputStats || !inputStats->SortColumns) {
+ return node;
+ }
+
+ auto stage = unionAll
+ .Output().Maybe<TDqOutput>()
+ .Stage().Maybe<TDqStageBase>();
+
+ auto streamLookup = unionAll
+ .Output().Maybe<TDqOutput>()
+ .Stage().Maybe<TDqStageBase>()
+ .Inputs().Item(0).Maybe<TKqpCnStreamLookup>();
+
+ if (!streamLookup.IsValid()) {
+ return node;
+ }
+
+ TExprNodeList fields;
+
+ auto tupleType = streamLookup.Cast().InputType().Cast<TCoListType>().ItemType().Cast<TCoTupleType>();
+
+ auto arg = Build<TCoArgument>(ctx, node.Pos()).Name("row").Done();
+ TExprNodeList args;
+ args.push_back(arg.Ptr());
+
+ auto rightStruct = tupleType.Arg(1).Cast<TCoStructType>();
+
+ for (auto structContent : rightStruct ) {
+ auto attrName = structContent.Ptr()->Child(0);
+ auto field = Build<TCoNameValueTuple>(ctx, node.Pos())
+ .Name(attrName)
+ .Value<TCoMember>()
+ .Struct<TCoNth>()
+ .Tuple(arg)
+ .Index().Value("0").Build()
+ .Build()
+ .Name(attrName)
+ .Build()
+ .Done().Ptr();
+
+ fields.push_back(field);
+ }
+
+ auto payload = Build<TCoNameValueTuple>(ctx, node.Pos())
+ .Name().Build("_payload")
+ .Value(arg)
+ .Done().Ptr();
+
+ fields.push_back(payload);
+
+ auto stageLambda = stage.Cast().Program();
+
+ auto orderedMap = Build<TCoOrderedMap>(ctx, node.Pos())
+ .Input(stageLambda.Body())
+ .Lambda()
+ .Args(args)
+ .Body<TCoAsStruct>()
+ .Add(fields).Build()
+ .Build()
+ .Done();
+
+ auto builder = Build<TDqSortColumnList>(ctx, node.Pos());
+ for (size_t i = 0; i < inputStats->SortColumns->Columns.size(); i++) {
+ auto columnName = inputStats->SortColumns->Columns[i];
+ if (inputStats->SortColumns->Aliases[i] != "") {
+ columnName = inputStats->SortColumns->Aliases[i] + "." + columnName;
+ }
+ builder.Add<TDqSortColumn>()
+ .Column<TCoAtom>().Build(columnName)
+ .SortDirection().Build(TTopSortSettings::AscendingSort)
+ .Build();
+ }
+
+ auto newStage = Build<TDqStage>(ctx, node.Pos())
+ .Inputs(stage.Cast().Inputs())
+ .Program()
+ .Args(stageLambda.Args())
+ .Body(orderedMap)
+ .Build()
+ .Settings(TDqStageSettings().BuildNode(ctx, node.Pos()))
+ .Done().Ptr();
+
+ auto merge = Build<TDqCnMerge>(ctx, node.Pos())
+ .Output()
+ .Stage(newStage)
+ .Index().Build(0)
+ .Build()
+ .SortColumns(builder.Build().Value())
+ .Done().Ptr();
+
+ return Build<TDqCnUnionAll>(ctx, node.Pos())
+ .Output()
+ .Stage<TDqStage>()
+ .Inputs()
+ .Add(merge)
+ .Build()
+ .Program()
+ .Args({"stream_lookup_join_output"})
+ .Body<TKqpIndexLookupJoin>()
+ .Input<TCoOrderedMap>()
+ .Input<TCoToStream>()
+ .Input("stream_lookup_join_output")
+ .Build()
+ .Lambda()
+ .Args({"arg"})
+ .Body<TCoMember>()
+ .Struct("arg")
+ .Name().Build("_payload")
+ .Build()
+ .Build()
+ .Build()
+ .JoinType(idxLookupJoin.JoinType())
+ .LeftLabel(idxLookupJoin.LeftLabel())
+ .RightLabel(idxLookupJoin.RightLabel())
+ .Build()
+ .Build()
+ .Settings(TDqStageSettings().BuildNode(ctx, node.Pos()))
+ .Build()
+ .Index().Build("0")
+ .Build()
+ .Done();
+}
+
+NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStagesKeepSortedFSM(
NYql::NNodes::TExprBase node,
NYql::TExprContext& ctx,
TTypeAnnotationContext& typeCtx,
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
index f5df541bd34..373cc251b13 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
@@ -30,11 +30,20 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase
NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStagesKeepSorted(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
NYql::TTypeAnnotationContext& typeCtx, bool ruleEnabled);
+NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStagesKeepSortedFSM(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
+ NYql::TTypeAnnotationContext& typeCtx, bool ruleEnabled);
+
NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStages(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx);
NYql::NNodes::TExprBase KqpRemoveRedundantSortOverReadTable(
NYql::NNodes::TExprBase node,
NYql::TExprContext& ctx,
+ const TKqpOptimizeContext& kqpCtx
+);
+
+NYql::NNodes::TExprBase KqpRemoveRedundantSortOverReadTableFSM(
+ NYql::NNodes::TExprBase node,
+ NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx,
const NYql::TTypeAnnotationContext& typeCtx
);
@@ -43,6 +52,10 @@ NYql::NNodes::TExprBase KqpBuildTopStageRemoveSort(NYql::NNodes::TExprBase node,
NYql::IOptimizationContext& optCtx, NYql::TTypeAnnotationContext& typeCtx, const NYql::TParentsMap& parentsMap,
bool allowStageMultiUsage, bool ruleEnabled);
+NYql::NNodes::TExprBase KqpBuildTopStageRemoveSortFSM(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
+ NYql::IOptimizationContext& optCtx, NYql::TTypeAnnotationContext& typeCtx, const NYql::TParentsMap& parentsMap,
+ bool allowStageMultiUsage, bool ruleEnabled);
+
NYql::NNodes::TExprBase KqpApplyLimitToReadTable(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
index 2fe6f2fdeb3..7658a0aae2f 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
@@ -29,7 +29,92 @@ TKqpTable GetTable(TExprBase input, bool isReadRanges) {
return input.Cast<TKqpReadTable>().Table();
};
-TExprBase KqpRemoveRedundantSortOverReadTable(
+TExprBase KqpRemoveRedundantSortOverReadTable(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
+ auto maybeSort = node.Maybe<TCoSortBase>();
+ auto maybeTopBase = node.Maybe<TCoTopBase>();
+
+ if (!maybeSort && !maybeTopBase) {
+ return node;
+ }
+
+ auto input = maybeSort ? maybeSort.Cast().Input() : maybeTopBase.Cast().Input();
+ auto sortDirections = maybeSort ? maybeSort.Cast().SortDirections() : maybeTopBase.Cast().SortDirections();
+ auto keySelector = maybeSort ? maybeSort.Cast().KeySelectorLambda() : maybeTopBase.Cast().KeySelectorLambda();
+
+ auto maybeFlatmap = input.Maybe<TCoFlatMap>();
+
+ TMaybe<THashSet<TStringBuf>> passthroughFields;
+ if (maybeFlatmap) {
+ auto flatmap = input.Cast<TCoFlatMap>();
+
+ if (!IsPassthroughFlatMap(flatmap, &passthroughFields)) {
+ return node;
+ }
+
+ input = flatmap.Input();
+ }
+
+ auto direction = GetSortDirection(sortDirections);
+ if (direction != ESortDirection::Forward && direction != ESortDirection::Reverse) {
+ return node;
+ }
+
+ bool isReadTable = input.Maybe<TKqpReadTable>().IsValid();
+ bool isReadTableRanges = input.Maybe<TKqpReadTableRanges>().IsValid() || input.Maybe<TKqpReadOlapTableRanges>().IsValid() ;
+ if (!isReadTable && !isReadTableRanges) {
+ return node;
+ }
+
+ auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, GetReadTablePath(input, isReadTableRanges));
+
+ if (tableDesc.Metadata->Kind == EKikimrTableKind::Olap) {
+ // OLAP tables are read in parallel, so we need to keep the out sort.
+ return node;
+ }
+
+ auto settings = GetReadTableSettings(input, isReadTableRanges);
+
+ if (!IsSortKeyPrimary(keySelector, tableDesc, passthroughFields)) {
+ return node;
+ }
+
+ if (direction == ESortDirection::Reverse) {
+ if (!UseSource(kqpCtx, tableDesc) && kqpCtx.IsScanQuery()) {
+ return node;
+ }
+
+ if (settings.IsReverse()) {
+ return node;
+ }
+
+ settings.SetSorting(ERequestSorting::DESC);
+
+ input = BuildReadNode(input.Pos(), ctx, input, settings);
+ } else if (direction == ESortDirection::Forward) {
+ if (UseSource(kqpCtx, tableDesc)) {
+ settings.SetSorting(ERequestSorting::ASC);
+ input = BuildReadNode(input.Pos(), ctx, input, settings);
+ }
+ }
+
+ if (maybeFlatmap) {
+ input = Build<TCoFlatMap>(ctx, node.Pos())
+ .Input(input)
+ .Lambda(maybeFlatmap.Cast().Lambda())
+ .Done();
+ }
+
+ if (maybeTopBase) {
+ return Build<TCoTake>(ctx, node.Pos())
+ .Input(input)
+ .Count(maybeTopBase.Cast().Count())
+ .Done();
+ } else {
+ return input;
+ }
+}
+
+TExprBase KqpRemoveRedundantSortOverReadTableFSM(
TExprBase node,
TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx,
@@ -141,7 +226,155 @@ TExprBase KqpRemoveRedundantSortOverReadTable(
using namespace NYql::NDq;
+bool CompatibleSort(TOptimizerStatistics::TSortColumns& existingOrder, const TCoLambda& keySelector, const TExprBase& sortDirections, TVector<TString>& sortKeys) {
+ if (auto body = keySelector.Body().Maybe<TCoMember>()) {
+ auto attrRef = body.Cast().Name().StringValue();
+ auto attrName = existingOrder.Columns[0];
+ auto attrNameWithAlias = existingOrder.Aliases[0] + "." + attrName;
+ if (attrName == attrRef || attrNameWithAlias == attrRef){
+ auto sortValue = sortDirections.Cast<TCoDataCtor>().Literal().Value();
+ if (FromString<bool>(sortValue)) {
+ sortKeys.push_back(attrRef);
+ return true;
+ }
+ }
+ }
+ else if (auto body = keySelector.Body().Maybe<TExprList>()) {
+ if (body.Cast().Size() > existingOrder.Columns.size()) {
+ return false;
+ }
+
+ bool allMatched = false;
+ auto dirs = sortDirections.Cast<TExprList>();
+ for (size_t i=0; i < body.Cast().Size(); i++) {
+ allMatched = false;
+ auto item = body.Cast().Item(i);
+ if (auto member = item.Maybe<TCoMember>()) {
+ auto attrRef = member.Cast().Name().StringValue();
+ auto attrName = existingOrder.Columns[i];
+ auto attrNameWithAlias = existingOrder.Aliases[i] + "." + attrName;
+ if (attrName == attrRef || attrNameWithAlias == attrRef){
+ auto sortValue = dirs.Item(i).Cast<TCoDataCtor>().Literal().Value();
+ if (FromString<bool>(sortValue)) {
+ sortKeys.push_back(attrRef);
+ allMatched = true;
+ }
+ }
+ }
+ if (!allMatched) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+}
+
TExprBase KqpBuildTopStageRemoveSort(
+ TExprBase node,
+ TExprContext& ctx,
+ IOptimizationContext& optCtx,
+ TTypeAnnotationContext& typeCtx,
+ const TParentsMap& parentsMap,
+ bool allowStageMultiUsage,
+ bool ruleEnabled
+) {
+ Y_UNUSED(optCtx);
+
+ if (!ruleEnabled) {
+ return node;
+ }
+
+ if (!node.Maybe<TCoTopBase>().Input().Maybe<TDqCnUnionAll>()) {
+ return node;
+ }
+
+ const auto top = node.Cast<TCoTopBase>();
+ const auto dqUnion = top.Input().Cast<TDqCnUnionAll>();
+
+ // skip this rule to activate KqpRemoveRedundantSortByPk later to reduce readings count
+ auto stageBody = dqUnion.Output().Stage().Program().Body();
+ if (stageBody.Maybe<TCoFlatMap>()) {
+ auto flatmap = dqUnion.Output().Stage().Program().Body().Cast<TCoFlatMap>();
+ auto input = flatmap.Input();
+ bool isReadTable = input.Maybe<TKqpReadTable>().IsValid();
+ bool isReadTableRanges = input.Maybe<TKqpReadTableRanges>().IsValid() || input.Maybe<TKqpReadOlapTableRanges>().IsValid() ;
+ if (IsPassthroughFlatMap(flatmap, nullptr)) {
+ if (isReadTable || isReadTableRanges) {
+ return node;
+ }
+ }
+ } else if (
+ stageBody.Maybe<TKqpReadTable>().IsValid() ||
+ stageBody.Maybe<TKqpReadTableRanges>().IsValid() ||
+ stageBody.Maybe<TKqpReadOlapTableRanges>().IsValid()
+ ) {
+ return node;
+ }
+
+ auto inputStats = typeCtx.GetStats(dqUnion.Output().Raw());
+
+ if (!inputStats || !inputStats->SortColumns) {
+ return node;
+ }
+
+ if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) {
+ return node;
+ }
+
+ if (!CanPushDqExpr(top.Count(), dqUnion) || !CanPushDqExpr(top.KeySelectorLambda(), dqUnion)) {
+ return node;
+ }
+
+ if (auto connToPushableStage = DqBuildPushableStage(dqUnion, ctx)) {
+ return TExprBase(ctx.ChangeChild(*node.Raw(), TCoTop::idx_Input, std::move(connToPushableStage)));
+ }
+
+ const auto sortKeySelector = top.KeySelectorLambda();
+ const auto sortDirections = top.SortDirections();
+ TVector<TString> sortKeys;
+
+ if (!CompatibleSort(*inputStats->SortColumns, sortKeySelector, sortDirections, sortKeys)) {
+ return node;
+ }
+
+ auto builder = Build<TDqSortColumnList>(ctx, node.Pos());
+ for (auto columnName : sortKeys ) {
+ builder.Add<TDqSortColumn>()
+ .Column<TCoAtom>().Build(columnName)
+ .SortDirection().Build(TTopSortSettings::AscendingSort)
+ .Build();
+ }
+ auto columnList = builder.Build().Value();
+
+ return Build<TDqCnUnionAll>(ctx, node.Pos())
+ .Output()
+ .Stage<TDqStage>()
+ .Inputs()
+ .Add<TDqCnMerge>()
+ .Output()
+ .Stage(dqUnion.Output().Stage())
+ .Index(dqUnion.Output().Index())
+ .Build()
+ .SortColumns(columnList)
+ .Build()
+ .Build()
+ .Program()
+ .Args({"stream"})
+ .Body<TCoTake>()
+ .Input("stream")
+ .Count(top.Count())
+ .Build()
+ .Build()
+ .Settings(NDq::TDqStageSettings::New().BuildNode(ctx, top.Pos()))
+ .Build()
+ .Index().Build(0U)
+ .Build()
+ //.SortColumns(columnList)
+ .Done();
+}
+
+TExprBase KqpBuildTopStageRemoveSortFSM(
TExprBase node,
TExprContext& ctx,
IOptimizationContext& /* optCtx */,
diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h
index 1f2ff62d181..8c57447d942 100644
--- a/ydb/core/kqp/provider/yql_kikimr_settings.h
+++ b/ydb/core/kqp/provider/yql_kikimr_settings.h
@@ -214,6 +214,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
bool EnableParallelUnionAllConnectionsForExtend = false;
bool EnableTempTablesForUser = false;
bool EnableOlapPushdownAggregate = false;
+ bool EnableOrderOptimizaionFSM = false;
bool EnableSimpleProgramsSinglePartitionOptimization = true;
diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto
index b08e74ae2cd..4ea41e9a58b 100644
--- a/ydb/core/protos/table_service_config.proto
+++ b/ydb/core/protos/table_service_config.proto
@@ -419,4 +419,6 @@ message TTableServiceConfig {
optional bool EnableKqpScanQueryUseLlvm = 97 [default = true];
optional bool EnableOlapPushdownAggregate = 98 [ default = false ];
+
+ optional bool EnableOrderOptimizaionFSM = 99 [ default = true ];
};