aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormaxkovalev <maxkovalev@yandex-team.com>2023-10-03 19:19:29 +0300
committermaxkovalev <maxkovalev@yandex-team.com>2023-10-03 19:48:32 +0300
commitc850b4710e1c140fc5bbe23670ee03964f08987a (patch)
tree55b726367d84729de3918b3f58c8b3e5e19cb1df
parenta14e6df4ad207375e3b08b57c0355f94bca2532f (diff)
downloadydb-c850b4710e1c140fc5bbe23670ee03964f08987a.tar.gz
YQL-15441: Add force merge join
YQL-15441: Add force merge join
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp24
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp2
-rw-r--r--ydb/library/yql/tests/sql/suites/join/force_merge_join.sql9
3 files changed, 23 insertions, 12 deletions
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp
index 2447f1a9f6..1007638a52 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp
@@ -2978,12 +2978,6 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
const auto joinType = op.JoinKind->Content();
const auto disableOptimizers = state->Configuration->DisableOptimizers.Get().GetOrElse(TSet<TString>());
- const unsigned readyCount = unsigned(leftTables.Defined()) + rightTables.Defined();
- if (!readyCount) {
- return TStatus::Repeat;
- } else if (readyCount == 1 && (disableOptimizers.contains("EarlyMapJoin") || joinType == "Cross")) {
- return TStatus::Repeat;
- }
bool empty = false;
if (leftTables.Defined()
@@ -3012,6 +3006,11 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
}
const bool isCross = joinType == "Cross";
+ const unsigned readyCount = unsigned(leftTables.Defined()) + rightTables.Defined();
+ if (isCross && readyCount < 2) {
+ return TStatus::Repeat;
+ }
+
auto leftJoinKeys = BuildJoinKeys(labels.Inputs[0], *op.LeftLabel);
auto rightJoinKeys = BuildJoinKeys(labels.Inputs[1], *op.RightLabel);
auto leftJoinKeyList = BuildJoinKeyList(labels.Inputs[0], *op.LeftLabel);
@@ -3050,6 +3049,9 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
forceMergeJoin = true;
}
}
+ if (!readyCount && !forceMergeJoin) {
+ return TStatus::Repeat;
+ }
auto cluster = TString{equiJoin.DataSink().Cluster().Value()};
@@ -3057,7 +3059,7 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
TJoinSideStats leftStats;
TJoinSideStats rightStats;
- const bool allowLookupJoin = !isCross && leftTables.Defined() && rightTables.Defined();
+ const bool allowLookupJoin = !isCross && leftTables.Defined() && rightTables.Defined() && !forceMergeJoin;
if (allowLookupJoin) {
auto status = CollectStatsAndMapJoinSettings(ESizeStatCollectMode::RawSize, mapSettings, leftStats, rightStats,
leftTables, leftJoinKeys, rightTables, rightJoinKeys,
@@ -3126,10 +3128,10 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
}
}
- {
+ if (!forceMergeJoin) {
auto status = CollectStatsAndMapJoinSettings(ESizeStatCollectMode::ColumnarSize, mapSettings, leftStats, rightStats,
- leftTables, leftJoinKeys, rightTables, rightJoinKeys,
- &leftLeaf, &rightLeaf, *state, isCross, cluster, ctx);
+ leftTables, leftJoinKeys, rightTables, rightJoinKeys,
+ &leftLeaf, &rightLeaf, *state, isCross, cluster, ctx);
if (status.Level != TStatus::Ok) {
return (status.Level == TStatus::Repeat) ? TStatus::Ok : status;
}
@@ -3148,7 +3150,7 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo
<< JoinSeq(",", leftStats.SortedKeys) << "], right sorted prefix: ["
<< JoinSeq(",", rightStats.SortedKeys) << "]";
- bool allowOrderedJoin = !isCross && leftTables.Defined() && rightTables.Defined();
+ bool allowOrderedJoin = !isCross && ((leftTables.Defined() && rightTables.Defined()) || forceMergeJoin);
TMergeJoinSortInfo sortInfo;
sortInfo.LeftSortedKeys = leftStats.SortedKeys;
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp
index 09ac0d0793..a45050a92d 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp
@@ -89,6 +89,7 @@ public:
AddHandler(0, &TYtDqWrite::Match, HNDL(YtDqWrite));
AddHandler(0, &TYtDqProcessWrite::Match, HNDL(YtDqProcessWrite));
AddHandler(0, &TYtTransientOpBase::Match, HNDL(ZeroSample));
+ AddHandler(0, &TYtEquiJoin::Match, HNDL(RuntimeEquiJoin));
AddHandler(1, &TYtMap::Match, HNDL(FuseInnerMap));
AddHandler(1, &TYtMap::Match, HNDL(FuseOuterMap));
@@ -109,7 +110,6 @@ public:
AddHandler(1, &TYtWithUserJobsOpBase::Match, HNDL(EmbedLimit));
AddHandler(1, &TYtMerge::Match, HNDL(PushMergeLimitToInput));
- AddHandler(2, &TYtEquiJoin::Match, HNDL(RuntimeEquiJoin));
AddHandler(2, &TStatWriteTable::Match, HNDL(ReplaceStatWriteTable));
AddHandler(2, &TYtMap::Match, HNDL(MapToMerge));
AddHandler(2, &TYtPublish::Match, HNDL(UnorderedPublishTarget));
diff --git a/ydb/library/yql/tests/sql/suites/join/force_merge_join.sql b/ydb/library/yql/tests/sql/suites/join/force_merge_join.sql
new file mode 100644
index 0000000000..80aaedfb0a
--- /dev/null
+++ b/ydb/library/yql/tests/sql/suites/join/force_merge_join.sql
@@ -0,0 +1,9 @@
+USE plato;
+
+PRAGMA yt.JoinMergeForce = "1";
+pragma yt.JoinMergeTablesLimit="10";
+
+SELECT a.key as key1
+ FROM (SELECT * FROM plato.Input1 WHERE subkey != "bar") AS a
+ JOIN (SELECT * FROM plato.Input1 WHERE subkey != "foo") AS b ON a.key = b.key
+WHERE a.key != "1" OR b.key != "2";