diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-11-19 19:59:28 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-11-19 20:22:53 +0300 |
commit | aa7dd150f8c667dbb1be1a703f8f92ad2efca76a (patch) | |
tree | fa80c5091a5195eb09f4fab96a8fb298f4d05799 | |
parent | a5c96cefcb982ed8198a1257c38a7873fa0499d8 (diff) | |
download | ydb-aa7dd150f8c667dbb1be1a703f8f92ad2efca76a.tar.gz |
KIKIMR-20084: provide computation sharding into read callable
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp | 108 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h | 5 |
5 files changed, 92 insertions, 39 deletions
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp index 13d19f9711..cad5615196 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp @@ -2,6 +2,7 @@ #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/kqp/opt/kqp_opt_impl.h> +#include <ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h> #include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h> #include <ydb/library/yql/core/yql_opt_utils.h> @@ -30,6 +31,7 @@ public: AddHandler(0, &TCoFlatMap::Match, HNDL(PushPredicateToReadTable)); AddHandler(0, &TCoFlatMap::Match, HNDL(PushExtractedPredicateToReadTable)); AddHandler(0, &TCoAggregate::Match, HNDL(RewriteAggregate)); + AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushdownOlapGroupByKeys)); AddHandler(0, &TCoTake::Match, HNDL(RewriteTakeSortToTopSort)); AddHandler(0, &TCoFlatMap::Match, HNDL(RewriteSqlInToEquiJoin)); AddHandler(0, &TCoFlatMap::Match, HNDL(RewriteSqlInCompactToJoin)); @@ -100,6 +102,11 @@ protected: return output; } + TMaybeNode<TExprBase> PushdownOlapGroupByKeys(TExprBase node, TExprContext& ctx) { + TExprBase output = KqpPushDownOlapGroupByKeys(node, ctx, KqpCtx); + DumpAppliedRule("PushdownOlapGroupByKeys", node.Ptr(), output.Ptr(), ctx); + return output; + } TMaybeNode<TExprBase> RewriteAggregate(TExprBase node, TExprContext& ctx) { TExprBase output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown(), KqpCtx.Config->HasOptUseFinalizeByKey()); diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 11e5887624..276e1c5413 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -42,6 +42,7 @@ public: AddHandler(0, &TCoFlatMap::Match, HNDL(PushOlapFilter)); AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushAggregateCombineToStage)); AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushOlapAggregate)); + AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushdownOlapGroupByKeys)); AddHandler(0, &TDqPhyLength::Match, HNDL(PushOlapLength)); AddHandler(0, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<false>)); AddHandler(0, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<false>)); @@ -177,7 +178,7 @@ protected: } TMaybeNode<TExprBase> RewriteKqpLookupTable(TExprBase node, TExprContext& ctx) { - TExprBase output = KqpRewriteLookupTable(node, ctx, KqpCtx); + TExprBase output = KqpRewriteLookupTablePhy(node, ctx, KqpCtx); DumpAppliedRule("RewriteKqpLookupTable", node.Ptr(), output.Ptr(), ctx); return output; } @@ -208,6 +209,12 @@ protected: return output; } + TMaybeNode<TExprBase> PushdownOlapGroupByKeys(TExprBase node, TExprContext& ctx) { + TExprBase output = KqpPushDownOlapGroupByKeys(node, ctx, KqpCtx); + DumpAppliedRule("PushdownOlapGroupByKeys", node.Ptr(), output.Ptr(), ctx); + return output; + } + TMaybeNode<TExprBase> PushOlapAggregate(TExprBase node, TExprContext& ctx) { TExprBase output = KqpPushOlapAggregate(node, ctx, KqpCtx); DumpAppliedRule("PushOlapAggregate", node.Ptr(), output.Ptr(), ctx); diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp index f8ceab62c5..03e47725e6 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp @@ -550,7 +550,7 @@ NYql::NNodes::TExprBase KqpBuildSequencerStages(NYql::NNodes::TExprBase node, NY .Build().Done(); } -NYql::NNodes::TExprBase KqpRewriteLookupTable(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, +NYql::NNodes::TExprBase KqpRewriteLookupTablePhy(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { if (!node.Maybe<TDqStage>() || !kqpCtx.Config->EnableKqpDataQueryStreamLookup) { diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp index 81896143bf..d6fc1149ec 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp @@ -5,6 +5,7 @@ #include <ydb/core/formats/arrow/ssa_runtime_version.h> #include <ydb/library/yql/core/yql_opt_utils.h> +#include <library/cpp/actors/core/log.h> #include <vector> #include <unordered_set> @@ -198,6 +199,76 @@ TExprBase BuildAvgResultProcessing(const std::vector<TAggInfo>& aggInfos, const } // anonymous namespace end +template <class TReadClass> +TExprBase KqpPushDownOlapGroupByKeysImpl(TExprBase node, TExprContext& ctx, bool& applied) { + applied = false; + auto aggCombine = node.Cast<TCoAggregateCombine>(); + if (aggCombine.Keys().Empty()) { + return node; + } + + auto maybeRead = aggCombine.Input().Maybe<TReadClass>(); + if (!maybeRead) { + maybeRead = aggCombine.Input().Maybe<TCoFlatMap>().Input().Maybe<TReadClass>(); + } + + if (!maybeRead) { + return node; + } + + if (NYql::HasSetting(maybeRead.Settings().Ref(), TKqpReadTableSettings::GroupByFieldNames)) { + return node; + } + auto newSettings = NYql::AddSetting(maybeRead.Settings().Cast().Ref(), maybeRead.Settings().Cast().Pos(), + TString(TKqpReadTableSettings::GroupByFieldNames.data(), TKqpReadTableSettings::GroupByFieldNames.size()), aggCombine.Keys().Ptr(), ctx); + if (auto read = aggCombine.Input().Maybe<TReadClass>()) { + applied = true; + return + Build<TCoAggregateCombine>(ctx, node.Pos()).InitFrom(node.Cast<TCoAggregateCombine>()) + .Input<TReadClass>().InitFrom(read.Cast()) + .Settings(newSettings) + .Build() + .Done(); + } else if (auto read = aggCombine.Input().Maybe<TCoFlatMap>().Input().Maybe<TReadClass>()) { + applied = true; + return + Build<TCoAggregateCombine>(ctx, node.Pos()).InitFrom(node.Cast<TCoAggregateCombine>()) + .Input<TCoFlatMap>().InitFrom(aggCombine.Input().Maybe<TCoFlatMap>().Cast()) + .Input<TReadClass>().InitFrom(read.Cast()) + .Settings(newSettings) + .Build() + .Build() + .Done(); + } else { + return node; + } +} + +TExprBase KqpPushDownOlapGroupByKeys(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { + if (NKikimr::NSsa::RuntimeVersion < 2U) { + // We introduced aggregate pushdown in v2 of SSA program + return node; + } + + if (!kqpCtx.Config->HasOptEnableOlapPushdown()) { + return node; + } + + if (!node.Maybe<TCoAggregateCombine>()) { + return node; + } + bool applied = false; + auto result = KqpPushDownOlapGroupByKeysImpl<TKqpReadOlapTableRanges>(node, ctx, applied); + if (applied) { + return result; + } + result = KqpPushDownOlapGroupByKeysImpl<TKqlReadTableRanges>(node, ctx, applied); + if (applied) { + return result; + } + return node; +} + TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { if (NKikimr::NSsa::RuntimeVersion < 2U) { @@ -222,9 +293,6 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti auto maybeRead = aggCombine.Input().Maybe<TKqpReadOlapTableRanges>(); if (!maybeRead) { maybeRead = aggCombine.Input().Maybe<TCoExtractMembers>().Input().Maybe<TKqpReadOlapTableRanges>(); - if (!maybeRead) { - maybeRead = aggCombine.Input().Maybe<TCoFlatMap>().Input().Maybe<TKqpReadOlapTableRanges>(); - } } if (!maybeRead) { @@ -233,39 +301,7 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti // temporary for keys grouping push down not useful if (!aggCombine.Keys().Empty()) { - if (NYql::HasSetting(maybeRead.Settings().Ref(), TKqpReadTableSettings::GroupByFieldNames)) { - return node; - } - auto newSettings = NYql::AddSetting(maybeRead.Settings().Cast().Ref(), maybeRead.Settings().Cast().Pos(), - TString(TKqpReadTableSettings::GroupByFieldNames.data(), TKqpReadTableSettings::GroupByFieldNames.size()), aggCombine.Keys().Ptr(), ctx); - if (auto read = aggCombine.Input().Maybe<TKqpReadOlapTableRanges>()) { - return - Build<TCoAggregateCombine>(ctx, node.Pos()).InitFrom(node.Cast<TCoAggregateCombine>()) - .Input<TKqpReadOlapTableRanges>().InitFrom(read.Cast()) - .Settings(newSettings) - .Build() - .Done(); - } else if (auto read = aggCombine.Input().Maybe<TCoExtractMembers>().Input().Maybe<TKqpReadOlapTableRanges>()) { - return - Build<TCoAggregateCombine>(ctx, node.Pos()).InitFrom(node.Cast<TCoAggregateCombine>()) - .Input<TCoExtractMembers>().InitFrom(aggCombine.Input().Maybe<TCoExtractMembers>().Cast()) - .Input<TKqpReadOlapTableRanges>().InitFrom(read.Cast()) - .Settings(newSettings) - .Build() - .Build() - .Done(); - } else if (auto read = aggCombine.Input().Maybe<TCoFlatMap>().Input().Maybe<TKqpReadOlapTableRanges>()) { - return - Build<TCoAggregateCombine>(ctx, node.Pos()).InitFrom(node.Cast<TCoAggregateCombine>()) - .Input<TCoFlatMap>().InitFrom(aggCombine.Input().Maybe<TCoFlatMap>().Cast()) - .Input<TKqpReadOlapTableRanges>().InitFrom(read.Cast()) - .Settings(newSettings) - .Build() - .Build() - .Done(); - } else { - Y_ABORT_UNLESS(false); - } + return node; } auto read = maybeRead.Cast(); diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h index 64c58a7377..782e50390a 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h @@ -14,7 +14,7 @@ namespace NKikimr::NKqp::NOpt { NYql::NNodes::TExprBase KqpRewriteReadTable(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); -NYql::NNodes::TExprBase KqpRewriteLookupTable(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, +NYql::NNodes::TExprBase KqpRewriteLookupTablePhy(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); NYql::NNodes::TExprBase KqpBuildReadTableStage(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, @@ -46,6 +46,9 @@ NYql::NNodes::TExprBase KqpPushOlapFilter(NYql::NNodes::TExprBase node, NYql::TE NYql::NNodes::TExprBase KqpPushOlapAggregate(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); +NYql::NNodes::TExprBase KqpPushDownOlapGroupByKeys(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, + const TKqpOptimizeContext& kqpCtx); + NYql::NNodes::TExprBase KqpPushOlapLength(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); |