aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwhcrc <whcrc@ydb.tech>2022-10-05 14:06:48 +0300
committerwhcrc <whcrc@ydb.tech>2022-10-05 14:06:48 +0300
commit9fce83d8824c458077b85e46424292a9487a76c6 (patch)
treef167b56b4ab61ce1978237a0995169c05ff862a1
parent6935fc72381ab994d0b2e6bd4fbeb2def684828e (diff)
downloadydb-9fce83d8824c458077b85e46424292a9487a76c6.tar.gz
dq, reduce/produce multi in
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_core.cpp33
-rw-r--r--ydb/library/yql/core/yql_expr_constraint.cpp6
-rw-r--r--ydb/library/yql/core/yql_opt_utils.cpp41
-rw-r--r--ydb/library/yql/core/yql_opt_utils.h3
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp275
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.h4
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.cpp10
7 files changed, 319 insertions, 53 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp
index 678a587cecf..2957ae2fe55 100644
--- a/ydb/library/yql/core/type_ann/type_ann_core.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp
@@ -9958,16 +9958,25 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
}
const TTypeAnnotationNode* inputType = input->Head().GetTypeAnn();
- const TTypeAnnotationNode* resultType = nullptr;
+ const TTypeAnnotationNode* resultItemType = nullptr;
+ auto resultKind = ETypeAnnotationKind::LastType;
if (inputType->GetKind() == ETypeAnnotationKind::Tuple) {
const TTupleExprType* tupleType = inputType->Cast<TTupleExprType>();
TTypeAnnotationNode::TListType itemTypes;
TExprNode::TListType updatedChildren;
+ if (!tupleType->GetItems().empty()) {
+ resultKind = tupleType->GetItems()[0]->GetKind();
+ }
for (size_t i = 0; i < tupleType->GetSize(); ++i) {
- if (!EnsureListType(input->Head().Pos(), *tupleType->GetItems()[i], ctx.Expr)) {
+ if (tupleType->GetItems()[i]->GetKind() != resultKind) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Head().Pos()), TStringBuilder()
+ << "Expected " << resultKind << ", but got " << *tupleType->GetItems()[i]));
+ return IGraphTransformer::TStatus::Error;
+ }
+ const TTypeAnnotationNode* itemType = nullptr;
+ if (!EnsureNewSeqType<false>(input->Head().Pos(), *tupleType->GetItems()[i], ctx.Expr, &itemType)) {
return IGraphTransformer::TStatus::Error;
}
- auto itemType = tupleType->GetItems()[i]->Cast<TListExprType>()->GetItemType();
if (itemType->GetKind() == ETypeAnnotationKind::Struct
&& AnyOf(itemType->Cast<TStructExprType>()->GetItems(), [](const TItemExprType* structItem) { return structItem->GetName().StartsWith("_yql_sys_"); })) {
@@ -10002,17 +10011,25 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
output = ctx.Expr.ChangeChild(*input, 0, ctx.Expr.NewList(input->Head().Pos(), std::move(updatedChildren)));
return IGraphTransformer::TStatus::Repeat;
}
- resultType = ctx.Expr.MakeType<TListExprType>(ctx.Expr.MakeType<TVariantExprType>(ctx.Expr.MakeType<TTupleExprType>(itemTypes)));
+ resultItemType = ctx.Expr.MakeType<TVariantExprType>(ctx.Expr.MakeType<TTupleExprType>(itemTypes));
}
else if (inputType->GetKind() == ETypeAnnotationKind::Struct) {
const TStructExprType* structType = inputType->Cast<TStructExprType>();
TVector<const TItemExprType*> itemTypes;
TExprNode::TListType updatedChildren;
+ if (!structType->GetItems().empty()) {
+ resultKind = structType->GetItems()[0]->GetItemType()->GetKind();
+ }
for (size_t i = 0; i < structType->GetSize(); ++i) {
- if (!EnsureListType(input->Head().Pos(), *structType->GetItems()[i]->GetItemType(), ctx.Expr)) {
+ if (structType->GetItems()[i]->GetItemType()->GetKind() != resultKind) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Head().Pos()), TStringBuilder()
+ << "Expected " << resultKind << ", but got " << *structType->GetItems()[i]->GetItemType()));
+ return IGraphTransformer::TStatus::Error;
+ }
+ const TTypeAnnotationNode* itemType = nullptr;
+ if (!EnsureNewSeqType<false>(input->Head().Pos(), *structType->GetItems()[i]->GetItemType(), ctx.Expr, &itemType)) {
return IGraphTransformer::TStatus::Error;
}
- auto itemType = structType->GetItems()[i]->GetItemType()->Cast<TListExprType>()->GetItemType();
auto itemName = structType->GetItems()[i]->GetName();
if (itemType->GetKind() == ETypeAnnotationKind::Struct
&& AnyOf(itemType->Cast<TStructExprType>()->GetItems(), [](const TItemExprType* structItem) { return structItem->GetName().StartsWith("_yql_sys_"); })) {
@@ -10056,14 +10073,14 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
output = ctx.Expr.ChangeChild(*input, 0, ctx.Expr.NewCallable(input->Head().Pos(), "AsStruct", std::move(updatedChildren)));
return IGraphTransformer::TStatus::Repeat;
}
- resultType = ctx.Expr.MakeType<TListExprType>(ctx.Expr.MakeType<TVariantExprType>(ctx.Expr.MakeType<TStructExprType>(itemTypes)));
+ resultItemType = ctx.Expr.MakeType<TVariantExprType>(ctx.Expr.MakeType<TStructExprType>(itemTypes));
}
else {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Head().Pos()), TStringBuilder() << "Expected Tuple or Struct type, but got: " << *inputType));
return IGraphTransformer::TStatus::Error;
}
- input->SetTypeAnn(resultType);
+ input->SetTypeAnn(MakeSequenceType(resultKind, *resultItemType, ctx.Expr));
return IGraphTransformer::TStatus::Ok;
}
diff --git a/ydb/library/yql/core/yql_expr_constraint.cpp b/ydb/library/yql/core/yql_expr_constraint.cpp
index e7b57a7ea92..48e8920af51 100644
--- a/ydb/library/yql/core/yql_expr_constraint.cpp
+++ b/ydb/library/yql/core/yql_expr_constraint.cpp
@@ -1888,7 +1888,11 @@ private:
}
TStatus MuxWrap(const TExprNode::TPtr& input, TExprNode::TPtr& /*output*/, TExprContext& ctx) const {
- if (input->GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TVariantExprType>()->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple) {
+ const TTypeAnnotationNode* listItemType = GetItemType(*input->GetTypeAnn());
+ if (!listItemType) {
+ return TStatus::Ok;
+ }
+ if (listItemType->Cast<TVariantExprType>()->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Tuple) {
if (input->Child(0)->IsList()) {
TMultiConstraintNode::TMapType items;
ui32 index = 0;
diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp
index ba49839b748..f1d7a068f6c 100644
--- a/ydb/library/yql/core/yql_opt_utils.cpp
+++ b/ydb/library/yql/core/yql_opt_utils.cpp
@@ -338,7 +338,6 @@ const TTypeAnnotationNode* GetSeqItemType(const TTypeAnnotationNode* type) {
case ETypeAnnotationKind::Optional: return type->Cast<TOptionalExprType>()->GetItemType();
default: break;
}
-
THROW yexception() << "Impossible to get item type from " << *type;
}
@@ -1494,4 +1493,44 @@ const TItemExprType* GetLightColumn(const TStructExprType& type) {
return field;
}
+TVector<TStringBuf> GetCommonKeysFromVariantSelector(const NNodes::TCoLambda& lambda) {
+ if (auto maybeVisit = lambda.Body().Maybe<TCoVisit>()) {
+ if (maybeVisit.Input().Raw() == lambda.Args().Arg(0).Raw()) {
+ TVector<TStringBuf> members;
+ for (ui32 index = 1; index < maybeVisit.Raw()->ChildrenSize(); ++index) {
+ if (maybeVisit.Raw()->Child(index)->IsAtom()) {
+ ++index;
+ auto visitLambda = maybeVisit.Raw()->Child(index);
+ auto arg = visitLambda->Child(0)->Child(0);
+
+ TVector<TStringBuf> visitMembers;
+ if (TMaybeNode<TCoMember>(visitLambda->Child(1)).Struct().Raw() == arg) {
+ visitMembers.push_back(TCoMember(visitLambda->Child(1)).Name().Value());
+ }
+ else if (auto maybeList = TMaybeNode<TExprList>(visitLambda->Child(1))) {
+ for (auto item: maybeList.Cast()) {
+ if (item.Maybe<TCoMember>().Struct().Raw() == arg) {
+ visitMembers.push_back(item.Cast<TCoMember>().Name().Value());
+ } else {
+ return {};
+ }
+ }
+ }
+ if (visitMembers.empty()) {
+ return {};
+ } else if (members.empty()) {
+ members = visitMembers;
+ } else if (members != visitMembers) {
+ return {};
+ }
+ } else {
+ return {};
+ }
+ }
+ return members;
+ }
+ return {};
+ }
+ return {};
+}
}
diff --git a/ydb/library/yql/core/yql_opt_utils.h b/ydb/library/yql/core/yql_opt_utils.h
index 956bac3b4a9..f40b98163e8 100644
--- a/ydb/library/yql/core/yql_opt_utils.h
+++ b/ydb/library/yql/core/yql_opt_utils.h
@@ -114,4 +114,7 @@ inline TStringBuf GetEmptyCollectionName(const TTypeAnnotationNode* type) {
const TItemExprType* GetLightColumn(const TStructExprType& type);
+// returned value exists as long as lambda object exists
+TVector<TStringBuf> GetCommonKeysFromVariantSelector(const NNodes::TCoLambda& lambda);
+
}
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index a6d1c0885da..9b45bed52b3 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -2,6 +2,7 @@
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
+#include <ydb/library/yql/core/yql_type_helpers.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <ydb/library/yql/dq/type_ann/dq_type_ann.h>
@@ -41,10 +42,43 @@ TVector<TCoArgument> PrepareArgumentsReplacement(const TExprBase& node, const TV
}
namespace {
+TMaybeNode<TCoMux> ConvertMuxArgumentsToFlows(TCoMux node, TExprContext& ctx) {
+ auto mux = node.Cast<TCoMux>();
+ bool hasConnAsArg = false;
+ for (auto child: mux.Input().template Cast<TExprList>()) {
+ if (child.Maybe<TDqConnection>().IsValid()) {
+ hasConnAsArg = true;
+ break;
+ }
+ }
+ if (!hasConnAsArg) {
+ return {};
+ }
+ TVector<TExprBase> muxArgs;
+ for (auto child: mux.Input().template Cast<TExprList>()) {
+ if (child.Maybe<TDqConnection>().IsValid()) {
+ muxArgs.push_back(child);
+ }
+ else if (IsDqPureExpr(child)) {
+ muxArgs.push_back(Build<TCoToFlow>(ctx, node.Pos())
+ .Input(child)
+ .Done());
+ } else {
+ return {};
+ }
+ }
+ return Build<TCoMux>(ctx, node.Pos())
+ .Input<TExprList>()
+ .Add(muxArgs)
+ .Build()
+ .Done();
+}
template <typename TPartition>
TExprBase DqBuildPartitionsStageStub(TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap) {
- if (!node.Maybe<TPartition>().Input().template Maybe<TDqCnUnionAll>()) {
+ auto partitionsInput = node.Maybe<TPartition>().Input();
+ const bool isMuxInput = partitionsInput.template Maybe<TCoMux>().IsValid();
+ if (!partitionsInput.template Maybe<TDqCnUnionAll>() && !isMuxInput) {
return node;
}
@@ -55,62 +89,128 @@ TExprBase DqBuildPartitionsStageStub(TExprBase node, TExprContext& ctx, const TP
{
return node;
}
- auto dqUnion = partition.Input().template Cast<TDqCnUnionAll>();
- if (!IsSingleConsumerConnection(dqUnion, parentsMap)) {
+ // dq splits this type of lambda output into separate stage outputs
+ // thus it's impossible to maintain 'node' typing (muxing them ain't an option,
+ // cause the only purpose of this optimizer is to push original Mux to the stage)
+ if (const auto listItemType = GetItemType(*node.Ref().GetTypeAnn());
+ !listItemType || listItemType->GetKind() == ETypeAnnotationKind::Variant) {
return node;
}
- auto keyLambda = partition.KeySelectorLambda();
- TVector<TExprBase> keyElements;
- if (auto maybeTuple = keyLambda.Body().template Maybe<TExprList>()) {
- auto tuple = maybeTuple.Cast();
- for (const auto& element : tuple) {
- keyElements.push_back(element);
+ TVector<TCoArgument> inputArgs;
+ TVector<TExprBase> inputConns;
+ TExprNode::TPtr newPartitionsInput = nullptr;
+ if (isMuxInput) {
+ auto maybeMux = ConvertMuxArgumentsToFlows(node.Cast<TPartition>().Input().template Cast<TCoMux>(), ctx);
+ if (!maybeMux.IsValid()) {
+ return node;
}
+ auto mux = maybeMux.Cast();
+ const auto keys = GetCommonKeysFromVariantSelector(partition.KeySelectorLambda());
+ TVector<TExprNode::TPtr> keyColumns;
+ keyColumns.reserve(keys.size());
+ for (const auto& key: keys) {
+ keyColumns.push_back(ctx.NewAtom(node.Pos(), key));
+ }
+ TVector<TExprBase> muxArgs;
+ for (auto child: mux.Input().template Cast<TExprList>()) {
+ auto conn = child.template Maybe<TDqConnection>();
+ if (!conn.IsValid()) {
+ muxArgs.push_back(child);
+ continue;
+ }
+ if (!IsSingleConsumerConnection(conn.Cast(), parentsMap)) {
+ return node;
+ }
+ TCoArgument programArg = Build<TCoArgument>(ctx, conn.Cast().Pos())
+ .Name("arg")
+ .Done();
+ auto newConnection = conn.Cast();
+ if (!keyColumns.empty()) {
+ newConnection = Build<TDqCnHashShuffle>(ctx, node.Pos())
+ .Output()
+ .Stage(conn.Cast().Output().Stage())
+ .Index(conn.Cast().Output().Index())
+ .Build()
+ .KeyColumns()
+ .Add(keyColumns)
+ .Build()
+ .Done();
+ } else if (!conn.template Maybe<TDqCnUnionAll>().IsValid()) {
+ return node;
+ }
+ inputConns.push_back(newConnection);
+ inputArgs.push_back(programArg);
+ muxArgs.push_back(programArg);
+ };
+ newPartitionsInput = Build<TCoMux>(ctx, node.Pos())
+ .template Input<TExprList>()
+ .Add(muxArgs)
+ .Build()
+ .Done().Ptr();
} else {
- keyElements.push_back(keyLambda.Body());
- }
+ auto dqUnion = partition.Input().template Cast<TDqCnUnionAll>();
- bool allKeysAreMembers = true;
-
- TVector<TCoAtom> keyColumns;
- keyColumns.reserve(keyElements.size());
- for (auto& element : keyElements) {
- if (!element.Maybe<TCoMember>()) {
- allKeysAreMembers = false;
- break;
+ if (!IsSingleConsumerConnection(dqUnion, parentsMap)) {
+ return node;
}
- auto member = element.Cast<TCoMember>();
- if (member.Struct().Raw() != keyLambda.Args().Arg(0).Raw()) {
- return node;
+ auto keyLambda = partition.KeySelectorLambda();
+ TVector<TExprBase> keyElements;
+ if (auto maybeTuple = keyLambda.Body().template Maybe<TExprList>()) {
+ auto tuple = maybeTuple.Cast();
+ for (const auto& element : tuple) {
+ keyElements.push_back(element);
+ }
+ } else {
+ keyElements.push_back(keyLambda.Body());
}
- keyColumns.push_back(member.Name());
- }
+ bool allKeysAreMembers = true;
- TExprNode::TPtr newConnection;
+ TVector<TCoAtom> keyColumns;
+ keyColumns.reserve(keyElements.size());
+ for (auto& element : keyElements) {
+ if (!element.Maybe<TCoMember>()) {
+ allKeysAreMembers = false;
+ break;
+ }
- if (!keyColumns.empty() && allKeysAreMembers) {
- newConnection = Build<TDqCnHashShuffle>(ctx, node.Pos())
- .Output()
- .Stage(dqUnion.Output().Stage())
- .Index(dqUnion.Output().Index())
- .Build()
- .KeyColumns()
- .Add(keyColumns)
- .Build()
- .Done().Ptr();
- } else if (keyColumns.empty()) {
- newConnection = Build<TDqCnUnionAll>(ctx, node.Pos())
+ auto member = element.Cast<TCoMember>();
+ if (member.Struct().Raw() != keyLambda.Args().Arg(0).Raw()) {
+ return node;
+ }
+
+ keyColumns.push_back(member.Name());
+ }
+
+ TDqConnection newConnection = Build<TDqCnUnionAll>(ctx, node.Pos())
.Output()
.Stage(dqUnion.Output().Stage())
.Index(dqUnion.Output().Index())
.Build()
- .Done().Ptr();
- } else {
- return node;
+ .Done();
+ if (!keyColumns.empty() && allKeysAreMembers) {
+ newConnection = Build<TDqCnHashShuffle>(ctx, node.Pos())
+ .Output()
+ .Stage(dqUnion.Output().Stage())
+ .Index(dqUnion.Output().Index())
+ .Build()
+ .KeyColumns()
+ .Add(keyColumns)
+ .Build()
+ .Done();
+ } else if (!keyColumns.empty()) {
+ return node;
+ }
+
+ TCoArgument programArg = Build<TCoArgument>(ctx, node.Pos())
+ .Name("arg")
+ .Done();
+ inputConns.push_back(newConnection);
+ inputArgs.push_back(programArg);
+ newPartitionsInput = programArg.Ptr();
}
auto handler = partition.ListHandlerLambda();
@@ -136,12 +236,12 @@ TExprBase DqBuildPartitionsStageStub(TExprBase node, TExprContext& ctx, const TP
auto partitionStage = Build<TDqStage>(ctx, node.Pos())
.Inputs()
- .Add(newConnection)
+ .Add(inputConns)
.Build()
.Program()
- .Args({"rows"})
+ .Args(inputArgs)
.Body<TPartition>()
- .Input("rows")
+ .Input(newPartitionsInput)
.KeySelectorLambda(ctx.DeepCopyLambda(partition.KeySelectorLambda().Ref()))
.SortDirections(partition.SortDirections())
.SortKeySelectorLambda(partition.SortKeySelectorLambda().template Maybe<TCoLambda>()
@@ -626,6 +726,95 @@ TExprBase DqPushLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationCont
return DqPushBaseLMapToStage<TCoLMap>(node, ctx, optCtx, parentsMap, allowStageMultiUsage);
}
+template <typename BaseLMap>
+TExprBase DqBuildLMapOverMuxStageStub(TExprBase node, TExprContext& ctx, NYql::IOptimizationContext&, const NYql::TParentsMap& parentsMap) {
+ if (!node.Maybe<BaseLMap>()) {
+ return node;
+ }
+ auto lmap = node.Cast<BaseLMap>();
+ auto maybeMux = lmap.Input().template Maybe<TCoMux>();
+ if (!maybeMux.IsValid()) {
+ return node;
+ }
+ maybeMux = ConvertMuxArgumentsToFlows(maybeMux.Cast(), ctx);
+ if (!maybeMux.IsValid()) {
+ return node;
+ }
+ auto mux = maybeMux.Cast();
+ const TTypeAnnotationNode* listItemType = GetItemType(*node.Ref().GetTypeAnn());
+ if (!listItemType) {
+ return node;
+ }
+ // dq splits this type of lambda output into separate stage outputs
+ // thus it's impossible to maintain 'node' typing (muxing them ain't an option, cause the only purpose of this optimizer is to push original Mux to the stage)
+ if (listItemType->GetKind() == ETypeAnnotationKind::Variant) {
+ return node;
+ }
+
+ if (!IsDqPureExpr(lmap.Lambda())) {
+ return node;
+ }
+
+ YQL_CLOG(DEBUG, CoreDq) << "DqBuildLMapOverMuxStage";
+ TVector<TCoArgument> inputArgs;
+ TVector<TExprBase> inputConns;
+ TVector<TExprBase> muxArgs;
+ for (auto child: mux.Input().template Cast<TExprList>()) {
+ auto conn = child.template Maybe<TDqConnection>();
+ if (!conn.IsValid()) {
+ muxArgs.push_back(child);
+ continue;
+ }
+ if (!IsSingleConsumerConnection(conn.Cast(), parentsMap)) {
+ return node;
+ }
+ TCoArgument programArg = Build<TCoArgument>(ctx, conn.Cast().Pos())
+ .Name("arg")
+ .Done();
+ inputConns.push_back(conn.Cast());
+ inputArgs.push_back(programArg);
+ muxArgs.push_back(programArg);
+ };
+
+ auto newMux = Build<TCoMux>(ctx, lmap.Input().Pos())
+ .template Input<TExprList>()
+ .Add(muxArgs)
+ .Build()
+ .Done().Ptr();
+ auto lmapLambda = ctx.DeepCopyLambda(lmap.Lambda().Ref());
+ Y_VERIFY(lmapLambda->Child(0)->ChildrenSize() == 1, "unexpected number of arguments in lmap lambda");
+ auto newBody = ctx.ReplaceNodes(lmapLambda->Child(1), {{lmapLambda->Child(0)->Child(0), newMux}});
+ auto lmapStage = Build<TDqStage>(ctx, lmap.Pos())
+ .Inputs()
+ .Add(inputConns)
+ .Build()
+ .Program()
+ .Args(inputArgs)
+ .Body(newBody)
+ .Build()
+ .Settings(TDqStageSettings().BuildNode(ctx, node.Pos()))
+ .Done();
+
+ return Build<TDqCnUnionAll>(ctx, node.Pos())
+ .Output()
+ .Stage(lmapStage)
+ .Index().Build("0")
+ .Build()
+ .Done();
+}
+
+TExprBase DqBuildOrderedLMapOverMuxStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap)
+{
+ return DqBuildLMapOverMuxStageStub<TCoOrderedLMap>(node, ctx, optCtx, parentsMap);
+}
+
+TExprBase DqBuildLMapOverMuxStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap)
+{
+ return DqBuildLMapOverMuxStageStub<TCoLMap>(node, ctx, optCtx, parentsMap);
+}
+
TExprBase DqPushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage)
{
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h
index d955441fa20..242902bd22a 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.h
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.h
@@ -94,6 +94,10 @@ TMaybe<std::pair<NNodes::TExprBase, NNodes::TDqConnection>> ExtractPureExprStag
NNodes::TExprBase DqBuildPureExprStage(NNodes::TExprBase node, TExprContext& ctx);
+NNodes::TExprBase DqBuildOrderedLMapOverMuxStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap);
+
+NNodes::TExprBase DqBuildLMapOverMuxStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap);
+
NNodes::TExprBase DqBuildExtendStage(NNodes::TExprBase node, TExprContext& ctx);
NNodes::TExprBase DqBuildPrecompute(NNodes::TExprBase node, TExprContext& ctx);
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
index 8ededee8059..6ccb1e42a75 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
@@ -44,6 +44,8 @@ public:
AddHandler(0, &TCoAssumeSorted::Match, HNDL(BuildSortStage<false>));
AddHandler(0, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<false>));
AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>));
+ AddHandler(0, &TCoOrderedLMap::Match, HNDL(BuildOrderedLMapOverMuxStage));
+ AddHandler(0, &TCoLMap::Match, HNDL(BuildLMapOverMuxStage));
if (enablePrecompute) {
AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems));
AddHandler(0, &TCoSqlIn::Match, HNDL(BuildSqlIn<false>));
@@ -244,6 +246,14 @@ protected:
return DqPushLMapToStage(node, ctx, optCtx, *getParents(), IsGlobal);
}
+ TMaybeNode<TExprBase> BuildOrderedLMapOverMuxStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
+ return DqBuildOrderedLMapOverMuxStage(node, ctx, optCtx, *getParents());
+ }
+
+ TMaybeNode<TExprBase> BuildLMapOverMuxStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
+ return DqBuildLMapOverMuxStage(node, ctx, optCtx, *getParents());
+ }
+
template <bool IsGlobal>
TMaybeNode<TExprBase> PushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
return DqPushCombineToStage(node, ctx, optCtx, *getParents(), IsGlobal);