diff options
author | aidarsamer <aidarsamer@ydb.tech> | 2022-08-22 12:31:36 +0300 |
---|---|---|
committer | aidarsamer <aidarsamer@ydb.tech> | 2022-08-22 12:31:36 +0300 |
commit | 9d04ed26e0a185237da67d15786cad572d352cd7 (patch) | |
tree | 0d3fa91ac42c2be74c36b8c5f2f71fccbc2096d0 | |
parent | bd26a7289d3b52946bb01aabf50aaa98e8776b24 (diff) | |
download | ydb-9d04ed26e0a185237da67d15786cad572d352cd7.tar.gz |
add support for cumulative filter and aggregation pushdown.
Add support for filter and aggregate push down.
Added tests for filter and aggregate pushdown for kqp and columnshard.
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp | 21 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_olap_ut.cpp | 100 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_columnshard_read_write.cpp | 90 |
3 files changed, 196 insertions, 15 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp index 3ecedb71dd..ed0cf7a5f0 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp @@ -118,17 +118,22 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti return node; } - if (!node.Maybe<TCoCombineByKey>().Input().Maybe<TKqpReadOlapTableRanges>()) { + if (!node.Maybe<TCoCombineByKey>()) { return node; } auto combineKey = node.Cast<TCoCombineByKey>(); - auto read = combineKey.Input().Cast<TKqpReadOlapTableRanges>(); + auto maybeRead = combineKey.Input().Maybe<TKqpReadOlapTableRanges>(); + if (!maybeRead) { + maybeRead = combineKey.Input().Maybe<TCoExtractMembers>().Input().Maybe<TKqpReadOlapTableRanges>(); + } - if (read.Process().Body().Raw() != read.Process().Args().Arg(0).Raw()) { + if (!maybeRead) { return node; } + auto read = maybeRead.Cast(); + auto keySelectorBody = combineKey.KeySelectorLambda().Cast<TCoLambda>().Body(); if (!ContainsSimpleColumnOnly(keySelectorBody, combineKey) && !ContainsConstOnly(keySelectorBody)) { return node; @@ -189,21 +194,23 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti } auto olapAgg = Build<TKqpOlapAgg>(ctx, node.Pos()) - .Input(read.Process().Body()) + .Input(read.Process().Args().Arg(0)) .Aggregates(std::move(aggs.Done())) .KeyColumns(std::move(aggKeyCols.Done())) .Done(); - auto newProcessLambda = Build<TCoLambda>(ctx, node.Pos()) + auto olapAggLambda = Build<TCoLambda>(ctx, node.Pos()) .Args({"row"}) .Body<TExprApplier>() .Apply(olapAgg) .With(read.Process().Args().Arg(0), "row") .Build() .Done(); - - YQL_CLOG(INFO, ProviderKqp) << "Pushed OLAP lambda: " << KqpExprToPrettyString(newProcessLambda, ctx); + auto newProcessLambda = ctx.FuseLambdas(olapAggLambda.Ref(), read.Process().Ref()); + + YQL_CLOG(INFO, ProviderKqp) << "Pushed OLAP lambda: " << KqpExprToPrettyString(*newProcessLambda, ctx); + auto newRead = Build<TKqpReadOlapTableRanges>(ctx, node.Pos()) .Table(read.Table()) .Ranges(read.Ranges()) diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp index 181e8814ff..7a92960726 100644 --- a/ydb/core/kqp/ut/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp @@ -511,7 +511,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { }; } - void CheckPlanForAggregatePushdown(const TString& query, NYdb::NTable::TTableClient& tableClient) { + void CheckPlanForAggregatePushdown(const TString& query, NYdb::NTable::TTableClient& tableClient, const std::vector<std::string>& planNodes) { TStreamExecScanQuerySettings scanSettings; scanSettings.Explain(true); auto res = tableClient.StreamExecuteScanQuery(query, scanSettings).GetValueSync(); @@ -520,8 +520,10 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto planRes = CollectStreamResult(res); auto ast = planRes.QueryStats->Getquery_ast(); - UNIT_ASSERT_C(ast.find("TKqpOlapAgg") != std::string::npos, - TStringBuilder() << "Aggregate was not pushed down. Query: " << query); + for (auto planNode : planNodes) { + UNIT_ASSERT_C(ast.find(planNode) != std::string::npos, + TStringBuilder() << planNode << " was not pushed down. Query: " << query); + } } Y_UNIT_TEST_TWIN(SimpleQueryOlap, UseSessionActor) { @@ -1320,7 +1322,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { CompareYson(result, R"([[23000u;]])"); // Check plan - CheckPlanForAggregatePushdown(query, tableClient); + CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" }); } } @@ -1364,7 +1366,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { CompareYson(result, R"([[23000u;]])"); // Check plan - CheckPlanForAggregatePushdown(query, tableClient); + CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" }); } } @@ -1408,7 +1410,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { CompareYson(result, R"([[23000u;]])"); // Check plan - CheckPlanForAggregatePushdown(query, tableClient); + CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" }); } } @@ -1447,6 +1449,92 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } } + Y_UNIT_TEST(AggregationAndFilterPushdownOnSameCols) { + // remove this return when Filter and Aggregation pushdown will be fixed on columnshard + return; + + auto settings = TKikimrSettings() + .SetWithSampleTables(false) + .SetEnableOlapSchemaOperations(true); + TKikimrRunner kikimr(settings); + + // EnableDebugLogging(kikimr); + CreateTestOlapTable(kikimr); + auto tableClient = kikimr.GetTableClient(); + + { + WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 11000, 3001000, 1000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 12000, 3002000, 1000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 13000, 3003000, 1000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 14000, 3004000, 1000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 20000, 2000000, 7000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 30000, 1000000, 11000); + } + + { + TString query = R"( + --!syntax_v1 + PRAGMA Kikimr.KqpPushOlapProcess = "true"; + SELECT + COUNT(level) + FROM `/Root/olapStore/olapTable` + WHERE level = 2 + )"; + auto it = tableClient.StreamExecuteScanQuery(query).GetValueSync(); + + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TString result = StreamResultToYson(it); + CompareYson(result, R"([[4600u;]])"); + + // Check plan + CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg", "KqpOlapFilter" }); + } + } + + Y_UNIT_TEST(AggregationAndFilterPushdownOnDiffCols) { + // remove this return when Filter and Aggregation pushdown will be fixed on columnshard + return; + + auto settings = TKikimrSettings() + .SetWithSampleTables(false) + .SetEnableOlapSchemaOperations(true); + TKikimrRunner kikimr(settings); + + // EnableDebugLogging(kikimr); + CreateTestOlapTable(kikimr); + auto tableClient = kikimr.GetTableClient(); + + { + WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 11000, 3001000, 1000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 12000, 3002000, 1000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 13000, 3003000, 1000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 14000, 3004000, 1000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 20000, 2000000, 7000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 30000, 1000000, 11000); + } + + { + TString query = R"( + --!syntax_v1 + PRAGMA Kikimr.KqpPushOlapProcess = "true"; + SELECT + COUNT(`timestamp`) + FROM `/Root/olapStore/olapTable` + WHERE level = 2 + )"; + auto it = tableClient.StreamExecuteScanQuery(query).GetValueSync(); + + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TString result = StreamResultToYson(it); + CompareYson(result, R"([[4600u;]])"); + + // Check plan + CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg", "KqpOlapFilter" }); + } + } + Y_UNIT_TEST_TWIN(StatsSysView, UseSessionActor) { auto settings = TKikimrSettings() .SetWithSampleTables(false) diff --git a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp index 0ec7aefce0..a373d366f6 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp @@ -83,6 +83,26 @@ bool DataHasOnly(const TVector<TString>& blobs, const TString& srtSchema, std::p return true; } +bool CheckIntValue(const TVector<TString>& blobs, const TString& srcSchema, const TString& colName, + const TVector<int>& expectedVals) { + auto schema = NArrow::DeserializeSchema(srcSchema); + for (auto& blob : blobs) { + auto batch = NArrow::DeserializeBatch(blob, schema); + UNIT_ASSERT(batch); + + std::shared_ptr<arrow::Array> array = batch->GetColumnByName(colName); + UNIT_ASSERT(array); + auto& val = dynamic_cast<const arrow::NumericArray<arrow::Int64Type>&>(*array); + + UNIT_ASSERT(val.length() == (int)expectedVals.size()); + for (int i = 0; i < val.length(); ++i) { + int value = val.Value(i); + UNIT_ASSERT(value == expectedVals[i]); + } + } + return true; +} + bool CheckOrdered(const TString& blob, const TString& srtSchema) { auto schema = NArrow::DeserializeSchema(srtSchema); auto batch = NArrow::DeserializeBatch(blob, schema); @@ -921,6 +941,52 @@ NKikimrSSA::TProgram MakeSelectAggregates(TAggAssignment::EAggregateFunction agg return ssa; } +// SELECT some(timestamp) FROM t WHERE level = 1 +// +// FIXME: +// NotImplemented: Function any has no kernel matching input types (array[timestamp[us]]) +// NotImplemented: Function any has no kernel matching input types (array[string]) +// NotImplemented: Function any has no kernel matching input types (array[int32]) +// NotImplemented: Function min_max has no kernel matching input types (array[timestamp[us]]) +// NotImplemented: Function min_max has no kernel matching input types (array[string]) +// +NKikimrSSA::TProgram MakeSelectAggregatesWithFilter(TAggAssignment::EAggregateFunction aggId = TAggAssignment::AGG_ANY, + std::vector<ui32> columnIds = {1, 5}) +{ + NKikimrSSA::TProgram ssa; + ui32 tmpColumnId = 100; + + auto* line1 = ssa.AddCommand(); + auto* l1_assign = line1->MutableAssign(); + l1_assign->MutableColumn()->SetId(tmpColumnId); + l1_assign->MutableConstant()->SetInt32(1); + + auto* line2 = ssa.AddCommand(); + auto* l2_assign = line2->MutableAssign(); + l2_assign->MutableColumn()->SetId(tmpColumnId + 1); + auto* l2_func = l2_assign->MutableFunction(); + l2_func->SetId(TAssignment::FUNC_CMP_EQUAL); + l2_func->AddArguments()->SetId(columnIds[1]); + l2_func->AddArguments()->SetId(tmpColumnId); + + auto* line3 = ssa.AddCommand(); + line3->MutableFilter()->MutablePredicate()->SetId(tmpColumnId + 1); + + auto* line4 = ssa.AddCommand(); + auto* groupBy = line4->MutableGroupBy(); + // + auto* l4_agg1 = groupBy->AddAggregates(); + l4_agg1->MutableColumn()->SetId(tmpColumnId + 2); + auto* l4_agg1_f = l4_agg1->MutableFunction(); + l4_agg1_f->SetId(aggId); + l4_agg1_f->AddArguments()->SetId(columnIds[0]); + + auto* line5 = ssa.AddCommand(); + auto* proj = line5->MutableProjection(); + proj->AddColumns()->SetId(tmpColumnId + 2); + return ssa; +} + void TestReadWithProgram(const TVector<std::pair<TString, TTypeId>>& ydbSchema = TTestSchema::YdbSchema()) { TTestBasicRuntime runtime; @@ -1101,6 +1167,18 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T UNIT_ASSERT(program.SerializeToString(&programs.back())); } + { + NKikimrSSA::TProgram ssa = MakeSelectAggregatesWithFilter(TAggAssignment::AGG_COUNT); + TString serialized; + UNIT_ASSERT(ssa.SerializeToString(&serialized)); + NKikimrSSA::TOlapProgram program; + program.SetProgram(serialized); + + // TODO: Uncomment to run this test! + // programs.push_back(""); + // UNIT_ASSERT(program.SerializeToString(&programs.back())); + } + ui32 i = 0; for (auto& programText : programs) { auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId); @@ -1126,12 +1204,20 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T UNIT_ASSERT(resRead.GetData().size() > 0); auto& meta = resRead.GetMeta(); - //auto& schema = meta.GetSchema(); + auto& schema = meta.GetSchema(); TVector<TString> readData; readData.push_back(resRead.GetData()); - UNIT_ASSERT(CheckColumns(readData[0], meta, {"100", "101"}, 1)); + switch (i) { + case 2: + UNIT_ASSERT(CheckColumns(readData[0], meta, {"102"}, 1)); + UNIT_ASSERT(CheckIntValue(readData, schema, "102", {1})); + break; + default: + UNIT_ASSERT(CheckColumns(readData[0], meta, {"100", "101"}, 1)); + break; + } } ++i; |