aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraidarsamer <aidarsamer@ydb.tech>2022-08-22 12:31:36 +0300
committeraidarsamer <aidarsamer@ydb.tech>2022-08-22 12:31:36 +0300
commit9d04ed26e0a185237da67d15786cad572d352cd7 (patch)
tree0d3fa91ac42c2be74c36b8c5f2f71fccbc2096d0
parentbd26a7289d3b52946bb01aabf50aaa98e8776b24 (diff)
downloadydb-9d04ed26e0a185237da67d15786cad572d352cd7.tar.gz
add support for cumulative filter and aggregation pushdown.
Add support for filter and aggregate push down. Added tests for filter and aggregate pushdown for kqp and columnshard.
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp21
-rw-r--r--ydb/core/kqp/ut/kqp_olap_ut.cpp100
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_read_write.cpp90
3 files changed, 196 insertions, 15 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
index 3ecedb71dd..ed0cf7a5f0 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
@@ -118,17 +118,22 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti
return node;
}
- if (!node.Maybe<TCoCombineByKey>().Input().Maybe<TKqpReadOlapTableRanges>()) {
+ if (!node.Maybe<TCoCombineByKey>()) {
return node;
}
auto combineKey = node.Cast<TCoCombineByKey>();
- auto read = combineKey.Input().Cast<TKqpReadOlapTableRanges>();
+ auto maybeRead = combineKey.Input().Maybe<TKqpReadOlapTableRanges>();
+ if (!maybeRead) {
+ maybeRead = combineKey.Input().Maybe<TCoExtractMembers>().Input().Maybe<TKqpReadOlapTableRanges>();
+ }
- if (read.Process().Body().Raw() != read.Process().Args().Arg(0).Raw()) {
+ if (!maybeRead) {
return node;
}
+ auto read = maybeRead.Cast();
+
auto keySelectorBody = combineKey.KeySelectorLambda().Cast<TCoLambda>().Body();
if (!ContainsSimpleColumnOnly(keySelectorBody, combineKey) && !ContainsConstOnly(keySelectorBody)) {
return node;
@@ -189,21 +194,23 @@ TExprBase KqpPushOlapAggregate(TExprBase node, TExprContext& ctx, const TKqpOpti
}
auto olapAgg = Build<TKqpOlapAgg>(ctx, node.Pos())
- .Input(read.Process().Body())
+ .Input(read.Process().Args().Arg(0))
.Aggregates(std::move(aggs.Done()))
.KeyColumns(std::move(aggKeyCols.Done()))
.Done();
- auto newProcessLambda = Build<TCoLambda>(ctx, node.Pos())
+ auto olapAggLambda = Build<TCoLambda>(ctx, node.Pos())
.Args({"row"})
.Body<TExprApplier>()
.Apply(olapAgg)
.With(read.Process().Args().Arg(0), "row")
.Build()
.Done();
-
- YQL_CLOG(INFO, ProviderKqp) << "Pushed OLAP lambda: " << KqpExprToPrettyString(newProcessLambda, ctx);
+ auto newProcessLambda = ctx.FuseLambdas(olapAggLambda.Ref(), read.Process().Ref());
+
+ YQL_CLOG(INFO, ProviderKqp) << "Pushed OLAP lambda: " << KqpExprToPrettyString(*newProcessLambda, ctx);
+
auto newRead = Build<TKqpReadOlapTableRanges>(ctx, node.Pos())
.Table(read.Table())
.Ranges(read.Ranges())
diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp
index 181e8814ff..7a92960726 100644
--- a/ydb/core/kqp/ut/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp
@@ -511,7 +511,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
};
}
- void CheckPlanForAggregatePushdown(const TString& query, NYdb::NTable::TTableClient& tableClient) {
+ void CheckPlanForAggregatePushdown(const TString& query, NYdb::NTable::TTableClient& tableClient, const std::vector<std::string>& planNodes) {
TStreamExecScanQuerySettings scanSettings;
scanSettings.Explain(true);
auto res = tableClient.StreamExecuteScanQuery(query, scanSettings).GetValueSync();
@@ -520,8 +520,10 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto planRes = CollectStreamResult(res);
auto ast = planRes.QueryStats->Getquery_ast();
- UNIT_ASSERT_C(ast.find("TKqpOlapAgg") != std::string::npos,
- TStringBuilder() << "Aggregate was not pushed down. Query: " << query);
+ for (auto planNode : planNodes) {
+ UNIT_ASSERT_C(ast.find(planNode) != std::string::npos,
+ TStringBuilder() << planNode << " was not pushed down. Query: " << query);
+ }
}
Y_UNIT_TEST_TWIN(SimpleQueryOlap, UseSessionActor) {
@@ -1320,7 +1322,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(result, R"([[23000u;]])");
// Check plan
- CheckPlanForAggregatePushdown(query, tableClient);
+ CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" });
}
}
@@ -1364,7 +1366,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(result, R"([[23000u;]])");
// Check plan
- CheckPlanForAggregatePushdown(query, tableClient);
+ CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" });
}
}
@@ -1408,7 +1410,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(result, R"([[23000u;]])");
// Check plan
- CheckPlanForAggregatePushdown(query, tableClient);
+ CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" });
}
}
@@ -1447,6 +1449,92 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
}
+ Y_UNIT_TEST(AggregationAndFilterPushdownOnSameCols) {
+ // remove this return when Filter and Aggregation pushdown will be fixed on columnshard
+ return;
+
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false)
+ .SetEnableOlapSchemaOperations(true);
+ TKikimrRunner kikimr(settings);
+
+ // EnableDebugLogging(kikimr);
+ CreateTestOlapTable(kikimr);
+ auto tableClient = kikimr.GetTableClient();
+
+ {
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 11000, 3001000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 12000, 3002000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 13000, 3003000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 14000, 3004000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 20000, 2000000, 7000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 30000, 1000000, 11000);
+ }
+
+ {
+ TString query = R"(
+ --!syntax_v1
+ PRAGMA Kikimr.KqpPushOlapProcess = "true";
+ SELECT
+ COUNT(level)
+ FROM `/Root/olapStore/olapTable`
+ WHERE level = 2
+ )";
+ auto it = tableClient.StreamExecuteScanQuery(query).GetValueSync();
+
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TString result = StreamResultToYson(it);
+ CompareYson(result, R"([[4600u;]])");
+
+ // Check plan
+ CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg", "KqpOlapFilter" });
+ }
+ }
+
+ Y_UNIT_TEST(AggregationAndFilterPushdownOnDiffCols) {
+ // remove this return when Filter and Aggregation pushdown will be fixed on columnshard
+ return;
+
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false)
+ .SetEnableOlapSchemaOperations(true);
+ TKikimrRunner kikimr(settings);
+
+ // EnableDebugLogging(kikimr);
+ CreateTestOlapTable(kikimr);
+ auto tableClient = kikimr.GetTableClient();
+
+ {
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 11000, 3001000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 12000, 3002000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 13000, 3003000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 14000, 3004000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 20000, 2000000, 7000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 30000, 1000000, 11000);
+ }
+
+ {
+ TString query = R"(
+ --!syntax_v1
+ PRAGMA Kikimr.KqpPushOlapProcess = "true";
+ SELECT
+ COUNT(`timestamp`)
+ FROM `/Root/olapStore/olapTable`
+ WHERE level = 2
+ )";
+ auto it = tableClient.StreamExecuteScanQuery(query).GetValueSync();
+
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TString result = StreamResultToYson(it);
+ CompareYson(result, R"([[4600u;]])");
+
+ // Check plan
+ CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg", "KqpOlapFilter" });
+ }
+ }
+
Y_UNIT_TEST_TWIN(StatsSysView, UseSessionActor) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
diff --git a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
index 0ec7aefce0..a373d366f6 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
@@ -83,6 +83,26 @@ bool DataHasOnly(const TVector<TString>& blobs, const TString& srtSchema, std::p
return true;
}
+bool CheckIntValue(const TVector<TString>& blobs, const TString& srcSchema, const TString& colName,
+ const TVector<int>& expectedVals) {
+ auto schema = NArrow::DeserializeSchema(srcSchema);
+ for (auto& blob : blobs) {
+ auto batch = NArrow::DeserializeBatch(blob, schema);
+ UNIT_ASSERT(batch);
+
+ std::shared_ptr<arrow::Array> array = batch->GetColumnByName(colName);
+ UNIT_ASSERT(array);
+ auto& val = dynamic_cast<const arrow::NumericArray<arrow::Int64Type>&>(*array);
+
+ UNIT_ASSERT(val.length() == (int)expectedVals.size());
+ for (int i = 0; i < val.length(); ++i) {
+ int value = val.Value(i);
+ UNIT_ASSERT(value == expectedVals[i]);
+ }
+ }
+ return true;
+}
+
bool CheckOrdered(const TString& blob, const TString& srtSchema) {
auto schema = NArrow::DeserializeSchema(srtSchema);
auto batch = NArrow::DeserializeBatch(blob, schema);
@@ -921,6 +941,52 @@ NKikimrSSA::TProgram MakeSelectAggregates(TAggAssignment::EAggregateFunction agg
return ssa;
}
+// SELECT some(timestamp) FROM t WHERE level = 1
+//
+// FIXME:
+// NotImplemented: Function any has no kernel matching input types (array[timestamp[us]])
+// NotImplemented: Function any has no kernel matching input types (array[string])
+// NotImplemented: Function any has no kernel matching input types (array[int32])
+// NotImplemented: Function min_max has no kernel matching input types (array[timestamp[us]])
+// NotImplemented: Function min_max has no kernel matching input types (array[string])
+//
+NKikimrSSA::TProgram MakeSelectAggregatesWithFilter(TAggAssignment::EAggregateFunction aggId = TAggAssignment::AGG_ANY,
+ std::vector<ui32> columnIds = {1, 5})
+{
+ NKikimrSSA::TProgram ssa;
+ ui32 tmpColumnId = 100;
+
+ auto* line1 = ssa.AddCommand();
+ auto* l1_assign = line1->MutableAssign();
+ l1_assign->MutableColumn()->SetId(tmpColumnId);
+ l1_assign->MutableConstant()->SetInt32(1);
+
+ auto* line2 = ssa.AddCommand();
+ auto* l2_assign = line2->MutableAssign();
+ l2_assign->MutableColumn()->SetId(tmpColumnId + 1);
+ auto* l2_func = l2_assign->MutableFunction();
+ l2_func->SetId(TAssignment::FUNC_CMP_EQUAL);
+ l2_func->AddArguments()->SetId(columnIds[1]);
+ l2_func->AddArguments()->SetId(tmpColumnId);
+
+ auto* line3 = ssa.AddCommand();
+ line3->MutableFilter()->MutablePredicate()->SetId(tmpColumnId + 1);
+
+ auto* line4 = ssa.AddCommand();
+ auto* groupBy = line4->MutableGroupBy();
+ //
+ auto* l4_agg1 = groupBy->AddAggregates();
+ l4_agg1->MutableColumn()->SetId(tmpColumnId + 2);
+ auto* l4_agg1_f = l4_agg1->MutableFunction();
+ l4_agg1_f->SetId(aggId);
+ l4_agg1_f->AddArguments()->SetId(columnIds[0]);
+
+ auto* line5 = ssa.AddCommand();
+ auto* proj = line5->MutableProjection();
+ proj->AddColumns()->SetId(tmpColumnId + 2);
+ return ssa;
+}
+
void TestReadWithProgram(const TVector<std::pair<TString, TTypeId>>& ydbSchema = TTestSchema::YdbSchema())
{
TTestBasicRuntime runtime;
@@ -1101,6 +1167,18 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T
UNIT_ASSERT(program.SerializeToString(&programs.back()));
}
+ {
+ NKikimrSSA::TProgram ssa = MakeSelectAggregatesWithFilter(TAggAssignment::AGG_COUNT);
+ TString serialized;
+ UNIT_ASSERT(ssa.SerializeToString(&serialized));
+ NKikimrSSA::TOlapProgram program;
+ program.SetProgram(serialized);
+
+ // TODO: Uncomment to run this test!
+ // programs.push_back("");
+ // UNIT_ASSERT(program.SerializeToString(&programs.back()));
+ }
+
ui32 i = 0;
for (auto& programText : programs) {
auto* readEvent = new TEvColumnShard::TEvRead(sender, metaShard, planStep, txId, tableId);
@@ -1126,12 +1204,20 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeId>>& ydbSchema = T
UNIT_ASSERT(resRead.GetData().size() > 0);
auto& meta = resRead.GetMeta();
- //auto& schema = meta.GetSchema();
+ auto& schema = meta.GetSchema();
TVector<TString> readData;
readData.push_back(resRead.GetData());
- UNIT_ASSERT(CheckColumns(readData[0], meta, {"100", "101"}, 1));
+ switch (i) {
+ case 2:
+ UNIT_ASSERT(CheckColumns(readData[0], meta, {"102"}, 1));
+ UNIT_ASSERT(CheckIntValue(readData, schema, "102", {1}));
+ break;
+ default:
+ UNIT_ASSERT(CheckColumns(readData[0], meta, {"100", "101"}, 1));
+ break;
+ }
}
++i;