diff options
author | maxkovalev <maxkovalev@yandex-team.com> | 2023-10-03 19:19:29 +0300 |
---|---|---|
committer | maxkovalev <maxkovalev@yandex-team.com> | 2023-10-03 19:48:32 +0300 |
commit | c850b4710e1c140fc5bbe23670ee03964f08987a (patch) | |
tree | 55b726367d84729de3918b3f58c8b3e5e19cb1df | |
parent | a14e6df4ad207375e3b08b57c0355f94bca2532f (diff) | |
download | ydb-c850b4710e1c140fc5bbe23670ee03964f08987a.tar.gz |
YQL-15441: Add force merge join
YQL-15441: Add force merge join
3 files changed, 23 insertions, 12 deletions
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp index 2447f1a9f6..1007638a52 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp @@ -2978,12 +2978,6 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo const auto joinType = op.JoinKind->Content(); const auto disableOptimizers = state->Configuration->DisableOptimizers.Get().GetOrElse(TSet<TString>()); - const unsigned readyCount = unsigned(leftTables.Defined()) + rightTables.Defined(); - if (!readyCount) { - return TStatus::Repeat; - } else if (readyCount == 1 && (disableOptimizers.contains("EarlyMapJoin") || joinType == "Cross")) { - return TStatus::Repeat; - } bool empty = false; if (leftTables.Defined() @@ -3012,6 +3006,11 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo } const bool isCross = joinType == "Cross"; + const unsigned readyCount = unsigned(leftTables.Defined()) + rightTables.Defined(); + if (isCross && readyCount < 2) { + return TStatus::Repeat; + } + auto leftJoinKeys = BuildJoinKeys(labels.Inputs[0], *op.LeftLabel); auto rightJoinKeys = BuildJoinKeys(labels.Inputs[1], *op.RightLabel); auto leftJoinKeyList = BuildJoinKeyList(labels.Inputs[0], *op.LeftLabel); @@ -3050,6 +3049,9 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo forceMergeJoin = true; } } + if (!readyCount && !forceMergeJoin) { + return TStatus::Repeat; + } auto cluster = TString{equiJoin.DataSink().Cluster().Value()}; @@ -3057,7 +3059,7 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo TJoinSideStats leftStats; TJoinSideStats rightStats; - const bool allowLookupJoin = !isCross && leftTables.Defined() && rightTables.Defined(); + const bool allowLookupJoin = !isCross && leftTables.Defined() && rightTables.Defined() && !forceMergeJoin; if (allowLookupJoin) { auto status = CollectStatsAndMapJoinSettings(ESizeStatCollectMode::RawSize, mapSettings, leftStats, rightStats, leftTables, leftJoinKeys, rightTables, rightJoinKeys, @@ -3126,10 +3128,10 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo } } - { + if (!forceMergeJoin) { auto status = CollectStatsAndMapJoinSettings(ESizeStatCollectMode::ColumnarSize, mapSettings, leftStats, rightStats, - leftTables, leftJoinKeys, rightTables, rightJoinKeys, - &leftLeaf, &rightLeaf, *state, isCross, cluster, ctx); + leftTables, leftJoinKeys, rightTables, rightJoinKeys, + &leftLeaf, &rightLeaf, *state, isCross, cluster, ctx); if (status.Level != TStatus::Ok) { return (status.Level == TStatus::Repeat) ? TStatus::Ok : status; } @@ -3148,7 +3150,7 @@ TStatus RewriteYtEquiJoinLeaf(TYtEquiJoin equiJoin, TYtJoinNodeOp& op, TYtJoinNo << JoinSeq(",", leftStats.SortedKeys) << "], right sorted prefix: [" << JoinSeq(",", rightStats.SortedKeys) << "]"; - bool allowOrderedJoin = !isCross && leftTables.Defined() && rightTables.Defined(); + bool allowOrderedJoin = !isCross && ((leftTables.Defined() && rightTables.Defined()) || forceMergeJoin); TMergeJoinSortInfo sortInfo; sortInfo.LeftSortedKeys = leftStats.SortedKeys; diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp index 09ac0d0793..a45050a92d 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp @@ -89,6 +89,7 @@ public: AddHandler(0, &TYtDqWrite::Match, HNDL(YtDqWrite)); AddHandler(0, &TYtDqProcessWrite::Match, HNDL(YtDqProcessWrite)); AddHandler(0, &TYtTransientOpBase::Match, HNDL(ZeroSample)); + AddHandler(0, &TYtEquiJoin::Match, HNDL(RuntimeEquiJoin)); AddHandler(1, &TYtMap::Match, HNDL(FuseInnerMap)); AddHandler(1, &TYtMap::Match, HNDL(FuseOuterMap)); @@ -109,7 +110,6 @@ public: AddHandler(1, &TYtWithUserJobsOpBase::Match, HNDL(EmbedLimit)); AddHandler(1, &TYtMerge::Match, HNDL(PushMergeLimitToInput)); - AddHandler(2, &TYtEquiJoin::Match, HNDL(RuntimeEquiJoin)); AddHandler(2, &TStatWriteTable::Match, HNDL(ReplaceStatWriteTable)); AddHandler(2, &TYtMap::Match, HNDL(MapToMerge)); AddHandler(2, &TYtPublish::Match, HNDL(UnorderedPublishTarget)); diff --git a/ydb/library/yql/tests/sql/suites/join/force_merge_join.sql b/ydb/library/yql/tests/sql/suites/join/force_merge_join.sql new file mode 100644 index 0000000000..80aaedfb0a --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/join/force_merge_join.sql @@ -0,0 +1,9 @@ +USE plato; + +PRAGMA yt.JoinMergeForce = "1"; +pragma yt.JoinMergeTablesLimit="10"; + +SELECT a.key as key1 + FROM (SELECT * FROM plato.Input1 WHERE subkey != "bar") AS a + JOIN (SELECT * FROM plato.Input1 WHERE subkey != "foo") AS b ON a.key = b.key +WHERE a.key != "1" OR b.key != "2"; |