diff options
author | aidarsamer <aidarsamer@ydb.tech> | 2022-10-18 11:36:35 +0300 |
---|---|---|
committer | aidarsamer <aidarsamer@ydb.tech> | 2022-10-18 11:36:35 +0300 |
commit | 3b27a244b3f6b9b45ad1835aa6f91278edda1ecd (patch) | |
tree | ac9eec6cb5e016e13b8e53bf68db07a07d648f77 | |
parent | 92be03d3315373b944d7cf676eb857646863a0a6 (diff) | |
download | ydb-3b27a244b3f6b9b45ad1835aa6f91278edda1ecd.tar.gz |
Add AggregateCombine push to stage optimization.
Add AggregateCombine push to stage optimization
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 24 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_list.cpp | 13 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 31 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.h | 5 |
5 files changed, 64 insertions, 11 deletions
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp index e8dbf24a5b..f8e5f2b1a1 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp @@ -75,7 +75,7 @@ protected: } TMaybeNode<TExprBase> RewriteAggregate(TExprBase node, TExprContext& ctx) { - TExprBase output = DqRewriteAggregate(node, ctx, TypesCtx, false, false); + TExprBase output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->PushOlapProcess()); DumpAppliedRule("RewriteAggregate", node.Ptr(), output.Ptr(), ctx); return output; } diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index f992b8d8dd..9995f7c669 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -4,6 +4,7 @@ #include <ydb/core/kqp/opt/kqp_opt_impl.h> #include <ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_rules.h> +#include <ydb/library/yql/core/yql_aggregate_expander.h> #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/core/yql_opt_utils.h> #include <ydb/library/yql/dq/opt/dq_opt.h> @@ -34,6 +35,7 @@ public: HNDL(RemoveRedundantSortByPk)); AddHandler(0, &TCoTake::Match, HNDL(ApplyLimitToReadTable)); AddHandler(0, &TCoFlatMap::Match, HNDL(PushOlapFilter)); + AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushAggregateCombineToStage)); AddHandler(0, &TCoCombineByKey::Match, HNDL(PushOlapAggregate)); AddHandler(0, &TDqPhyLength::Match, HNDL(PushOlapLength)); AddHandler(0, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<false>)); @@ -72,6 +74,13 @@ public: AddHandler(0, &TCoTake::Match, HNDL(PropagatePrecomuteTake<false>)); AddHandler(0, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<false>)); + AddHandler(0, &TCoAggregateCombine::Match, HNDL(ExpandAggregatePhase)); + AddHandler(0, &TCoAggregateCombineState::Match, HNDL(ExpandAggregatePhase)); + AddHandler(0, &TCoAggregateMergeState::Match, HNDL(ExpandAggregatePhase)); + AddHandler(0, &TCoAggregateMergeFinalize::Match, HNDL(ExpandAggregatePhase)); + AddHandler(0, &TCoAggregateMergeManyFinalize::Match, HNDL(ExpandAggregatePhase)); + AddHandler(0, &TCoAggregateFinalize::Match, HNDL(ExpandAggregatePhase)); + AddHandler(1, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<true>)); AddHandler(1, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<true>)); AddHandler(1, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<true>)); @@ -140,12 +149,27 @@ protected: return output; } + TMaybeNode<TExprBase> PushAggregateCombineToStage(TExprBase node, TExprContext& ctx, + IOptimizationContext& optCtx, const TGetParents& getParents) + { + TExprBase output = DqPushAggregateCombineToStage(node, ctx, optCtx, *getParents(), false); + DumpAppliedRule("PushAggregateCombineToStage", node.Ptr(), output.Ptr(), ctx); + return output; + } + TMaybeNode<TExprBase> PushOlapAggregate(TExprBase node, TExprContext& ctx) { TExprBase output = KqpPushOlapAggregate(node, ctx, KqpCtx); DumpAppliedRule("PushOlapAggregate", node.Ptr(), output.Ptr(), ctx); return output; } + + TMaybeNode<TExprBase> ExpandAggregatePhase(TExprBase node, TExprContext& ctx) { + auto output = ExpandAggregatePeephole(node.Ptr(), ctx, TypesCtx); + DumpAppliedRule("ExpandAggregatePhase", node.Ptr(), output, ctx); + return TExprBase(output); + } + TMaybeNode<TExprBase> PushOlapLength(TExprBase node, TExprContext& ctx) { TExprBase output = KqpPushOlapLength(node, ctx, KqpCtx); DumpAppliedRule("PushOlapLength", node.Ptr(), output.Ptr(), ctx); diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp index 0c267c4f13..fb418a7b04 100644 --- a/ydb/library/yql/core/type_ann/type_ann_list.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp @@ -4582,14 +4582,11 @@ namespace { return IGraphTransformer::TStatus::Repeat; } - bool isStream; - if (!EnsureSeqType(input->Head(), ctx.Expr, &isStream)) { + const TTypeAnnotationNode* inputItemType = nullptr; + if (!EnsureNewSeqType<false, true>(input->Head(), ctx.Expr, &inputItemType)) { return IGraphTransformer::TStatus::Error; } - - auto inputItemType = isStream - ? input->Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType() - : input->Head().GetTypeAnn()->Cast<TListExprType>()->GetItemType(); + auto inputTypeKind = input->Head().GetTypeAnn()->GetKind(); if (!EnsureStructType(input->Head().Pos(), *inputItemType, ctx.Expr)) { return IGraphTransformer::TStatus::Error; @@ -4911,9 +4908,7 @@ namespace { return IGraphTransformer::TStatus::Error; } - input->SetTypeAnn(isStream - ? (const TTypeAnnotationNode*)ctx.Expr.MakeType<TStreamExprType>(rowType) - : (const TTypeAnnotationNode*)ctx.Expr.MakeType<TListExprType>(rowType)); + input->SetTypeAnn(MakeSequenceType(inputTypeKind, *rowType, ctx.Expr)); return IGraphTransformer::TStatus::Ok; } diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index 9b45bed52b..e8eefd77e6 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -876,6 +876,37 @@ TExprBase DqPushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationC return result.Cast(); } +NNodes::TExprBase DqPushAggregateCombineToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage) +{ + if (!node.Maybe<TCoAggregateCombine>().Input().Maybe<TDqCnUnionAll>()) { + return node; + } + + auto aggCombine = node.Cast<TCoAggregateCombine>(); + auto dqUnion = aggCombine.Input().Cast<TDqCnUnionAll>(); + if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) { + return node; + } + + auto lambda = Build<TCoLambda>(ctx, aggCombine.Pos()) + .Args({"stream"}) + .Body<TCoAggregateCombine>() + .Input("stream") + .Keys(aggCombine.Keys()) + .Handlers(aggCombine.Handlers()) + .Settings(aggCombine.Settings()) + .Build() + .Done(); + + auto result = DqPushLambdaToStageUnionAll(dqUnion, lambda, {}, ctx, optCtx); + if (!result) { + return node; + } + + return result.Cast(); +} + TExprBase DqBuildPartitionsStage(TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap) { return DqBuildPartitionsStageStub<TCoPartitionsByKeys>(std::move(node), ctx, parentsMap); } diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index 242902bd22..4b3a49f704 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -39,6 +39,9 @@ NNodes::TExprBase DqBuildFlatmapStage(NNodes::TExprBase node, TExprContext& ctx, NNodes::TExprBase DqPushCombineToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage = true); +NNodes::TExprBase DqPushAggregateCombineToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage = true); + NNodes::TExprBase DqBuildPartitionsStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap); NNodes::TExprBase DqBuildPartitionStage(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap); @@ -75,7 +78,7 @@ NNodes::TExprBase DqRewriteLeftPureJoin(const NNodes::TExprBase node, TExprConte NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx); -NNodes::TExprBase DqBuildJoin(const NNodes::TExprBase& node, TExprContext& ctx, +NNodes::TExprBase DqBuildJoin(const NNodes::TExprBase& node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, bool useGraceJoin = false); NNodes::TExprBase DqBuildGraceJoin(const NNodes::TDqJoin& join, TExprContext& ctx); |