diff options
author | aozeritsky <aozeritsky@yandex-team.ru> | 2022-03-15 19:01:14 +0300 |
---|---|---|
committer | aozeritsky <aozeritsky@yandex-team.ru> | 2022-03-15 19:01:14 +0300 |
commit | 1f6055c82d8fe1111236baad5219e8213625193f (patch) | |
tree | 0d250d1ccbb61815bb3887be22a86ed655c5b7e3 | |
parent | e8ec31e9ccd3284a9944548b44595992c8f5ffa4 (diff) | |
download | ydb-1f6055c82d8fe1111236baad5219e8213625193f.tar.gz |
KIKIMR-14497: Use same join code for kqp and yql
ref:0c4ecec64b39a9a7638da916c9ed9a50124275e2
-rw-r--r-- | ydb/core/kqp/opt/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_impl.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_join.cpp | 118 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 100 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.h | 8 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/opt/physical_optimize.cpp | 40 |
7 files changed, 110 insertions, 163 deletions
diff --git a/ydb/core/kqp/opt/CMakeLists.txt b/ydb/core/kqp/opt/CMakeLists.txt index f99e58d720a..18fb177fade 100644 --- a/ydb/core/kqp/opt/CMakeLists.txt +++ b/ydb/core/kqp/opt/CMakeLists.txt @@ -25,7 +25,6 @@ target_sources(core-kqp-opt PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/kqp_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/kqp_opt_build_txs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/kqp_opt_effects.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/kqp_opt_join.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/kqp_opt_kql.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/kqp_opt_phase.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/kqp_opt_phy_check.cpp diff --git a/ydb/core/kqp/opt/kqp_opt_impl.h b/ydb/core/kqp/opt/kqp_opt_impl.h index 459742b12fa..6db7f968e9a 100644 --- a/ydb/core/kqp/opt/kqp_opt_impl.h +++ b/ydb/core/kqp/opt/kqp_opt_impl.h @@ -45,10 +45,6 @@ NYql::NNodes::TKqpTable BuildTableMeta(const NYql::TKikimrTableDescription& tabl NYql::NNodes::TKqpTable BuildTableMeta(const NYql::TKikimrTableMetadata& tableMeta, const NYql::TPositionHandle& pos, NYql::TExprContext& ctx); -NYql::NNodes::TExprBase KqpBuildJoin(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx, - const TKqpOptimizeContext& kqpCtx, NYql::IOptimizationContext& optCtx, const NYql::TParentsMap& parentsMap, - bool allowStageMultiUsage); - TIntrusivePtr<NYql::TKikimrTableMetadata> GetIndexMetadata(const NYql::NNodes::TKqlReadTableIndex& index, const NYql::TKikimrTablesData& tables, TStringBuf cluster); diff --git a/ydb/core/kqp/opt/kqp_opt_join.cpp b/ydb/core/kqp/opt/kqp_opt_join.cpp deleted file mode 100644 index 650dc64104c..00000000000 --- a/ydb/core/kqp/opt/kqp_opt_join.cpp +++ /dev/null @@ -1,118 +0,0 @@ -#include "kqp_opt_impl.h" - -#include <ydb/core/kqp/common/kqp_yql.h> - -#include <ydb/library/yql/dq/opt/dq_opt_phy.h> - -namespace NKikimr::NKqp::NOpt { - -using namespace NYql; -using namespace NYql::NDq; -using namespace NYql::NNodes; - -namespace { - -// left input should be DqCnUnionAll (with single usage) -// right input should be either DqCnUnionAll (with single usage) or DqPure expression -bool ValidateJoinInputs(const TExprBase& left, const TExprBase& right, const TParentsMap& parentsMap, - bool allowStageMultiUsage) -{ - if (!left.Maybe<TDqCnUnionAll>()) { - return false; - } - if (!IsSingleConsumerConnection(left.Cast<TDqCnUnionAll>(), parentsMap, allowStageMultiUsage)) { - return false; - } - - if (right.Maybe<TDqCnUnionAll>()) { - if (!IsSingleConsumerConnection(right.Cast<TDqCnUnionAll>(), parentsMap, allowStageMultiUsage)) { - return false; - } - } else if (IsDqPureExpr(right, /* isPrecomputePure */ true)) { - // pass - } else { - return false; - } - - return true; -} - -TMaybeNode<TDqJoin> FlipJoin(const TDqJoin& join, TExprContext& ctx) { - auto joinType = join.JoinType().Value(); - - if (joinType == "Inner"sv || joinType == "Full"sv || joinType == "Exclusion"sv || joinType == "Cross"sv) { - // pass - } else if (joinType == "Right"sv) { - joinType = "Left"sv; - } else if (joinType == "Left"sv) { - joinType = "Right"sv; - } else if (joinType == "RightSemi"sv) { - joinType = "LeftSemi"sv; - } else if (joinType == "LeftSemi"sv) { - joinType = "RightSemi"sv; - } else if (joinType == "RightOnly"sv) { - joinType = "LeftOnly"sv; - } else if (joinType == "LeftOnly"sv) { - joinType = "RightOnly"sv; - } else { - return {}; - } - - auto joinKeysBuilder = Build<TDqJoinKeyTupleList>(ctx, join.Pos()); - for (const auto& keys : join.JoinKeys()) { - joinKeysBuilder.Add<TDqJoinKeyTuple>() - .LeftLabel(keys.RightLabel()) - .LeftColumn(keys.RightColumn()) - .RightLabel(keys.LeftLabel()) - .RightColumn(keys.LeftColumn()) - .Build(); - } - - return Build<TDqJoin>(ctx, join.Pos()) - .LeftInput(join.RightInput()) - .LeftLabel(join.RightLabel()) - .RightInput(join.LeftInput()) - .RightLabel(join.LeftLabel()) - .JoinType().Build(joinType) - .JoinKeys(joinKeysBuilder.Done()) - .Done(); -} - -} // anonymous namespace - -TExprBase KqpBuildJoin(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, - IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage) -{ - if (!node.Maybe<TDqJoin>()) { - return node; - } - - auto join = node.Cast<TDqJoin>(); - - if (ValidateJoinInputs(join.LeftInput(), join.RightInput(), parentsMap, allowStageMultiUsage)) { - // pass - } else if (ValidateJoinInputs(join.RightInput(), join.LeftInput(), parentsMap, allowStageMultiUsage)) { - auto maybeFlipJoin = FlipJoin(join, ctx); - if (!maybeFlipJoin) { - return node; - } - join = maybeFlipJoin.Cast(); - } else { - return node; - } - - auto joinType = join.JoinType().Value(); - - if (joinType == "Full"sv || joinType == "Exclusion"sv) { - return DqBuildJoinDict(join, ctx); - } - - // NOTE: We don't want to broadcast table data via readsets for data queries, so we need to create a - // separate stage to receive data from both sides of join. - // TODO: We can push MapJoin to existing stage for data query, if it doesn't have table reads. This - // requires some additional knowledge, probably with use of constraints. - bool pushLeftStage = !kqpCtx.IsDataQuery(); - return DqBuildPhyJoin(join, pushLeftStage, ctx, optCtx); -} - -} // namespace NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 66bc0482c02..d1c9065c818 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -229,7 +229,7 @@ protected: TMaybeNode<TExprBase> BuildJoin(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { - TExprBase output = KqpBuildJoin(node, ctx, KqpCtx, optCtx, *getParents(), IsGlobal); + TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal, /*pushLeftStage =*/ !KqpCtx.IsDataQuery()); DumpAppliedRule("BuildJoin", node.Ptr(), output.Ptr(), ctx); return output; } diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index e29a22bffe6..11580284005 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -1639,4 +1639,104 @@ TExprBase DqBuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizati return precompute; } +// left input should be DqCnUnionAll (with single usage) +// right input should be either DqCnUnionAll (with single usage) or DqPure expression +bool DqValidateJoinInputs(const TExprBase& left, const TExprBase& right, const TParentsMap& parentsMap, + bool allowStageMultiUsage) +{ + if (!left.Maybe<TDqCnUnionAll>()) { + return false; + } + if (!IsSingleConsumerConnection(left.Cast<TDqCnUnionAll>(), parentsMap, allowStageMultiUsage)) { + return false; + } + + if (right.Maybe<TDqCnUnionAll>()) { + if (!IsSingleConsumerConnection(right.Cast<TDqCnUnionAll>(), parentsMap, allowStageMultiUsage)) { + return false; + } + } else if (IsDqPureExpr(right, /* isPrecomputePure */ true)) { + // pass + } else { + return false; + } + + return true; +} + +TMaybeNode<TDqJoin> DqFlipJoin(const TDqJoin& join, TExprContext& ctx) { + auto joinType = join.JoinType().Value(); + + if (joinType == "Inner"sv || joinType == "Full"sv || joinType == "Exclusion"sv || joinType == "Cross"sv) { + // pass + } else if (joinType == "Right"sv) { + joinType = "Left"sv; + } else if (joinType == "Left"sv) { + joinType = "Right"sv; + } else if (joinType == "RightSemi"sv) { + joinType = "LeftSemi"sv; + } else if (joinType == "LeftSemi"sv) { + joinType = "RightSemi"sv; + } else if (joinType == "RightOnly"sv) { + joinType = "LeftOnly"sv; + } else if (joinType == "LeftOnly"sv) { + joinType = "RightOnly"sv; + } else { + return {}; + } + + auto joinKeysBuilder = Build<TDqJoinKeyTupleList>(ctx, join.Pos()); + for (const auto& keys : join.JoinKeys()) { + joinKeysBuilder.Add<TDqJoinKeyTuple>() + .LeftLabel(keys.RightLabel()) + .LeftColumn(keys.RightColumn()) + .RightLabel(keys.LeftLabel()) + .RightColumn(keys.LeftColumn()) + .Build(); + } + + return Build<TDqJoin>(ctx, join.Pos()) + .LeftInput(join.RightInput()) + .LeftLabel(join.RightLabel()) + .RightInput(join.LeftInput()) + .RightLabel(join.LeftLabel()) + .JoinType().Build(joinType) + .JoinKeys(joinKeysBuilder.Done()) + .Done(); +} + +TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage) +{ + if (!node.Maybe<TDqJoin>()) { + return node; + } + + auto join = node.Cast<TDqJoin>(); + + if (DqValidateJoinInputs(join.LeftInput(), join.RightInput(), parentsMap, allowStageMultiUsage)) { + // pass + } else if (DqValidateJoinInputs(join.RightInput(), join.LeftInput(), parentsMap, allowStageMultiUsage)) { + auto maybeFlipJoin = DqFlipJoin(join, ctx); + if (!maybeFlipJoin) { + return node; + } + join = maybeFlipJoin.Cast(); + } else { + return node; + } + + auto joinType = join.JoinType().Value(); + + if (joinType == "Full"sv || joinType == "Exclusion"sv) { + return DqBuildJoinDict(join, ctx); + } + + // NOTE: We don't want to broadcast table data via readsets for data queries, so we need to create a + // separate stage to receive data from both sides of join. + // TODO: We can push MapJoin to existing stage for data query, if it doesn't have table reads. This + // requires some additional knowledge, probably with use of constraints. + return DqBuildPhyJoin(join, pushLeftStage, ctx, optCtx); +} + } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index 4a9ac46701e..4ba60f7b866 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -70,6 +70,14 @@ NNodes::TExprBase DqRewriteLeftPureJoin(const NNodes::TExprBase node, TExprConte NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx); +NNodes::TExprBase DqBuildJoin(const NNodes::TExprBase& node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage); + +bool DqValidateJoinInputs( + const NNodes::TExprBase& left, const NNodes::TExprBase& right, const TParentsMap& parentsMap, + bool allowStageMultiUsage); + +NNodes::TMaybeNode<NNodes::TDqJoin> DqFlipJoin(const NNodes::TDqJoin& join, TExprContext& ctx); + NNodes::TExprBase DqBuildJoinDict(const NNodes::TDqJoin& join, TExprContext& ctx); TMaybe<std::pair<NNodes::TExprBase, NNodes::TDqConnection>> ExtractPureExprStage(TExprNode::TPtr input, diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index 697c4a35859..cfc82027263 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -38,7 +38,6 @@ public: AddHandler(0, &TDqJoin::Match, HNDL(RewriteRightJoinToLeft)); AddHandler(0, &TDqJoin::Match, HNDL(RewriteLeftPureJoin<false>)); AddHandler(0, &TDqJoin::Match, HNDL(BuildJoin<false>)); - AddHandler(0, &TDqJoin::Match, HNDL(BuildJoinDict<false>)); AddHandler(0, &TCoAssumeSorted::Match, HNDL(BuildSortStage<false>)); AddHandler(0, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<false>)); AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>)); @@ -58,7 +57,6 @@ public: AddHandler(1, &TCoTake::Match, HNDL(BuildTakeOrTakeSkipStage<true>)); AddHandler(1, &TDqJoin::Match, HNDL(RewriteLeftPureJoin<true>)); AddHandler(1, &TDqJoin::Match, HNDL(BuildJoin<true>)); - AddHandler(1, &TDqJoin::Match, HNDL(BuildJoinDict<true>)); AddHandler(1, &TCoAssumeSorted::Match, HNDL(BuildSortStage<true>)); AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>)); AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>)); @@ -295,24 +293,7 @@ protected: TMaybeNode<TExprBase> BuildJoin(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { auto join = node.Cast<TDqJoin>(); const TParentsMap* parentsMap = getParents(); - if (!JoinPrerequisitesVerify(join, parentsMap, IsGlobal)) { - return node; - } - - return DqBuildPhyJoin(join, false /* TODO */, ctx, optCtx); - } - - template <bool IsGlobal> - TMaybeNode<TExprBase> BuildJoinDict(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { - Y_UNUSED(optCtx); - - auto join = node.Cast<TDqJoin>(); - const TParentsMap* parentsMap = getParents(); - if (!JoinPrerequisitesVerify(join, parentsMap, IsGlobal)) { - return node; - } - - return DqBuildJoinDict(join, ctx); // , optCtx); + return DqBuildJoin(join, ctx, optCtx, *parentsMap, IsGlobal, /* pushLeftStage = */ false /* TODO */); } TMaybeNode<TExprBase> BuildHasItems(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx) { @@ -322,25 +303,6 @@ protected: TMaybeNode<TExprBase> BuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx) { return DqBuildScalarPrecompute(node, ctx, optCtx); } - -private: - bool JoinPrerequisitesVerify(TDqJoin join, const TParentsMap* parentsMap, bool isGlobal) const { - // KqpBuildJoin copy/paste - if (!join.LeftInput().Maybe<TDqCnUnionAll>()) { - return false; - } - if (!join.RightInput().Maybe<TDqCnUnionAll>()) { - return false; - } - - if (!IsSingleConsumerConnection(join.LeftInput().Cast<TDqCnUnionAll>(), *parentsMap, isGlobal)) { - return false; - } - if (!IsSingleConsumerConnection(join.RightInput().Cast<TDqCnUnionAll>(), *parentsMap, isGlobal)) { - return false; - } - return true; - } }; THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx) { |