diff options
author | whcrc <whcrc@ydb.tech> | 2022-10-05 14:06:48 +0300 |
---|---|---|
committer | whcrc <whcrc@ydb.tech> | 2022-10-05 14:06:48 +0300 |
commit | 9fce83d8824c458077b85e46424292a9487a76c6 (patch) | |
tree | f167b56b4ab61ce1978237a0995169c05ff862a1 | |
parent | 6935fc72381ab994d0b2e6bd4fbeb2def684828e (diff) | |
download | ydb-9fce83d8824c458077b85e46424292a9487a76c6.tar.gz |
dq, reduce/produce multi in
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_core.cpp | 33 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_expr_constraint.cpp | 6 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_opt_utils.cpp | 41 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_opt_utils.h | 3 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 275 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.h | 4 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/opt/physical_optimize.cpp | 10 |
7 files changed, 319 insertions, 53 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index 678a587cecf..2957ae2fe55 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -9958,16 +9958,25 @@ template <NKikimr::NUdf::EDataSlot DataSlot> } const TTypeAnnotationNode* inputType = input->Head().GetTypeAnn(); - const TTypeAnnotationNode* resultType = nullptr; + const TTypeAnnotationNode* resultItemType = nullptr; + auto resultKind = ETypeAnnotationKind::LastType; if (inputType->GetKind() == ETypeAnnotationKind::Tuple) { const TTupleExprType* tupleType = inputType->Cast<TTupleExprType>(); TTypeAnnotationNode::TListType itemTypes; TExprNode::TListType updatedChildren; + if (!tupleType->GetItems().empty()) { + resultKind = tupleType->GetItems()[0]->GetKind(); + } for (size_t i = 0; i < tupleType->GetSize(); ++i) { - if (!EnsureListType(input->Head().Pos(), *tupleType->GetItems()[i], ctx.Expr)) { + if (tupleType->GetItems()[i]->GetKind() != resultKind) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Head().Pos()), TStringBuilder() + << "Expected " << resultKind << ", but got " << *tupleType->GetItems()[i])); + return IGraphTransformer::TStatus::Error; + } + const TTypeAnnotationNode* itemType = nullptr; + if (!EnsureNewSeqType<false>(input->Head().Pos(), *tupleType->GetItems()[i], ctx.Expr, &itemType)) { return IGraphTransformer::TStatus::Error; } - auto itemType = tupleType->GetItems()[i]->Cast<TListExprType>()->GetItemType(); if (itemType->GetKind() == ETypeAnnotationKind::Struct && AnyOf(itemType->Cast<TStructExprType>()->GetItems(), [](const TItemExprType* structItem) { return structItem->GetName().StartsWith("_yql_sys_"); })) { @@ -10002,17 +10011,25 @@ template <NKikimr::NUdf::EDataSlot DataSlot> output = ctx.Expr.ChangeChild(*input, 0, ctx.Expr.NewList(input->Head().Pos(), std::move(updatedChildren))); return IGraphTransformer::TStatus::Repeat; } - resultType = ctx.Expr.MakeType<TListExprType>(ctx.Expr.MakeType<TVariantExprType>(ctx.Expr.MakeType<TTupleExprType>(itemTypes))); + resultItemType = ctx.Expr.MakeType<TVariantExprType>(ctx.Expr.MakeType<TTupleExprType>(itemTypes)); } else if (inputType->GetKind() == ETypeAnnotationKind::Struct) { const TStructExprType* structType = inputType->Cast<TStructExprType>(); TVector<const TItemExprType*> itemTypes; TExprNode::TListType updatedChildren; + if (!structType->GetItems().empty()) { + resultKind = structType->GetItems()[0]->GetItemType()->GetKind(); + } for (size_t i = 0; i < structType->GetSize(); ++i) { - if (!EnsureListType(input->Head().Pos(), *structType->GetItems()[i]->GetItemType(), ctx.Expr)) { + if (structType->GetItems()[i]->GetItemType()->GetKind() != resultKind) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Head().Pos()), TStringBuilder() + << "Expected " << resultKind << ", but got " << *structType->GetItems()[i]->GetItemType())); + return IGraphTransformer::TStatus::Error; + } + const TTypeAnnotationNode* itemType = nullptr; + if (!EnsureNewSeqType<false>(input->Head().Pos(), *structType->GetItems()[i]->GetItemType(), ctx.Expr, &itemType)) { return IGraphTransformer::TStatus::Error; } - auto itemType = structType->GetItems()[i]->GetItemType()->Cast<TListExprType>()->GetItemType(); auto itemName = structType->GetItems()[i]->GetName(); if (itemType->GetKind() == ETypeAnnotationKind::Struct && AnyOf(itemType->Cast<TStructExprType>()->GetItems(), [](const TItemExprType* structItem) { return structItem->GetName().StartsWith("_yql_sys_"); })) { @@ -10056,14 +10073,14 @@ template <NKikimr::NUdf::EDataSlot DataSlot> output = ctx.Expr.ChangeChild(*input, 0, ctx.Expr.NewCallable(input->Head().Pos(), "AsStruct", std::move(updatedChildren))); return IGraphTransformer::TStatus::Repeat; } - resultType = ctx.Expr.MakeType<TListExprType>(ctx.Expr.MakeType<TVariantExprType>(ctx.Expr.MakeType<TStructExprType>(itemTypes))); + resultItemType = ctx.Expr.MakeType<TVariantExprType>(ctx.Expr.MakeType<TStructExprType>(itemTypes)); } else { ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Head().Pos()), TStringBuilder() << "Expected Tuple or Struct type, but got: " << *inputType)); return IGraphTransformer::TStatus::Error; } - input->SetTypeAnn(resultType); + input->SetTypeAnn(MakeSequenceType(resultKind, *resultItemType, ctx.Expr)); return IGraphTransformer::TStatus::Ok; } diff --git a/ydb/library/yql/core/yql_expr_constraint.cpp b/ydb/library/yql/core/yql_expr_constraint.cpp index e7b57a7ea92..48e8920af51 100644 --- a/ydb/library/yql/core/yql_expr_constraint.cpp +++ b/ydb/library/yql/core/yql_expr_constraint.cpp @@ -1888,7 +1888,11 @@ private: } TStatus MuxWrap(const TExprNode::TPtr& input, TExprNode::TPtr& /*output*/, TExprContext& ctx) const { - if (input->GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TVariantExprType>()->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple) { + const TTypeAnnotationNode* listItemType = GetItemType(*input->GetTypeAnn()); + if (!listItemType) { + return TStatus::Ok; + } + if (listItemType->Cast<TVariantExprType>()->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple) { if (input->Child(0)->IsList()) { TMultiConstraintNode::TMapType items; ui32 index = 0; diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp index ba49839b748..f1d7a068f6c 100644 --- a/ydb/library/yql/core/yql_opt_utils.cpp +++ b/ydb/library/yql/core/yql_opt_utils.cpp @@ -338,7 +338,6 @@ const TTypeAnnotationNode* GetSeqItemType(const TTypeAnnotationNode* type) { case ETypeAnnotationKind::Optional: return type->Cast<TOptionalExprType>()->GetItemType(); default: break; } - THROW yexception() << "Impossible to get item type from " << *type; } @@ -1494,4 +1493,44 @@ const TItemExprType* GetLightColumn(const TStructExprType& type) { return field; } +TVector<TStringBuf> GetCommonKeysFromVariantSelector(const NNodes::TCoLambda& lambda) { + if (auto maybeVisit = lambda.Body().Maybe<TCoVisit>()) { + if (maybeVisit.Input().Raw() == lambda.Args().Arg(0).Raw()) { + TVector<TStringBuf> members; + for (ui32 index = 1; index < maybeVisit.Raw()->ChildrenSize(); ++index) { + if (maybeVisit.Raw()->Child(index)->IsAtom()) { + ++index; + auto visitLambda = maybeVisit.Raw()->Child(index); + auto arg = visitLambda->Child(0)->Child(0); + + TVector<TStringBuf> visitMembers; + if (TMaybeNode<TCoMember>(visitLambda->Child(1)).Struct().Raw() == arg) { + visitMembers.push_back(TCoMember(visitLambda->Child(1)).Name().Value()); + } + else if (auto maybeList = TMaybeNode<TExprList>(visitLambda->Child(1))) { + for (auto item: maybeList.Cast()) { + if (item.Maybe<TCoMember>().Struct().Raw() == arg) { + visitMembers.push_back(item.Cast<TCoMember>().Name().Value()); + } else { + return {}; + } + } + } + if (visitMembers.empty()) { + return {}; + } else if (members.empty()) { + members = visitMembers; + } else if (members != visitMembers) { + return {}; + } + } else { + return {}; + } + } + return members; + } + return {}; + } + return {}; +} } diff --git a/ydb/library/yql/core/yql_opt_utils.h b/ydb/library/yql/core/yql_opt_utils.h index 956bac3b4a9..f40b98163e8 100644 --- a/ydb/library/yql/core/yql_opt_utils.h +++ b/ydb/library/yql/core/yql_opt_utils.h @@ -114,4 +114,7 @@ inline TStringBuf GetEmptyCollectionName(const TTypeAnnotationNode* type) { const TItemExprType* GetLightColumn(const TStructExprType& type); +// returned value exists as long as lambda object exists +TVector<TStringBuf> GetCommonKeysFromVariantSelector(const NNodes::TCoLambda& lambda); + } diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index a6d1c0885da..9b45bed52b3 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -2,6 +2,7 @@ #include <ydb/library/yql/core/yql_expr_type_annotation.h> #include <ydb/library/yql/core/yql_opt_utils.h> +#include <ydb/library/yql/core/yql_type_helpers.h> #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/dq/type_ann/dq_type_ann.h> @@ -41,10 +42,43 @@ TVector<TCoArgument> PrepareArgumentsReplacement(const TExprBase& node, const TV } namespace { +TMaybeNode<TCoMux> ConvertMuxArgumentsToFlows(TCoMux node, TExprContext& ctx) { + auto mux = node.Cast<TCoMux>(); + bool hasConnAsArg = false; + for (auto child: mux.Input().template Cast<TExprList>()) { + if (child.Maybe<TDqConnection>().IsValid()) { + hasConnAsArg = true; + break; + } + } + if (!hasConnAsArg) { + return {}; + } + TVector<TExprBase> muxArgs; + for (auto child: mux.Input().template Cast<TExprList>()) { + if (child.Maybe<TDqConnection>().IsValid()) { + muxArgs.push_back(child); + } + else if (IsDqPureExpr(child)) { + muxArgs.push_back(Build<TCoToFlow>(ctx, node.Pos()) + .Input(child) + .Done()); + } else { + return {}; + } + } + return Build<TCoMux>(ctx, node.Pos()) + .Input<TExprList>() + .Add(muxArgs) + .Build() + .Done(); +} template <typename TPartition> TExprBase DqBuildPartitionsStageStub(TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap) { - if (!node.Maybe<TPartition>().Input().template Maybe<TDqCnUnionAll>()) { + auto partitionsInput = node.Maybe<TPartition>().Input(); + const bool isMuxInput = partitionsInput.template Maybe<TCoMux>().IsValid(); + if (!partitionsInput.template Maybe<TDqCnUnionAll>() && !isMuxInput) { return node; } @@ -55,62 +89,128 @@ TExprBase DqBuildPartitionsStageStub(TExprBase node, TExprContext& ctx, const TP { return node; } - auto dqUnion = partition.Input().template Cast<TDqCnUnionAll>(); - if (!IsSingleConsumerConnection(dqUnion, parentsMap)) { + // dq splits this type of lambda output into separate stage outputs + // thus it's impossible to maintain 'node' typing (muxing them ain't an option, + // cause the only purpose of this optimizer is to push original Mux to the stage) + if (const auto listItemType = GetItemType(*node.Ref().GetTypeAnn()); + !listItemType || listItemType->GetKind() == ETypeAnnotationKind::Variant) { return node; } - auto keyLambda = partition.KeySelectorLambda(); - TVector<TExprBase> keyElements; - if (auto maybeTuple = keyLambda.Body().template Maybe<TExprList>()) { - auto tuple = maybeTuple.Cast(); - for (const auto& element : tuple) { - keyElements.push_back(element); + TVector<TCoArgument> inputArgs; + TVector<TExprBase> inputConns; + TExprNode::TPtr newPartitionsInput = nullptr; + if (isMuxInput) { + auto maybeMux = ConvertMuxArgumentsToFlows(node.Cast<TPartition>().Input().template Cast<TCoMux>(), ctx); + if (!maybeMux.IsValid()) { + return node; } + auto mux = maybeMux.Cast(); + const auto keys = GetCommonKeysFromVariantSelector(partition.KeySelectorLambda()); + TVector<TExprNode::TPtr> keyColumns; + keyColumns.reserve(keys.size()); + for (const auto& key: keys) { + keyColumns.push_back(ctx.NewAtom(node.Pos(), key)); + } + TVector<TExprBase> muxArgs; + for (auto child: mux.Input().template Cast<TExprList>()) { + auto conn = child.template Maybe<TDqConnection>(); + if (!conn.IsValid()) { + muxArgs.push_back(child); + continue; + } + if (!IsSingleConsumerConnection(conn.Cast(), parentsMap)) { + return node; + } + TCoArgument programArg = Build<TCoArgument>(ctx, conn.Cast().Pos()) + .Name("arg") + .Done(); + auto newConnection = conn.Cast(); + if (!keyColumns.empty()) { + newConnection = Build<TDqCnHashShuffle>(ctx, node.Pos()) + .Output() + .Stage(conn.Cast().Output().Stage()) + .Index(conn.Cast().Output().Index()) + .Build() + .KeyColumns() + .Add(keyColumns) + .Build() + .Done(); + } else if (!conn.template Maybe<TDqCnUnionAll>().IsValid()) { + return node; + } + inputConns.push_back(newConnection); + inputArgs.push_back(programArg); + muxArgs.push_back(programArg); + }; + newPartitionsInput = Build<TCoMux>(ctx, node.Pos()) + .template Input<TExprList>() + .Add(muxArgs) + .Build() + .Done().Ptr(); } else { - keyElements.push_back(keyLambda.Body()); - } + auto dqUnion = partition.Input().template Cast<TDqCnUnionAll>(); - bool allKeysAreMembers = true; - - TVector<TCoAtom> keyColumns; - keyColumns.reserve(keyElements.size()); - for (auto& element : keyElements) { - if (!element.Maybe<TCoMember>()) { - allKeysAreMembers = false; - break; + if (!IsSingleConsumerConnection(dqUnion, parentsMap)) { + return node; } - auto member = element.Cast<TCoMember>(); - if (member.Struct().Raw() != keyLambda.Args().Arg(0).Raw()) { - return node; + auto keyLambda = partition.KeySelectorLambda(); + TVector<TExprBase> keyElements; + if (auto maybeTuple = keyLambda.Body().template Maybe<TExprList>()) { + auto tuple = maybeTuple.Cast(); + for (const auto& element : tuple) { + keyElements.push_back(element); + } + } else { + keyElements.push_back(keyLambda.Body()); } - keyColumns.push_back(member.Name()); - } + bool allKeysAreMembers = true; - TExprNode::TPtr newConnection; + TVector<TCoAtom> keyColumns; + keyColumns.reserve(keyElements.size()); + for (auto& element : keyElements) { + if (!element.Maybe<TCoMember>()) { + allKeysAreMembers = false; + break; + } - if (!keyColumns.empty() && allKeysAreMembers) { - newConnection = Build<TDqCnHashShuffle>(ctx, node.Pos()) - .Output() - .Stage(dqUnion.Output().Stage()) - .Index(dqUnion.Output().Index()) - .Build() - .KeyColumns() - .Add(keyColumns) - .Build() - .Done().Ptr(); - } else if (keyColumns.empty()) { - newConnection = Build<TDqCnUnionAll>(ctx, node.Pos()) + auto member = element.Cast<TCoMember>(); + if (member.Struct().Raw() != keyLambda.Args().Arg(0).Raw()) { + return node; + } + + keyColumns.push_back(member.Name()); + } + + TDqConnection newConnection = Build<TDqCnUnionAll>(ctx, node.Pos()) .Output() .Stage(dqUnion.Output().Stage()) .Index(dqUnion.Output().Index()) .Build() - .Done().Ptr(); - } else { - return node; + .Done(); + if (!keyColumns.empty() && allKeysAreMembers) { + newConnection = Build<TDqCnHashShuffle>(ctx, node.Pos()) + .Output() + .Stage(dqUnion.Output().Stage()) + .Index(dqUnion.Output().Index()) + .Build() + .KeyColumns() + .Add(keyColumns) + .Build() + .Done(); + } else if (!keyColumns.empty()) { + return node; + } + + TCoArgument programArg = Build<TCoArgument>(ctx, node.Pos()) + .Name("arg") + .Done(); + inputConns.push_back(newConnection); + inputArgs.push_back(programArg); + newPartitionsInput = programArg.Ptr(); } auto handler = partition.ListHandlerLambda(); @@ -136,12 +236,12 @@ TExprBase DqBuildPartitionsStageStub(TExprBase node, TExprContext& ctx, const TP auto partitionStage = Build<TDqStage>(ctx, node.Pos()) .Inputs() - .Add(newConnection) + .Add(inputConns) .Build() .Program() - .Args({"rows"}) + .Args(inputArgs) .Body<TPartition>() - .Input("rows") + .Input(newPartitionsInput) .KeySelectorLambda(ctx.DeepCopyLambda(partition.KeySelectorLambda().Ref())) .SortDirections(partition.SortDirections()) .SortKeySelectorLambda(partition.SortKeySelectorLambda().template Maybe<TCoLambda>() @@ -626,6 +726,95 @@ TExprBase DqPushLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationCont return DqPushBaseLMapToStage<TCoLMap>(node, ctx, optCtx, parentsMap, allowStageMultiUsage); } +template <typename BaseLMap> +TExprBase DqBuildLMapOverMuxStageStub(TExprBase node, TExprContext& ctx, NYql::IOptimizationContext&, const NYql::TParentsMap& parentsMap) { + if (!node.Maybe<BaseLMap>()) { + return node; + } + auto lmap = node.Cast<BaseLMap>(); + auto maybeMux = lmap.Input().template Maybe<TCoMux>(); + if (!maybeMux.IsValid()) { + return node; + } + maybeMux = ConvertMuxArgumentsToFlows(maybeMux.Cast(), ctx); + if (!maybeMux.IsValid()) { + return node; + } + auto mux = maybeMux.Cast(); + const TTypeAnnotationNode* listItemType = GetItemType(*node.Ref().GetTypeAnn()); + if (!listItemType) { + return node; + } + // dq splits this type of lambda output into separate stage outputs + // thus it's impossible to maintain 'node' typing (muxing them ain't an option, cause the only purpose of this optimizer is to push original Mux to the stage) + if (listItemType->GetKind() == ETypeAnnotationKind::Variant) { + return node; + } + + if (!IsDqPureExpr(lmap.Lambda())) { + return node; + } + + YQL_CLOG(DEBUG, CoreDq) << "DqBuildLMapOverMuxStage"; + TVector<TCoArgument> inputArgs; + TVector<TExprBase> inputConns; + TVector<TExprBase> muxArgs; + for (auto child: mux.Input().template Cast<TExprList>()) { + auto conn = child.template Maybe<TDqConnection>(); + if (!conn.IsValid()) { + muxArgs.push_back(child); + continue; + } + if (!IsSingleConsumerConnection(conn.Cast(), parentsMap)) { + return node; + } + TCoArgument programArg = Build<TCoArgument>(ctx, conn.Cast().Pos()) + .Name("arg") + .Done(); + inputConns.push_back(conn.Cast()); + inputArgs.push_back(programArg); + muxArgs.push_back(programArg); + }; + + auto newMux = Build<TCoMux>(ctx, lmap.Input().Pos()) + .template Input<TExprList>() + .Add(muxArgs) + .Build() + .Done().Ptr(); + auto lmapLambda = ctx.DeepCopyLambda(lmap.Lambda().Ref()); + Y_VERIFY(lmapLambda->Child(0)->ChildrenSize() == 1, "unexpected number of arguments in lmap lambda"); + auto newBody = ctx.ReplaceNodes(lmapLambda->Child(1), {{lmapLambda->Child(0)->Child(0), newMux}}); + auto lmapStage = Build<TDqStage>(ctx, lmap.Pos()) + .Inputs() + .Add(inputConns) + .Build() + .Program() + .Args(inputArgs) + .Body(newBody) + .Build() + .Settings(TDqStageSettings().BuildNode(ctx, node.Pos())) + .Done(); + + return Build<TDqCnUnionAll>(ctx, node.Pos()) + .Output() + .Stage(lmapStage) + .Index().Build("0") + .Build() + .Done(); +} + +TExprBase DqBuildOrderedLMapOverMuxStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap) +{ + return DqBuildLMapOverMuxStageStub<TCoOrderedLMap>(node, ctx, optCtx, parentsMap); +} + +TExprBase DqBuildLMapOverMuxStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap) +{ + return DqBuildLMapOverMuxStageStub<TCoLMap>(node, ctx, optCtx, parentsMap); +} + TExprBase DqPushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage) { diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index d955441fa20..242902bd22a 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -94,6 +94,10 @@ TMaybe<std::pair<NNodes::TExprBase, NNodes::TDqConnection>> ExtractPureExprStag NNodes::TExprBase DqBuildPureExprStage(NNodes::TExprBase node, TExprContext& ctx); +NNodes::TExprBase DqBuildOrderedLMapOverMuxStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap); + +NNodes::TExprBase DqBuildLMapOverMuxStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap); + NNodes::TExprBase DqBuildExtendStage(NNodes::TExprBase node, TExprContext& ctx); NNodes::TExprBase DqBuildPrecompute(NNodes::TExprBase node, TExprContext& ctx); diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index 8ededee8059..6ccb1e42a75 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -44,6 +44,8 @@ public: AddHandler(0, &TCoAssumeSorted::Match, HNDL(BuildSortStage<false>)); AddHandler(0, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<false>)); AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>)); + AddHandler(0, &TCoOrderedLMap::Match, HNDL(BuildOrderedLMapOverMuxStage)); + AddHandler(0, &TCoLMap::Match, HNDL(BuildLMapOverMuxStage)); if (enablePrecompute) { AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems)); AddHandler(0, &TCoSqlIn::Match, HNDL(BuildSqlIn<false>)); @@ -244,6 +246,14 @@ protected: return DqPushLMapToStage(node, ctx, optCtx, *getParents(), IsGlobal); } + TMaybeNode<TExprBase> BuildOrderedLMapOverMuxStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { + return DqBuildOrderedLMapOverMuxStage(node, ctx, optCtx, *getParents()); + } + + TMaybeNode<TExprBase> BuildLMapOverMuxStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { + return DqBuildLMapOverMuxStage(node, ctx, optCtx, *getParents()); + } + template <bool IsGlobal> TMaybeNode<TExprBase> PushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { return DqPushCombineToStage(node, ctx, optCtx, *getParents(), IsGlobal); |