diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-01-24 13:36:14 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-01-24 13:36:14 +0300 |
commit | d8d5df1f1fe0d30fad1e8509f13ebf6e2cfde43c (patch) | |
tree | 31bc2fe0101bd123c6de369b03dde5332b905a04 | |
parent | f0945b36214dd0f2bba63539bd9ffd75cfd612fe (diff) | |
download | ydb-d8d5df1f1fe0d30fad1e8509f13ebf6e2cfde43c.tar.gz |
hash shuffle optimizer
canondata
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 8 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 44 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/opt/physical_optimize.cpp | 5 |
4 files changed, 58 insertions, 0 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 35b448d4ed..d9a1ba27e3 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -78,6 +78,8 @@ public: AddHandler(0, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<false>)); AddHandler(0, &TDqStage::Match, HNDL(ApplyLimitToReadTableSource)); + AddHandler(0, &TDqCnHashShuffle::Match, HNDL(BuildHashShuffleByKeyStage)); + AddHandler(0, &TCoAggregateCombine::Match, HNDL(ExpandAggregatePhase)); AddHandler(0, &TCoAggregateCombineState::Match, HNDL(ExpandAggregatePhase)); AddHandler(0, &TCoAggregateMergeState::Match, HNDL(ExpandAggregatePhase)); @@ -173,6 +175,12 @@ protected: return output; } + TMaybeNode<TExprBase> BuildHashShuffleByKeyStage(TExprBase node, TExprContext& ctx) { + auto output = DqBuildHashShuffleByKeyStage(node, ctx, {}); + DumpAppliedRule("BuildHashShuffleByKeyStage", node.Ptr(), output.Ptr(), ctx); + return TExprBase(output); + } + TMaybeNode<TExprBase> ExpandAggregatePhase(TExprBase node, TExprContext& ctx) { auto output = ExpandAggregatePeepholeImpl(node.Ptr(), ctx, TypesCtx, KqpCtx.Config->HasOptUseFinalizeByKey(), false); diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index 9649f8306a..0dca7b3b3a 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -1017,6 +1017,50 @@ TExprBase DqBuildShuffleStage(TExprBase node, TExprContext& ctx, const TParentsM .Done(); } +NNodes::TExprBase DqBuildHashShuffleByKeyStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& /*parentsMap*/) { + if (!node.Maybe<TDqCnHashShuffle>()) { + return node; + } + auto cnHash = node.Cast<TDqCnHashShuffle>(); + auto stage = cnHash.Output().Stage(); + if (!stage.Program().Body().Maybe<TCoExtend>()) { + return node; + } + auto extend = stage.Program().Body().Cast<TCoExtend>(); + TNodeSet nodes; + for (auto&& i : stage.Program().Args()) { + nodes.emplace(i.Raw()); + } + for (auto&& i : extend) { + if (nodes.erase(i.Raw()) != 1) { + return node; + } + } + if (!nodes.empty()) { + return node; + } + TExprNode::TListType nodesTuple; + for (auto&& i : stage.Inputs()) { + if (!i.Maybe<TDqCnUnionAll>()) { + return node; + } + auto uAll = i.Cast<TDqCnUnionAll>(); + nodesTuple.emplace_back(ctx.ChangeChild(node.Ref(), 0, uAll.Output().Ptr())); + } + auto stageCopy = ctx.ChangeChild(stage.Ref(), 0, ctx.NewList(node.Pos(), std::move(nodesTuple))); + auto output = + Build<TDqOutput>(ctx, node.Pos()) + .Stage(stageCopy) + .Index().Build("0") + .Done(); + auto outputCnMap = + Build<TDqCnMap>(ctx, node.Pos()) + .Output(output) + .Done(); + + return TExprBase(outputCnMap); +} + TExprBase DqBuildFinalizeByKeyStage(TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap) { auto finalizeInput = node.Maybe<TCoFinalizeByKey>().Input(); if (!finalizeInput.Maybe<TDqCnUnionAll>()) { diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index f47848c83a..f5462bc3ad 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -49,6 +49,7 @@ NNodes::TExprBase DqBuildPartitionStage(NNodes::TExprBase node, TExprContext& ct NNodes::TExprBase DqBuildShuffleStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap); NNodes::TExprBase DqBuildFinalizeByKeyStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap); +NNodes::TExprBase DqBuildHashShuffleByKeyStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap); NNodes::TExprBase DqBuildAggregationResultStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx); diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index 9280903907..97e67c785f 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -35,6 +35,7 @@ public: AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage)); AddHandler(0, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage)); AddHandler(0, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage)); + AddHandler(0, &TDqCnHashShuffle::Match, HNDL(BuildHashShuffleByKeyStage)); AddHandler(0, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage)); AddHandler(0, &TCoAsList::Match, HNDL(BuildAggregationResultStage)); AddHandler(0, &TCoTopSort::Match, HNDL(BuildTopSortStage<false>)); @@ -278,6 +279,10 @@ protected: return DqBuildShuffleStage(node, ctx, *getParents()); } + TMaybeNode<TExprBase> BuildHashShuffleByKeyStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { + return DqBuildHashShuffleByKeyStage(node, ctx, *getParents()); + } + TMaybeNode<TExprBase> BuildFinalizeByKeyStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { return DqBuildFinalizeByKeyStage(node, ctx, *getParents()); } |