aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@yandex-team.ru>2022-03-15 19:01:14 +0300
committeraozeritsky <aozeritsky@yandex-team.ru>2022-03-15 19:01:14 +0300
commit1f6055c82d8fe1111236baad5219e8213625193f (patch)
tree0d250d1ccbb61815bb3887be22a86ed655c5b7e3
parente8ec31e9ccd3284a9944548b44595992c8f5ffa4 (diff)
downloadydb-1f6055c82d8fe1111236baad5219e8213625193f.tar.gz
KIKIMR-14497: Use same join code for kqp and yql
ref:0c4ecec64b39a9a7638da916c9ed9a50124275e2
-rw-r--r--ydb/core/kqp/opt/CMakeLists.txt1
-rw-r--r--ydb/core/kqp/opt/kqp_opt_impl.h4
-rw-r--r--ydb/core/kqp/opt/kqp_opt_join.cpp118
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp2
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp100
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.h8
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.cpp40
7 files changed, 110 insertions, 163 deletions
diff --git a/ydb/core/kqp/opt/CMakeLists.txt b/ydb/core/kqp/opt/CMakeLists.txt
index f99e58d720a..18fb177fade 100644
--- a/ydb/core/kqp/opt/CMakeLists.txt
+++ b/ydb/core/kqp/opt/CMakeLists.txt
@@ -25,7 +25,6 @@ target_sources(core-kqp-opt PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/kqp_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/kqp_opt_effects.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/kqp_opt_join.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/kqp_opt_kql.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/kqp_opt_phase.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/kqp_opt_phy_check.cpp
diff --git a/ydb/core/kqp/opt/kqp_opt_impl.h b/ydb/core/kqp/opt/kqp_opt_impl.h
index 459742b12fa..6db7f968e9a 100644
--- a/ydb/core/kqp/opt/kqp_opt_impl.h
+++ b/ydb/core/kqp/opt/kqp_opt_impl.h
@@ -45,10 +45,6 @@ NYql::NNodes::TKqpTable BuildTableMeta(const NYql::TKikimrTableDescription& tabl
NYql::NNodes::TKqpTable BuildTableMeta(const NYql::TKikimrTableMetadata& tableMeta,
const NYql::TPositionHandle& pos, NYql::TExprContext& ctx);
-NYql::NNodes::TExprBase KqpBuildJoin(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx,
- const TKqpOptimizeContext& kqpCtx, NYql::IOptimizationContext& optCtx, const NYql::TParentsMap& parentsMap,
- bool allowStageMultiUsage);
-
TIntrusivePtr<NYql::TKikimrTableMetadata> GetIndexMetadata(const NYql::NNodes::TKqlReadTableIndex& index,
const NYql::TKikimrTablesData& tables, TStringBuf cluster);
diff --git a/ydb/core/kqp/opt/kqp_opt_join.cpp b/ydb/core/kqp/opt/kqp_opt_join.cpp
deleted file mode 100644
index 650dc64104c..00000000000
--- a/ydb/core/kqp/opt/kqp_opt_join.cpp
+++ /dev/null
@@ -1,118 +0,0 @@
-#include "kqp_opt_impl.h"
-
-#include <ydb/core/kqp/common/kqp_yql.h>
-
-#include <ydb/library/yql/dq/opt/dq_opt_phy.h>
-
-namespace NKikimr::NKqp::NOpt {
-
-using namespace NYql;
-using namespace NYql::NDq;
-using namespace NYql::NNodes;
-
-namespace {
-
-// left input should be DqCnUnionAll (with single usage)
-// right input should be either DqCnUnionAll (with single usage) or DqPure expression
-bool ValidateJoinInputs(const TExprBase& left, const TExprBase& right, const TParentsMap& parentsMap,
- bool allowStageMultiUsage)
-{
- if (!left.Maybe<TDqCnUnionAll>()) {
- return false;
- }
- if (!IsSingleConsumerConnection(left.Cast<TDqCnUnionAll>(), parentsMap, allowStageMultiUsage)) {
- return false;
- }
-
- if (right.Maybe<TDqCnUnionAll>()) {
- if (!IsSingleConsumerConnection(right.Cast<TDqCnUnionAll>(), parentsMap, allowStageMultiUsage)) {
- return false;
- }
- } else if (IsDqPureExpr(right, /* isPrecomputePure */ true)) {
- // pass
- } else {
- return false;
- }
-
- return true;
-}
-
-TMaybeNode<TDqJoin> FlipJoin(const TDqJoin& join, TExprContext& ctx) {
- auto joinType = join.JoinType().Value();
-
- if (joinType == "Inner"sv || joinType == "Full"sv || joinType == "Exclusion"sv || joinType == "Cross"sv) {
- // pass
- } else if (joinType == "Right"sv) {
- joinType = "Left"sv;
- } else if (joinType == "Left"sv) {
- joinType = "Right"sv;
- } else if (joinType == "RightSemi"sv) {
- joinType = "LeftSemi"sv;
- } else if (joinType == "LeftSemi"sv) {
- joinType = "RightSemi"sv;
- } else if (joinType == "RightOnly"sv) {
- joinType = "LeftOnly"sv;
- } else if (joinType == "LeftOnly"sv) {
- joinType = "RightOnly"sv;
- } else {
- return {};
- }
-
- auto joinKeysBuilder = Build<TDqJoinKeyTupleList>(ctx, join.Pos());
- for (const auto& keys : join.JoinKeys()) {
- joinKeysBuilder.Add<TDqJoinKeyTuple>()
- .LeftLabel(keys.RightLabel())
- .LeftColumn(keys.RightColumn())
- .RightLabel(keys.LeftLabel())
- .RightColumn(keys.LeftColumn())
- .Build();
- }
-
- return Build<TDqJoin>(ctx, join.Pos())
- .LeftInput(join.RightInput())
- .LeftLabel(join.RightLabel())
- .RightInput(join.LeftInput())
- .RightLabel(join.LeftLabel())
- .JoinType().Build(joinType)
- .JoinKeys(joinKeysBuilder.Done())
- .Done();
-}
-
-} // anonymous namespace
-
-TExprBase KqpBuildJoin(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx,
- IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage)
-{
- if (!node.Maybe<TDqJoin>()) {
- return node;
- }
-
- auto join = node.Cast<TDqJoin>();
-
- if (ValidateJoinInputs(join.LeftInput(), join.RightInput(), parentsMap, allowStageMultiUsage)) {
- // pass
- } else if (ValidateJoinInputs(join.RightInput(), join.LeftInput(), parentsMap, allowStageMultiUsage)) {
- auto maybeFlipJoin = FlipJoin(join, ctx);
- if (!maybeFlipJoin) {
- return node;
- }
- join = maybeFlipJoin.Cast();
- } else {
- return node;
- }
-
- auto joinType = join.JoinType().Value();
-
- if (joinType == "Full"sv || joinType == "Exclusion"sv) {
- return DqBuildJoinDict(join, ctx);
- }
-
- // NOTE: We don't want to broadcast table data via readsets for data queries, so we need to create a
- // separate stage to receive data from both sides of join.
- // TODO: We can push MapJoin to existing stage for data query, if it doesn't have table reads. This
- // requires some additional knowledge, probably with use of constraints.
- bool pushLeftStage = !kqpCtx.IsDataQuery();
- return DqBuildPhyJoin(join, pushLeftStage, ctx, optCtx);
-}
-
-} // namespace NKikimr::NKqp::NOpt
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index 66bc0482c02..d1c9065c818 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -229,7 +229,7 @@ protected:
TMaybeNode<TExprBase> BuildJoin(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
{
- TExprBase output = KqpBuildJoin(node, ctx, KqpCtx, optCtx, *getParents(), IsGlobal);
+ TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal, /*pushLeftStage =*/ !KqpCtx.IsDataQuery());
DumpAppliedRule("BuildJoin", node.Ptr(), output.Ptr(), ctx);
return output;
}
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index e29a22bffe6..11580284005 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -1639,4 +1639,104 @@ TExprBase DqBuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizati
return precompute;
}
+// left input should be DqCnUnionAll (with single usage)
+// right input should be either DqCnUnionAll (with single usage) or DqPure expression
+bool DqValidateJoinInputs(const TExprBase& left, const TExprBase& right, const TParentsMap& parentsMap,
+ bool allowStageMultiUsage)
+{
+ if (!left.Maybe<TDqCnUnionAll>()) {
+ return false;
+ }
+ if (!IsSingleConsumerConnection(left.Cast<TDqCnUnionAll>(), parentsMap, allowStageMultiUsage)) {
+ return false;
+ }
+
+ if (right.Maybe<TDqCnUnionAll>()) {
+ if (!IsSingleConsumerConnection(right.Cast<TDqCnUnionAll>(), parentsMap, allowStageMultiUsage)) {
+ return false;
+ }
+ } else if (IsDqPureExpr(right, /* isPrecomputePure */ true)) {
+ // pass
+ } else {
+ return false;
+ }
+
+ return true;
+}
+
+TMaybeNode<TDqJoin> DqFlipJoin(const TDqJoin& join, TExprContext& ctx) {
+ auto joinType = join.JoinType().Value();
+
+ if (joinType == "Inner"sv || joinType == "Full"sv || joinType == "Exclusion"sv || joinType == "Cross"sv) {
+ // pass
+ } else if (joinType == "Right"sv) {
+ joinType = "Left"sv;
+ } else if (joinType == "Left"sv) {
+ joinType = "Right"sv;
+ } else if (joinType == "RightSemi"sv) {
+ joinType = "LeftSemi"sv;
+ } else if (joinType == "LeftSemi"sv) {
+ joinType = "RightSemi"sv;
+ } else if (joinType == "RightOnly"sv) {
+ joinType = "LeftOnly"sv;
+ } else if (joinType == "LeftOnly"sv) {
+ joinType = "RightOnly"sv;
+ } else {
+ return {};
+ }
+
+ auto joinKeysBuilder = Build<TDqJoinKeyTupleList>(ctx, join.Pos());
+ for (const auto& keys : join.JoinKeys()) {
+ joinKeysBuilder.Add<TDqJoinKeyTuple>()
+ .LeftLabel(keys.RightLabel())
+ .LeftColumn(keys.RightColumn())
+ .RightLabel(keys.LeftLabel())
+ .RightColumn(keys.LeftColumn())
+ .Build();
+ }
+
+ return Build<TDqJoin>(ctx, join.Pos())
+ .LeftInput(join.RightInput())
+ .LeftLabel(join.RightLabel())
+ .RightInput(join.LeftInput())
+ .RightLabel(join.LeftLabel())
+ .JoinType().Build(joinType)
+ .JoinKeys(joinKeysBuilder.Done())
+ .Done();
+}
+
+TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage)
+{
+ if (!node.Maybe<TDqJoin>()) {
+ return node;
+ }
+
+ auto join = node.Cast<TDqJoin>();
+
+ if (DqValidateJoinInputs(join.LeftInput(), join.RightInput(), parentsMap, allowStageMultiUsage)) {
+ // pass
+ } else if (DqValidateJoinInputs(join.RightInput(), join.LeftInput(), parentsMap, allowStageMultiUsage)) {
+ auto maybeFlipJoin = DqFlipJoin(join, ctx);
+ if (!maybeFlipJoin) {
+ return node;
+ }
+ join = maybeFlipJoin.Cast();
+ } else {
+ return node;
+ }
+
+ auto joinType = join.JoinType().Value();
+
+ if (joinType == "Full"sv || joinType == "Exclusion"sv) {
+ return DqBuildJoinDict(join, ctx);
+ }
+
+ // NOTE: We don't want to broadcast table data via readsets for data queries, so we need to create a
+ // separate stage to receive data from both sides of join.
+ // TODO: We can push MapJoin to existing stage for data query, if it doesn't have table reads. This
+ // requires some additional knowledge, probably with use of constraints.
+ return DqBuildPhyJoin(join, pushLeftStage, ctx, optCtx);
+}
+
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h
index 4a9ac46701e..4ba60f7b866 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.h
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.h
@@ -70,6 +70,14 @@ NNodes::TExprBase DqRewriteLeftPureJoin(const NNodes::TExprBase node, TExprConte
NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx,
IOptimizationContext& optCtx);
+NNodes::TExprBase DqBuildJoin(const NNodes::TExprBase& node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage);
+
+bool DqValidateJoinInputs(
+ const NNodes::TExprBase& left, const NNodes::TExprBase& right, const TParentsMap& parentsMap,
+ bool allowStageMultiUsage);
+
+NNodes::TMaybeNode<NNodes::TDqJoin> DqFlipJoin(const NNodes::TDqJoin& join, TExprContext& ctx);
+
NNodes::TExprBase DqBuildJoinDict(const NNodes::TDqJoin& join, TExprContext& ctx);
TMaybe<std::pair<NNodes::TExprBase, NNodes::TDqConnection>> ExtractPureExprStage(TExprNode::TPtr input,
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
index 697c4a35859..cfc82027263 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
@@ -38,7 +38,6 @@ public:
AddHandler(0, &TDqJoin::Match, HNDL(RewriteRightJoinToLeft));
AddHandler(0, &TDqJoin::Match, HNDL(RewriteLeftPureJoin<false>));
AddHandler(0, &TDqJoin::Match, HNDL(BuildJoin<false>));
- AddHandler(0, &TDqJoin::Match, HNDL(BuildJoinDict<false>));
AddHandler(0, &TCoAssumeSorted::Match, HNDL(BuildSortStage<false>));
AddHandler(0, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<false>));
AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>));
@@ -58,7 +57,6 @@ public:
AddHandler(1, &TCoTake::Match, HNDL(BuildTakeOrTakeSkipStage<true>));
AddHandler(1, &TDqJoin::Match, HNDL(RewriteLeftPureJoin<true>));
AddHandler(1, &TDqJoin::Match, HNDL(BuildJoin<true>));
- AddHandler(1, &TDqJoin::Match, HNDL(BuildJoinDict<true>));
AddHandler(1, &TCoAssumeSorted::Match, HNDL(BuildSortStage<true>));
AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>));
AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>));
@@ -295,24 +293,7 @@ protected:
TMaybeNode<TExprBase> BuildJoin(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
auto join = node.Cast<TDqJoin>();
const TParentsMap* parentsMap = getParents();
- if (!JoinPrerequisitesVerify(join, parentsMap, IsGlobal)) {
- return node;
- }
-
- return DqBuildPhyJoin(join, false /* TODO */, ctx, optCtx);
- }
-
- template <bool IsGlobal>
- TMaybeNode<TExprBase> BuildJoinDict(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
- Y_UNUSED(optCtx);
-
- auto join = node.Cast<TDqJoin>();
- const TParentsMap* parentsMap = getParents();
- if (!JoinPrerequisitesVerify(join, parentsMap, IsGlobal)) {
- return node;
- }
-
- return DqBuildJoinDict(join, ctx); // , optCtx);
+ return DqBuildJoin(join, ctx, optCtx, *parentsMap, IsGlobal, /* pushLeftStage = */ false /* TODO */);
}
TMaybeNode<TExprBase> BuildHasItems(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx) {
@@ -322,25 +303,6 @@ protected:
TMaybeNode<TExprBase> BuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx) {
return DqBuildScalarPrecompute(node, ctx, optCtx);
}
-
-private:
- bool JoinPrerequisitesVerify(TDqJoin join, const TParentsMap* parentsMap, bool isGlobal) const {
- // KqpBuildJoin copy/paste
- if (!join.LeftInput().Maybe<TDqCnUnionAll>()) {
- return false;
- }
- if (!join.RightInput().Maybe<TDqCnUnionAll>()) {
- return false;
- }
-
- if (!IsSingleConsumerConnection(join.LeftInput().Cast<TDqCnUnionAll>(), *parentsMap, isGlobal)) {
- return false;
- }
- if (!IsSingleConsumerConnection(join.RightInput().Cast<TDqCnUnionAll>(), *parentsMap, isGlobal)) {
- return false;
- }
- return true;
- }
};
THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx) {