aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraidarsamer <aidarsamer@ydb.tech>2022-10-18 11:36:35 +0300
committeraidarsamer <aidarsamer@ydb.tech>2022-10-18 11:36:35 +0300
commit3b27a244b3f6b9b45ad1835aa6f91278edda1ecd (patch)
treeac9eec6cb5e016e13b8e53bf68db07a07d648f77
parent92be03d3315373b944d7cf676eb857646863a0a6 (diff)
downloadydb-3b27a244b3f6b9b45ad1835aa6f91278edda1ecd.tar.gz
Add AggregateCombine push to stage optimization.
Add AggregateCombine push to stage optimization
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log.cpp2
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp24
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_list.cpp13
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp31
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.h5
5 files changed, 64 insertions, 11 deletions
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
index e8dbf24a5b..f8e5f2b1a1 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
@@ -75,7 +75,7 @@ protected:
}
TMaybeNode<TExprBase> RewriteAggregate(TExprBase node, TExprContext& ctx) {
- TExprBase output = DqRewriteAggregate(node, ctx, TypesCtx, false, false);
+ TExprBase output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->PushOlapProcess());
DumpAppliedRule("RewriteAggregate", node.Ptr(), output.Ptr(), ctx);
return output;
}
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index f992b8d8dd..9995f7c669 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -4,6 +4,7 @@
#include <ydb/core/kqp/opt/kqp_opt_impl.h>
#include <ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_rules.h>
+#include <ydb/library/yql/core/yql_aggregate_expander.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/dq/opt/dq_opt.h>
@@ -34,6 +35,7 @@ public:
HNDL(RemoveRedundantSortByPk));
AddHandler(0, &TCoTake::Match, HNDL(ApplyLimitToReadTable));
AddHandler(0, &TCoFlatMap::Match, HNDL(PushOlapFilter));
+ AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushAggregateCombineToStage));
AddHandler(0, &TCoCombineByKey::Match, HNDL(PushOlapAggregate));
AddHandler(0, &TDqPhyLength::Match, HNDL(PushOlapLength));
AddHandler(0, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<false>));
@@ -72,6 +74,13 @@ public:
AddHandler(0, &TCoTake::Match, HNDL(PropagatePrecomuteTake<false>));
AddHandler(0, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<false>));
+ AddHandler(0, &TCoAggregateCombine::Match, HNDL(ExpandAggregatePhase));
+ AddHandler(0, &TCoAggregateCombineState::Match, HNDL(ExpandAggregatePhase));
+ AddHandler(0, &TCoAggregateMergeState::Match, HNDL(ExpandAggregatePhase));
+ AddHandler(0, &TCoAggregateMergeFinalize::Match, HNDL(ExpandAggregatePhase));
+ AddHandler(0, &TCoAggregateMergeManyFinalize::Match, HNDL(ExpandAggregatePhase));
+ AddHandler(0, &TCoAggregateFinalize::Match, HNDL(ExpandAggregatePhase));
+
AddHandler(1, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<true>));
AddHandler(1, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<true>));
AddHandler(1, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<true>));
@@ -140,12 +149,27 @@ protected:
return output;
}
+ TMaybeNode<TExprBase> PushAggregateCombineToStage(TExprBase node, TExprContext& ctx,
+ IOptimizationContext& optCtx, const TGetParents& getParents)
+ {
+ TExprBase output = DqPushAggregateCombineToStage(node, ctx, optCtx, *getParents(), false);
+ DumpAppliedRule("PushAggregateCombineToStage", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
+
TMaybeNode<TExprBase> PushOlapAggregate(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpPushOlapAggregate(node, ctx, KqpCtx);
DumpAppliedRule("PushOlapAggregate", node.Ptr(), output.Ptr(), ctx);
return output;
}
+
+ TMaybeNode<TExprBase> ExpandAggregatePhase(TExprBase node, TExprContext& ctx) {
+ auto output = ExpandAggregatePeephole(node.Ptr(), ctx, TypesCtx);
+ DumpAppliedRule("ExpandAggregatePhase", node.Ptr(), output, ctx);
+ return TExprBase(output);
+ }
+
TMaybeNode<TExprBase> PushOlapLength(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpPushOlapLength(node, ctx, KqpCtx);
DumpAppliedRule("PushOlapLength", node.Ptr(), output.Ptr(), ctx);
diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp
index 0c267c4f13..fb418a7b04 100644
--- a/ydb/library/yql/core/type_ann/type_ann_list.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp
@@ -4582,14 +4582,11 @@ namespace {
return IGraphTransformer::TStatus::Repeat;
}
- bool isStream;
- if (!EnsureSeqType(input->Head(), ctx.Expr, &isStream)) {
+ const TTypeAnnotationNode* inputItemType = nullptr;
+ if (!EnsureNewSeqType<false, true>(input->Head(), ctx.Expr, &inputItemType)) {
return IGraphTransformer::TStatus::Error;
}
-
- auto inputItemType = isStream
- ? input->Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()
- : input->Head().GetTypeAnn()->Cast<TListExprType>()->GetItemType();
+ auto inputTypeKind = input->Head().GetTypeAnn()->GetKind();
if (!EnsureStructType(input->Head().Pos(), *inputItemType, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
@@ -4911,9 +4908,7 @@ namespace {
return IGraphTransformer::TStatus::Error;
}
- input->SetTypeAnn(isStream
- ? (const TTypeAnnotationNode*)ctx.Expr.MakeType<TStreamExprType>(rowType)
- : (const TTypeAnnotationNode*)ctx.Expr.MakeType<TListExprType>(rowType));
+ input->SetTypeAnn(MakeSequenceType(inputTypeKind, *rowType, ctx.Expr));
return IGraphTransformer::TStatus::Ok;
}
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index 9b45bed52b..e8eefd77e6 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -876,6 +876,37 @@ TExprBase DqPushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationC
return result.Cast();
}
+NNodes::TExprBase DqPushAggregateCombineToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage)
+{
+ if (!node.Maybe<TCoAggregateCombine>().Input().Maybe<TDqCnUnionAll>()) {
+ return node;
+ }
+
+ auto aggCombine = node.Cast<TCoAggregateCombine>();
+ auto dqUnion = aggCombine.Input().Cast<TDqCnUnionAll>();
+ if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) {
+ return node;
+ }
+
+ auto lambda = Build<TCoLambda>(ctx, aggCombine.Pos())
+ .Args({"stream"})
+ .Body<TCoAggregateCombine>()
+ .Input("stream")
+ .Keys(aggCombine.Keys())
+ .Handlers(aggCombine.Handlers())
+ .Settings(aggCombine.Settings())
+ .Build()
+ .Done();
+
+ auto result = DqPushLambdaToStageUnionAll(dqUnion, lambda, {}, ctx, optCtx);
+ if (!result) {
+ return node;
+ }
+
+ return result.Cast();
+}
+
TExprBase DqBuildPartitionsStage(TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap) {
return DqBuildPartitionsStageStub<TCoPartitionsByKeys>(std::move(node), ctx, parentsMap);
}
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h
index 242902bd22..4b3a49f704 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.h
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.h
@@ -39,6 +39,9 @@ NNodes::TExprBase DqBuildFlatmapStage(NNodes::TExprBase node, TExprContext& ctx,
NNodes::TExprBase DqPushCombineToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
+NNodes::TExprBase DqPushAggregateCombineToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
+
NNodes::TExprBase DqBuildPartitionsStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap);
NNodes::TExprBase DqBuildPartitionStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap);
@@ -75,7 +78,7 @@ NNodes::TExprBase DqRewriteLeftPureJoin(const NNodes::TExprBase node, TExprConte
NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx,
IOptimizationContext& optCtx);
-NNodes::TExprBase DqBuildJoin(const NNodes::TExprBase& node, TExprContext& ctx,
+NNodes::TExprBase DqBuildJoin(const NNodes::TExprBase& node, TExprContext& ctx,
IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, bool useGraceJoin = false);
NNodes::TExprBase DqBuildGraceJoin(const NNodes::TDqJoin& join, TExprContext& ctx);