aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-01-24 13:36:14 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-01-24 13:36:14 +0300
commitd8d5df1f1fe0d30fad1e8509f13ebf6e2cfde43c (patch)
tree31bc2fe0101bd123c6de369b03dde5332b905a04
parentf0945b36214dd0f2bba63539bd9ffd75cfd612fe (diff)
downloadydb-d8d5df1f1fe0d30fad1e8509f13ebf6e2cfde43c.tar.gz
hash shuffle optimizer
canondata
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp8
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp44
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.h1
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.cpp5
4 files changed, 58 insertions, 0 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index 35b448d4ed..d9a1ba27e3 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -78,6 +78,8 @@ public:
AddHandler(0, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<false>));
AddHandler(0, &TDqStage::Match, HNDL(ApplyLimitToReadTableSource));
+ AddHandler(0, &TDqCnHashShuffle::Match, HNDL(BuildHashShuffleByKeyStage));
+
AddHandler(0, &TCoAggregateCombine::Match, HNDL(ExpandAggregatePhase));
AddHandler(0, &TCoAggregateCombineState::Match, HNDL(ExpandAggregatePhase));
AddHandler(0, &TCoAggregateMergeState::Match, HNDL(ExpandAggregatePhase));
@@ -173,6 +175,12 @@ protected:
return output;
}
+ TMaybeNode<TExprBase> BuildHashShuffleByKeyStage(TExprBase node, TExprContext& ctx) {
+ auto output = DqBuildHashShuffleByKeyStage(node, ctx, {});
+ DumpAppliedRule("BuildHashShuffleByKeyStage", node.Ptr(), output.Ptr(), ctx);
+ return TExprBase(output);
+ }
+
TMaybeNode<TExprBase> ExpandAggregatePhase(TExprBase node, TExprContext& ctx) {
auto output = ExpandAggregatePeepholeImpl(node.Ptr(), ctx, TypesCtx, KqpCtx.Config->HasOptUseFinalizeByKey(), false);
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index 9649f8306a..0dca7b3b3a 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -1017,6 +1017,50 @@ TExprBase DqBuildShuffleStage(TExprBase node, TExprContext& ctx, const TParentsM
.Done();
}
+NNodes::TExprBase DqBuildHashShuffleByKeyStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& /*parentsMap*/) {
+ if (!node.Maybe<TDqCnHashShuffle>()) {
+ return node;
+ }
+ auto cnHash = node.Cast<TDqCnHashShuffle>();
+ auto stage = cnHash.Output().Stage();
+ if (!stage.Program().Body().Maybe<TCoExtend>()) {
+ return node;
+ }
+ auto extend = stage.Program().Body().Cast<TCoExtend>();
+ TNodeSet nodes;
+ for (auto&& i : stage.Program().Args()) {
+ nodes.emplace(i.Raw());
+ }
+ for (auto&& i : extend) {
+ if (nodes.erase(i.Raw()) != 1) {
+ return node;
+ }
+ }
+ if (!nodes.empty()) {
+ return node;
+ }
+ TExprNode::TListType nodesTuple;
+ for (auto&& i : stage.Inputs()) {
+ if (!i.Maybe<TDqCnUnionAll>()) {
+ return node;
+ }
+ auto uAll = i.Cast<TDqCnUnionAll>();
+ nodesTuple.emplace_back(ctx.ChangeChild(node.Ref(), 0, uAll.Output().Ptr()));
+ }
+ auto stageCopy = ctx.ChangeChild(stage.Ref(), 0, ctx.NewList(node.Pos(), std::move(nodesTuple)));
+ auto output =
+ Build<TDqOutput>(ctx, node.Pos())
+ .Stage(stageCopy)
+ .Index().Build("0")
+ .Done();
+ auto outputCnMap =
+ Build<TDqCnMap>(ctx, node.Pos())
+ .Output(output)
+ .Done();
+
+ return TExprBase(outputCnMap);
+}
+
TExprBase DqBuildFinalizeByKeyStage(TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap) {
auto finalizeInput = node.Maybe<TCoFinalizeByKey>().Input();
if (!finalizeInput.Maybe<TDqCnUnionAll>()) {
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h
index f47848c83a..f5462bc3ad 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.h
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.h
@@ -49,6 +49,7 @@ NNodes::TExprBase DqBuildPartitionStage(NNodes::TExprBase node, TExprContext& ct
NNodes::TExprBase DqBuildShuffleStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap);
NNodes::TExprBase DqBuildFinalizeByKeyStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap);
+NNodes::TExprBase DqBuildHashShuffleByKeyStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap);
NNodes::TExprBase DqBuildAggregationResultStage(NNodes::TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx);
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
index 9280903907..97e67c785f 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
@@ -35,6 +35,7 @@ public:
AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage));
AddHandler(0, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage));
AddHandler(0, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage));
+ AddHandler(0, &TDqCnHashShuffle::Match, HNDL(BuildHashShuffleByKeyStage));
AddHandler(0, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage));
AddHandler(0, &TCoAsList::Match, HNDL(BuildAggregationResultStage));
AddHandler(0, &TCoTopSort::Match, HNDL(BuildTopSortStage<false>));
@@ -278,6 +279,10 @@ protected:
return DqBuildShuffleStage(node, ctx, *getParents());
}
+ TMaybeNode<TExprBase> BuildHashShuffleByKeyStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
+ return DqBuildHashShuffleByKeyStage(node, ctx, *getParents());
+ }
+
TMaybeNode<TExprBase> BuildFinalizeByKeyStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
return DqBuildFinalizeByKeyStage(node, ctx, *getParents());
}