aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2023-03-13 14:24:35 +0300
committerulya-sidorina <yulia@ydb.tech>2023-03-13 14:24:35 +0300
commitd13400c47d91842ecfb0e6b67c79e1f645db956b (patch)
treebae3dafb09a6e1c79c21943368139b1aad0a7453
parentf7e61bcdd954ae42d8d6b5d2388b4f8446380f5a (diff)
downloadydb-d13400c47d91842ecfb0e6b67c79e1f645db956b.tar.gz
apply filter before deduplication for index lookup join
fix(kqp): apply filter before deduplication
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp75
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp60
-rw-r--r--ydb/tests/functional/suite_tests/canondata/test_postgres.TestPGSQL.test_sql_suite_plan-jointest_join4.test_/query_5.plan8
3 files changed, 113 insertions, 30 deletions
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
index 96ac7409fa9..6edcd849d15 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
@@ -71,12 +71,44 @@ TExprBase ConvertToTuples(const TSet<TString>& columns, const TCoArgument& struc
.Done();
}
-TExprBase DeduplicateByMembers(const TExprBase& expr, const TSet<TString>& members, TExprContext& ctx,
- TPositionHandle pos)
+TExprBase DeduplicateByMembers(const TExprBase& expr, const TMaybeNode<TCoLambda>& filter, const TSet<TString>& members,
+ TExprContext& ctx, TPositionHandle pos)
{
- auto structArg = Build<TCoArgument>(ctx, pos)
- .Name("struct")
+ TMaybeNode<TCoLambda> lambda;
+ if (filter.IsValid()) {
+ lambda = Build<TCoLambda>(ctx, pos)
+ .Args({"tuple"})
+ .Body<TCoTake>()
+ .Input<TCoFilter>()
+ .Input<TCoNth>()
+ .Tuple("tuple")
+ .Index().Value("1").Build()
+ .Build()
+ .Lambda(filter.Cast())
+ .Build()
+ .Count<TCoUint64>()
+ .Literal().Value("1").Build()
+ .Build()
+ .Build()
+ .Done();
+ } else {
+ lambda = Build<TCoLambda>(ctx, pos)
+ .Args({"tuple"})
+ .Body<TCoTake>()
+ .Input<TCoNth>()
+ .Tuple("tuple")
+ .Index().Value("1").Build()
+ .Build()
+ .Count<TCoUint64>()
+ .Literal().Value("1").Build()
+ .Build()
+ .Build()
.Done();
+ }
+
+ auto structArg = Build<TCoArgument>(ctx, pos)
+ .Name("struct")
+ .Done();
return Build<TCoPartitionByKey>(ctx, pos)
.Input(expr)
@@ -92,18 +124,7 @@ TExprBase DeduplicateByMembers(const TExprBase& expr, const TSet<TString>& membe
.Args({"stream"})
.Body<TCoFlatMap>()
.Input("stream")
- .Lambda()
- .Args({"tuple"})
- .Body<TCoTake>()
- .Input<TCoNth>()
- .Tuple("tuple")
- .Index().Value("1").Build()
- .Build()
- .Count<TCoUint64>()
- .Literal().Value("1").Build()
- .Build()
- .Build()
- .Build()
+ .Lambda(lambda.Cast())
.Build()
.Build()
.Done();
@@ -480,7 +501,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
.Done()
: join.LeftInput();
- auto leftDataDeduplicated = DeduplicateByMembers(leftData, deduplicateLeftColumns, ctx, join.Pos());
+ TMaybeNode<TCoLambda> filter;
if (!equalLeftKeys.empty()) {
auto row = Build<TCoArgument>(ctx, join.Pos())
@@ -507,22 +528,20 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
}
}
- leftDataDeduplicated = Build<TCoFilter>(ctx, join.Pos())
- .Input(leftDataDeduplicated)
- .Lambda()
- .Args({row})
- .Body<TCoCoalesce>()
- .Predicate<TCoAnd>()
- .Add(conditions)
- .Build()
- .Value<TCoBool>()
- .Literal().Build("false")
- .Build()
+ filter = Build<TCoLambda>(ctx, join.Pos())
+ .Args({row})
+ .Body<TCoCoalesce>()
+ .Predicate<TCoAnd>()
+ .Add(conditions)
+ .Build()
+ .Value<TCoBool>()
+ .Literal().Build("false")
.Build()
.Build()
.Done();
}
+ auto leftDataDeduplicated = DeduplicateByMembers(leftData, filter, deduplicateLeftColumns, ctx, join.Pos());
auto keysToLookup = Build<TCoMap>(ctx, join.Pos())
.Input(leftDataDeduplicated)
.Lambda()
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index cf0165ab183..12b95f11d96 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -1449,6 +1449,66 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).reads().rows(), 4);
}
+ Y_UNIT_TEST(JoinIdxLookupWithPredicate) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ AssertSuccessResult(session.ExecuteSchemeQuery(R"(
+ CREATE TABLE `Left` (
+ Key Uint64,
+ Value1 Uint64,
+ Value2 String,
+ PRIMARY KEY (Key)
+ );
+ )").GetValueSync());
+
+ AssertSuccessResult(session.ExecuteSchemeQuery(R"(
+ CREATE TABLE `Right` (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )").GetValueSync());
+
+ AssertSuccessResult(session.ExecuteDataQuery(R"(
+ REPLACE INTO `Left` (Key, Value1, Value2) VALUES
+ (1, 6, "Value1"),
+ (2, 2, "Value1"),
+ (3, 3, "Value2"),
+ (4, 4, "Value2"),
+ (5, 5, "Value3"),
+ (6, 6, "Value1");
+
+ REPLACE INTO `Right` (Key, Value) VALUES
+ (1, "One"),
+ (2, "Two"),
+ (3, "Three"),
+ (4, "Four"),
+ (5, "Five"),
+ (6, "Six");
+ )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync());
+
+ auto result = session.ExecuteDataQuery(R"(
+ $input = (
+ SELECT Key, Value1
+ FROM `Left` WHERE Value2 == "Value1"
+ );
+
+ SELECT t1.Key AS Key, t2.Value AS Value
+ FROM $input AS t1
+ INNER JOIN `Right` AS t2
+ ON t1.Value1 = t2.Key AND t1.Key = t2.Key
+ ORDER BY Key, Value;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ CompareYson(R"([
+ [[2u];["Two"]];
+ [[6u];["Six"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
Y_UNIT_TEST(LeftSemiJoin) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetTableClient();
diff --git a/ydb/tests/functional/suite_tests/canondata/test_postgres.TestPGSQL.test_sql_suite_plan-jointest_join4.test_/query_5.plan b/ydb/tests/functional/suite_tests/canondata/test_postgres.TestPGSQL.test_sql_suite_plan-jointest_join4.test_/query_5.plan
index 0618f58bf6b..166bfa62c13 100644
--- a/ydb/tests/functional/suite_tests/canondata/test_postgres.TestPGSQL.test_sql_suite_plan-jointest_join4.test_/query_5.plan
+++ b/ydb/tests/functional/suite_tests/canondata/test_postgres.TestPGSQL.test_sql_suite_plan-jointest_join4.test_/query_5.plan
@@ -115,15 +115,19 @@
"PlanNodeType": "Materialize",
"Plans": [
{
- "Node Type": "Filter-Limit",
+ "Node Type": "Filter-Limit-Filter",
"Operators": [
{
"Name": "Filter",
- "Predicate": "item.i == item.i And Exist(item.i)"
+ "Predicate": "Exist(item.i)"
},
{
"Limit": "1",
"Name": "Limit"
+ },
+ {
+ "Name": "Filter",
+ "Predicate": "item.i == item.i"
}
],
"PlanNodeId": 6