aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSergei Puchin <s.puchin@gmail.com>2022-03-13 15:41:34 +0300
committerSergei Puchin <s.puchin@gmail.com>2022-03-13 15:41:34 +0300
commit6d8a80e71c596adbc9148eb3cf285ef131549c2e (patch)
tree8b05b19b8adbd96c8ed6a8c3bb17c3008f37ff67
parent8b6343c5c3c0146152e6d6c176923dc2ae923f70 (diff)
downloadydb-6d8a80e71c596adbc9148eb3cf285ef131549c2e.tar.gz
Fix DqPushJoinToStage optimizer rule. (KIKIMR-14490)
ref:58c9b2b207c29d6d95293743b749ba298a67b387
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp11
-rw-r--r--ydb/core/kqp/ut/kqp_join_ut.cpp38
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_join.cpp74
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.h2
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.cpp8
5 files changed, 85 insertions, 48 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index 11e1af7ae87..1576f97467f 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -46,7 +46,7 @@ public:
AddHandler(0, &TCoLength::Match, HNDL(RewriteLengthOfStageOutput));
AddHandler(0, &TCoExtendBase::Match, HNDL(BuildExtendStage));
AddHandler(0, &TDqJoin::Match, HNDL(RewriteRightJoinToLeft));
- AddHandler(0, &TDqJoin::Match, HNDL(PushJoinToStage<false>));
+ AddHandler(0, &TDqJoin::Match, HNDL(RewriteLeftPureJoin<false>));
AddHandler(0, &TDqJoin::Match, HNDL(BuildJoin<false>));
AddHandler(0, &TDqPrecompute::Match, HNDL(BuildPrecomputeStage));
AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>));
@@ -69,7 +69,7 @@ public:
AddHandler(1, &TCoTake::Match, HNDL(BuildTakeSkipStage<true>));
AddHandler(1, &TCoSortBase::Match, HNDL(BuildSortStage<true>));
AddHandler(1, &TCoTake::Match, HNDL(BuildTakeStage<true>));
- AddHandler(1, &TDqJoin::Match, HNDL(PushJoinToStage<true>));
+ AddHandler(1, &TDqJoin::Match, HNDL(RewriteLeftPureJoin<true>));
AddHandler(1, &TDqJoin::Match, HNDL(BuildJoin<true>));
AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>));
AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>));
@@ -217,11 +217,10 @@ protected:
}
template <bool IsGlobal>
- TMaybeNode<TExprBase> PushJoinToStage(TExprBase node, TExprContext& ctx,
- IOptimizationContext& optCtx, const TGetParents& getParents)
+ TMaybeNode<TExprBase> RewriteLeftPureJoin(TExprBase node, TExprContext& ctx, const TGetParents& getParents)
{
- TExprBase output = DqPushJoinToStage(node, ctx, optCtx, *getParents(), IsGlobal);
- DumpAppliedRule("PushJoinToStage", node.Ptr(), output.Ptr(), ctx);
+ TExprBase output = DqRewriteLeftPureJoin(node, ctx, *getParents(), IsGlobal);
+ DumpAppliedRule("RewriteLeftPureJoin", node.Ptr(), output.Ptr(), ctx);
return output;
}
diff --git a/ydb/core/kqp/ut/kqp_join_ut.cpp b/ydb/core/kqp/ut/kqp_join_ut.cpp
index 8fa59d31b36..f934b6d88dd 100644
--- a/ydb/core/kqp/ut/kqp_join_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_join_ut.cpp
@@ -977,6 +977,44 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
])", FormatResultSetYson(result.GetResultSet(0)));
}
+ Y_UNIT_TEST_NEW_ENGINE(JoinDupColumnRightPure) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ CreateSampleTables(session);
+
+ auto params = TParamsBuilder()
+ .AddParam("$rows")
+ .BeginList()
+ .AddListItem()
+ .BeginStruct()
+ .AddMember("Key").Int32(1)
+ .AddMember("Fk21").Int32(101)
+ .EndStruct()
+ .AddListItem()
+ .BeginStruct()
+ .AddMember("Key").Int32(2)
+ .AddMember("Fk21").Int32(102)
+ .EndStruct()
+ .EndList().Build()
+ .Build();
+
+ auto result = session.ExecuteDataQuery(Q1_(R"(
+ DECLARE $rows AS List<Struct<Key: Int32, Fk21: Int32>>;
+
+ SELECT Key, Key1, Key2
+ FROM AS_TABLE($rows) AS t1
+ LEFT JOIN Join1_2 AS t2 ON t1.Key = t2.Key1 AND t1.Fk21 = t2.Key1
+ ORDER BY Key;
+ )"), TTxControl::BeginTx().CommitTx(), params).GetValueSync();
+
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ CompareYson(R"([
+ [1;#;#];
+ [2;#;#]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
Y_UNIT_TEST_NEW_ENGINE(JoinLeftPureInner) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
diff --git a/ydb/library/yql/dq/opt/dq_opt_join.cpp b/ydb/library/yql/dq/opt/dq_opt_join.cpp
index 4097b49e24a..548ad89c9a1 100644
--- a/ydb/library/yql/dq/opt/dq_opt_join.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_join.cpp
@@ -410,8 +410,8 @@ TExprBase DqRewriteRightJoinToLeft(const TExprBase node, TExprContext& ctx) {
.Done();
}
-TExprBase DqPushJoinToStage(const TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
- const TParentsMap& parentsMap, bool allowStageMultiUsage)
+TExprBase DqRewriteLeftPureJoin(const TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap,
+ bool allowStageMultiUsage)
{
if (!node.Maybe<TDqJoin>()) {
return node;
@@ -419,57 +419,57 @@ TExprBase DqPushJoinToStage(const TExprBase node, TExprContext& ctx, IOptimizati
auto join = node.Cast<TDqJoin>();
- static const std::set<std::string_view> supportedTypes = {"Inner"sv, "Left"sv, "LeftOnly"sv, "LeftSemi"sv};
+ static const std::set<std::string_view> supportedTypes = {"Left"sv, "LeftOnly"sv, "LeftSemi"sv};
auto joinType = join.JoinType().Value();
if (!supportedTypes.contains(joinType)) {
return node;
}
- TMaybeNode<TDqCnUnionAll> connection;
- bool pushLeft = false;
- if (join.LeftInput().Maybe<TDqCnUnionAll>() && IsDqPureExpr(join.RightInput())) {
- connection = join.LeftInput().Cast<TDqCnUnionAll>();
- pushLeft = true;
- }
- if (join.RightInput().Maybe<TDqCnUnionAll>() && IsDqPureExpr(join.LeftInput())) {
- connection = join.RightInput().Cast<TDqCnUnionAll>();
- pushLeft = false;
- }
-
- if (!connection) {
+ if (!join.RightInput().Maybe<TDqCnUnionAll>()) {
return node;
}
- if (!IsSingleConsumerConnection(connection.Cast(), parentsMap, allowStageMultiUsage)) {
+ if (!IsDqPureExpr(join.LeftInput())) {
return node;
}
- TCoArgument inputArg{ctx.NewArgument(join.Pos(), "_dq_join_input")};
+ auto rightConnection = join.RightInput().Cast<TDqCnUnionAll>();
- auto makeFlow = [&ctx](const TExprBase& list) {
- return Build<TCoToFlow>(ctx, list.Pos())
- .Input(list)
- .Done();
- };
-
- auto phyJoin = DqMakePhyMapJoin(
- join,
- pushLeft ? TExprBase(inputArg) : makeFlow(join.LeftInput()),
- pushLeft ? makeFlow(join.RightInput()) : TExprBase(inputArg),
- ctx);
+ if (!IsSingleConsumerConnection(rightConnection, parentsMap, allowStageMultiUsage)) {
+ return node;
+ }
- auto lambda = Build<TCoLambda>(ctx, join.Pos())
- .Args({inputArg})
- .Body(phyJoin)
+ auto leftStage = Build<TDqStage>(ctx, join.Pos())
+ .Inputs()
+ .Build()
+ .Program()
+ .Args({})
+ .Body<TCoToFlow>()
+ .Input(join.LeftInput())
+ .Build()
+ .Build()
+ .Settings(TDqStageSettings().BuildNode(ctx, join.Pos()))
.Done();
- auto result = DqPushLambdaToStageUnionAll(connection.Cast(), lambda, {}, ctx, optCtx);
- if (!result) {
- return node;
- }
+ auto leftConnection = Build<TDqCnUnionAll>(ctx, join.Pos())
+ .Output()
+ .Stage(leftStage)
+ .Index().Build("0")
+ .Build()
+ .Done();
- YQL_ENSURE(result.Maybe<TDqCnUnionAll>());
- return result.Cast();
+ // TODO: Right input might be large, there are better possible physical plans.
+ // We only need matching key from the right side. Instead of broadcasting
+ // all right input data to single task, we can do a "partial" right semi join
+ // on in the right stage to extract only necessary rows.
+ return Build<TDqJoin>(ctx, join.Pos())
+ .LeftInput(leftConnection)
+ .LeftLabel(join.LeftLabel())
+ .RightInput(join.RightInput())
+ .RightLabel(join.RightLabel())
+ .JoinType().Build(joinType)
+ .JoinKeys(join.JoinKeys())
+ .Done();
}
TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx) {
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h
index 70a9d70082b..550410253d7 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.h
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.h
@@ -64,7 +64,7 @@ NNodes::TExprBase DqRewriteLengthOfStageOutput(NNodes::TExprBase node, TExprCont
NNodes::TExprBase DqRewriteRightJoinToLeft(const NNodes::TExprBase node, TExprContext& ctx);
-NNodes::TExprBase DqPushJoinToStage(const NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+NNodes::TExprBase DqRewriteLeftPureJoin(const NNodes::TExprBase node, TExprContext& ctx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx,
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
index 858e3da45a0..a31aa699f1a 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
@@ -36,7 +36,7 @@ public:
AddHandler(0, &TCoLength::Match, HNDL(RewriteLengthOfStageOutput));
AddHandler(0, &TCoExtendBase::Match, HNDL(BuildExtendStage));
AddHandler(0, &TDqJoin::Match, HNDL(RewriteRightJoinToLeft));
- AddHandler(0, &TDqJoin::Match, HNDL(PushJoinToStage<false>));
+ AddHandler(0, &TDqJoin::Match, HNDL(RewriteLeftPureJoin<false>));
AddHandler(0, &TDqJoin::Match, HNDL(BuildJoin<false>));
AddHandler(0, &TDqJoin::Match, HNDL(BuildJoinDict<false>));
AddHandler(0, &TCoAssumeSorted::Match, HNDL(BuildSortStage<false>));
@@ -56,7 +56,7 @@ public:
AddHandler(1, &TCoTopSort::Match, HNDL(BuildTopSortStage<true>));
AddHandler(1, &TCoSort::Match, HNDL(BuildSortStage<true>));
AddHandler(1, &TCoTake::Match, HNDL(BuildTakeOrTakeSkipStage<true>));
- AddHandler(1, &TDqJoin::Match, HNDL(PushJoinToStage<true>));
+ AddHandler(1, &TDqJoin::Match, HNDL(RewriteLeftPureJoin<true>));
AddHandler(1, &TDqJoin::Match, HNDL(BuildJoin<true>));
AddHandler(1, &TDqJoin::Match, HNDL(BuildJoinDict<true>));
AddHandler(1, &TCoAssumeSorted::Match, HNDL(BuildSortStage<true>));
@@ -266,8 +266,8 @@ protected:
}
template <bool IsGlobal>
- TMaybeNode<TExprBase> PushJoinToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
- return DqPushJoinToStage(node, ctx, optCtx, *getParents(), IsGlobal);
+ TMaybeNode<TExprBase> RewriteLeftPureJoin(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
+ return DqRewriteLeftPureJoin(node, ctx, *getParents(), IsGlobal);
}
template <bool IsGlobal>