diff options
author | Sergei Puchin <s.puchin@gmail.com> | 2022-03-13 15:41:34 +0300 |
---|---|---|
committer | Sergei Puchin <s.puchin@gmail.com> | 2022-03-13 15:41:34 +0300 |
commit | 6d8a80e71c596adbc9148eb3cf285ef131549c2e (patch) | |
tree | 8b05b19b8adbd96c8ed6a8c3bb17c3008f37ff67 | |
parent | 8b6343c5c3c0146152e6d6c176923dc2ae923f70 (diff) | |
download | ydb-6d8a80e71c596adbc9148eb3cf285ef131549c2e.tar.gz |
Fix DqPushJoinToStage optimizer rule. (KIKIMR-14490)
ref:58c9b2b207c29d6d95293743b749ba298a67b387
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_join_ut.cpp | 38 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_join.cpp | 74 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/opt/physical_optimize.cpp | 8 |
5 files changed, 85 insertions, 48 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 11e1af7ae87..1576f97467f 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -46,7 +46,7 @@ public: AddHandler(0, &TCoLength::Match, HNDL(RewriteLengthOfStageOutput)); AddHandler(0, &TCoExtendBase::Match, HNDL(BuildExtendStage)); AddHandler(0, &TDqJoin::Match, HNDL(RewriteRightJoinToLeft)); - AddHandler(0, &TDqJoin::Match, HNDL(PushJoinToStage<false>)); + AddHandler(0, &TDqJoin::Match, HNDL(RewriteLeftPureJoin<false>)); AddHandler(0, &TDqJoin::Match, HNDL(BuildJoin<false>)); AddHandler(0, &TDqPrecompute::Match, HNDL(BuildPrecomputeStage)); AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>)); @@ -69,7 +69,7 @@ public: AddHandler(1, &TCoTake::Match, HNDL(BuildTakeSkipStage<true>)); AddHandler(1, &TCoSortBase::Match, HNDL(BuildSortStage<true>)); AddHandler(1, &TCoTake::Match, HNDL(BuildTakeStage<true>)); - AddHandler(1, &TDqJoin::Match, HNDL(PushJoinToStage<true>)); + AddHandler(1, &TDqJoin::Match, HNDL(RewriteLeftPureJoin<true>)); AddHandler(1, &TDqJoin::Match, HNDL(BuildJoin<true>)); AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>)); AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>)); @@ -217,11 +217,10 @@ protected: } template <bool IsGlobal> - TMaybeNode<TExprBase> PushJoinToStage(TExprBase node, TExprContext& ctx, - IOptimizationContext& optCtx, const TGetParents& getParents) + TMaybeNode<TExprBase> RewriteLeftPureJoin(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { - TExprBase output = DqPushJoinToStage(node, ctx, optCtx, *getParents(), IsGlobal); - DumpAppliedRule("PushJoinToStage", node.Ptr(), output.Ptr(), ctx); + TExprBase output = DqRewriteLeftPureJoin(node, ctx, *getParents(), IsGlobal); + DumpAppliedRule("RewriteLeftPureJoin", node.Ptr(), output.Ptr(), ctx); return output; } diff --git a/ydb/core/kqp/ut/kqp_join_ut.cpp b/ydb/core/kqp/ut/kqp_join_ut.cpp index 8fa59d31b36..f934b6d88dd 100644 --- a/ydb/core/kqp/ut/kqp_join_ut.cpp +++ b/ydb/core/kqp/ut/kqp_join_ut.cpp @@ -977,6 +977,44 @@ Y_UNIT_TEST_SUITE(KqpJoin) { ])", FormatResultSetYson(result.GetResultSet(0))); } + Y_UNIT_TEST_NEW_ENGINE(JoinDupColumnRightPure) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + CreateSampleTables(session); + + auto params = TParamsBuilder() + .AddParam("$rows") + .BeginList() + .AddListItem() + .BeginStruct() + .AddMember("Key").Int32(1) + .AddMember("Fk21").Int32(101) + .EndStruct() + .AddListItem() + .BeginStruct() + .AddMember("Key").Int32(2) + .AddMember("Fk21").Int32(102) + .EndStruct() + .EndList().Build() + .Build(); + + auto result = session.ExecuteDataQuery(Q1_(R"( + DECLARE $rows AS List<Struct<Key: Int32, Fk21: Int32>>; + + SELECT Key, Key1, Key2 + FROM AS_TABLE($rows) AS t1 + LEFT JOIN Join1_2 AS t2 ON t1.Key = t2.Key1 AND t1.Fk21 = t2.Key1 + ORDER BY Key; + )"), TTxControl::BeginTx().CommitTx(), params).GetValueSync(); + + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([ + [1;#;#]; + [2;#;#] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + Y_UNIT_TEST_NEW_ENGINE(JoinLeftPureInner) { TKikimrRunner kikimr; auto db = kikimr.GetTableClient(); diff --git a/ydb/library/yql/dq/opt/dq_opt_join.cpp b/ydb/library/yql/dq/opt/dq_opt_join.cpp index 4097b49e24a..548ad89c9a1 100644 --- a/ydb/library/yql/dq/opt/dq_opt_join.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_join.cpp @@ -410,8 +410,8 @@ TExprBase DqRewriteRightJoinToLeft(const TExprBase node, TExprContext& ctx) { .Done(); } -TExprBase DqPushJoinToStage(const TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, - const TParentsMap& parentsMap, bool allowStageMultiUsage) +TExprBase DqRewriteLeftPureJoin(const TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap, + bool allowStageMultiUsage) { if (!node.Maybe<TDqJoin>()) { return node; @@ -419,57 +419,57 @@ TExprBase DqPushJoinToStage(const TExprBase node, TExprContext& ctx, IOptimizati auto join = node.Cast<TDqJoin>(); - static const std::set<std::string_view> supportedTypes = {"Inner"sv, "Left"sv, "LeftOnly"sv, "LeftSemi"sv}; + static const std::set<std::string_view> supportedTypes = {"Left"sv, "LeftOnly"sv, "LeftSemi"sv}; auto joinType = join.JoinType().Value(); if (!supportedTypes.contains(joinType)) { return node; } - TMaybeNode<TDqCnUnionAll> connection; - bool pushLeft = false; - if (join.LeftInput().Maybe<TDqCnUnionAll>() && IsDqPureExpr(join.RightInput())) { - connection = join.LeftInput().Cast<TDqCnUnionAll>(); - pushLeft = true; - } - if (join.RightInput().Maybe<TDqCnUnionAll>() && IsDqPureExpr(join.LeftInput())) { - connection = join.RightInput().Cast<TDqCnUnionAll>(); - pushLeft = false; - } - - if (!connection) { + if (!join.RightInput().Maybe<TDqCnUnionAll>()) { return node; } - if (!IsSingleConsumerConnection(connection.Cast(), parentsMap, allowStageMultiUsage)) { + if (!IsDqPureExpr(join.LeftInput())) { return node; } - TCoArgument inputArg{ctx.NewArgument(join.Pos(), "_dq_join_input")}; + auto rightConnection = join.RightInput().Cast<TDqCnUnionAll>(); - auto makeFlow = [&ctx](const TExprBase& list) { - return Build<TCoToFlow>(ctx, list.Pos()) - .Input(list) - .Done(); - }; - - auto phyJoin = DqMakePhyMapJoin( - join, - pushLeft ? TExprBase(inputArg) : makeFlow(join.LeftInput()), - pushLeft ? makeFlow(join.RightInput()) : TExprBase(inputArg), - ctx); + if (!IsSingleConsumerConnection(rightConnection, parentsMap, allowStageMultiUsage)) { + return node; + } - auto lambda = Build<TCoLambda>(ctx, join.Pos()) - .Args({inputArg}) - .Body(phyJoin) + auto leftStage = Build<TDqStage>(ctx, join.Pos()) + .Inputs() + .Build() + .Program() + .Args({}) + .Body<TCoToFlow>() + .Input(join.LeftInput()) + .Build() + .Build() + .Settings(TDqStageSettings().BuildNode(ctx, join.Pos())) .Done(); - auto result = DqPushLambdaToStageUnionAll(connection.Cast(), lambda, {}, ctx, optCtx); - if (!result) { - return node; - } + auto leftConnection = Build<TDqCnUnionAll>(ctx, join.Pos()) + .Output() + .Stage(leftStage) + .Index().Build("0") + .Build() + .Done(); - YQL_ENSURE(result.Maybe<TDqCnUnionAll>()); - return result.Cast(); + // TODO: Right input might be large, there are better possible physical plans. + // We only need matching key from the right side. Instead of broadcasting + // all right input data to single task, we can do a "partial" right semi join + // on in the right stage to extract only necessary rows. + return Build<TDqJoin>(ctx, join.Pos()) + .LeftInput(leftConnection) + .LeftLabel(join.LeftLabel()) + .RightInput(join.RightInput()) + .RightLabel(join.RightLabel()) + .JoinType().Build(joinType) + .JoinKeys(join.JoinKeys()) + .Done(); } TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx) { diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index 70a9d70082b..550410253d7 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -64,7 +64,7 @@ NNodes::TExprBase DqRewriteLengthOfStageOutput(NNodes::TExprBase node, TExprCont NNodes::TExprBase DqRewriteRightJoinToLeft(const NNodes::TExprBase node, TExprContext& ctx); -NNodes::TExprBase DqPushJoinToStage(const NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, +NNodes::TExprBase DqRewriteLeftPureJoin(const NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap, bool allowStageMultiUsage = true); NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx, diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index 858e3da45a0..a31aa699f1a 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -36,7 +36,7 @@ public: AddHandler(0, &TCoLength::Match, HNDL(RewriteLengthOfStageOutput)); AddHandler(0, &TCoExtendBase::Match, HNDL(BuildExtendStage)); AddHandler(0, &TDqJoin::Match, HNDL(RewriteRightJoinToLeft)); - AddHandler(0, &TDqJoin::Match, HNDL(PushJoinToStage<false>)); + AddHandler(0, &TDqJoin::Match, HNDL(RewriteLeftPureJoin<false>)); AddHandler(0, &TDqJoin::Match, HNDL(BuildJoin<false>)); AddHandler(0, &TDqJoin::Match, HNDL(BuildJoinDict<false>)); AddHandler(0, &TCoAssumeSorted::Match, HNDL(BuildSortStage<false>)); @@ -56,7 +56,7 @@ public: AddHandler(1, &TCoTopSort::Match, HNDL(BuildTopSortStage<true>)); AddHandler(1, &TCoSort::Match, HNDL(BuildSortStage<true>)); AddHandler(1, &TCoTake::Match, HNDL(BuildTakeOrTakeSkipStage<true>)); - AddHandler(1, &TDqJoin::Match, HNDL(PushJoinToStage<true>)); + AddHandler(1, &TDqJoin::Match, HNDL(RewriteLeftPureJoin<true>)); AddHandler(1, &TDqJoin::Match, HNDL(BuildJoin<true>)); AddHandler(1, &TDqJoin::Match, HNDL(BuildJoinDict<true>)); AddHandler(1, &TCoAssumeSorted::Match, HNDL(BuildSortStage<true>)); @@ -266,8 +266,8 @@ protected: } template <bool IsGlobal> - TMaybeNode<TExprBase> PushJoinToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { - return DqPushJoinToStage(node, ctx, optCtx, *getParents(), IsGlobal); + TMaybeNode<TExprBase> RewriteLeftPureJoin(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { + return DqRewriteLeftPureJoin(node, ctx, *getParents(), IsGlobal); } template <bool IsGlobal> |