diff options
author | aneporada <aneporada@ydb.tech> | 2023-02-01 12:58:30 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-02-01 12:58:30 +0300 |
commit | 0a953ae6cfcc600db9eefde4b859a29b3e8dac1a (patch) | |
tree | 51b53947b9f8206f4ce6846224f48f9e945ad88e | |
parent | 08e260ca8b66f19c5efb9788298735c84c2f7d6c (diff) | |
download | ydb-0a953ae6cfcc600db9eefde4b859a29b3e8dac1a.tar.gz |
Fix PartitionsByKey/ShuffleByKeys expansion with non-trivial key selectors
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 36 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_aggregate_expander.cpp | 39 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 325 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.h | 12 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/opt/physical_optimize.cpp | 30 |
5 files changed, 304 insertions, 138 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 90ec709c7a..c559b0eba2 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -45,10 +45,10 @@ public: AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildPureFlatmapStage)); AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<false>)); AddHandler(0, &TCoCombineByKey::Match, HNDL(PushCombineToStage<false>)); - AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage)); - AddHandler(0, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage)); - AddHandler(0, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage)); - AddHandler(0, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage)); + AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage<false>)); + AddHandler(0, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<false>)); + AddHandler(0, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage<false>)); + AddHandler(0, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage<false>)); AddHandler(0, &TCoTopSort::Match, HNDL(BuildTopSortStage<false>)); AddHandler(0, &TCoTakeBase::Match, HNDL(BuildTakeSkipStage<false>)); AddHandler(0, &TCoSortBase::Match, HNDL(BuildSortStage<false>)); @@ -92,6 +92,10 @@ public: AddHandler(1, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<true>)); AddHandler(1, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<true>)); AddHandler(1, &TCoCombineByKey::Match, HNDL(PushCombineToStage<true>)); + AddHandler(1, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage<true>)); + AddHandler(1, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<true>)); + AddHandler(1, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage<true>)); + AddHandler(1, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage<true>)); AddHandler(1, &TCoTopSort::Match, HNDL(BuildTopSortStage<true>)); AddHandler(1, &TCoTakeBase::Match, HNDL(BuildTakeSkipStage<true>)); AddHandler(1, &TCoSortBase::Match, HNDL(BuildSortStage<true>)); @@ -251,26 +255,36 @@ protected: return output; } - TMaybeNode<TExprBase> BuildShuffleStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { - TExprBase output = DqBuildShuffleStage(node, ctx, *getParents()); + template <bool IsGlobal> + TMaybeNode<TExprBase> BuildShuffleStage(TExprBase node, TExprContext& ctx, + IOptimizationContext& optCtx, const TGetParents& getParents) + { + TExprBase output = DqBuildShuffleStage(node, ctx, optCtx, *getParents(), IsGlobal); DumpAppliedRule("BuildShuffleStage", node.Ptr(), output.Ptr(), ctx); return output; } + template <bool IsGlobal> TMaybeNode<TExprBase> BuildFinalizeByKeyStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { - TExprBase output = DqBuildFinalizeByKeyStage(node, ctx, *getParents()); + TExprBase output = DqBuildFinalizeByKeyStage(node, ctx, *getParents(), IsGlobal); DumpAppliedRule("BuildFinalizeByKeyStage", node.Ptr(), output.Ptr(), ctx); return output; } - TMaybeNode<TExprBase> BuildPartitionsStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { - TExprBase output = DqBuildPartitionsStage(node, ctx, *getParents()); + template <bool IsGlobal> + TMaybeNode<TExprBase> BuildPartitionsStage(TExprBase node, TExprContext& ctx, + IOptimizationContext& optCtx, const TGetParents& getParents) + { + TExprBase output = DqBuildPartitionsStage(node, ctx, optCtx, *getParents(), IsGlobal); DumpAppliedRule("BuildPartitionsStage", node.Ptr(), output.Ptr(), ctx); return output; } - TMaybeNode<TExprBase> BuildPartitionStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { - TExprBase output = DqBuildPartitionStage(node, ctx, *getParents()); + template <bool IsGlobal> + TMaybeNode<TExprBase> BuildPartitionStage(TExprBase node, TExprContext& ctx, + IOptimizationContext& optCtx, const TGetParents& getParents) + { + TExprBase output = DqBuildPartitionStage(node, ctx, optCtx, *getParents(), IsGlobal); DumpAppliedRule("BuildPartitionStage", node.Ptr(), output.Ptr(), ctx); return output; } diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp index 105fd8ff0c..0f81a69490 100644 --- a/ydb/library/yql/core/yql_aggregate_expander.cpp +++ b/ydb/library/yql/core/yql_aggregate_expander.cpp @@ -1649,32 +1649,21 @@ TExprNode::TPtr TAggregateExpander::GeneratePostAggregate(const TExprNode::TPtr& auto preprocessLambda = GeneratePreprocessLambda(keyExtractor); TExprNode::TPtr postAgg; if (!UsePartitionsByKeys && UseFinalizeByKeys) { - if (KeyColumns->ChildrenSize() == 0) { - postAgg = Ctx.Builder(Node->Pos()) - .Apply(GetContextLambda()) - .With(0) - .Apply(BuildFinalizeByKeyLambda(preprocessLambda, keyExtractor)) - .With(0, preAgg) - .Seal() - .Done() - .Seal().Build(); - } else { - postAgg = Ctx.Builder(Node->Pos()) - .Callable("ShuffleByKeys") - .Add(0, std::move(preAgg)) - .Add(1, keyExtractor) - .Lambda(2) - .Param("stream") - .Apply(GetContextLambda()) - .With(0) - .Apply(BuildFinalizeByKeyLambda(preprocessLambda, keyExtractor)) - .With(0, "stream") - .Seal() - .Done() - .Seal() + postAgg = Ctx.Builder(Node->Pos()) + .Callable("ShuffleByKeys") + .Add(0, std::move(preAgg)) + .Add(1, keyExtractor) + .Lambda(2) + .Param("stream") + .Apply(GetContextLambda()) + .With(0) + .Apply(BuildFinalizeByKeyLambda(preprocessLambda, keyExtractor)) + .With(0, "stream") + .Seal() + .Done() .Seal() - .Seal().Build(); - } + .Seal() + .Seal().Build(); } else { auto condenseSwitch = GenerateCondenseSwitch(keyExtractor); postAgg = Ctx.Builder(Node->Pos()) diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index 0dca7b3b3a..2ec20b104e 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -75,8 +75,164 @@ TMaybeNode<TCoMux> ConvertMuxArgumentsToFlows(TCoMux node, TExprContext& ctx) { .Done(); } +bool PrepareKeySelectorToStage(TCoLambda& keySelector, TCoLambda& stageLambda, TCoLambda& handlerLambda, TExprContext& ctx) { + if (keySelector.Body().Ref().IsComplete()) { + // constant key + return false; + } + const TPositionHandle pos = keySelector.Pos(); + TVector<TExprBase> keyElements; + if (auto maybeTuple = keySelector.Body().Maybe<TExprList>()) { + auto tuple = maybeTuple.Cast(); + for (const auto& element : tuple) { + keyElements.push_back(element); + } + } else { + keyElements.push_back(keySelector.Body()); + } + + size_t genCount = 0; + TCoLambda removeColumns = BuildIdentityLambda(pos, ctx); + TCoLambda addColumns = BuildIdentityLambda(pos, ctx); + for (auto& element : keyElements) { + if (element.Maybe<TCoMember>()) { + auto member = element.Cast<TCoMember>(); + if (member.Struct().Raw() == keySelector.Args().Arg(0).Raw()) { + continue; + } + } + + TString newMemberName = TString("_yql_key_selector_") + ToString(genCount++); + + TCoLambda computeElement = Build<TCoLambda>(ctx, pos) + .InitFrom(keySelector) + .Body(element) + .Done(); + + addColumns = Build<TCoLambda>(ctx, pos) + .Args({"row"}) + .Body<TCoAddMember>() + .Struct<TExprApplier>() + .Apply(addColumns) + .With(0, "row") + .Build() + .Name() + .Value(newMemberName) + .Build() + .Item<TExprApplier>() + .Apply(computeElement) + .With(0, "row") + .Build() + .Build() + .Done(); + + removeColumns = Build<TCoLambda>(ctx, pos) + .Args({"row"}) + .Body<TCoRemoveMember>() + .Struct<TExprApplier>() + .Apply(removeColumns) + .With(0, "row") + .Build() + .Name() + .Value(newMemberName) + .Build() + .Build() + .Done(); + + element = Build<TCoMember>(ctx, pos) + .Struct(keySelector.Args().Arg(0)) + .Name() + .Value(newMemberName) + .Build() + .Done(); + + } + + if (!genCount) { + return false; + } + + handlerLambda = Build<TCoLambda>(ctx, pos) + .Args({"flow"}) + .Body<TExprApplier>() + .Apply(handlerLambda) + .With<TCoOrderedMap>(0) + .Input("flow") + .Lambda(removeColumns) + .Build() + .Build() + .Done(); + + stageLambda = Build<TCoLambda>(ctx, pos) + .Args({"flow"}) + .Body<TCoOrderedMap>() + .Input("flow") + .Lambda(addColumns) + .Build() + .Done(); + + if (keyElements.size() == 1) { + keySelector = Build<TCoLambda>(ctx, pos) + .InitFrom(keySelector) + .Body(keyElements.front()) + .Done(); + } else { + keySelector = Build<TCoLambda>(ctx, pos) + .InitFrom(keySelector) + .Body<TExprList>() + .Add(keyElements) + .Build() + .Done(); + } + + keySelector = TCoLambda(ctx.DeepCopyLambda(keySelector.Ref())); + return true; +} + +TDqConnection BuildShuffleConnection(TPositionHandle pos, TCoLambda keySelector, TDqCnUnionAll dqUnion, TExprContext& ctx) { + const bool isConstKey = keySelector.Body().Ref().IsComplete(); + if (isConstKey) { + return Build<TDqCnUnionAll>(ctx, pos) + .Output() + .Stage(dqUnion.Output().Stage()) + .Index(dqUnion.Output().Index()) + .Build() + .Done(); + } + + TVector<TExprBase> keyElements; + if (auto maybeTuple = keySelector.Body().template Maybe<TExprList>()) { + auto tuple = maybeTuple.Cast(); + for (const auto& element : tuple) { + keyElements.push_back(element); + } + } else { + keyElements.push_back(keySelector.Body()); + } + + TVector<TCoAtom> keyColumns; + keyColumns.reserve(keyElements.size()); + for (auto& element : keyElements) { + auto member = element.Cast<TCoMember>(); + YQL_ENSURE(member.Struct().Raw() == keySelector.Args().Arg(0).Raw(), "Should be handled earlier"); + keyColumns.push_back(element.Cast<TCoMember>().Name()); + } + + return Build<TDqCnHashShuffle>(ctx, pos) + .Output() + .Stage(dqUnion.Output().Stage()) + .Index(dqUnion.Output().Index()) + .Build() + .KeyColumns() + .Add(keyColumns) + .Build() + .Done(); +} + template <typename TPartition> -TExprBase DqBuildPartitionsStageStub(TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap) { +TExprBase DqBuildPartitionsStageStub(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage) +{ auto partitionsInput = node.Maybe<TPartition>().Input(); const bool isMuxInput = partitionsInput.template Maybe<TCoMux>().IsValid(); if (!partitionsInput.template Maybe<TDqCnUnionAll>() && !isMuxInput) { @@ -121,7 +277,7 @@ TExprBase DqBuildPartitionsStageStub(TExprBase node, TExprContext& ctx, const TP muxArgs.push_back(child); continue; } - if (!IsSingleConsumerConnection(conn.Cast(), parentsMap)) { + if (!IsSingleConsumerConnection(conn.Cast(), parentsMap, allowStageMultiUsage)) { return node; } TCoArgument programArg = Build<TCoArgument>(ctx, conn.Cast().Pos()) @@ -153,59 +309,28 @@ TExprBase DqBuildPartitionsStageStub(TExprBase node, TExprContext& ctx, const TP } else { auto dqUnion = partition.Input().template Cast<TDqCnUnionAll>(); - if (!IsSingleConsumerConnection(dqUnion, parentsMap)) { + if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) { return node; } auto keyLambda = partition.KeySelectorLambda(); - TVector<TExprBase> keyElements; - if (auto maybeTuple = keyLambda.Body().template Maybe<TExprList>()) { - auto tuple = maybeTuple.Cast(); - for (const auto& element : tuple) { - keyElements.push_back(element); - } - } else { - keyElements.push_back(keyLambda.Body()); - } - - bool allKeysAreMembers = true; - - TVector<TCoAtom> keyColumns; - keyColumns.reserve(keyElements.size()); - for (auto& element : keyElements) { - if (!element.Maybe<TCoMember>()) { - allKeysAreMembers = false; - break; - } - - auto member = element.Cast<TCoMember>(); - if (member.Struct().Raw() != keyLambda.Args().Arg(0).Raw()) { + TCoLambda stageLambda = BuildIdentityLambda(node.Pos(), ctx); + TCoLambda handlerLambda = partition.ListHandlerLambda(); + if (PrepareKeySelectorToStage(keyLambda, stageLambda, handlerLambda, ctx)) { + auto newConn = DqPushLambdaToStageUnionAll(dqUnion, stageLambda, {}, ctx, optCtx); + if (!newConn) { return node; } - keyColumns.push_back(member.Name()); - } - - TDqConnection newConnection = Build<TDqCnUnionAll>(ctx, node.Pos()) - .Output() - .Stage(dqUnion.Output().Stage()) - .Index(dqUnion.Output().Index()) - .Build() - .Done(); - if (!keyColumns.empty() && allKeysAreMembers) { - newConnection = Build<TDqCnHashShuffle>(ctx, node.Pos()) - .Output() - .Stage(dqUnion.Output().Stage()) - .Index(dqUnion.Output().Index()) - .Build() - .KeyColumns() - .Add(keyColumns) - .Build() + return Build<TPartition>(ctx, node.Pos()) + .InitFrom(partition) + .Input(newConn.Cast()) + .KeySelectorLambda(keyLambda) + .ListHandlerLambda(handlerLambda) .Done(); - } else if (!keyColumns.empty()) { - return node; } + TDqConnection newConnection = BuildShuffleConnection(node.Pos(), keyLambda, dqUnion, ctx); TCoArgument programArg = Build<TCoArgument>(ctx, node.Pos()) .Name("arg") .Done(); @@ -914,15 +1039,21 @@ NNodes::TExprBase DqPushAggregateCombineToStage(NNodes::TExprBase node, TExprCon return result.Cast(); } -TExprBase DqBuildPartitionsStage(TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap) { - return DqBuildPartitionsStageStub<TCoPartitionsByKeys>(std::move(node), ctx, parentsMap); +TExprBase DqBuildPartitionsStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage) +{ + return DqBuildPartitionsStageStub<TCoPartitionsByKeys>(std::move(node), ctx, optCtx, parentsMap, allowStageMultiUsage); } -TExprBase DqBuildPartitionStage(TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap) { - return DqBuildPartitionsStageStub<TCoPartitionByKey>(std::move(node), ctx, parentsMap); +TExprBase DqBuildPartitionStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage) +{ + return DqBuildPartitionsStageStub<TCoPartitionByKey>(std::move(node), ctx, optCtx, parentsMap, allowStageMultiUsage); } -TExprBase DqBuildShuffleStage(TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap) { +TExprBase DqBuildShuffleStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage) +{ auto shuffleInput = node.Maybe<TCoShuffleByKeys>().Input(); if (!shuffleInput.Maybe<TDqCnUnionAll>()) { return node; @@ -937,50 +1068,27 @@ TExprBase DqBuildShuffleStage(TExprBase node, TExprContext& ctx, const TParentsM auto dqUnion = shuffle.Input().Cast<TDqCnUnionAll>(); - if (!IsSingleConsumerConnection(dqUnion, parentsMap)) { + if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) { return node; } auto keyLambda = shuffle.KeySelectorLambda(); - TVector<TExprBase> keyElements; - if (auto maybeTuple = keyLambda.Body().Maybe<TExprList>()) { - auto tuple = maybeTuple.Cast(); - for (const auto& element : tuple) { - keyElements.push_back(element); - } - } else { - keyElements.push_back(keyLambda.Body()); - } - - TVector<TCoAtom> keyColumns; - keyColumns.reserve(keyElements.size()); - for (auto& element : keyElements) { - if (!element.Maybe<TCoMember>()) { + TCoLambda stageLambda = BuildIdentityLambda(node.Pos(), ctx); + TCoLambda handlerLambda = shuffle.ListHandlerLambda(); + if (PrepareKeySelectorToStage(keyLambda, stageLambda, handlerLambda, ctx)) { + auto newConn = DqPushLambdaToStageUnionAll(dqUnion, stageLambda, {}, ctx, optCtx); + if (!newConn) { return node; } - auto member = element.Cast<TCoMember>(); - if (member.Struct().Raw() != keyLambda.Args().Arg(0).Raw()) { - return node; - } - - keyColumns.push_back(member.Name()); - } - - if (keyColumns.empty()) { - return node; + return Build<TCoShuffleByKeys>(ctx, node.Pos()) + .Input(newConn.Cast()) + .KeySelectorLambda(keyLambda) + .ListHandlerLambda(handlerLambda) + .Done(); } - auto connection = Build<TDqCnHashShuffle>(ctx, node.Pos()) - .Output() - .Stage(dqUnion.Output().Stage()) - .Index(dqUnion.Output().Index()) - .Build() - .KeyColumns() - .Add(keyColumns) - .Build() - .Done(); - + auto connection = BuildShuffleConnection(node.Pos(), keyLambda, dqUnion, ctx); TCoArgument programArg = Build<TCoArgument>(ctx, node.Pos()) .Name("arg") .Done(); @@ -1061,7 +1169,9 @@ NNodes::TExprBase DqBuildHashShuffleByKeyStage(NNodes::TExprBase node, TExprCont return TExprBase(outputCnMap); } -TExprBase DqBuildFinalizeByKeyStage(TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap) { +TExprBase DqBuildFinalizeByKeyStage(TExprBase node, TExprContext& ctx, + const TParentsMap& parentsMap, bool allowStageMultiUsage) +{ auto finalizeInput = node.Maybe<TCoFinalizeByKey>().Input(); if (!finalizeInput.Maybe<TDqCnUnionAll>()) { return node; @@ -1070,7 +1180,7 @@ TExprBase DqBuildFinalizeByKeyStage(TExprBase node, TExprContext& ctx, const TPa auto finalize = node.Cast<TCoFinalizeByKey>(); auto dqUnion = finalize.Input().Cast<TDqCnUnionAll>(); - if (!IsSingleConsumerConnection(dqUnion, parentsMap)) { + if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) { return node; } @@ -2377,6 +2487,47 @@ TExprBase DqBuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizati .Body(valueExtractor) .Done(); + const bool hasSingleTask = AllOf(stage.Inputs(), [](const auto& input) { + return input.template Maybe<TDqCnMerge>() || input.template Maybe<TDqCnUnionAll>(); + }); + if (!hasSingleTask) { + // Add LIMIT 1 via Take + auto takeProgram = Build<TCoLambda>(ctx, node.Pos()) + .Args({"take_arg"}) + // DqOutput expects stream as input, thus form stream with one element + .Body<TCoToStream>() + .Input<TCoTake>() + .Input({"take_arg"}) + .Count<TCoUint64>() + .Literal().Build("1") + .Build() + .Build() + .Build() + .Done(); + + auto newUnion = DqPushLambdaToStageUnionAll(unionAll, takeProgram, {}, ctx, optCtx); + if (!newUnion) { + return node; + } + + auto newStage = Build<TDqStage>(ctx, node.Pos()) + .Inputs() + .Add(newUnion.Cast()) + .Build() + .Program(newProgram) + .Settings().Build() + .Done(); + + return Build<TDqPrecompute>(ctx, node.Pos()) + .Input<TDqCnValue>() + .Output<TDqOutput>() + .Stage(newStage) + .Index().Build("0") + .Build() + .Build() + .Done(); + } + auto newUnion = DqPushLambdaToStageUnionAll(unionAll, newProgram, {}, ctx, optCtx); if (!newUnion.IsValid()) { diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index f5462bc3ad..88be69c3c8 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -42,13 +42,17 @@ NNodes::TExprBase DqPushCombineToStage(NNodes::TExprBase node, TExprContext& ctx NNodes::TExprBase DqPushAggregateCombineToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage = true); -NNodes::TExprBase DqBuildPartitionsStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap); +NNodes::TExprBase DqBuildPartitionsStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage = true); -NNodes::TExprBase DqBuildPartitionStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap); +NNodes::TExprBase DqBuildPartitionStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage = true); -NNodes::TExprBase DqBuildShuffleStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap); +NNodes::TExprBase DqBuildShuffleStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage = true); -NNodes::TExprBase DqBuildFinalizeByKeyStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap); +NNodes::TExprBase DqBuildFinalizeByKeyStage(NNodes::TExprBase node, TExprContext& ctx, + const TParentsMap& parentsMap, bool allowStageMultiUsage = true); NNodes::TExprBase DqBuildHashShuffleByKeyStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap); NNodes::TExprBase DqBuildAggregationResultStage(NNodes::TExprBase node, TExprContext& ctx, diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index 97e67c785f..69debeb3f1 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -32,11 +32,11 @@ public: AddHandler(0, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<false>)); AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<false>)); AddHandler(0, &TCoCombineByKey::Match, HNDL(PushCombineToStage<false>)); - AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage)); - AddHandler(0, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage)); - AddHandler(0, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage)); + AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage<false>)); + AddHandler(0, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage<false>)); + AddHandler(0, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<false>)); AddHandler(0, &TDqCnHashShuffle::Match, HNDL(BuildHashShuffleByKeyStage)); - AddHandler(0, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage)); + AddHandler(0, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage<false>)); AddHandler(0, &TCoAsList::Match, HNDL(BuildAggregationResultStage)); AddHandler(0, &TCoTopSort::Match, HNDL(BuildTopSortStage<false>)); AddHandler(0, &TCoSort::Match, HNDL(BuildSortStage<false>)); @@ -66,6 +66,10 @@ public: AddHandler(1, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<true>)); AddHandler(1, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<true>)); AddHandler(1, &TCoCombineByKey::Match, HNDL(PushCombineToStage<true>)); + AddHandler(1, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage<true>)); + AddHandler(1, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage<true>)); + AddHandler(1, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<true>)); + AddHandler(1, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage<true>)); AddHandler(1, &TCoTopSort::Match, HNDL(BuildTopSortStage<true>)); AddHandler(1, &TCoSort::Match, HNDL(BuildSortStage<true>)); AddHandler(1, &TCoTakeBase::Match, HNDL(BuildTakeOrTakeSkipStage<true>)); @@ -267,24 +271,28 @@ protected: return DqPushCombineToStage(node, ctx, optCtx, *getParents(), IsGlobal); } - TMaybeNode<TExprBase> BuildPartitionsStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { - return DqBuildPartitionsStage(node, ctx, *getParents()); + template <bool IsGlobal> + TMaybeNode<TExprBase> BuildPartitionsStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { + return DqBuildPartitionsStage(node, ctx, optCtx, *getParents(), IsGlobal); } - TMaybeNode<TExprBase> BuildPartitionStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { - return DqBuildPartitionStage(node, ctx, *getParents()); + template <bool IsGlobal> + TMaybeNode<TExprBase> BuildPartitionStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { + return DqBuildPartitionStage(node, ctx, optCtx, *getParents(), IsGlobal); } - TMaybeNode<TExprBase> BuildShuffleStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { - return DqBuildShuffleStage(node, ctx, *getParents()); + template <bool IsGlobal> + TMaybeNode<TExprBase> BuildShuffleStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { + return DqBuildShuffleStage(node, ctx, optCtx, *getParents(), IsGlobal); } TMaybeNode<TExprBase> BuildHashShuffleByKeyStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { return DqBuildHashShuffleByKeyStage(node, ctx, *getParents()); } + template<bool IsGlobal> TMaybeNode<TExprBase> BuildFinalizeByKeyStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { - return DqBuildFinalizeByKeyStage(node, ctx, *getParents()); + return DqBuildFinalizeByKeyStage(node, ctx, *getParents(), IsGlobal); } TMaybeNode<TExprBase> BuildAggregationResultStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx) { |