aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraidarsamer <aidarsamer@ydb.tech>2022-10-18 22:22:21 +0300
committeraidarsamer <aidarsamer@ydb.tech>2022-10-18 22:22:21 +0300
commit1192ba7fb99ce3f8f5e52ff920687aa80fc42aba (patch)
tree221ae35a6efc2ab5dbcb9579d2d33ebe842f1f39
parentcf532004dcdf731c0035e191c519fcb8704225e9 (diff)
downloadydb-1192ba7fb99ce3f8f5e52ff920687aa80fc42aba.tar.gz
Add optimization to pushdown COUNT to column shard
Pushdown COUNT to column shard
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp2
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp197
-rw-r--r--ydb/core/kqp/ut/kqp_olap_ut.cpp63
3 files changed, 77 insertions, 185 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index 9995f7c669a..c5726a2d7ab 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -36,7 +36,7 @@ public:
AddHandler(0, &TCoTake::Match, HNDL(ApplyLimitToReadTable));
AddHandler(0, &TCoFlatMap::Match, HNDL(PushOlapFilter));
AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushAggregateCombineToStage));
- AddHandler(0, &TCoCombineByKey::Match, HNDL(PushOlapAggregate));
+ AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushOlapAggregate));
AddHandler(0, &TDqPhyLength::Match, HNDL(PushOlapLength));
AddHandler(0, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<false>));
AddHandler(0, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<false>));
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
index ed0cf7a5f0e..87589149d6c 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
@@ -13,101 +13,39 @@ using namespace NYql::NNodes;
namespace {
-enum class EAggType {
- Count,
- Some
-};
-
-struct TAggInfo {
- std::string AggName;
- std::string ColName;
- EAggType Type;
-};
-
-bool ContainsConstOnly(const TExprBase& node) {
- return node.Maybe<TCoDataCtor>().IsValid();
-}
-
-bool ContainsSimpleColumnOnly(const TExprBase& node, const TExprBase& parent) {
- if (!parent.Maybe<TCoInputBase>()) {
- return false;
- }
- auto input = parent.Cast<TCoInputBase>().Input();
- if (auto maybeExprList = node.Maybe<TExprList>()) {
- for (auto expr : maybeExprList.Cast()) {
- if (!expr.Maybe<TCoMember>() || expr.Cast<TCoMember>().Struct().Raw() != input.Raw()) {
- return false;
- }
- }
- return true;
- }
- return node.Maybe<TCoMember>().IsValid() && node.Cast<TCoMember>().Struct().Raw() == input.Raw();
-}
-
-std::vector<std::string> GetGroupByCols(const TExprBase& keySelectorBody, const TExprBase& parent) {
- std::vector<std::string> res;
- if (!ContainsSimpleColumnOnly(keySelectorBody, parent)) {
- YQL_CLOG(DEBUG, ProviderKqp) << "For aggregate push down optimization in GROUP BY column list should be Member callables only.";
- return res;
+std::string GetColumnNameUnderAggregation(const TCoAggApply& aggApply, TExprContext& ctx) {
+ auto extractorBody = aggApply.Extractor().Body();
+ if (extractorBody.Maybe<TCoVoid>() && aggApply.Name() == "count_all") {
+ return "*";
}
- if (auto maybeMember = keySelectorBody.Maybe<TCoMember>()) {
- res.push_back(keySelectorBody.Cast<TCoMember>().Name().StringValue());
- } else if (auto maybeExprList = keySelectorBody.Maybe<TExprList>()) {
- for (auto expr : maybeExprList.Cast()) {
- res.push_back(expr.Cast<TCoMember>().Name().StringValue());
- }
+ if (!extractorBody.Maybe<TCoMember>()) {
+ YQL_CLOG(WARN, ProviderKqp) << "Expected TCoMember callable to get column under aggregation. Got: " << KqpExprToPrettyString(extractorBody, ctx);
+ return "";
}
- return res;
+ return extractorBody.Cast<TCoMember>().Name().StringValue();
}
-std::vector<TAggInfo> GetAggregationsFromInit(const TExprBase& node) {
- std::vector<TAggInfo> res;
- if (!node.Maybe<TCoAsStruct>()) {
- return res;
- }
- for (auto item : node.Cast<TCoAsStruct>()) {
- auto tuple = item.Cast<TCoNameValueTuple>();
- auto tupleValue = tuple.Value();
- if (tupleValue.Maybe<TCoAggrCountInit>()) {
- auto aggrCntInit = tupleValue.Cast<TCoAggrCountInit>();
- if (aggrCntInit.Value().Maybe<TCoMember>()) {
- TAggInfo aggInfo;
- aggInfo.AggName = tuple.Name();
- aggInfo.Type = EAggType::Count;
- aggInfo.ColName = aggrCntInit.Value().Cast<TCoMember>().Name();
- res.push_back(aggInfo);
- }
- } else {
- YQL_CLOG(DEBUG, ProviderKqp) << "Unsupported aggregation type in init handler.";
- res.clear();
- return res;
- }
+std::string GetAggregationName(const TExprBase& node, TExprContext& ctx) {
+ if (!node.Maybe<TCoAtom>()) {
+ YQL_CLOG(WARN, ProviderKqp) << "Expected TCoAtom as aggregation name. Got: " << KqpExprToPrettyString(node, ctx);
+ return "";
}
- return res;
+ return node.Cast<TCoAtom>().StringValue();
}
-std::vector<TAggInfo> GetAggregationsFromUpdate(const TExprBase& node) {
- std::vector<TAggInfo> res;
- if (!node.Maybe<TCoAsStruct>()) {
- return res;
+bool CanBePushedDown(const TExprBase& trait, TExprContext& ctx)
+{
+ if (!trait.Maybe<TCoAggApply>()) {
+ YQL_CLOG(WARN, ProviderKqp) << "Expected TCoAggApply in aggregate pushdown to column shard. Got: " << KqpExprToPrettyString(trait, ctx);
+ return false;
}
- for (auto item : node.Cast<TCoAsStruct>()) {
- auto tuple = item.Cast<TCoNameValueTuple>();
- auto tupleValue = tuple.Value();
- if (auto maybeAggrCntUpd = tupleValue.Maybe<TCoAggrCountUpdate>()) {
- if (maybeAggrCntUpd.Cast().Value().Maybe<TCoMember>()) {
- TAggInfo aggInfo;
- aggInfo.Type = EAggType::Count;
- aggInfo.AggName = tuple.Name();
- res.push_back(aggInfo);
- }
- } else {
- YQL_CLOG(DEBUG, ProviderKqp) << "Unsupported aggregation type in update handler.";
- res.clear();
- return res;
- }
+ auto aggApply = trait.Cast<TCoAggApply>();
+ auto aggName = aggApply.Name();
+ if (aggName == "count" || aggName == "count_all") {
+ return true;
}
- return res;
+ YQL_CLOG(WARN, ProviderKqp) << "Unsupported type of aggregation: " << aggName.StringValue();
+ return false;
}
} // anonymous namespace end
@@ -118,14 +56,14 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti
return node;
}
- if (!node.Maybe<TCoCombineByKey>()) {
+ if (!node.Maybe<TCoAggregateCombine>()) {
return node;
}
- auto combineKey = node.Cast<TCoCombineByKey>();
- auto maybeRead = combineKey.Input().Maybe<TKqpReadOlapTableRanges>();
+ auto aggCombine = node.Cast<TCoAggregateCombine>();
+ auto maybeRead = aggCombine.Input().Maybe<TKqpReadOlapTableRanges>();
if (!maybeRead) {
- maybeRead = combineKey.Input().Maybe<TCoExtractMembers>().Input().Maybe<TKqpReadOlapTableRanges>();
+ maybeRead = aggCombine.Input().Maybe<TCoExtractMembers>().Input().Maybe<TKqpReadOlapTableRanges>();
}
if (!maybeRead) {
@@ -133,62 +71,29 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti
}
auto read = maybeRead.Cast();
-
- auto keySelectorBody = combineKey.KeySelectorLambda().Cast<TCoLambda>().Body();
- if (!ContainsSimpleColumnOnly(keySelectorBody, combineKey) && !ContainsConstOnly(keySelectorBody)) {
- return node;
- }
- auto aggKeyCols = Build<TCoAtomList>(ctx, node.Pos());
- auto groupByCols = GetGroupByCols(keySelectorBody, combineKey);
- for (auto groupByCol : groupByCols) {
- aggKeyCols.Add<TCoAtom>()
- .Build(groupByCol)
- .Done();
- }
-
- auto initHandlerBody = combineKey.InitHandlerLambda().Cast<TCoLambda>().Body();
- auto aggInits = GetAggregationsFromInit(initHandlerBody);
-
- auto updateHandlerBody = combineKey.UpdateHandlerLambda().Cast<TCoLambda>().Body();
- auto aggUpdates = GetAggregationsFromUpdate(updateHandlerBody);
-
- auto finishHandlerBody = combineKey.FinishHandlerLambda().Cast<TCoLambda>().Body();
- if (aggInits.empty() || aggInits.size() != aggUpdates.size()) {
- return node;
- }
-
- for (size_t i = 0; i != aggInits.size(); ++i) {
- if (aggInits[i].Type != aggUpdates[i].Type) {
- YQL_CLOG(DEBUG, ProviderKqp) << "Different aggregation type in init and update handlers in aggregate push-down optimization!";
- return node;
- }
- }
auto aggs = Build<TKqpOlapAggOperationList>(ctx, node.Pos());
// TODO: TMaybeNode<TKqpOlapAggOperation>;
- for (size_t i = 0; i != aggInits.size(); ++i) {
- std::string aggType;
- switch (aggInits[i].Type) {
- case EAggType::Count:
- {
- aggType = "count";
- break;
- }
- case EAggType::Some:
- {
- aggType = "some";
- break;
- }
- default:
- {
- YQL_ENSURE(false, "Unsupported type of aggregation!"); // add aggInits[i].Type
- return node;
- }
+ for (auto handler: aggCombine.Handlers()) {
+ auto trait = handler.Trait();
+ if (!CanBePushedDown(trait, ctx)) {
+ return node;
+ }
+ auto aggApply = trait.Cast<TCoAggApply>();
+ auto aggName = GetAggregationName(handler.ColumnName(), ctx);
+ auto colName = GetColumnNameUnderAggregation(aggApply, ctx);
+ if (aggName.empty() || colName.empty()) {
+ return node;
}
+ auto aggOp = aggApply.Name();
+ if (aggOp == "count_all") {
+ aggOp = TCoAtom(ctx.NewAtom(node.Pos(), "count"));
+ }
+
aggs.Add<TKqpOlapAggOperation>()
- .Name().Build(aggInits[i].AggName)
- .Type().Build(aggType)
- .Column().Build(aggInits[i].ColName)
+ .Name().Build(aggName)
+ .Type().Build(aggOp)
+ .Column().Build(colName)
.Build()
.Done();
}
@@ -196,9 +101,9 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti
auto olapAgg = Build<TKqpOlapAgg>(ctx, node.Pos())
.Input(read.Process().Args().Arg(0))
.Aggregates(std::move(aggs.Done()))
- .KeyColumns(std::move(aggKeyCols.Done()))
+ .KeyColumns(aggCombine.Keys())
.Done();
-
+
auto olapAggLambda = Build<TCoLambda>(ctx, node.Pos())
.Args({"row"})
.Body<TExprApplier>()
@@ -210,7 +115,7 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti
auto newProcessLambda = ctx.FuseLambdas(olapAggLambda.Ref(), read.Process().Ref());
YQL_CLOG(INFO, ProviderKqp) << "Pushed OLAP lambda: " << KqpExprToPrettyString(*newProcessLambda, ctx);
-
+
auto newRead = Build<TKqpReadOlapTableRanges>(ctx, node.Pos())
.Table(read.Table())
.Ranges(read.Ranges())
@@ -257,7 +162,7 @@ TExprBase KqpPushOlapLength(TExprBase node, TExprContext& ctx, const TKqpOptimiz
)
)
.Done();
-
+
auto newProcessLambda = Build<TCoLambda>(ctx, node.Pos())
.Args({"row"})
.Body<TExprApplier>()
@@ -265,7 +170,7 @@ TExprBase KqpPushOlapLength(TExprBase node, TExprContext& ctx, const TKqpOptimiz
.With(read.Process().Args().Arg(0), "row")
.Build()
.Done();
-
+
YQL_CLOG(INFO, ProviderKqp) << "Pushed OLAP lambda: " << KqpExprToPrettyString(newProcessLambda, ctx);
auto newRead = Build<TKqpReadOlapTableRanges>(ctx, node.Pos())
diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp
index 5d6b0abba26..2e63ab1b94e 100644
--- a/ydb/core/kqp/ut/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp
@@ -1392,9 +1392,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
Y_UNIT_TEST(AggregationCountGroupByPushdown) {
- // remove this return when GROUP BY will be implemented on columnshard
- return;
-
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
.SetEnableOlapSchemaOperations(true);
@@ -1418,6 +1415,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TString query = R"(
--!syntax_v1
PRAGMA Kikimr.KqpPushOlapProcess = "true";
+ PRAGMA EmitAggApply;
+
SELECT
level, COUNT(level)
FROM `/Root/olapStore/olapTable`
@@ -1437,9 +1436,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
Y_UNIT_TEST_TWIN(CountAllPushdown, UseLlvm) {
- // remove this return when COUNT(*) will be implemented on columnshard
- return;
-
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
.SetEnableOlapSchemaOperations(true);
@@ -1463,7 +1459,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TString query = fmt::format(R"(
--!syntax_v1
PRAGMA Kikimr.KqpPushOlapProcess = "true";
+ PRAGMA EmitAggApply;
PRAGMA ydb.EnableLlvm = "{}";
+
SELECT
COUNT(*)
FROM `/Root/olapStore/olapTable`
@@ -1594,6 +1592,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
queryFixed << "--!syntax_v1" << Endl;
if (Pushdown) {
queryFixed << "PRAGMA Kikimr.KqpPushOlapProcess = \"true\";" << Endl;
+ queryFixed << "PRAGMA EmitAggApply;" << Endl;
}
queryFixed << Query << Endl;
Cerr << "REQUEST:\n" << queryFixed << Endl;
@@ -1737,7 +1736,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
Y_UNIT_TEST(Aggregation_Composite_GroupByUR_Limit) {
- //https://st.yandex-team.ru/KIKIMR-15900
+ // https://st.yandex-team.ru/KIKIMR-16141
return;
TAggregationTestCase testCase;
testCase.SetQuery(R"(
@@ -1881,44 +1880,32 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TestAggregations({ testCase });
}
- Y_UNIT_TEST(AggregationAndFilterPushdownOnDiffCols) {
- auto settings = TKikimrSettings()
- .SetWithSampleTables(false)
- .SetEnableOlapSchemaOperations(true);
- TKikimrRunner kikimr(settings);
-
- // EnableDebugLogging(kikimr);
- CreateTestOlapTable(kikimr);
- auto tableClient = kikimr.GetTableClient();
+ Y_UNIT_TEST(Aggregation_ResultCountExpr) {
+ TAggregationTestCase testCase;
+ testCase.SetQuery(R"(
+ SELECT
+ COUNT(level + 2)
+ FROM `/Root/olapStore/olapTable`
+ )")
+ .SetExpectedReply("[[23000u;]]")
+ .AddExpectedPlanOptions("CombineCore");
- {
- WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000);
- WriteTestData(kikimr, "/Root/olapStore/olapTable", 11000, 3001000, 1000);
- WriteTestData(kikimr, "/Root/olapStore/olapTable", 12000, 3002000, 1000);
- WriteTestData(kikimr, "/Root/olapStore/olapTable", 13000, 3003000, 1000);
- WriteTestData(kikimr, "/Root/olapStore/olapTable", 14000, 3004000, 1000);
- WriteTestData(kikimr, "/Root/olapStore/olapTable", 20000, 2000000, 7000);
- WriteTestData(kikimr, "/Root/olapStore/olapTable", 30000, 1000000, 11000);
- }
+ TestAggregations({ testCase });
+ }
- {
- TString query = R"(
- --!syntax_v1
- PRAGMA Kikimr.KqpPushOlapProcess = "true";
+ Y_UNIT_TEST(AggregationAndFilterPushdownOnDiffCols) {
+ TAggregationTestCase testCase;
+ testCase.SetQuery(R"(
SELECT
COUNT(`timestamp`)
FROM `/Root/olapStore/olapTable`
WHERE level = 2
- )";
- auto it = tableClient.StreamExecuteScanQuery(query).GetValueSync();
-
- UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
- TString result = StreamResultToYson(it);
- CompareYson(result, R"([[4600u;]])");
+ )")
+ .SetExpectedReply("[[4600u;]]")
+ .AddExpectedPlanOptions("TKqpOlapAgg")
+ .AddExpectedPlanOptions("KqpOlapFilter");
- // Check plan
- CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg", "KqpOlapFilter" });
- }
+ TestAggregations({ testCase });
}
Y_UNIT_TEST_TWIN(StatsSysView, UseSessionActor) {