aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-02-01 12:58:30 +0300
committeraneporada <aneporada@ydb.tech>2023-02-01 12:58:30 +0300
commit0a953ae6cfcc600db9eefde4b859a29b3e8dac1a (patch)
tree51b53947b9f8206f4ce6846224f48f9e945ad88e
parent08e260ca8b66f19c5efb9788298735c84c2f7d6c (diff)
downloadydb-0a953ae6cfcc600db9eefde4b859a29b3e8dac1a.tar.gz
Fix PartitionsByKey/ShuffleByKeys expansion with non-trivial key selectors
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp36
-rw-r--r--ydb/library/yql/core/yql_aggregate_expander.cpp39
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp325
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.h12
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.cpp30
5 files changed, 304 insertions, 138 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index 90ec709c7a..c559b0eba2 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -45,10 +45,10 @@ public:
AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildPureFlatmapStage));
AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<false>));
AddHandler(0, &TCoCombineByKey::Match, HNDL(PushCombineToStage<false>));
- AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage));
- AddHandler(0, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage));
- AddHandler(0, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage));
- AddHandler(0, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage));
+ AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage<false>));
+ AddHandler(0, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<false>));
+ AddHandler(0, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage<false>));
+ AddHandler(0, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage<false>));
AddHandler(0, &TCoTopSort::Match, HNDL(BuildTopSortStage<false>));
AddHandler(0, &TCoTakeBase::Match, HNDL(BuildTakeSkipStage<false>));
AddHandler(0, &TCoSortBase::Match, HNDL(BuildSortStage<false>));
@@ -92,6 +92,10 @@ public:
AddHandler(1, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<true>));
AddHandler(1, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<true>));
AddHandler(1, &TCoCombineByKey::Match, HNDL(PushCombineToStage<true>));
+ AddHandler(1, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage<true>));
+ AddHandler(1, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<true>));
+ AddHandler(1, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage<true>));
+ AddHandler(1, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage<true>));
AddHandler(1, &TCoTopSort::Match, HNDL(BuildTopSortStage<true>));
AddHandler(1, &TCoTakeBase::Match, HNDL(BuildTakeSkipStage<true>));
AddHandler(1, &TCoSortBase::Match, HNDL(BuildSortStage<true>));
@@ -251,26 +255,36 @@ protected:
return output;
}
- TMaybeNode<TExprBase> BuildShuffleStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
- TExprBase output = DqBuildShuffleStage(node, ctx, *getParents());
+ template <bool IsGlobal>
+ TMaybeNode<TExprBase> BuildShuffleStage(TExprBase node, TExprContext& ctx,
+ IOptimizationContext& optCtx, const TGetParents& getParents)
+ {
+ TExprBase output = DqBuildShuffleStage(node, ctx, optCtx, *getParents(), IsGlobal);
DumpAppliedRule("BuildShuffleStage", node.Ptr(), output.Ptr(), ctx);
return output;
}
+ template <bool IsGlobal>
TMaybeNode<TExprBase> BuildFinalizeByKeyStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
- TExprBase output = DqBuildFinalizeByKeyStage(node, ctx, *getParents());
+ TExprBase output = DqBuildFinalizeByKeyStage(node, ctx, *getParents(), IsGlobal);
DumpAppliedRule("BuildFinalizeByKeyStage", node.Ptr(), output.Ptr(), ctx);
return output;
}
- TMaybeNode<TExprBase> BuildPartitionsStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
- TExprBase output = DqBuildPartitionsStage(node, ctx, *getParents());
+ template <bool IsGlobal>
+ TMaybeNode<TExprBase> BuildPartitionsStage(TExprBase node, TExprContext& ctx,
+ IOptimizationContext& optCtx, const TGetParents& getParents)
+ {
+ TExprBase output = DqBuildPartitionsStage(node, ctx, optCtx, *getParents(), IsGlobal);
DumpAppliedRule("BuildPartitionsStage", node.Ptr(), output.Ptr(), ctx);
return output;
}
- TMaybeNode<TExprBase> BuildPartitionStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
- TExprBase output = DqBuildPartitionStage(node, ctx, *getParents());
+ template <bool IsGlobal>
+ TMaybeNode<TExprBase> BuildPartitionStage(TExprBase node, TExprContext& ctx,
+ IOptimizationContext& optCtx, const TGetParents& getParents)
+ {
+ TExprBase output = DqBuildPartitionStage(node, ctx, optCtx, *getParents(), IsGlobal);
DumpAppliedRule("BuildPartitionStage", node.Ptr(), output.Ptr(), ctx);
return output;
}
diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp
index 105fd8ff0c..0f81a69490 100644
--- a/ydb/library/yql/core/yql_aggregate_expander.cpp
+++ b/ydb/library/yql/core/yql_aggregate_expander.cpp
@@ -1649,32 +1649,21 @@ TExprNode::TPtr TAggregateExpander::GeneratePostAggregate(const TExprNode::TPtr&
auto preprocessLambda = GeneratePreprocessLambda(keyExtractor);
TExprNode::TPtr postAgg;
if (!UsePartitionsByKeys && UseFinalizeByKeys) {
- if (KeyColumns->ChildrenSize() == 0) {
- postAgg = Ctx.Builder(Node->Pos())
- .Apply(GetContextLambda())
- .With(0)
- .Apply(BuildFinalizeByKeyLambda(preprocessLambda, keyExtractor))
- .With(0, preAgg)
- .Seal()
- .Done()
- .Seal().Build();
- } else {
- postAgg = Ctx.Builder(Node->Pos())
- .Callable("ShuffleByKeys")
- .Add(0, std::move(preAgg))
- .Add(1, keyExtractor)
- .Lambda(2)
- .Param("stream")
- .Apply(GetContextLambda())
- .With(0)
- .Apply(BuildFinalizeByKeyLambda(preprocessLambda, keyExtractor))
- .With(0, "stream")
- .Seal()
- .Done()
- .Seal()
+ postAgg = Ctx.Builder(Node->Pos())
+ .Callable("ShuffleByKeys")
+ .Add(0, std::move(preAgg))
+ .Add(1, keyExtractor)
+ .Lambda(2)
+ .Param("stream")
+ .Apply(GetContextLambda())
+ .With(0)
+ .Apply(BuildFinalizeByKeyLambda(preprocessLambda, keyExtractor))
+ .With(0, "stream")
+ .Seal()
+ .Done()
.Seal()
- .Seal().Build();
- }
+ .Seal()
+ .Seal().Build();
} else {
auto condenseSwitch = GenerateCondenseSwitch(keyExtractor);
postAgg = Ctx.Builder(Node->Pos())
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index 0dca7b3b3a..2ec20b104e 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -75,8 +75,164 @@ TMaybeNode<TCoMux> ConvertMuxArgumentsToFlows(TCoMux node, TExprContext& ctx) {
.Done();
}
+bool PrepareKeySelectorToStage(TCoLambda& keySelector, TCoLambda& stageLambda, TCoLambda& handlerLambda, TExprContext& ctx) {
+ if (keySelector.Body().Ref().IsComplete()) {
+ // constant key
+ return false;
+ }
+ const TPositionHandle pos = keySelector.Pos();
+ TVector<TExprBase> keyElements;
+ if (auto maybeTuple = keySelector.Body().Maybe<TExprList>()) {
+ auto tuple = maybeTuple.Cast();
+ for (const auto& element : tuple) {
+ keyElements.push_back(element);
+ }
+ } else {
+ keyElements.push_back(keySelector.Body());
+ }
+
+ size_t genCount = 0;
+ TCoLambda removeColumns = BuildIdentityLambda(pos, ctx);
+ TCoLambda addColumns = BuildIdentityLambda(pos, ctx);
+ for (auto& element : keyElements) {
+ if (element.Maybe<TCoMember>()) {
+ auto member = element.Cast<TCoMember>();
+ if (member.Struct().Raw() == keySelector.Args().Arg(0).Raw()) {
+ continue;
+ }
+ }
+
+ TString newMemberName = TString("_yql_key_selector_") + ToString(genCount++);
+
+ TCoLambda computeElement = Build<TCoLambda>(ctx, pos)
+ .InitFrom(keySelector)
+ .Body(element)
+ .Done();
+
+ addColumns = Build<TCoLambda>(ctx, pos)
+ .Args({"row"})
+ .Body<TCoAddMember>()
+ .Struct<TExprApplier>()
+ .Apply(addColumns)
+ .With(0, "row")
+ .Build()
+ .Name()
+ .Value(newMemberName)
+ .Build()
+ .Item<TExprApplier>()
+ .Apply(computeElement)
+ .With(0, "row")
+ .Build()
+ .Build()
+ .Done();
+
+ removeColumns = Build<TCoLambda>(ctx, pos)
+ .Args({"row"})
+ .Body<TCoRemoveMember>()
+ .Struct<TExprApplier>()
+ .Apply(removeColumns)
+ .With(0, "row")
+ .Build()
+ .Name()
+ .Value(newMemberName)
+ .Build()
+ .Build()
+ .Done();
+
+ element = Build<TCoMember>(ctx, pos)
+ .Struct(keySelector.Args().Arg(0))
+ .Name()
+ .Value(newMemberName)
+ .Build()
+ .Done();
+
+ }
+
+ if (!genCount) {
+ return false;
+ }
+
+ handlerLambda = Build<TCoLambda>(ctx, pos)
+ .Args({"flow"})
+ .Body<TExprApplier>()
+ .Apply(handlerLambda)
+ .With<TCoOrderedMap>(0)
+ .Input("flow")
+ .Lambda(removeColumns)
+ .Build()
+ .Build()
+ .Done();
+
+ stageLambda = Build<TCoLambda>(ctx, pos)
+ .Args({"flow"})
+ .Body<TCoOrderedMap>()
+ .Input("flow")
+ .Lambda(addColumns)
+ .Build()
+ .Done();
+
+ if (keyElements.size() == 1) {
+ keySelector = Build<TCoLambda>(ctx, pos)
+ .InitFrom(keySelector)
+ .Body(keyElements.front())
+ .Done();
+ } else {
+ keySelector = Build<TCoLambda>(ctx, pos)
+ .InitFrom(keySelector)
+ .Body<TExprList>()
+ .Add(keyElements)
+ .Build()
+ .Done();
+ }
+
+ keySelector = TCoLambda(ctx.DeepCopyLambda(keySelector.Ref()));
+ return true;
+}
+
+TDqConnection BuildShuffleConnection(TPositionHandle pos, TCoLambda keySelector, TDqCnUnionAll dqUnion, TExprContext& ctx) {
+ const bool isConstKey = keySelector.Body().Ref().IsComplete();
+ if (isConstKey) {
+ return Build<TDqCnUnionAll>(ctx, pos)
+ .Output()
+ .Stage(dqUnion.Output().Stage())
+ .Index(dqUnion.Output().Index())
+ .Build()
+ .Done();
+ }
+
+ TVector<TExprBase> keyElements;
+ if (auto maybeTuple = keySelector.Body().template Maybe<TExprList>()) {
+ auto tuple = maybeTuple.Cast();
+ for (const auto& element : tuple) {
+ keyElements.push_back(element);
+ }
+ } else {
+ keyElements.push_back(keySelector.Body());
+ }
+
+ TVector<TCoAtom> keyColumns;
+ keyColumns.reserve(keyElements.size());
+ for (auto& element : keyElements) {
+ auto member = element.Cast<TCoMember>();
+ YQL_ENSURE(member.Struct().Raw() == keySelector.Args().Arg(0).Raw(), "Should be handled earlier");
+ keyColumns.push_back(element.Cast<TCoMember>().Name());
+ }
+
+ return Build<TDqCnHashShuffle>(ctx, pos)
+ .Output()
+ .Stage(dqUnion.Output().Stage())
+ .Index(dqUnion.Output().Index())
+ .Build()
+ .KeyColumns()
+ .Add(keyColumns)
+ .Build()
+ .Done();
+}
+
template <typename TPartition>
-TExprBase DqBuildPartitionsStageStub(TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap) {
+TExprBase DqBuildPartitionsStageStub(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage)
+{
auto partitionsInput = node.Maybe<TPartition>().Input();
const bool isMuxInput = partitionsInput.template Maybe<TCoMux>().IsValid();
if (!partitionsInput.template Maybe<TDqCnUnionAll>() && !isMuxInput) {
@@ -121,7 +277,7 @@ TExprBase DqBuildPartitionsStageStub(TExprBase node, TExprContext& ctx, const TP
muxArgs.push_back(child);
continue;
}
- if (!IsSingleConsumerConnection(conn.Cast(), parentsMap)) {
+ if (!IsSingleConsumerConnection(conn.Cast(), parentsMap, allowStageMultiUsage)) {
return node;
}
TCoArgument programArg = Build<TCoArgument>(ctx, conn.Cast().Pos())
@@ -153,59 +309,28 @@ TExprBase DqBuildPartitionsStageStub(TExprBase node, TExprContext& ctx, const TP
} else {
auto dqUnion = partition.Input().template Cast<TDqCnUnionAll>();
- if (!IsSingleConsumerConnection(dqUnion, parentsMap)) {
+ if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) {
return node;
}
auto keyLambda = partition.KeySelectorLambda();
- TVector<TExprBase> keyElements;
- if (auto maybeTuple = keyLambda.Body().template Maybe<TExprList>()) {
- auto tuple = maybeTuple.Cast();
- for (const auto& element : tuple) {
- keyElements.push_back(element);
- }
- } else {
- keyElements.push_back(keyLambda.Body());
- }
-
- bool allKeysAreMembers = true;
-
- TVector<TCoAtom> keyColumns;
- keyColumns.reserve(keyElements.size());
- for (auto& element : keyElements) {
- if (!element.Maybe<TCoMember>()) {
- allKeysAreMembers = false;
- break;
- }
-
- auto member = element.Cast<TCoMember>();
- if (member.Struct().Raw() != keyLambda.Args().Arg(0).Raw()) {
+ TCoLambda stageLambda = BuildIdentityLambda(node.Pos(), ctx);
+ TCoLambda handlerLambda = partition.ListHandlerLambda();
+ if (PrepareKeySelectorToStage(keyLambda, stageLambda, handlerLambda, ctx)) {
+ auto newConn = DqPushLambdaToStageUnionAll(dqUnion, stageLambda, {}, ctx, optCtx);
+ if (!newConn) {
return node;
}
- keyColumns.push_back(member.Name());
- }
-
- TDqConnection newConnection = Build<TDqCnUnionAll>(ctx, node.Pos())
- .Output()
- .Stage(dqUnion.Output().Stage())
- .Index(dqUnion.Output().Index())
- .Build()
- .Done();
- if (!keyColumns.empty() && allKeysAreMembers) {
- newConnection = Build<TDqCnHashShuffle>(ctx, node.Pos())
- .Output()
- .Stage(dqUnion.Output().Stage())
- .Index(dqUnion.Output().Index())
- .Build()
- .KeyColumns()
- .Add(keyColumns)
- .Build()
+ return Build<TPartition>(ctx, node.Pos())
+ .InitFrom(partition)
+ .Input(newConn.Cast())
+ .KeySelectorLambda(keyLambda)
+ .ListHandlerLambda(handlerLambda)
.Done();
- } else if (!keyColumns.empty()) {
- return node;
}
+ TDqConnection newConnection = BuildShuffleConnection(node.Pos(), keyLambda, dqUnion, ctx);
TCoArgument programArg = Build<TCoArgument>(ctx, node.Pos())
.Name("arg")
.Done();
@@ -914,15 +1039,21 @@ NNodes::TExprBase DqPushAggregateCombineToStage(NNodes::TExprBase node, TExprCon
return result.Cast();
}
-TExprBase DqBuildPartitionsStage(TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap) {
- return DqBuildPartitionsStageStub<TCoPartitionsByKeys>(std::move(node), ctx, parentsMap);
+TExprBase DqBuildPartitionsStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage)
+{
+ return DqBuildPartitionsStageStub<TCoPartitionsByKeys>(std::move(node), ctx, optCtx, parentsMap, allowStageMultiUsage);
}
-TExprBase DqBuildPartitionStage(TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap) {
- return DqBuildPartitionsStageStub<TCoPartitionByKey>(std::move(node), ctx, parentsMap);
+TExprBase DqBuildPartitionStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage)
+{
+ return DqBuildPartitionsStageStub<TCoPartitionByKey>(std::move(node), ctx, optCtx, parentsMap, allowStageMultiUsage);
}
-TExprBase DqBuildShuffleStage(TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap) {
+TExprBase DqBuildShuffleStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage)
+{
auto shuffleInput = node.Maybe<TCoShuffleByKeys>().Input();
if (!shuffleInput.Maybe<TDqCnUnionAll>()) {
return node;
@@ -937,50 +1068,27 @@ TExprBase DqBuildShuffleStage(TExprBase node, TExprContext& ctx, const TParentsM
auto dqUnion = shuffle.Input().Cast<TDqCnUnionAll>();
- if (!IsSingleConsumerConnection(dqUnion, parentsMap)) {
+ if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) {
return node;
}
auto keyLambda = shuffle.KeySelectorLambda();
- TVector<TExprBase> keyElements;
- if (auto maybeTuple = keyLambda.Body().Maybe<TExprList>()) {
- auto tuple = maybeTuple.Cast();
- for (const auto& element : tuple) {
- keyElements.push_back(element);
- }
- } else {
- keyElements.push_back(keyLambda.Body());
- }
-
- TVector<TCoAtom> keyColumns;
- keyColumns.reserve(keyElements.size());
- for (auto& element : keyElements) {
- if (!element.Maybe<TCoMember>()) {
+ TCoLambda stageLambda = BuildIdentityLambda(node.Pos(), ctx);
+ TCoLambda handlerLambda = shuffle.ListHandlerLambda();
+ if (PrepareKeySelectorToStage(keyLambda, stageLambda, handlerLambda, ctx)) {
+ auto newConn = DqPushLambdaToStageUnionAll(dqUnion, stageLambda, {}, ctx, optCtx);
+ if (!newConn) {
return node;
}
- auto member = element.Cast<TCoMember>();
- if (member.Struct().Raw() != keyLambda.Args().Arg(0).Raw()) {
- return node;
- }
-
- keyColumns.push_back(member.Name());
- }
-
- if (keyColumns.empty()) {
- return node;
+ return Build<TCoShuffleByKeys>(ctx, node.Pos())
+ .Input(newConn.Cast())
+ .KeySelectorLambda(keyLambda)
+ .ListHandlerLambda(handlerLambda)
+ .Done();
}
- auto connection = Build<TDqCnHashShuffle>(ctx, node.Pos())
- .Output()
- .Stage(dqUnion.Output().Stage())
- .Index(dqUnion.Output().Index())
- .Build()
- .KeyColumns()
- .Add(keyColumns)
- .Build()
- .Done();
-
+ auto connection = BuildShuffleConnection(node.Pos(), keyLambda, dqUnion, ctx);
TCoArgument programArg = Build<TCoArgument>(ctx, node.Pos())
.Name("arg")
.Done();
@@ -1061,7 +1169,9 @@ NNodes::TExprBase DqBuildHashShuffleByKeyStage(NNodes::TExprBase node, TExprCont
return TExprBase(outputCnMap);
}
-TExprBase DqBuildFinalizeByKeyStage(TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap) {
+TExprBase DqBuildFinalizeByKeyStage(TExprBase node, TExprContext& ctx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage)
+{
auto finalizeInput = node.Maybe<TCoFinalizeByKey>().Input();
if (!finalizeInput.Maybe<TDqCnUnionAll>()) {
return node;
@@ -1070,7 +1180,7 @@ TExprBase DqBuildFinalizeByKeyStage(TExprBase node, TExprContext& ctx, const TPa
auto finalize = node.Cast<TCoFinalizeByKey>();
auto dqUnion = finalize.Input().Cast<TDqCnUnionAll>();
- if (!IsSingleConsumerConnection(dqUnion, parentsMap)) {
+ if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) {
return node;
}
@@ -2377,6 +2487,47 @@ TExprBase DqBuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizati
.Body(valueExtractor)
.Done();
+ const bool hasSingleTask = AllOf(stage.Inputs(), [](const auto& input) {
+ return input.template Maybe<TDqCnMerge>() || input.template Maybe<TDqCnUnionAll>();
+ });
+ if (!hasSingleTask) {
+ // Add LIMIT 1 via Take
+ auto takeProgram = Build<TCoLambda>(ctx, node.Pos())
+ .Args({"take_arg"})
+ // DqOutput expects stream as input, thus form stream with one element
+ .Body<TCoToStream>()
+ .Input<TCoTake>()
+ .Input({"take_arg"})
+ .Count<TCoUint64>()
+ .Literal().Build("1")
+ .Build()
+ .Build()
+ .Build()
+ .Done();
+
+ auto newUnion = DqPushLambdaToStageUnionAll(unionAll, takeProgram, {}, ctx, optCtx);
+ if (!newUnion) {
+ return node;
+ }
+
+ auto newStage = Build<TDqStage>(ctx, node.Pos())
+ .Inputs()
+ .Add(newUnion.Cast())
+ .Build()
+ .Program(newProgram)
+ .Settings().Build()
+ .Done();
+
+ return Build<TDqPrecompute>(ctx, node.Pos())
+ .Input<TDqCnValue>()
+ .Output<TDqOutput>()
+ .Stage(newStage)
+ .Index().Build("0")
+ .Build()
+ .Build()
+ .Done();
+ }
+
auto newUnion = DqPushLambdaToStageUnionAll(unionAll, newProgram, {}, ctx, optCtx);
if (!newUnion.IsValid()) {
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h
index f5462bc3ad..88be69c3c8 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.h
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.h
@@ -42,13 +42,17 @@ NNodes::TExprBase DqPushCombineToStage(NNodes::TExprBase node, TExprContext& ctx
NNodes::TExprBase DqPushAggregateCombineToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
-NNodes::TExprBase DqBuildPartitionsStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap);
+NNodes::TExprBase DqBuildPartitionsStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
-NNodes::TExprBase DqBuildPartitionStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap);
+NNodes::TExprBase DqBuildPartitionStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
-NNodes::TExprBase DqBuildShuffleStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap);
+NNodes::TExprBase DqBuildShuffleStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
-NNodes::TExprBase DqBuildFinalizeByKeyStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap);
+NNodes::TExprBase DqBuildFinalizeByKeyStage(NNodes::TExprBase node, TExprContext& ctx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
NNodes::TExprBase DqBuildHashShuffleByKeyStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap);
NNodes::TExprBase DqBuildAggregationResultStage(NNodes::TExprBase node, TExprContext& ctx,
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
index 97e67c785f..69debeb3f1 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
@@ -32,11 +32,11 @@ public:
AddHandler(0, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<false>));
AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<false>));
AddHandler(0, &TCoCombineByKey::Match, HNDL(PushCombineToStage<false>));
- AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage));
- AddHandler(0, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage));
- AddHandler(0, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage));
+ AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage<false>));
+ AddHandler(0, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage<false>));
+ AddHandler(0, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<false>));
AddHandler(0, &TDqCnHashShuffle::Match, HNDL(BuildHashShuffleByKeyStage));
- AddHandler(0, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage));
+ AddHandler(0, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage<false>));
AddHandler(0, &TCoAsList::Match, HNDL(BuildAggregationResultStage));
AddHandler(0, &TCoTopSort::Match, HNDL(BuildTopSortStage<false>));
AddHandler(0, &TCoSort::Match, HNDL(BuildSortStage<false>));
@@ -66,6 +66,10 @@ public:
AddHandler(1, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<true>));
AddHandler(1, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<true>));
AddHandler(1, &TCoCombineByKey::Match, HNDL(PushCombineToStage<true>));
+ AddHandler(1, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage<true>));
+ AddHandler(1, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage<true>));
+ AddHandler(1, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<true>));
+ AddHandler(1, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage<true>));
AddHandler(1, &TCoTopSort::Match, HNDL(BuildTopSortStage<true>));
AddHandler(1, &TCoSort::Match, HNDL(BuildSortStage<true>));
AddHandler(1, &TCoTakeBase::Match, HNDL(BuildTakeOrTakeSkipStage<true>));
@@ -267,24 +271,28 @@ protected:
return DqPushCombineToStage(node, ctx, optCtx, *getParents(), IsGlobal);
}
- TMaybeNode<TExprBase> BuildPartitionsStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
- return DqBuildPartitionsStage(node, ctx, *getParents());
+ template <bool IsGlobal>
+ TMaybeNode<TExprBase> BuildPartitionsStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
+ return DqBuildPartitionsStage(node, ctx, optCtx, *getParents(), IsGlobal);
}
- TMaybeNode<TExprBase> BuildPartitionStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
- return DqBuildPartitionStage(node, ctx, *getParents());
+ template <bool IsGlobal>
+ TMaybeNode<TExprBase> BuildPartitionStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
+ return DqBuildPartitionStage(node, ctx, optCtx, *getParents(), IsGlobal);
}
- TMaybeNode<TExprBase> BuildShuffleStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
- return DqBuildShuffleStage(node, ctx, *getParents());
+ template <bool IsGlobal>
+ TMaybeNode<TExprBase> BuildShuffleStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
+ return DqBuildShuffleStage(node, ctx, optCtx, *getParents(), IsGlobal);
}
TMaybeNode<TExprBase> BuildHashShuffleByKeyStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
return DqBuildHashShuffleByKeyStage(node, ctx, *getParents());
}
+ template<bool IsGlobal>
TMaybeNode<TExprBase> BuildFinalizeByKeyStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
- return DqBuildFinalizeByKeyStage(node, ctx, *getParents());
+ return DqBuildFinalizeByKeyStage(node, ctx, *getParents(), IsGlobal);
}
TMaybeNode<TExprBase> BuildAggregationResultStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx) {