diff options
| author | Vitalii Gridnev <[email protected]> | 2025-03-26 21:01:02 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-03-26 21:01:02 +0300 |
| commit | cfb285473337c588351acfb7ef577c2adfff95f2 (patch) | |
| tree | ed8027a0b06ed456054e4cb88a7855bffa1e4bf4 | |
| parent | 48638955a6b8d69340e00876d5ec01dcbff07045 (diff) | |
parameters pushdown support prototype (#16251)
| -rw-r--r-- | ydb/core/kqp/expr_nodes/kqp_expr_nodes.json | 3 | ||||
| -rw-r--r-- | ydb/core/kqp/host/kqp_type_ann.cpp | 21 | ||||
| -rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp | 22 | ||||
| -rw-r--r-- | ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp | 6 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 87 |
5 files changed, 124 insertions, 15 deletions
diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index 3f1efc86bad..9f2b033f353 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -679,7 +679,8 @@ "Children": [ {"Index": 0, "Name": "Type", "Type": "TExprBase"}, {"Index": 1, "Name": "Columns", "Type": "TCoAtomList"}, - {"Index": 2, "Name": "Lambda", "Type": "TCoLambda"} + {"Index": 2, "Name": "Parameters", "Type": "TExprList"}, + {"Index": 3, "Name": "Lambda", "Type": "TCoLambda"} ] }, { diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index d1ea8fa1c56..2c4aed2bcc7 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -1077,7 +1077,7 @@ TStatus AnnotateOlapFilter(const TExprNode::TPtr& node, TExprContext& ctx) { } TStatus AnnotateOlapApply(const TExprNode::TPtr& node, TExprContext& ctx) { - if (!EnsureArgsCount(*node, 3U, ctx)) { + if (!EnsureArgsCount(*node, 4U, ctx)) { return TStatus::Error; } @@ -1097,7 +1097,8 @@ TStatus AnnotateOlapApply(const TExprNode::TPtr& node, TExprContext& ctx) { } const auto structType = argsType->Cast<TStructExprType>(); - TTypeAnnotationNode::TListType argsTypes(columns->ChildrenSize()); + std::vector<const NYql::TTypeAnnotationNode*> argsTypes(columns->ChildrenSize()); + for (auto i = 0U; i < argsTypes.size(); ++i) { if (const auto argType = structType->FindItemType(columns->Child(i)->Content())) argsTypes[i] = argType; @@ -1109,6 +1110,22 @@ TStatus AnnotateOlapApply(const TExprNode::TPtr& node, TExprContext& ctx) { } } + TExprList parameters = TExprList(node->Child(TKqpOlapApply::idx_Parameters)); + + for(auto expr: parameters) { + if (!EnsureArgsCount(*expr.Ptr(), 2U, ctx)) { + return TStatus::Error; + } + + TCoParameter param = TMaybeNode<TCoParameter>(expr.Ptr()).Cast(); + const auto& paramType = expr.Ptr()->Child(TCoParameter::idx_Type); + if (!EnsureType(*paramType, ctx)) { + return TStatus::Error; + } + + argsTypes.push_back(paramType->GetTypeAnn()->Cast<TTypeExprType>()->GetType()); + } + if (!EnsureLambda(node->Tail(), ctx)) { return TStatus::Error; } diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp index 5ec99a88efb..6f18f8ce1b8 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp @@ -214,11 +214,6 @@ TMaybeNode<TExprBase> YqlApplyPushdown(const TExprBase& apply, const TExprNode& return false; }); - // Temporary fix for https://st.yandex-team.ru/KIKIMR-22216 - if (parameters.size()!=0) { - return nullptr; - } - const auto members = FindNodes(apply.Ptr(), [&argument] (const TExprNode::TPtr& node) { if (const auto maybeMember = TMaybeNode<TCoMember>(node)) return maybeMember.Cast().Struct().Raw() == &argument; @@ -231,10 +226,18 @@ TMaybeNode<TExprBase> YqlApplyPushdown(const TExprBase& apply, const TExprNode& arguments.reserve(members.size()); for (const auto& member : members) { columns.emplace_back(member->TailPtr()); - arguments.emplace_back(ctx.NewArgument(member->Pos(), columns.back()->Content())); + TString argumentName = "members_" + TString(columns.back()->Content()); + arguments.emplace_back(ctx.NewArgument(member->Pos(), TStringBuf(argumentName))); replacements.emplace(member.Get(), arguments.back()); } + for(const auto& pptr : parameters) { + TCoParameter parameter = TMaybeNode<TCoParameter>(pptr).Cast(); + TString argumentName = "parameter_" + TString(parameter.Name().StringValue()); + arguments.emplace_back(ctx.NewArgument(pptr->Pos(), TStringBuf(argumentName))); + replacements.emplace(pptr.Get(), arguments.back()); + } + // Temporary fix for https://st.yandex-team.ru/KIKIMR-22560 if (!columns.size()) { return nullptr; @@ -243,6 +246,7 @@ TMaybeNode<TExprBase> YqlApplyPushdown(const TExprBase& apply, const TExprNode& return Build<TKqpOlapApply>(ctx, apply.Pos()) .Type(ExpandType(argument.Pos(), *argument.GetTypeAnn(), ctx)) .Columns().Add(std::move(columns)).Build() + .Parameters().Add(std::move(parameters)).Build() .Lambda(ctx.NewLambda(apply.Pos(), ctx.NewArguments(argument.Pos(), std::move(arguments)), ctx.ReplaceNodes(apply.Ptr(), replacements))) .Done(); } @@ -401,7 +405,7 @@ std::vector<TExprBase> ConvertComparisonNode(const TExprBase& nodeIn, const TExp return SafeCastPredicatePushdown(maybeFlatmap.Cast(), argument, ctx, pos); } else if (auto maybePredicate = node.Maybe<TCoCompare>()) { return SimplePredicatePushdown(maybePredicate.Cast(), argument, ctx, pos); - } + } if constexpr (NKikimr::NSsa::RuntimeVersion >= 5U) { return YqlApplyPushdown(node, argument, ctx); @@ -800,7 +804,7 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz } remaining = std::move(remainingAfterApply); } - + if (pushedPredicates.empty()) { return node; } @@ -810,7 +814,7 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz const auto remainingFilter = CombinePredicatesWithAnd(remaining, ctx, node.Pos(), false, true); TMaybeNode<TExprBase> olapFilter; - if (pushedFilter.FirstLevelOps.IsValid()) { + if (pushedFilter.FirstLevelOps.IsValid()) { olapFilter = Build<TKqpOlapFilter>(ctx, node.Pos()) .Input(read.Process().Body()) .Condition(pushedFilter.FirstLevelOps.Cast()) diff --git a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp index df94546389f..ba0f3136039 100644 --- a/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp @@ -600,6 +600,12 @@ TTypedColumn CompileYqlKernelScalarApply(const TKqpOlapApply& apply, TKqpOlapCom argTypes.emplace_back(arg.Type); } + for(const auto& param: apply.Parameters()) { + const auto& arg = GetOrCreateColumnIdAndType(param, ctx); + ids.emplace_back(arg.Id); + argTypes.emplace_back(arg.Type); + } + auto *const command = ctx.CreateAssignCmd(); auto *const function = command->MutableFunction(); const auto idx = ctx.GetKernelRequestBuilder().AddScalarApply(apply.Lambda().Ref(), argTypes, ctx.ExprCtx()); diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 21df7a55360..1b64ef2b8cf 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -1458,7 +1458,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { //This check is based on an assumpltion, that for pushed down predicates column names are preserved in AST //But for non-pushed down predicates column names are (usually) replaced with a label, started with $. It' not a rule, but a heuristic //So this check may require a correction when some ast optimization rules are changed - UNIT_ASSERT_C(ast.find(R"((Unwrap (/ $)") != std::string::npos, + UNIT_ASSERT_C(ast.find(R"((Unwrap (/ $)") != std::string::npos, TStringBuilder() << "Unsafe subpredicate is pushed down. Query: " << query); UNIT_ASSERT_C(ast.find("NarrowMap") != std::string::npos, @@ -1776,6 +1776,87 @@ Y_UNIT_TEST_SUITE(KqpOlap) { UNIT_ASSERT_VALUES_EQUAL(result, insertRows); } + Y_UNIT_TEST(PredicatePushdownWithParametersILike) { + constexpr bool logQueries = true; + auto settings = TKikimrSettings() + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + + TStreamExecScanQuerySettings scanSettings; + scanSettings.Explain(true); + + TLocalHelper(kikimr.GetTestServer()).CreateTestOlapTable(); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000); + + auto tableClient = kikimr.GetTableClient(); + + auto buildQuery = [](bool pushEnabled) { + TStringBuilder builder; + + builder << "--!syntax_v1" << Endl; + + if (!pushEnabled) { + builder << "PRAGMA Kikimr.OptEnableOlapPushdown = \"false\";" << Endl; + } + + builder << R"( + DECLARE $in_uid AS Utf8; + DECLARE $in_level AS Int32; + + SELECT `timestamp` FROM `/Root/olapStore/olapTable` WHERE + uid ILIKE "uid_%" || $in_uid || "%" AND level > $in_level + ORDER BY `timestamp`; + )" << Endl; + + return builder; + }; + + auto normalQuery = buildQuery(false); + auto pushQuery = buildQuery(true); + + auto params = tableClient.GetParamsBuilder() + .AddParam("$in_uid") + .Utf8("3000") + .Build() + .AddParam("$in_level") + .Int32(2) + .Build() + .Build(); + + auto it = tableClient.StreamExecuteScanQuery(normalQuery, params).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + auto goodResult = CollectStreamResult(it); + + it = tableClient.StreamExecuteScanQuery(pushQuery, params).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + auto pushResult = CollectStreamResult(it); + + if (logQueries) { + Cerr << "Query: " << normalQuery << Endl; + Cerr << "Expected: " << goodResult.ResultSetYson << Endl; + Cerr << "Received: " << pushResult.ResultSetYson << Endl; + } + + CompareYson(goodResult.ResultSetYson, pushResult.ResultSetYson); + + it = tableClient.StreamExecuteScanQuery(pushQuery, scanSettings).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + auto result = CollectStreamResult(it); + auto ast = result.QueryStats->Getquery_ast(); + + UNIT_ASSERT_C(ast.find("KqpOlapFilter") != std::string::npos, + TStringBuilder() << "Predicate not pushed down. Query: " << pushQuery); + + NJson::TJsonValue plan, readRange; + NJson::ReadJsonTree(*result.PlanJson, &plan, true); + + Cerr << result.PlanJson << Endl; + + readRange = FindPlanNodeByKv(plan, "Name", "TableFullScan"); + UNIT_ASSERT(readRange.IsDefined()); + } + Y_UNIT_TEST(PredicatePushdownWithParameters) { constexpr bool logQueries = true; auto settings = TKikimrSettings() @@ -3277,9 +3358,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) { )", noTx).GetValueSync(); UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - + result = queryClient.ExecuteQuery(R"( - UPSERT INTO Test (Id, Name, Comment) VALUES + UPSERT INTO Test (Id, Name, Comment) VALUES (10, "n1", "aa"), (20, "n2", "bb"), (30, "n3", "cc"), |
