summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <[email protected]>2025-03-26 21:01:02 +0300
committerGitHub <[email protected]>2025-03-26 21:01:02 +0300
commitcfb285473337c588351acfb7ef577c2adfff95f2 (patch)
treeed8027a0b06ed456054e4cb88a7855bffa1e4bf4
parent48638955a6b8d69340e00876d5ec01dcbff07045 (diff)
parameters pushdown support prototype (#16251)
-rw-r--r--ydb/core/kqp/expr_nodes/kqp_expr_nodes.json3
-rw-r--r--ydb/core/kqp/host/kqp_type_ann.cpp21
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp22
-rw-r--r--ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp6
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp87
5 files changed, 124 insertions, 15 deletions
diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
index 3f1efc86bad..9f2b033f353 100644
--- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
+++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
@@ -679,7 +679,8 @@
"Children": [
{"Index": 0, "Name": "Type", "Type": "TExprBase"},
{"Index": 1, "Name": "Columns", "Type": "TCoAtomList"},
- {"Index": 2, "Name": "Lambda", "Type": "TCoLambda"}
+ {"Index": 2, "Name": "Parameters", "Type": "TExprList"},
+ {"Index": 3, "Name": "Lambda", "Type": "TCoLambda"}
]
},
{
diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp
index d1ea8fa1c56..2c4aed2bcc7 100644
--- a/ydb/core/kqp/host/kqp_type_ann.cpp
+++ b/ydb/core/kqp/host/kqp_type_ann.cpp
@@ -1077,7 +1077,7 @@ TStatus AnnotateOlapFilter(const TExprNode::TPtr& node, TExprContext& ctx) {
}
TStatus AnnotateOlapApply(const TExprNode::TPtr& node, TExprContext& ctx) {
- if (!EnsureArgsCount(*node, 3U, ctx)) {
+ if (!EnsureArgsCount(*node, 4U, ctx)) {
return TStatus::Error;
}
@@ -1097,7 +1097,8 @@ TStatus AnnotateOlapApply(const TExprNode::TPtr& node, TExprContext& ctx) {
}
const auto structType = argsType->Cast<TStructExprType>();
- TTypeAnnotationNode::TListType argsTypes(columns->ChildrenSize());
+ std::vector<const NYql::TTypeAnnotationNode*> argsTypes(columns->ChildrenSize());
+
for (auto i = 0U; i < argsTypes.size(); ++i) {
if (const auto argType = structType->FindItemType(columns->Child(i)->Content()))
argsTypes[i] = argType;
@@ -1109,6 +1110,22 @@ TStatus AnnotateOlapApply(const TExprNode::TPtr& node, TExprContext& ctx) {
}
}
+ TExprList parameters = TExprList(node->Child(TKqpOlapApply::idx_Parameters));
+
+ for(auto expr: parameters) {
+ if (!EnsureArgsCount(*expr.Ptr(), 2U, ctx)) {
+ return TStatus::Error;
+ }
+
+ TCoParameter param = TMaybeNode<TCoParameter>(expr.Ptr()).Cast();
+ const auto& paramType = expr.Ptr()->Child(TCoParameter::idx_Type);
+ if (!EnsureType(*paramType, ctx)) {
+ return TStatus::Error;
+ }
+
+ argsTypes.push_back(paramType->GetTypeAnn()->Cast<TTypeExprType>()->GetType());
+ }
+
if (!EnsureLambda(node->Tail(), ctx)) {
return TStatus::Error;
}
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
index 5ec99a88efb..6f18f8ce1b8 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
@@ -214,11 +214,6 @@ TMaybeNode<TExprBase> YqlApplyPushdown(const TExprBase& apply, const TExprNode&
return false;
});
- // Temporary fix for https://st.yandex-team.ru/KIKIMR-22216
- if (parameters.size()!=0) {
- return nullptr;
- }
-
const auto members = FindNodes(apply.Ptr(), [&argument] (const TExprNode::TPtr& node) {
if (const auto maybeMember = TMaybeNode<TCoMember>(node))
return maybeMember.Cast().Struct().Raw() == &argument;
@@ -231,10 +226,18 @@ TMaybeNode<TExprBase> YqlApplyPushdown(const TExprBase& apply, const TExprNode&
arguments.reserve(members.size());
for (const auto& member : members) {
columns.emplace_back(member->TailPtr());
- arguments.emplace_back(ctx.NewArgument(member->Pos(), columns.back()->Content()));
+ TString argumentName = "members_" + TString(columns.back()->Content());
+ arguments.emplace_back(ctx.NewArgument(member->Pos(), TStringBuf(argumentName)));
replacements.emplace(member.Get(), arguments.back());
}
+ for(const auto& pptr : parameters) {
+ TCoParameter parameter = TMaybeNode<TCoParameter>(pptr).Cast();
+ TString argumentName = "parameter_" + TString(parameter.Name().StringValue());
+ arguments.emplace_back(ctx.NewArgument(pptr->Pos(), TStringBuf(argumentName)));
+ replacements.emplace(pptr.Get(), arguments.back());
+ }
+
// Temporary fix for https://st.yandex-team.ru/KIKIMR-22560
if (!columns.size()) {
return nullptr;
@@ -243,6 +246,7 @@ TMaybeNode<TExprBase> YqlApplyPushdown(const TExprBase& apply, const TExprNode&
return Build<TKqpOlapApply>(ctx, apply.Pos())
.Type(ExpandType(argument.Pos(), *argument.GetTypeAnn(), ctx))
.Columns().Add(std::move(columns)).Build()
+ .Parameters().Add(std::move(parameters)).Build()
.Lambda(ctx.NewLambda(apply.Pos(), ctx.NewArguments(argument.Pos(), std::move(arguments)), ctx.ReplaceNodes(apply.Ptr(), replacements)))
.Done();
}
@@ -401,7 +405,7 @@ std::vector<TExprBase> ConvertComparisonNode(const TExprBase& nodeIn, const TExp
return SafeCastPredicatePushdown(maybeFlatmap.Cast(), argument, ctx, pos);
} else if (auto maybePredicate = node.Maybe<TCoCompare>()) {
return SimplePredicatePushdown(maybePredicate.Cast(), argument, ctx, pos);
- }
+ }
if constexpr (NKikimr::NSsa::RuntimeVersion >= 5U) {
return YqlApplyPushdown(node, argument, ctx);
@@ -800,7 +804,7 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
}
remaining = std::move(remainingAfterApply);
}
-
+
if (pushedPredicates.empty()) {
return node;
}
@@ -810,7 +814,7 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
const auto remainingFilter = CombinePredicatesWithAnd(remaining, ctx, node.Pos(), false, true);
TMaybeNode<TExprBase> olapFilter;
- if (pushedFilter.FirstLevelOps.IsValid()) {
+ if (pushedFilter.FirstLevelOps.IsValid()) {
olapFilter = Build<TKqpOlapFilter>(ctx, node.Pos())
.Input(read.Process().Body())
.Condition(pushedFilter.FirstLevelOps.Cast())
diff --git a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
index df94546389f..ba0f3136039 100644
--- a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
@@ -600,6 +600,12 @@ TTypedColumn CompileYqlKernelScalarApply(const TKqpOlapApply& apply, TKqpOlapCom
argTypes.emplace_back(arg.Type);
}
+ for(const auto& param: apply.Parameters()) {
+ const auto& arg = GetOrCreateColumnIdAndType(param, ctx);
+ ids.emplace_back(arg.Id);
+ argTypes.emplace_back(arg.Type);
+ }
+
auto *const command = ctx.CreateAssignCmd();
auto *const function = command->MutableFunction();
const auto idx = ctx.GetKernelRequestBuilder().AddScalarApply(apply.Lambda().Ref(), argTypes, ctx.ExprCtx());
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index 21df7a55360..1b64ef2b8cf 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -1458,7 +1458,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
//This check is based on an assumpltion, that for pushed down predicates column names are preserved in AST
//But for non-pushed down predicates column names are (usually) replaced with a label, started with $. It' not a rule, but a heuristic
//So this check may require a correction when some ast optimization rules are changed
- UNIT_ASSERT_C(ast.find(R"((Unwrap (/ $)") != std::string::npos,
+ UNIT_ASSERT_C(ast.find(R"((Unwrap (/ $)") != std::string::npos,
TStringBuilder() << "Unsafe subpredicate is pushed down. Query: " << query);
UNIT_ASSERT_C(ast.find("NarrowMap") != std::string::npos,
@@ -1776,6 +1776,87 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_VALUES_EQUAL(result, insertRows);
}
+ Y_UNIT_TEST(PredicatePushdownWithParametersILike) {
+ constexpr bool logQueries = true;
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false);
+ TKikimrRunner kikimr(settings);
+
+ TStreamExecScanQuerySettings scanSettings;
+ scanSettings.Explain(true);
+
+ TLocalHelper(kikimr.GetTestServer()).CreateTestOlapTable();
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000);
+
+ auto tableClient = kikimr.GetTableClient();
+
+ auto buildQuery = [](bool pushEnabled) {
+ TStringBuilder builder;
+
+ builder << "--!syntax_v1" << Endl;
+
+ if (!pushEnabled) {
+ builder << "PRAGMA Kikimr.OptEnableOlapPushdown = \"false\";" << Endl;
+ }
+
+ builder << R"(
+ DECLARE $in_uid AS Utf8;
+ DECLARE $in_level AS Int32;
+
+ SELECT `timestamp` FROM `/Root/olapStore/olapTable` WHERE
+ uid ILIKE "uid_%" || $in_uid || "%" AND level > $in_level
+ ORDER BY `timestamp`;
+ )" << Endl;
+
+ return builder;
+ };
+
+ auto normalQuery = buildQuery(false);
+ auto pushQuery = buildQuery(true);
+
+ auto params = tableClient.GetParamsBuilder()
+ .AddParam("$in_uid")
+ .Utf8("3000")
+ .Build()
+ .AddParam("$in_level")
+ .Int32(2)
+ .Build()
+ .Build();
+
+ auto it = tableClient.StreamExecuteScanQuery(normalQuery, params).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ auto goodResult = CollectStreamResult(it);
+
+ it = tableClient.StreamExecuteScanQuery(pushQuery, params).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ auto pushResult = CollectStreamResult(it);
+
+ if (logQueries) {
+ Cerr << "Query: " << normalQuery << Endl;
+ Cerr << "Expected: " << goodResult.ResultSetYson << Endl;
+ Cerr << "Received: " << pushResult.ResultSetYson << Endl;
+ }
+
+ CompareYson(goodResult.ResultSetYson, pushResult.ResultSetYson);
+
+ it = tableClient.StreamExecuteScanQuery(pushQuery, scanSettings).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+
+ auto result = CollectStreamResult(it);
+ auto ast = result.QueryStats->Getquery_ast();
+
+ UNIT_ASSERT_C(ast.find("KqpOlapFilter") != std::string::npos,
+ TStringBuilder() << "Predicate not pushed down. Query: " << pushQuery);
+
+ NJson::TJsonValue plan, readRange;
+ NJson::ReadJsonTree(*result.PlanJson, &plan, true);
+
+ Cerr << result.PlanJson << Endl;
+
+ readRange = FindPlanNodeByKv(plan, "Name", "TableFullScan");
+ UNIT_ASSERT(readRange.IsDefined());
+ }
+
Y_UNIT_TEST(PredicatePushdownWithParameters) {
constexpr bool logQueries = true;
auto settings = TKikimrSettings()
@@ -3277,9 +3358,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
)", noTx).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
-
+
result = queryClient.ExecuteQuery(R"(
- UPSERT INTO Test (Id, Name, Comment) VALUES
+ UPSERT INTO Test (Id, Name, Comment) VALUES
(10, "n1", "aa"),
(20, "n2", "bb"),
(30, "n3", "cc"),