aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-11-19 19:59:28 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-11-19 20:22:53 +0300
commitaa7dd150f8c667dbb1be1a703f8f92ad2efca76a (patch)
treefa80c5091a5195eb09f4fab96a8fb298f4d05799
parenta5c96cefcb982ed8198a1257c38a7873fa0499d8 (diff)
downloadydb-aa7dd150f8c667dbb1be1a703f8f92ad2efca76a.tar.gz
KIKIMR-20084: provide computation sharding into read callable
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log.cpp7
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp9
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp2
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp108
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h5
5 files changed, 92 insertions, 39 deletions
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
index 13d19f9711..cad5615196 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
@@ -2,6 +2,7 @@
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/opt/kqp_opt_impl.h>
+#include <ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
@@ -30,6 +31,7 @@ public:
AddHandler(0, &TCoFlatMap::Match, HNDL(PushPredicateToReadTable));
AddHandler(0, &TCoFlatMap::Match, HNDL(PushExtractedPredicateToReadTable));
AddHandler(0, &TCoAggregate::Match, HNDL(RewriteAggregate));
+ AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushdownOlapGroupByKeys));
AddHandler(0, &TCoTake::Match, HNDL(RewriteTakeSortToTopSort));
AddHandler(0, &TCoFlatMap::Match, HNDL(RewriteSqlInToEquiJoin));
AddHandler(0, &TCoFlatMap::Match, HNDL(RewriteSqlInCompactToJoin));
@@ -100,6 +102,11 @@ protected:
return output;
}
+ TMaybeNode<TExprBase> PushdownOlapGroupByKeys(TExprBase node, TExprContext& ctx) {
+ TExprBase output = KqpPushDownOlapGroupByKeys(node, ctx, KqpCtx);
+ DumpAppliedRule("PushdownOlapGroupByKeys", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
TMaybeNode<TExprBase> RewriteAggregate(TExprBase node, TExprContext& ctx) {
TExprBase output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown(), KqpCtx.Config->HasOptUseFinalizeByKey());
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index 11e5887624..276e1c5413 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -42,6 +42,7 @@ public:
AddHandler(0, &TCoFlatMap::Match, HNDL(PushOlapFilter));
AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushAggregateCombineToStage));
AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushOlapAggregate));
+ AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushdownOlapGroupByKeys));
AddHandler(0, &TDqPhyLength::Match, HNDL(PushOlapLength));
AddHandler(0, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<false>));
AddHandler(0, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<false>));
@@ -177,7 +178,7 @@ protected:
}
TMaybeNode<TExprBase> RewriteKqpLookupTable(TExprBase node, TExprContext& ctx) {
- TExprBase output = KqpRewriteLookupTable(node, ctx, KqpCtx);
+ TExprBase output = KqpRewriteLookupTablePhy(node, ctx, KqpCtx);
DumpAppliedRule("RewriteKqpLookupTable", node.Ptr(), output.Ptr(), ctx);
return output;
}
@@ -208,6 +209,12 @@ protected:
return output;
}
+ TMaybeNode<TExprBase> PushdownOlapGroupByKeys(TExprBase node, TExprContext& ctx) {
+ TExprBase output = KqpPushDownOlapGroupByKeys(node, ctx, KqpCtx);
+ DumpAppliedRule("PushdownOlapGroupByKeys", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
+
TMaybeNode<TExprBase> PushOlapAggregate(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpPushOlapAggregate(node, ctx, KqpCtx);
DumpAppliedRule("PushOlapAggregate", node.Ptr(), output.Ptr(), ctx);
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
index f8ceab62c5..03e47725e6 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
@@ -550,7 +550,7 @@ NYql::NNodes::TExprBase KqpBuildSequencerStages(NYql::NNodes::TExprBase node, NY
.Build().Done();
}
-NYql::NNodes::TExprBase KqpRewriteLookupTable(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
+NYql::NNodes::TExprBase KqpRewriteLookupTablePhy(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx) {
if (!node.Maybe<TDqStage>() || !kqpCtx.Config->EnableKqpDataQueryStreamLookup) {
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
index 81896143bf..d6fc1149ec 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/formats/arrow/ssa_runtime_version.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
+#include <library/cpp/actors/core/log.h>
#include <vector>
#include <unordered_set>
@@ -198,6 +199,76 @@ TExprBase BuildAvgResultProcessing(const std::vector<TAggInfo>& aggInfos, const
} // anonymous namespace end
+template <class TReadClass>
+TExprBase KqpPushDownOlapGroupByKeysImpl(TExprBase node, TExprContext& ctx, bool& applied) {
+ applied = false;
+ auto aggCombine = node.Cast<TCoAggregateCombine>();
+ if (aggCombine.Keys().Empty()) {
+ return node;
+ }
+
+ auto maybeRead = aggCombine.Input().Maybe<TReadClass>();
+ if (!maybeRead) {
+ maybeRead = aggCombine.Input().Maybe<TCoFlatMap>().Input().Maybe<TReadClass>();
+ }
+
+ if (!maybeRead) {
+ return node;
+ }
+
+ if (NYql::HasSetting(maybeRead.Settings().Ref(), TKqpReadTableSettings::GroupByFieldNames)) {
+ return node;
+ }
+ auto newSettings = NYql::AddSetting(maybeRead.Settings().Cast().Ref(), maybeRead.Settings().Cast().Pos(),
+ TString(TKqpReadTableSettings::GroupByFieldNames.data(), TKqpReadTableSettings::GroupByFieldNames.size()), aggCombine.Keys().Ptr(), ctx);
+ if (auto read = aggCombine.Input().Maybe<TReadClass>()) {
+ applied = true;
+ return
+ Build<TCoAggregateCombine>(ctx, node.Pos()).InitFrom(node.Cast<TCoAggregateCombine>())
+ .Input<TReadClass>().InitFrom(read.Cast())
+ .Settings(newSettings)
+ .Build()
+ .Done();
+ } else if (auto read = aggCombine.Input().Maybe<TCoFlatMap>().Input().Maybe<TReadClass>()) {
+ applied = true;
+ return
+ Build<TCoAggregateCombine>(ctx, node.Pos()).InitFrom(node.Cast<TCoAggregateCombine>())
+ .Input<TCoFlatMap>().InitFrom(aggCombine.Input().Maybe<TCoFlatMap>().Cast())
+ .Input<TReadClass>().InitFrom(read.Cast())
+ .Settings(newSettings)
+ .Build()
+ .Build()
+ .Done();
+ } else {
+ return node;
+ }
+}
+
+TExprBase KqpPushDownOlapGroupByKeys(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
+ if (NKikimr::NSsa::RuntimeVersion < 2U) {
+ // We introduced aggregate pushdown in v2 of SSA program
+ return node;
+ }
+
+ if (!kqpCtx.Config->HasOptEnableOlapPushdown()) {
+ return node;
+ }
+
+ if (!node.Maybe<TCoAggregateCombine>()) {
+ return node;
+ }
+ bool applied = false;
+ auto result = KqpPushDownOlapGroupByKeysImpl<TKqpReadOlapTableRanges>(node, ctx, applied);
+ if (applied) {
+ return result;
+ }
+ result = KqpPushDownOlapGroupByKeysImpl<TKqlReadTableRanges>(node, ctx, applied);
+ if (applied) {
+ return result;
+ }
+ return node;
+}
+
TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx)
{
if (NKikimr::NSsa::RuntimeVersion < 2U) {
@@ -222,9 +293,6 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti
auto maybeRead = aggCombine.Input().Maybe<TKqpReadOlapTableRanges>();
if (!maybeRead) {
maybeRead = aggCombine.Input().Maybe<TCoExtractMembers>().Input().Maybe<TKqpReadOlapTableRanges>();
- if (!maybeRead) {
- maybeRead = aggCombine.Input().Maybe<TCoFlatMap>().Input().Maybe<TKqpReadOlapTableRanges>();
- }
}
if (!maybeRead) {
@@ -233,39 +301,7 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti
// temporary for keys grouping push down not useful
if (!aggCombine.Keys().Empty()) {
- if (NYql::HasSetting(maybeRead.Settings().Ref(), TKqpReadTableSettings::GroupByFieldNames)) {
- return node;
- }
- auto newSettings = NYql::AddSetting(maybeRead.Settings().Cast().Ref(), maybeRead.Settings().Cast().Pos(),
- TString(TKqpReadTableSettings::GroupByFieldNames.data(), TKqpReadTableSettings::GroupByFieldNames.size()), aggCombine.Keys().Ptr(), ctx);
- if (auto read = aggCombine.Input().Maybe<TKqpReadOlapTableRanges>()) {
- return
- Build<TCoAggregateCombine>(ctx, node.Pos()).InitFrom(node.Cast<TCoAggregateCombine>())
- .Input<TKqpReadOlapTableRanges>().InitFrom(read.Cast())
- .Settings(newSettings)
- .Build()
- .Done();
- } else if (auto read = aggCombine.Input().Maybe<TCoExtractMembers>().Input().Maybe<TKqpReadOlapTableRanges>()) {
- return
- Build<TCoAggregateCombine>(ctx, node.Pos()).InitFrom(node.Cast<TCoAggregateCombine>())
- .Input<TCoExtractMembers>().InitFrom(aggCombine.Input().Maybe<TCoExtractMembers>().Cast())
- .Input<TKqpReadOlapTableRanges>().InitFrom(read.Cast())
- .Settings(newSettings)
- .Build()
- .Build()
- .Done();
- } else if (auto read = aggCombine.Input().Maybe<TCoFlatMap>().Input().Maybe<TKqpReadOlapTableRanges>()) {
- return
- Build<TCoAggregateCombine>(ctx, node.Pos()).InitFrom(node.Cast<TCoAggregateCombine>())
- .Input<TCoFlatMap>().InitFrom(aggCombine.Input().Maybe<TCoFlatMap>().Cast())
- .Input<TKqpReadOlapTableRanges>().InitFrom(read.Cast())
- .Settings(newSettings)
- .Build()
- .Build()
- .Done();
- } else {
- Y_ABORT_UNLESS(false);
- }
+ return node;
}
auto read = maybeRead.Cast();
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
index 64c58a7377..782e50390a 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
@@ -14,7 +14,7 @@ namespace NKikimr::NKqp::NOpt {
NYql::NNodes::TExprBase KqpRewriteReadTable(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);
-NYql::NNodes::TExprBase KqpRewriteLookupTable(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
+NYql::NNodes::TExprBase KqpRewriteLookupTablePhy(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);
NYql::NNodes::TExprBase KqpBuildReadTableStage(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
@@ -46,6 +46,9 @@ NYql::NNodes::TExprBase KqpPushOlapFilter(NYql::NNodes::TExprBase node, NYql::TE
NYql::NNodes::TExprBase KqpPushOlapAggregate(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);
+NYql::NNodes::TExprBase KqpPushDownOlapGroupByKeys(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
+ const TKqpOptimizeContext& kqpCtx);
+
NYql::NNodes::TExprBase KqpPushOlapLength(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);