summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraidarsamer <[email protected]>2022-08-22 12:31:36 +0300
committeraidarsamer <[email protected]>2022-08-22 12:31:36 +0300
commit9d04ed26e0a185237da67d15786cad572d352cd7 (patch)
tree0d3fa91ac42c2be74c36b8c5f2f71fccbc2096d0
parentbd26a7289d3b52946bb01aabf50aaa98e8776b24 (diff)
add support for cumulative filter and aggregation pushdown.
Add support for filter and aggregate push down. Added tests for filter and aggregate pushdown for kqp and columnshard.
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp21
-rw-r--r--ydb/core/kqp/ut/kqp_olap_ut.cpp100
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_read_write.cpp90
3 files changed, 196 insertions, 15 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
index 3ecedb71ddf..ed0cf7a5f0e 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
@@ -118,17 +118,22 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti
return node;
}
- if (!node.Maybe<TCoCombineByKey>().Input().Maybe<TKqpReadOlapTableRanges>()) {
+ if (!node.Maybe<TCoCombineByKey>()) {
return node;
}
auto combineKey = node.Cast<TCoCombineByKey>();
- auto read = combineKey.Input().Cast<TKqpReadOlapTableRanges>();
+ auto maybeRead = combineKey.Input().Maybe<TKqpReadOlapTableRanges>();
+ if (!maybeRead) {
+ maybeRead = combineKey.Input().Maybe<TCoExtractMembers>().Input().Maybe<TKqpReadOlapTableRanges>();
+ }
- if (read.Process().Body().Raw() != read.Process().Args().Arg(0).Raw()) {
+ if (!maybeRead) {
return node;
}
+ auto read = maybeRead.Cast();
+
auto keySelectorBody = combineKey.KeySelectorLambda().Cast<TCoLambda>().Body();
if (!ContainsSimpleColumnOnly(keySelectorBody, combineKey) && !ContainsConstOnly(keySelectorBody)) {
return node;
@@ -189,21 +194,23 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti
}
auto olapAgg = Build<TKqpOlapAgg>(ctx, node.Pos())
- .Input(read.Process().Body())
+ .Input(read.Process().Args().Arg(0))
.Aggregates(std::move(aggs.Done()))
.KeyColumns(std::move(aggKeyCols.Done()))
.Done();
- auto newProcessLambda = Build<TCoLambda>(ctx, node.Pos())
+ auto olapAggLambda = Build<TCoLambda>(ctx, node.Pos())
.Args({"row"})
.Body<TExprApplier>()
.Apply(olapAgg)
.With(read.Process().Args().Arg(0), "row")
.Build()
.Done();
-
- YQL_CLOG(INFO, ProviderKqp) << "Pushed OLAP lambda: " << KqpExprToPrettyString(newProcessLambda, ctx);
+ auto newProcessLambda = ctx.FuseLambdas(olapAggLambda.Ref(), read.Process().Ref());
+
+ YQL_CLOG(INFO, ProviderKqp) << "Pushed OLAP lambda: " << KqpExprToPrettyString(*newProcessLambda, ctx);
+
auto newRead = Build<TKqpReadOlapTableRanges>(ctx, node.Pos())
.Table(read.Table())
.Ranges(read.Ranges())
diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp
index 181e8814ffb..7a929607267 100644
--- a/ydb/core/kqp/ut/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp
@@ -511,7 +511,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
};
}
- void CheckPlanForAggregatePushdown(const TString& query, NYdb::NTable::TTableClient& tableClient) {
+ void CheckPlanForAggregatePushdown(const TString& query, NYdb::NTable::TTableClient& tableClient, const std::vector<std::string>& planNodes) {
TStreamExecScanQuerySettings scanSettings;
scanSettings.Explain(true);
auto res = tableClient.StreamExecuteScanQuery(query, scanSettings).GetValueSync();
@@ -520,8 +520,10 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto planRes = CollectStreamResult(res);
auto ast = planRes.QueryStats->Getquery_ast();
- UNIT_ASSERT_C(ast.find("TKqpOlapAgg") != std::string::npos,
- TStringBuilder() << "Aggregate was not pushed down. Query: " << query);
+ for (auto planNode : planNodes) {
+ UNIT_ASSERT_C(ast.find(planNode) != std::string::npos,
+ TStringBuilder() << planNode << " was not pushed down. Query: " << query);
+ }
}
Y_UNIT_TEST_TWIN(SimpleQueryOlap, UseSessionActor) {
@@ -1320,7 +1322,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(result, R"([[23000u;]])");
// Check plan
- CheckPlanForAggregatePushdown(query, tableClient);
+ CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" });
}
}
@@ -1364,7 +1366,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(result, R"([[23000u;]])");
// Check plan
- CheckPlanForAggregatePushdown(query, tableClient);
+ CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" });
}
}
@@ -1408,7 +1410,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(result, R"([[23000u;]])");
// Check plan
- CheckPlanForAggregatePushdown(query, tableClient);
+ CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" });
}
}
@@ -1447,6 +1449,92 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
}
+ Y_UNIT_TEST(AggregationAndFilterPushdownOnSameCols) {
+ // remove this return when Filter and Aggregation pushdown will be fixed on columnshard
+ return;
+
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false)
+ .SetEnableOlapSchemaOperations(true);
+ TKikimrRunner kikimr(settings);
+
+ // EnableDebugLogging(kikimr);
+ CreateTestOlapTable(kikimr);
+ auto tableClient = kikimr.GetTableClient();
+
+ {
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 11000, 3001000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 12000, 3002000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 13000, 3003000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 14000, 3004000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 20000, 2000000, 7000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 30000, 1000000, 11000);
+ }
+
+ {
+ TString query = R"(
+ --!syntax_v1
+ PRAGMA Kikimr.KqpPushOlapProcess = "true";
+ SELECT
+ COUNT(level)
+ FROM `/Root/olapStore/olapTable`
+ WHERE level = 2
+ )";
+ auto it = tableClient.StreamExecuteScanQuery(query).GetValueSync();
+
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TString result = StreamResultToYson(it);
+ CompareYson(result, R"([[4600u;]])");
+
+ // Check plan
+ CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg", "KqpOlapFilter" });
+ }
+ }
+
+ Y_UNIT_TEST(AggregationAndFilterPushdownOnDiffCols) {
+ // remove this return when Filter and Aggregation pushdown will be fixed on columnshard
+ return;
+
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false)
+ .SetEnableOlapSchemaOperations(true);
+ TKikimrRunner kikimr(settings);
+
+ // EnableDebugLogging(kikimr);
+ CreateTestOlapTable(kikimr);
+ auto tableClient = kikimr.GetTableClient();
+
+ {
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 11000, 3001000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 12000, 3002000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 13000, 3003000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 14000, 3004000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 20000, 2000000, 7000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 30000, 1000000, 11000);
+ }
+
+ {
+ TString query = R"(
+ --!syntax_v1
+ PRAGMA Kikimr.KqpPushOlapProcess = "true";
+ SELECT
+ COUNT(`timestamp`)
+ FROM `/Root/olapStore/olapTable`
+ WHERE level = 2
+ )";
+ auto it = tableClient.StreamExecuteScanQuery(query).GetValueSync();
+
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TString result = StreamResultToYson(it);
+ CompareYson(result, R"([[4600u;]])");
+
+ // Check plan
+ CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg", "KqpOlapFilter" });
+ }
+ }
+
Y_UNIT_TEST_TWIN(StatsSysView, UseSessionActor) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
diff --git a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
index 0ec7aefce0f..a373d366f68 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
@@ -83,6 +83,26 @@ bool DataHasOnly(const TVector<TString>& blobs, const TString& srtSchema, std::p
return true;
}
+bool CheckIntValue(const TVector<TString>& blobs, const TString& srcSchema, const TString& colName,
+ const TVector<int>& expectedVals) {
+ auto schema = NArrow::DeserializeSchema(srcSchema);
+ for (auto& blob : blobs) {
+ auto batch = NArrow::DeserializeBatch(blob, schema);
+ UNIT_ASSERT(batch);
+
+ std::shared_ptr<arrow::Array> array = batch->GetColumnByName(colName);
+ UNIT_ASSERT(array);
+ auto& val = dynamic_cast<const arrow::NumericArray<arrow::Int64Type>&>(*array);
+
+ UNIT_ASSERT(val.length() == (int)expectedVals.size());
+ for (int i = 0; i < val.length(); ++i) {
+ int value = val.Value(i);
+ UNIT_ASSERT(value == expectedVals[i]);
+ }
+ }
+ return true;
+}
+
bool CheckOrdered(const TString& blob, const TString& srtSchema) {
auto schema = NArrow::DeserializeSchema(srtSchema);
auto batch = NArrow::DeserializeBatch(blob, schema);
@@ -921,6 +941,52 @@ NKikimrSSA::TProgram MakeSelectAggregates(TAggAssignment::EAggregateFunction agg
return ssa;
}
+// SELECT some(timestamp) FROM t WHERE level = 1
+//
+// FIXME:
+// NotImplemented: Function any has no kernel matching input types (array[timestamp[us]])
+// NotImplemented: Function any has no kernel matching input types (array[string])
+// NotImplemented: Function any has no kernel matching input types (array[int32])
+// NotImplemented: Function min_max has no kernel matching input types (array[timestamp[us]])
+// NotImplemented: Function min_max has no kernel matching input types (array[string])
+//
+NKikimrSSA::TProgram MakeSelectAggregatesWithFilter(TAggAssignment::EAggregateFunction aggId = TAggAssignment::AGG_ANY,
+ std::vector<ui32> columnIds = {1, 5})
+{
+ NKikimrSSA::TProgram ssa;
+ ui32 tmpColumnId = 100;
+
+ auto* line1 = ssa.AddCommand();
+ auto* l1_assign = line1->MutableAssign();
+ l1_assign->MutableColumn()->SetId(tmpColumnId);
+ l1_assign->MutableConstant()->SetInt32(1);
+
+ auto* line2 = ssa.AddCommand();
+ auto* l2_assign = line2->MutableAssign();
+ l2_assign->MutableColumn()->SetId(tmpColumnId + 1);
+ auto* l2_func = l2_assign->MutableFunction();
+ l2_func->SetId(TAssignment::FUNC_CMP_EQUAL);
+ l2_func->AddArguments()->SetId(columnIds[1]);
+ l2_func->AddArguments()->SetId(tmpColumnId);
+
+ auto* line3 = ssa.AddCommand();
+ line3->MutableFilter()->MutablePredicate()->SetId(tmpColumnId + 1);
+
+ auto* line4 = ssa.AddCommand();
+ auto* groupBy = line4->MutableGroupBy();
+ //
+ auto* l4_agg1 = groupBy->AddAggregates();
+ l4_agg1->MutableColumn()->SetId(tmpColumnId + 2);
+ auto* l4_agg1_f = l4_agg1->MutableFunction();
+ l4_agg1_f->SetId(aggId);
+ l4_agg1_f->AddArguments()->SetId(columnIds[0]);
+
+ auto* line5 = ssa.AddCommand();
+ auto* proj = line5->MutableProjection();
+ proj->AddColumns()->SetId(tmpColumnId + 2);
+ return ssa;
+}
+
void TestReadWithProgram(const TVector<std::pair<TString, TTypeId>>& ydbSchema = TTestSchema::YdbSchema())
{
TTestBasicRuntime runtime;
@@ -1101,6 +1167,18 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T
UNIT_ASSERT(program.SerializeToString(&programs.back()));
}
+ {
+ NKikimrSSA::TProgram ssa = MakeSelectAggregatesWithFilter(TAggAssignment::AGG_COUNT);
+ TString serialized;
+ UNIT_ASSERT(ssa.SerializeToString(&serialized));
+ NKikimrSSA::TOlapProgram program;
+ program.SetProgram(serialized);
+
+ // TODO: Uncomment to run this test!
+ // programs.push_back("");
+ // UNIT_ASSERT(program.SerializeToString(&programs.back()));
+ }
+
ui32 i = 0;
for (auto& programText : programs) {
auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId);
@@ -1126,12 +1204,20 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T
UNIT_ASSERT(resRead.GetData().size() > 0);
auto& meta = resRead.GetMeta();
- //auto& schema = meta.GetSchema();
+ auto& schema = meta.GetSchema();
TVector<TString> readData;
readData.push_back(resRead.GetData());
- UNIT_ASSERT(CheckColumns(readData[0], meta, {"100", "101"}, 1));
+ switch (i) {
+ case 2:
+ UNIT_ASSERT(CheckColumns(readData[0], meta, {"102"}, 1));
+ UNIT_ASSERT(CheckIntValue(readData, schema, "102", {1}));
+ break;
+ default:
+ UNIT_ASSERT(CheckColumns(readData[0], meta, {"100", "101"}, 1));
+ break;
+ }
}
++i;