diff options
author | ulya-sidorina <yulia@ydb.tech> | 2023-03-13 14:24:35 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2023-03-13 14:24:35 +0300 |
commit | d13400c47d91842ecfb0e6b67c79e1f645db956b (patch) | |
tree | bae3dafb09a6e1c79c21943368139b1aad0a7453 | |
parent | f7e61bcdd954ae42d8d6b5d2388b4f8446380f5a (diff) | |
download | ydb-d13400c47d91842ecfb0e6b67c79e1f645db956b.tar.gz |
apply filter before deduplication for index lookup join
fix(kqp): apply filter before deduplication
3 files changed, 113 insertions, 30 deletions
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp index 96ac7409fa9..6edcd849d15 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp @@ -71,12 +71,44 @@ TExprBase ConvertToTuples(const TSet<TString>& columns, const TCoArgument& struc .Done(); } -TExprBase DeduplicateByMembers(const TExprBase& expr, const TSet<TString>& members, TExprContext& ctx, - TPositionHandle pos) +TExprBase DeduplicateByMembers(const TExprBase& expr, const TMaybeNode<TCoLambda>& filter, const TSet<TString>& members, + TExprContext& ctx, TPositionHandle pos) { - auto structArg = Build<TCoArgument>(ctx, pos) - .Name("struct") + TMaybeNode<TCoLambda> lambda; + if (filter.IsValid()) { + lambda = Build<TCoLambda>(ctx, pos) + .Args({"tuple"}) + .Body<TCoTake>() + .Input<TCoFilter>() + .Input<TCoNth>() + .Tuple("tuple") + .Index().Value("1").Build() + .Build() + .Lambda(filter.Cast()) + .Build() + .Count<TCoUint64>() + .Literal().Value("1").Build() + .Build() + .Build() + .Done(); + } else { + lambda = Build<TCoLambda>(ctx, pos) + .Args({"tuple"}) + .Body<TCoTake>() + .Input<TCoNth>() + .Tuple("tuple") + .Index().Value("1").Build() + .Build() + .Count<TCoUint64>() + .Literal().Value("1").Build() + .Build() + .Build() .Done(); + } + + auto structArg = Build<TCoArgument>(ctx, pos) + .Name("struct") + .Done(); return Build<TCoPartitionByKey>(ctx, pos) .Input(expr) @@ -92,18 +124,7 @@ TExprBase DeduplicateByMembers(const TExprBase& expr, const TSet<TString>& membe .Args({"stream"}) .Body<TCoFlatMap>() .Input("stream") - .Lambda() - .Args({"tuple"}) - .Body<TCoTake>() - .Input<TCoNth>() - .Tuple("tuple") - .Index().Value("1").Build() - .Build() - .Count<TCoUint64>() - .Literal().Value("1").Build() - .Build() - .Build() - .Build() + .Lambda(lambda.Cast()) .Build() .Build() .Done(); @@ -480,7 +501,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext .Done() : join.LeftInput(); - auto leftDataDeduplicated = DeduplicateByMembers(leftData, deduplicateLeftColumns, ctx, join.Pos()); + TMaybeNode<TCoLambda> filter; if (!equalLeftKeys.empty()) { auto row = Build<TCoArgument>(ctx, join.Pos()) @@ -507,22 +528,20 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext } } - leftDataDeduplicated = Build<TCoFilter>(ctx, join.Pos()) - .Input(leftDataDeduplicated) - .Lambda() - .Args({row}) - .Body<TCoCoalesce>() - .Predicate<TCoAnd>() - .Add(conditions) - .Build() - .Value<TCoBool>() - .Literal().Build("false") - .Build() + filter = Build<TCoLambda>(ctx, join.Pos()) + .Args({row}) + .Body<TCoCoalesce>() + .Predicate<TCoAnd>() + .Add(conditions) + .Build() + .Value<TCoBool>() + .Literal().Build("false") .Build() .Build() .Done(); } + auto leftDataDeduplicated = DeduplicateByMembers(leftData, filter, deduplicateLeftColumns, ctx, join.Pos()); auto keysToLookup = Build<TCoMap>(ctx, join.Pos()) .Input(leftDataDeduplicated) .Lambda() diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index cf0165ab183..12b95f11d96 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -1449,6 +1449,66 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).reads().rows(), 4); } + Y_UNIT_TEST(JoinIdxLookupWithPredicate) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + AssertSuccessResult(session.ExecuteSchemeQuery(R"( + CREATE TABLE `Left` ( + Key Uint64, + Value1 Uint64, + Value2 String, + PRIMARY KEY (Key) + ); + )").GetValueSync()); + + AssertSuccessResult(session.ExecuteSchemeQuery(R"( + CREATE TABLE `Right` ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )").GetValueSync()); + + AssertSuccessResult(session.ExecuteDataQuery(R"( + REPLACE INTO `Left` (Key, Value1, Value2) VALUES + (1, 6, "Value1"), + (2, 2, "Value1"), + (3, 3, "Value2"), + (4, 4, "Value2"), + (5, 5, "Value3"), + (6, 6, "Value1"); + + REPLACE INTO `Right` (Key, Value) VALUES + (1, "One"), + (2, "Two"), + (3, "Three"), + (4, "Four"), + (5, "Five"), + (6, "Six"); + )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync()); + + auto result = session.ExecuteDataQuery(R"( + $input = ( + SELECT Key, Value1 + FROM `Left` WHERE Value2 == "Value1" + ); + + SELECT t1.Key AS Key, t2.Value AS Value + FROM $input AS t1 + INNER JOIN `Right` AS t2 + ON t1.Value1 = t2.Key AND t1.Key = t2.Key + ORDER BY Key, Value; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + CompareYson(R"([ + [[2u];["Two"]]; + [[6u];["Six"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + Y_UNIT_TEST(LeftSemiJoin) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetTableClient(); diff --git a/ydb/tests/functional/suite_tests/canondata/test_postgres.TestPGSQL.test_sql_suite_plan-jointest_join4.test_/query_5.plan b/ydb/tests/functional/suite_tests/canondata/test_postgres.TestPGSQL.test_sql_suite_plan-jointest_join4.test_/query_5.plan index 0618f58bf6b..166bfa62c13 100644 --- a/ydb/tests/functional/suite_tests/canondata/test_postgres.TestPGSQL.test_sql_suite_plan-jointest_join4.test_/query_5.plan +++ b/ydb/tests/functional/suite_tests/canondata/test_postgres.TestPGSQL.test_sql_suite_plan-jointest_join4.test_/query_5.plan @@ -115,15 +115,19 @@ "PlanNodeType": "Materialize", "Plans": [ { - "Node Type": "Filter-Limit", + "Node Type": "Filter-Limit-Filter", "Operators": [ { "Name": "Filter", - "Predicate": "item.i == item.i And Exist(item.i)" + "Predicate": "Exist(item.i)" }, { "Limit": "1", "Name": "Limit" + }, + { + "Name": "Filter", + "Predicate": "item.i == item.i" } ], "PlanNodeId": 6 |