summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp20
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_join_impl.cpp13
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_join_impl.h2
-rw-r--r--yt/yql/tests/sql/suites/join/mergejoin_small_primary_force.cfg2
-rw-r--r--yt/yql/tests/sql/suites/join/mergejoin_small_primary_force.sql13
5 files changed, 39 insertions, 11 deletions
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp
index 5e79c9e2970..70fd0127701 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp
@@ -327,6 +327,12 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EquiJoin(TExprBase node
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EarlyMergeJoin(TExprBase node, TExprContext& ctx) const {
if (State_->Configuration->JoinMergeTablesLimit.Get()) {
auto equiJoin = node.Cast<TYtEquiJoin>();
+
+ const bool waitAllInputs = State_->Configuration->JoinWaitAllInputs.Get().GetOrElse(false);
+ if (waitAllInputs && !AreJoinInputsReady(equiJoin)) {
+ return node;
+ }
+
const auto tree = ImportYtEquiJoin(equiJoin, ctx);
if (State_->Configuration->JoinMergeForce.Get() || tree->LinkSettings.ForceSortedMerge) {
const auto rewriteStatus = RewriteYtEquiJoinLeaves(equiJoin, *tree, State_, ctx);
@@ -479,19 +485,11 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBa
&& !HasSetting(equiJoin.JoinOptions().Ref(), "cbo_passed");
const bool waitAllInputs = State_->Configuration->JoinWaitAllInputs.Get().GetOrElse(false) || tryReorder;
-
- if (waitAllInputs) {
- for (auto section: equiJoin.Input()) {
- for (auto path: section.Paths()) {
- TYtPathInfo pathInfo(path);
- if (!pathInfo.Table->Stat) {
- return node;
- }
- }
- }
+ if (waitAllInputs && !AreJoinInputsReady(equiJoin)) {
+ return node;
}
- const auto tree = ImportYtEquiJoin(equiJoin, ctx);
+ const auto tree = ImportYtEquiJoin(equiJoin, ctx);
if (tryReorder) {
YQL_CLOG(INFO, ProviderYt) << "Collecting cbo stats for equiJoin";
auto collectStatus = CollectCboStats(*tree, State_, ctx);
diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
index 84bb2d67f0e..39ea6f9a55e 100644
--- a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
@@ -5244,4 +5244,17 @@ TMaybeNode<TExprBase> ExportYtEquiJoin(TYtEquiJoin equiJoin, const TYtJoinNodeOp
return TExprBase(ctx.ChangeChildren(join.Ref(), std::move(children)));
}
+bool AreJoinInputsReady(const TYtEquiJoin& equiJoin) {
+ for (auto section: equiJoin.Input()) {
+ for (auto path: section.Paths()) {
+ TYtPathInfo pathInfo(path);
+ if (!pathInfo.Table->Stat) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+}
+
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.h b/yt/yql/providers/yt/provider/yql_yt_join_impl.h
index d8a109b5cf6..b8d86748884 100644
--- a/yt/yql/providers/yt/provider/yql_yt_join_impl.h
+++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.h
@@ -123,4 +123,6 @@ IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResul
ui64 CalcInMemorySizeNoCrossJoin(const TJoinLabel& label, const TYtJoinNodeOp& op, const TMapJoinSettings& settings, bool isLeft,
TExprContext& ctx, bool needPayload, ui64 size);
+bool AreJoinInputsReady(const TYtEquiJoin& equiJoin);
+
}
diff --git a/yt/yql/tests/sql/suites/join/mergejoin_small_primary_force.cfg b/yt/yql/tests/sql/suites/join/mergejoin_small_primary_force.cfg
new file mode 100644
index 00000000000..d83bb027ab6
--- /dev/null
+++ b/yt/yql/tests/sql/suites/join/mergejoin_small_primary_force.cfg
@@ -0,0 +1,2 @@
+in Input1 sorted_by_kv1.txt
+in Input2 input_tutorial_users.txt
diff --git a/yt/yql/tests/sql/suites/join/mergejoin_small_primary_force.sql b/yt/yql/tests/sql/suites/join/mergejoin_small_primary_force.sql
new file mode 100644
index 00000000000..4e4e62997f3
--- /dev/null
+++ b/yt/yql/tests/sql/suites/join/mergejoin_small_primary_force.sql
@@ -0,0 +1,13 @@
+PRAGMA DisableSimpleColumns;
+use plato;
+pragma yt.JoinMergeTablesLimit="10";
+pragma yt.JoinAllowColumnRenames="true";
+pragma yt.JoinMergeUseSmallAsPrimary="true";
+pragma yt.JoinWaitAllInputs="true";
+
+INSERT INTO @Input2Sorted SELECT * FROM Input2 ORDER BY key;
+COMMIT;
+
+-- Input1 is smaller than Input2 (known thanks to JoinWaitAllInputs)
+select * from @Input2Sorted as b join /*+ merge() */ Input1 as a on a.k1 = b.key
+order by a.v1, b.value;