diff options
5 files changed, 39 insertions, 11 deletions
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp index 5e79c9e2970..70fd0127701 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp @@ -327,6 +327,12 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EquiJoin(TExprBase node TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EarlyMergeJoin(TExprBase node, TExprContext& ctx) const { if (State_->Configuration->JoinMergeTablesLimit.Get()) { auto equiJoin = node.Cast<TYtEquiJoin>(); + + const bool waitAllInputs = State_->Configuration->JoinWaitAllInputs.Get().GetOrElse(false); + if (waitAllInputs && !AreJoinInputsReady(equiJoin)) { + return node; + } + const auto tree = ImportYtEquiJoin(equiJoin, ctx); if (State_->Configuration->JoinMergeForce.Get() || tree->LinkSettings.ForceSortedMerge) { const auto rewriteStatus = RewriteYtEquiJoinLeaves(equiJoin, *tree, State_, ctx); @@ -479,19 +485,11 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBa && !HasSetting(equiJoin.JoinOptions().Ref(), "cbo_passed"); const bool waitAllInputs = State_->Configuration->JoinWaitAllInputs.Get().GetOrElse(false) || tryReorder; - - if (waitAllInputs) { - for (auto section: equiJoin.Input()) { - for (auto path: section.Paths()) { - TYtPathInfo pathInfo(path); - if (!pathInfo.Table->Stat) { - return node; - } - } - } + if (waitAllInputs && !AreJoinInputsReady(equiJoin)) { + return node; } - const auto tree = ImportYtEquiJoin(equiJoin, ctx); + const auto tree = ImportYtEquiJoin(equiJoin, ctx); if (tryReorder) { YQL_CLOG(INFO, ProviderYt) << "Collecting cbo stats for equiJoin"; auto collectStatus = CollectCboStats(*tree, State_, ctx); diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp index 84bb2d67f0e..39ea6f9a55e 100644 --- a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp @@ -5244,4 +5244,17 @@ TMaybeNode<TExprBase> ExportYtEquiJoin(TYtEquiJoin equiJoin, const TYtJoinNodeOp return TExprBase(ctx.ChangeChildren(join.Ref(), std::move(children))); } +bool AreJoinInputsReady(const TYtEquiJoin& equiJoin) { + for (auto section: equiJoin.Input()) { + for (auto path: section.Paths()) { + TYtPathInfo pathInfo(path); + if (!pathInfo.Table->Stat) { + return false; + } + } + } + + return true; +} + } diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.h b/yt/yql/providers/yt/provider/yql_yt_join_impl.h index d8a109b5cf6..b8d86748884 100644 --- a/yt/yql/providers/yt/provider/yql_yt_join_impl.h +++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.h @@ -123,4 +123,6 @@ IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResul ui64 CalcInMemorySizeNoCrossJoin(const TJoinLabel& label, const TYtJoinNodeOp& op, const TMapJoinSettings& settings, bool isLeft, TExprContext& ctx, bool needPayload, ui64 size); +bool AreJoinInputsReady(const TYtEquiJoin& equiJoin); + } diff --git a/yt/yql/tests/sql/suites/join/mergejoin_small_primary_force.cfg b/yt/yql/tests/sql/suites/join/mergejoin_small_primary_force.cfg new file mode 100644 index 00000000000..d83bb027ab6 --- /dev/null +++ b/yt/yql/tests/sql/suites/join/mergejoin_small_primary_force.cfg @@ -0,0 +1,2 @@ +in Input1 sorted_by_kv1.txt +in Input2 input_tutorial_users.txt diff --git a/yt/yql/tests/sql/suites/join/mergejoin_small_primary_force.sql b/yt/yql/tests/sql/suites/join/mergejoin_small_primary_force.sql new file mode 100644 index 00000000000..4e4e62997f3 --- /dev/null +++ b/yt/yql/tests/sql/suites/join/mergejoin_small_primary_force.sql @@ -0,0 +1,13 @@ +PRAGMA DisableSimpleColumns; +use plato; +pragma yt.JoinMergeTablesLimit="10"; +pragma yt.JoinAllowColumnRenames="true"; +pragma yt.JoinMergeUseSmallAsPrimary="true"; +pragma yt.JoinWaitAllInputs="true"; + +INSERT INTO @Input2Sorted SELECT * FROM Input2 ORDER BY key; +COMMIT; + +-- Input1 is smaller than Input2 (known thanks to JoinWaitAllInputs) +select * from @Input2Sorted as b join /*+ merge() */ Input1 as a on a.k1 = b.key +order by a.v1, b.value; |
