diff options
author | aidarsamer <aidarsamer@ydb.tech> | 2022-10-18 22:22:21 +0300 |
---|---|---|
committer | aidarsamer <aidarsamer@ydb.tech> | 2022-10-18 22:22:21 +0300 |
commit | 1192ba7fb99ce3f8f5e52ff920687aa80fc42aba (patch) | |
tree | 221ae35a6efc2ab5dbcb9579d2d33ebe842f1f39 | |
parent | cf532004dcdf731c0035e191c519fcb8704225e9 (diff) | |
download | ydb-1192ba7fb99ce3f8f5e52ff920687aa80fc42aba.tar.gz |
Add optimization to pushdown COUNT to column shard
Pushdown COUNT to column shard
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp | 197 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_olap_ut.cpp | 63 |
3 files changed, 77 insertions, 185 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 9995f7c669a..c5726a2d7ab 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -36,7 +36,7 @@ public: AddHandler(0, &TCoTake::Match, HNDL(ApplyLimitToReadTable)); AddHandler(0, &TCoFlatMap::Match, HNDL(PushOlapFilter)); AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushAggregateCombineToStage)); - AddHandler(0, &TCoCombineByKey::Match, HNDL(PushOlapAggregate)); + AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushOlapAggregate)); AddHandler(0, &TDqPhyLength::Match, HNDL(PushOlapLength)); AddHandler(0, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<false>)); AddHandler(0, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<false>)); diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp index ed0cf7a5f0e..87589149d6c 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp @@ -13,101 +13,39 @@ using namespace NYql::NNodes; namespace { -enum class EAggType { - Count, - Some -}; - -struct TAggInfo { - std::string AggName; - std::string ColName; - EAggType Type; -}; - -bool ContainsConstOnly(const TExprBase& node) { - return node.Maybe<TCoDataCtor>().IsValid(); -} - -bool ContainsSimpleColumnOnly(const TExprBase& node, const TExprBase& parent) { - if (!parent.Maybe<TCoInputBase>()) { - return false; - } - auto input = parent.Cast<TCoInputBase>().Input(); - if (auto maybeExprList = node.Maybe<TExprList>()) { - for (auto expr : maybeExprList.Cast()) { - if (!expr.Maybe<TCoMember>() || expr.Cast<TCoMember>().Struct().Raw() != input.Raw()) { - return false; - } - } - return true; - } - return node.Maybe<TCoMember>().IsValid() && node.Cast<TCoMember>().Struct().Raw() == input.Raw(); -} - -std::vector<std::string> GetGroupByCols(const TExprBase& keySelectorBody, const TExprBase& parent) { - std::vector<std::string> res; - if (!ContainsSimpleColumnOnly(keySelectorBody, parent)) { - YQL_CLOG(DEBUG, ProviderKqp) << "For aggregate push down optimization in GROUP BY column list should be Member callables only."; - return res; +std::string GetColumnNameUnderAggregation(const TCoAggApply& aggApply, TExprContext& ctx) { + auto extractorBody = aggApply.Extractor().Body(); + if (extractorBody.Maybe<TCoVoid>() && aggApply.Name() == "count_all") { + return "*"; } - if (auto maybeMember = keySelectorBody.Maybe<TCoMember>()) { - res.push_back(keySelectorBody.Cast<TCoMember>().Name().StringValue()); - } else if (auto maybeExprList = keySelectorBody.Maybe<TExprList>()) { - for (auto expr : maybeExprList.Cast()) { - res.push_back(expr.Cast<TCoMember>().Name().StringValue()); - } + if (!extractorBody.Maybe<TCoMember>()) { + YQL_CLOG(WARN, ProviderKqp) << "Expected TCoMember callable to get column under aggregation. Got: " << KqpExprToPrettyString(extractorBody, ctx); + return ""; } - return res; + return extractorBody.Cast<TCoMember>().Name().StringValue(); } -std::vector<TAggInfo> GetAggregationsFromInit(const TExprBase& node) { - std::vector<TAggInfo> res; - if (!node.Maybe<TCoAsStruct>()) { - return res; - } - for (auto item : node.Cast<TCoAsStruct>()) { - auto tuple = item.Cast<TCoNameValueTuple>(); - auto tupleValue = tuple.Value(); - if (tupleValue.Maybe<TCoAggrCountInit>()) { - auto aggrCntInit = tupleValue.Cast<TCoAggrCountInit>(); - if (aggrCntInit.Value().Maybe<TCoMember>()) { - TAggInfo aggInfo; - aggInfo.AggName = tuple.Name(); - aggInfo.Type = EAggType::Count; - aggInfo.ColName = aggrCntInit.Value().Cast<TCoMember>().Name(); - res.push_back(aggInfo); - } - } else { - YQL_CLOG(DEBUG, ProviderKqp) << "Unsupported aggregation type in init handler."; - res.clear(); - return res; - } +std::string GetAggregationName(const TExprBase& node, TExprContext& ctx) { + if (!node.Maybe<TCoAtom>()) { + YQL_CLOG(WARN, ProviderKqp) << "Expected TCoAtom as aggregation name. Got: " << KqpExprToPrettyString(node, ctx); + return ""; } - return res; + return node.Cast<TCoAtom>().StringValue(); } -std::vector<TAggInfo> GetAggregationsFromUpdate(const TExprBase& node) { - std::vector<TAggInfo> res; - if (!node.Maybe<TCoAsStruct>()) { - return res; +bool CanBePushedDown(const TExprBase& trait, TExprContext& ctx) +{ + if (!trait.Maybe<TCoAggApply>()) { + YQL_CLOG(WARN, ProviderKqp) << "Expected TCoAggApply in aggregate pushdown to column shard. Got: " << KqpExprToPrettyString(trait, ctx); + return false; } - for (auto item : node.Cast<TCoAsStruct>()) { - auto tuple = item.Cast<TCoNameValueTuple>(); - auto tupleValue = tuple.Value(); - if (auto maybeAggrCntUpd = tupleValue.Maybe<TCoAggrCountUpdate>()) { - if (maybeAggrCntUpd.Cast().Value().Maybe<TCoMember>()) { - TAggInfo aggInfo; - aggInfo.Type = EAggType::Count; - aggInfo.AggName = tuple.Name(); - res.push_back(aggInfo); - } - } else { - YQL_CLOG(DEBUG, ProviderKqp) << "Unsupported aggregation type in update handler."; - res.clear(); - return res; - } + auto aggApply = trait.Cast<TCoAggApply>(); + auto aggName = aggApply.Name(); + if (aggName == "count" || aggName == "count_all") { + return true; } - return res; + YQL_CLOG(WARN, ProviderKqp) << "Unsupported type of aggregation: " << aggName.StringValue(); + return false; } } // anonymous namespace end @@ -118,14 +56,14 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti return node; } - if (!node.Maybe<TCoCombineByKey>()) { + if (!node.Maybe<TCoAggregateCombine>()) { return node; } - auto combineKey = node.Cast<TCoCombineByKey>(); - auto maybeRead = combineKey.Input().Maybe<TKqpReadOlapTableRanges>(); + auto aggCombine = node.Cast<TCoAggregateCombine>(); + auto maybeRead = aggCombine.Input().Maybe<TKqpReadOlapTableRanges>(); if (!maybeRead) { - maybeRead = combineKey.Input().Maybe<TCoExtractMembers>().Input().Maybe<TKqpReadOlapTableRanges>(); + maybeRead = aggCombine.Input().Maybe<TCoExtractMembers>().Input().Maybe<TKqpReadOlapTableRanges>(); } if (!maybeRead) { @@ -133,62 +71,29 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti } auto read = maybeRead.Cast(); - - auto keySelectorBody = combineKey.KeySelectorLambda().Cast<TCoLambda>().Body(); - if (!ContainsSimpleColumnOnly(keySelectorBody, combineKey) && !ContainsConstOnly(keySelectorBody)) { - return node; - } - auto aggKeyCols = Build<TCoAtomList>(ctx, node.Pos()); - auto groupByCols = GetGroupByCols(keySelectorBody, combineKey); - for (auto groupByCol : groupByCols) { - aggKeyCols.Add<TCoAtom>() - .Build(groupByCol) - .Done(); - } - - auto initHandlerBody = combineKey.InitHandlerLambda().Cast<TCoLambda>().Body(); - auto aggInits = GetAggregationsFromInit(initHandlerBody); - - auto updateHandlerBody = combineKey.UpdateHandlerLambda().Cast<TCoLambda>().Body(); - auto aggUpdates = GetAggregationsFromUpdate(updateHandlerBody); - - auto finishHandlerBody = combineKey.FinishHandlerLambda().Cast<TCoLambda>().Body(); - if (aggInits.empty() || aggInits.size() != aggUpdates.size()) { - return node; - } - - for (size_t i = 0; i != aggInits.size(); ++i) { - if (aggInits[i].Type != aggUpdates[i].Type) { - YQL_CLOG(DEBUG, ProviderKqp) << "Different aggregation type in init and update handlers in aggregate push-down optimization!"; - return node; - } - } auto aggs = Build<TKqpOlapAggOperationList>(ctx, node.Pos()); // TODO: TMaybeNode<TKqpOlapAggOperation>; - for (size_t i = 0; i != aggInits.size(); ++i) { - std::string aggType; - switch (aggInits[i].Type) { - case EAggType::Count: - { - aggType = "count"; - break; - } - case EAggType::Some: - { - aggType = "some"; - break; - } - default: - { - YQL_ENSURE(false, "Unsupported type of aggregation!"); // add aggInits[i].Type - return node; - } + for (auto handler: aggCombine.Handlers()) { + auto trait = handler.Trait(); + if (!CanBePushedDown(trait, ctx)) { + return node; + } + auto aggApply = trait.Cast<TCoAggApply>(); + auto aggName = GetAggregationName(handler.ColumnName(), ctx); + auto colName = GetColumnNameUnderAggregation(aggApply, ctx); + if (aggName.empty() || colName.empty()) { + return node; } + auto aggOp = aggApply.Name(); + if (aggOp == "count_all") { + aggOp = TCoAtom(ctx.NewAtom(node.Pos(), "count")); + } + aggs.Add<TKqpOlapAggOperation>() - .Name().Build(aggInits[i].AggName) - .Type().Build(aggType) - .Column().Build(aggInits[i].ColName) + .Name().Build(aggName) + .Type().Build(aggOp) + .Column().Build(colName) .Build() .Done(); } @@ -196,9 +101,9 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti auto olapAgg = Build<TKqpOlapAgg>(ctx, node.Pos()) .Input(read.Process().Args().Arg(0)) .Aggregates(std::move(aggs.Done())) - .KeyColumns(std::move(aggKeyCols.Done())) + .KeyColumns(aggCombine.Keys()) .Done(); - + auto olapAggLambda = Build<TCoLambda>(ctx, node.Pos()) .Args({"row"}) .Body<TExprApplier>() @@ -210,7 +115,7 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti auto newProcessLambda = ctx.FuseLambdas(olapAggLambda.Ref(), read.Process().Ref()); YQL_CLOG(INFO, ProviderKqp) << "Pushed OLAP lambda: " << KqpExprToPrettyString(*newProcessLambda, ctx); - + auto newRead = Build<TKqpReadOlapTableRanges>(ctx, node.Pos()) .Table(read.Table()) .Ranges(read.Ranges()) @@ -257,7 +162,7 @@ TExprBase KqpPushOlapLength(TExprBase node, TExprContext& ctx, const TKqpOptimiz ) ) .Done(); - + auto newProcessLambda = Build<TCoLambda>(ctx, node.Pos()) .Args({"row"}) .Body<TExprApplier>() @@ -265,7 +170,7 @@ TExprBase KqpPushOlapLength(TExprBase node, TExprContext& ctx, const TKqpOptimiz .With(read.Process().Args().Arg(0), "row") .Build() .Done(); - + YQL_CLOG(INFO, ProviderKqp) << "Pushed OLAP lambda: " << KqpExprToPrettyString(newProcessLambda, ctx); auto newRead = Build<TKqpReadOlapTableRanges>(ctx, node.Pos()) diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp index 5d6b0abba26..2e63ab1b94e 100644 --- a/ydb/core/kqp/ut/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp @@ -1392,9 +1392,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } Y_UNIT_TEST(AggregationCountGroupByPushdown) { - // remove this return when GROUP BY will be implemented on columnshard - return; - auto settings = TKikimrSettings() .SetWithSampleTables(false) .SetEnableOlapSchemaOperations(true); @@ -1418,6 +1415,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { TString query = R"( --!syntax_v1 PRAGMA Kikimr.KqpPushOlapProcess = "true"; + PRAGMA EmitAggApply; + SELECT level, COUNT(level) FROM `/Root/olapStore/olapTable` @@ -1437,9 +1436,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } Y_UNIT_TEST_TWIN(CountAllPushdown, UseLlvm) { - // remove this return when COUNT(*) will be implemented on columnshard - return; - auto settings = TKikimrSettings() .SetWithSampleTables(false) .SetEnableOlapSchemaOperations(true); @@ -1463,7 +1459,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) { TString query = fmt::format(R"( --!syntax_v1 PRAGMA Kikimr.KqpPushOlapProcess = "true"; + PRAGMA EmitAggApply; PRAGMA ydb.EnableLlvm = "{}"; + SELECT COUNT(*) FROM `/Root/olapStore/olapTable` @@ -1594,6 +1592,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { queryFixed << "--!syntax_v1" << Endl; if (Pushdown) { queryFixed << "PRAGMA Kikimr.KqpPushOlapProcess = \"true\";" << Endl; + queryFixed << "PRAGMA EmitAggApply;" << Endl; } queryFixed << Query << Endl; Cerr << "REQUEST:\n" << queryFixed << Endl; @@ -1737,7 +1736,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } Y_UNIT_TEST(Aggregation_Composite_GroupByUR_Limit) { - //https://st.yandex-team.ru/KIKIMR-15900 + // https://st.yandex-team.ru/KIKIMR-16141 return; TAggregationTestCase testCase; testCase.SetQuery(R"( @@ -1881,44 +1880,32 @@ Y_UNIT_TEST_SUITE(KqpOlap) { TestAggregations({ testCase }); } - Y_UNIT_TEST(AggregationAndFilterPushdownOnDiffCols) { - auto settings = TKikimrSettings() - .SetWithSampleTables(false) - .SetEnableOlapSchemaOperations(true); - TKikimrRunner kikimr(settings); - - // EnableDebugLogging(kikimr); - CreateTestOlapTable(kikimr); - auto tableClient = kikimr.GetTableClient(); + Y_UNIT_TEST(Aggregation_ResultCountExpr) { + TAggregationTestCase testCase; + testCase.SetQuery(R"( + SELECT + COUNT(level + 2) + FROM `/Root/olapStore/olapTable` + )") + .SetExpectedReply("[[23000u;]]") + .AddExpectedPlanOptions("CombineCore"); - { - WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000); - WriteTestData(kikimr, "/Root/olapStore/olapTable", 11000, 3001000, 1000); - WriteTestData(kikimr, "/Root/olapStore/olapTable", 12000, 3002000, 1000); - WriteTestData(kikimr, "/Root/olapStore/olapTable", 13000, 3003000, 1000); - WriteTestData(kikimr, "/Root/olapStore/olapTable", 14000, 3004000, 1000); - WriteTestData(kikimr, "/Root/olapStore/olapTable", 20000, 2000000, 7000); - WriteTestData(kikimr, "/Root/olapStore/olapTable", 30000, 1000000, 11000); - } + TestAggregations({ testCase }); + } - { - TString query = R"( - --!syntax_v1 - PRAGMA Kikimr.KqpPushOlapProcess = "true"; + Y_UNIT_TEST(AggregationAndFilterPushdownOnDiffCols) { + TAggregationTestCase testCase; + testCase.SetQuery(R"( SELECT COUNT(`timestamp`) FROM `/Root/olapStore/olapTable` WHERE level = 2 - )"; - auto it = tableClient.StreamExecuteScanQuery(query).GetValueSync(); - - UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); - TString result = StreamResultToYson(it); - CompareYson(result, R"([[4600u;]])"); + )") + .SetExpectedReply("[[4600u;]]") + .AddExpectedPlanOptions("TKqpOlapAgg") + .AddExpectedPlanOptions("KqpOlapFilter"); - // Check plan - CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg", "KqpOlapFilter" }); - } + TestAggregations({ testCase }); } Y_UNIT_TEST_TWIN(StatsSysView, UseSessionActor) { |