summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormpereskokova <[email protected]>2025-06-18 13:04:12 +0300
committermpereskokova <[email protected]>2025-06-18 14:53:20 +0300
commit1264f781e9428ca95c3daf0b69b43f1ce956ab34 (patch)
tree3162d283606c412ab3ee5e63e5e9d2835eb256a8
parent61f8ee251577b9e79f673dc7bacc1deb5b23048d (diff)
Add PruneKeys in YT opt
commit_hash:b12d341458bb39ffb6b4a4d7a99c3ef25a417ca5
-rw-r--r--yql/essentials/core/common_opt/yql_co_flow2.cpp45
-rw-r--r--yql/essentials/core/common_opt/yql_co_simple1.cpp2
-rw-r--r--yql/essentials/core/yql_opt_utils.cpp55
-rw-r--r--yql/essentials/core/yql_opt_utils.h4
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/result.json7
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp1
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h2
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_helper.cpp57
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_helper.h11
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp126
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_misc.cpp58
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_join_impl.cpp3
-rw-r--r--yt/yql/tests/sql/suites/join/prune_keys_yt_opt.cfg4
-rw-r--r--yt/yql/tests/sql/suites/join/prune_keys_yt_opt.sql44
14 files changed, 337 insertions, 82 deletions
diff --git a/yql/essentials/core/common_opt/yql_co_flow2.cpp b/yql/essentials/core/common_opt/yql_co_flow2.cpp
index c0ce99b8df3..16ba67da460 100644
--- a/yql/essentials/core/common_opt/yql_co_flow2.cpp
+++ b/yql/essentials/core/common_opt/yql_co_flow2.cpp
@@ -2097,56 +2097,27 @@ void RegisterCoFlowCallables2(TCallableOptimizerMap& map) {
continue;
}
+ THashSet<TString> columns;
auto itemNames = columnsForPruneKeysExtractor.find(scope.Ref().Content());
if (itemNames == columnsForPruneKeysExtractor.end() || itemNames->second.empty()) {
children.push_back(equiJoin.Arg(i).Ptr());
continue;
}
-
- if (auto distinct = list.Ref().GetConstraint<TDistinctConstraintNode>()) {
- if (distinct->ContainsCompleteSet(std::vector<std::string_view>(itemNames->second.cbegin(), itemNames->second.cend()))) {
- children.push_back(equiJoin.Arg(i).Ptr());
- continue;
- }
+ for (const auto& elem : itemNames->second) {
+ columns.insert(TString(elem));
}
- bool isOrdered = false;
- if (auto sorted = list.Ref().GetConstraint<TSortedConstraintNode>()) {
- for (const auto& item : sorted->GetContent()) {
- size_t foundItemNamesCount = 0;
- for (const auto& path : item.first) {
- if (itemNames->second.contains(path.front())) {
- foundItemNamesCount++;
- }
- }
- if (foundItemNamesCount == itemNames->second.size()) {
- isOrdered = true;
- break;
- }
- }
+ if (IsAlreadyDistinct(list.Ref(), columns)) {
+ children.push_back(equiJoin.Arg(i).Ptr());
+ continue;
}
-
- auto pruneKeysCallable = isOrdered ? "PruneAdjacentKeys" : "PruneKeys";
+ auto pruneKeysCallable = IsOrdered(list.Ref(), columns) ? "PruneAdjacentKeys" : "PruneKeys";
YQL_CLOG(DEBUG, Core) << "Add " << pruneKeysCallable << " to EquiJoin input #" << i << ", label " << scope.Ref().Content();
children.push_back(ctx.Builder(child.Pos())
.List()
.Callable(0, pruneKeysCallable)
.Add(0, list.Ptr())
- .Lambda(1)
- .Param("item")
- .List(0)
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder & {
- ui32 i = 0;
- for (const auto& column : itemNames->second) {
- parent.Callable(i++, "Member")
- .Arg(0, "item")
- .Atom(1, column)
- .Seal();
- }
- return parent;
- })
- .Seal()
- .Seal()
+ .Add(1, MakePruneKeysExtractorLambda(child.Ref(), columns, ctx))
.Seal()
.Add(1, scope.Ptr())
.Seal()
diff --git a/yql/essentials/core/common_opt/yql_co_simple1.cpp b/yql/essentials/core/common_opt/yql_co_simple1.cpp
index 0df3c0746ef..f2573982d40 100644
--- a/yql/essentials/core/common_opt/yql_co_simple1.cpp
+++ b/yql/essentials/core/common_opt/yql_co_simple1.cpp
@@ -6225,7 +6225,7 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
};
map["Unordered"] = map["UnorderedSubquery"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
- if (node->Head().IsCallable({"AsList","EquiJoin","Filter","Map","FlatMap","MultiMap","Extend", "Apply","PartitionByKey","PartitionsByKeys"})) {
+ if (node->Head().IsCallable({"AsList","EquiJoin","Filter","Map","FlatMap","MultiMap","Extend", "Apply","PartitionByKey","PartitionsByKeys","PruneKeys"})) {
YQL_CLOG(DEBUG, Core) << "Drop " << node->Content() << " over " << node->Head().Content();
return node->HeadPtr();
}
diff --git a/yql/essentials/core/yql_opt_utils.cpp b/yql/essentials/core/yql_opt_utils.cpp
index b07cc6d41e5..76cc1a51500 100644
--- a/yql/essentials/core/yql_opt_utils.cpp
+++ b/yql/essentials/core/yql_opt_utils.cpp
@@ -359,6 +359,61 @@ bool IsNoPush(const TExprNode& node) {
return node.IsCallable({"NoPush", "Likely"});
}
+bool IsAlreadyDistinct(const TExprNode& node, const THashSet<TString>& columns) {
+ if (auto distinct = node.GetConstraint<TDistinctConstraintNode>()) {
+ if (distinct->ContainsCompleteSet(std::vector<std::string_view>(columns.cbegin(), columns.cend()))) {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool IsOrdered(const TExprNode& node, const THashSet<TString>& columns) {
+ if (auto sorted = node.GetConstraint<TSortedConstraintNode>()) {
+ for (const auto& item : sorted->GetContent()) {
+ size_t foundItemNamesCount = 0;
+ bool found = false;
+ for (const auto& path : item.first) {
+ if (path.size() == 1 && columns.contains(path.front())) {
+ foundItemNamesCount++;
+ found = true;
+ break;
+ }
+ }
+ if (foundItemNamesCount == columns.size()) {
+ return true;
+ }
+
+ // Required columns are not sorted by prefix.
+ if (!found) {
+ break;
+ }
+ }
+ }
+
+ return false;
+}
+
+TExprNode::TPtr MakePruneKeysExtractorLambda(const TExprNode& node, const THashSet<TString>& columns, TExprContext& ctx) {
+ return ctx.Builder(node.Pos())
+ .Lambda()
+ .Param("item")
+ .List(0)
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder & {
+ ui32 i = 0;
+ for (const auto& column : columns) {
+ parent.Callable(i++, "Member")
+ .Arg(0, "item")
+ .Atom(1, column)
+ .Seal();
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Build();
+}
+
TExprNode::TPtr KeepColumnOrder(const TExprNode::TPtr& node, const TExprNode& src, TExprContext& ctx, const TTypeAnnotationContext& typeCtx) {
auto columnOrder = typeCtx.LookupColumnOrder(src);
if (!columnOrder) {
diff --git a/yql/essentials/core/yql_opt_utils.h b/yql/essentials/core/yql_opt_utils.h
index 196847b63d1..3acfe0f9739 100644
--- a/yql/essentials/core/yql_opt_utils.h
+++ b/yql/essentials/core/yql_opt_utils.h
@@ -28,6 +28,10 @@ bool IsPassthroughLambda(const NNodes::TCoLambda& lambda, TMaybe<THashSet<TStrin
bool IsTablePropsDependent(const TExprNode& node);
bool IsNoPush(const TExprNode& node);
+bool IsAlreadyDistinct(const TExprNode& node, const THashSet<TString>& columns);
+bool IsOrdered(const TExprNode& node, const THashSet<TString>& columns);
+TExprNode::TPtr MakePruneKeysExtractorLambda(const TExprNode& node, const THashSet<TString>& columns, TExprContext& ctx);
+
bool HasOnlyOneJoinType(const TExprNode& joinTree, TStringBuf joinType);
TExprNode::TPtr KeepColumnOrder(const TExprNode::TPtr& node, const TExprNode& src, TExprContext& ctx, const TTypeAnnotationContext& typeCtx);
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/result.json b/yql/essentials/tests/sql/sql2yql/canondata/result.json
index 029d804ed93..8eb7676357f 100644
--- a/yql/essentials/tests/sql/sql2yql/canondata/result.json
+++ b/yql/essentials/tests/sql/sql2yql/canondata/result.json
@@ -4003,6 +4003,13 @@
"uri": "https://{canondata_backend}/1942525/94a477066ea16f69d4848bbe524485fc029978b8/resource.tar.gz#test_sql2yql.test_join-prune_keys_YQL-19979_/sql.yql"
}
],
+ "test_sql2yql.test[join-prune_keys_YQL-19979]": [
+ {
+ "checksum": "0dad5d395f90148805e893a30f0b4963",
+ "size": 3845,
+ "uri": "https://{canondata_backend}/1942525/94a477066ea16f69d4848bbe524485fc029978b8/resource.tar.gz#test_sql2yql.test_join-prune_keys_YQL-19979_/sql.yql"
+ }
+ ],
"test_sql2yql.test[join-yql-19192]": [
{
"checksum": "fffdf1cbb40643da9daf9bdf3edec121",
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp
index bb5c2f4e36a..05e73a11707 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp
@@ -62,6 +62,7 @@ TYtPhysicalOptProposalTransformer::TYtPhysicalOptProposalTransformer(TYtState::T
AddHandler(0, &TYtDqWrite::Match, HNDL(YtDqWrite));
AddHandler(0, &TYtDqProcessWrite::Match, HNDL(YtDqProcessWrite));
AddHandler(0, &TYtEquiJoin::Match, HNDL(EarlyMergeJoin));
+ AddHandler(0, &TYtEquiJoin::Match, HNDL(AddPruneKeys));
AddHandler(0, &TYtOutputOpBase::Match, HNDL(TableContentWithSettings));
AddHandler(0, &TYtOutputOpBase::Match, HNDL(NonOptimalTableContent));
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h
index cb940e59963..b14cfe8a5d5 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h
@@ -108,6 +108,8 @@ private:
NNodes::TMaybeNode<NNodes::TExprBase> EarlyMergeJoin(NNodes::TExprBase node, TExprContext& ctx) const;
+ NNodes::TMaybeNode<NNodes::TExprBase> AddPruneKeys(NNodes::TExprBase node, TExprContext& ctx) const;
+
NNodes::TMaybeNode<NNodes::TExprBase> RuntimeEquiJoin(NNodes::TExprBase node, TExprContext& ctx) const;
NNodes::TMaybeNode<NNodes::TExprBase> TableContentWithSettings(NNodes::TExprBase node, TExprContext& ctx) const;
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_helper.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_helper.cpp
index 89758d7b2e4..643b29394b2 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_helper.cpp
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_helper.cpp
@@ -579,6 +579,63 @@ TExprNode::TPtr BuildYtEquiJoinPremap(TExprBase list, TMaybeNode<TCoLambda> prem
return {};
}
+TExprBase BuildMapForPruneKeys(
+ const TExprBase node,
+ const TExprNode::TPtr extractorLambda,
+ bool isOrdered,
+ const TString& cluster,
+ const TExprNode::TPtr newWorld,
+ const TYtSectionList newInput,
+ const TTypeAnnotationNode* outItemType,
+ TExprContext& ctx,
+ const TYtState::TPtr& state) {
+
+ auto pruneKeysCallable = isOrdered ? "PruneAdjacentKeys" : "PruneKeys";
+ auto mapper = ctx.Builder(node.Pos())
+ .Lambda()
+ .Param("stream")
+ .Callable(pruneKeysCallable)
+ .Arg(0, "stream")
+ .Add(1, extractorLambda)
+ .Seal()
+ .Seal()
+ .Build();
+
+ TVector<TYtOutTable> outTables = ConvertOutTablesWithSortAware(mapper, isOrdered, node.Pos(),
+ outItemType, ctx, state, node.Ref().GetConstraintSet());
+
+ auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, node.Pos());
+ if (isOrdered) {
+ settingsBuilder
+ .Add()
+ .Name()
+ .Value(ToString(EYtSettingType::Ordered))
+ .Build()
+ .Build();
+ }
+ if (state->Configuration->UseFlow.Get().GetOrElse(DEFAULT_USE_FLOW)) {
+ settingsBuilder
+ .Add()
+ .Name()
+ .Value(ToString(EYtSettingType::Flow))
+ .Build()
+ .Build();
+ }
+
+ auto map = Build<TYtMap>(ctx, node.Pos())
+ .World(newWorld)
+ .DataSink(MakeDataSink(node.Pos(), cluster, ctx))
+ .Input(newInput)
+ .Output()
+ .Add(outTables)
+ .Build()
+ .Settings(settingsBuilder.Done())
+ .Mapper(std::move(mapper))
+ .Done();
+
+ return WrapOp(map, ctx);
+}
+
// label -> pair(<asc sort keys>, <inputs matched by keys>)
THashMap<TStringBuf, std::pair<TVector<TStringBuf>, ui32>> CollectTableSortKeysUsage(const TYtState::TPtr& state, const TCoEquiJoin& equiJoin) {
THashMap<TStringBuf, std::pair<TVector<TStringBuf>, ui32>> tableSortKeys;
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_helper.h b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_helper.h
index 6cb1af8a2c0..a5e0f87d42f 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_helper.h
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_helper.h
@@ -116,4 +116,15 @@ NNodes::TExprBase WrapOp(NNodes::TYtOutputOpBase op, TExprContext& ctx);
NNodes::TCoLambda MapEmbedInputFieldsFilter(NNodes::TCoLambda lambda, bool ordered, NNodes::TCoAtomList fields, TExprContext& ctx);
+NNodes::TExprBase BuildMapForPruneKeys(
+ const NNodes::TExprBase node,
+ const TExprNode::TPtr extractorLambda,
+ bool isOrdered,
+ const TString& cluster,
+ const TExprNode::TPtr newWorld,
+ const NNodes::TYtSectionList newInput,
+ const TTypeAnnotationNode* outItemType,
+ TExprContext& ctx,
+ const TYtState::TPtr& state);
+
} // namespace NYql::NPrivate
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp
index a59d38fb872..5e79c9e2970 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp
@@ -12,6 +12,7 @@
#include <yql/essentials/providers/common/provider/yql_provider.h>
#include <yql/essentials/utils/log/log.h>
+#include <yql/essentials/core/yql_opt_utils.h>
namespace NYql {
@@ -345,6 +346,131 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EarlyMergeJoin(TExprBas
return node;
}
+TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::AddPruneKeys(TExprBase node, TExprContext& ctx) const {
+ // Analogue to flow2 core EquiJoin optimizer with PushPruneKeysIntoYtOperation optimizer
+ static const char optName[] = "EmitPruneKeys";
+ if (!IsOptimizerEnabled<optName>(*State_->Types) || IsOptimizerDisabled<optName>(*State_->Types)) {
+ return node;
+ }
+ auto equiJoin = node.Cast<TYtEquiJoin>();
+ if (HasSetting(equiJoin.JoinOptions().Ref(), "prune_keys_added")) {
+ return node;
+ }
+
+ THashMap<TStringBuf, THashSet<TStringBuf>> columnsForPruneKeysExtractor;
+ GetPruneKeysColumnsForJoinLeaves(equiJoin.Joins().Cast<TCoEquiJoinTuple>(), columnsForPruneKeysExtractor);
+
+ TExprNode::TListType children;
+ bool hasChanges = false;
+
+ for (const auto& input : equiJoin.Input()) {
+ auto section = input.Cast<TYtSection>();
+
+ auto joinLabel = NYql::GetSetting(section.Settings().Ref(), EYtSettingType::JoinLabel);
+ if (!joinLabel) {
+ children.push_back(input.Ptr());
+ continue;
+ }
+
+ bool hasTable = false;
+ THashSet<TString> columns;
+ if (joinLabel->Child(1)->IsAtom()) {
+ if (auto itemNames = columnsForPruneKeysExtractor.find(joinLabel->Child(1)->Content());
+ itemNames != columnsForPruneKeysExtractor.end() && !itemNames->second.empty()) {
+ hasTable = true;
+ for (const auto& elem : itemNames->second) {
+ columns.insert(TString(elem));
+ }
+ }
+ } else {
+ for (const auto& child : joinLabel->Child(1)->Children()) {
+ if (auto itemNames = columnsForPruneKeysExtractor.find(child->Content());
+ itemNames != columnsForPruneKeysExtractor.end() && !itemNames->second.empty()) {
+ hasTable = true;
+ for (const auto& elem : itemNames->second) {
+ columns.insert(FullColumnName(itemNames->first, elem));
+ }
+ }
+ }
+ }
+
+ if (!hasTable) {
+ children.push_back(input.Ptr());
+ continue;
+ }
+
+ if (IsAlreadyDistinct(section.Ref(), columns)) {
+ children.push_back(input.Ptr());
+ continue;
+ }
+ bool isOrdered = IsOrdered(section.Ref(), columns);
+
+ auto cluster = GetClusterFromSection(section);
+ YQL_ENSURE(cluster);
+
+ auto outItemType = section.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType();
+ auto inputSection = Build<TYtSection>(ctx, section.Pos())
+ .InitFrom(section)
+ .Settings(NYql::RemoveSettings(section.Settings().Ref(), EYtSettingType::JoinLabel | EYtSettingType::StatColumns, ctx))
+ .Done();
+
+ auto pruneKeysCallable = isOrdered ? "PruneAdjacentKeys" : "PruneKeys";
+ YQL_CLOG(DEBUG, Core) << "Add " << pruneKeysCallable << " to YtEquiJoin input";
+
+ auto map = BuildMapForPruneKeys(
+ section,
+ MakePruneKeysExtractorLambda(node.Ref(), columns, ctx),
+ isOrdered,
+ cluster,
+ ctx.NewWorld(section.Pos()),
+ Build<TYtSectionList>(ctx, section.Pos())
+ .Add(inputSection)
+ .Done(),
+ outItemType,
+ ctx,
+ State_);
+
+ children.push_back(Build<TYtSection>(ctx, section.Pos())
+ .Paths()
+ .Add()
+ .Table<TYtOutput>()
+ .InitFrom(map.Cast<TYtOutput>())
+ .Build()
+ .Columns<TCoVoid>().Build()
+ .Ranges<TCoVoid>().Build()
+ .Stat<TCoVoid>().Build()
+ .Build()
+ .Build()
+ .Settings(section.Settings())
+ .Done()
+ .Ptr());
+ hasChanges = true;
+ }
+
+ if (!hasChanges) {
+ return node;
+ }
+
+ auto result = ctx.ChangeChild(
+ node.Ref(),
+ TYtEquiJoin::idx_Input,
+ Build<TYtSectionList>(ctx, equiJoin.Input().Pos())
+ .Add(children)
+ .Done().Ptr());
+
+ result = ctx.ChangeChild(
+ *result,
+ TYtEquiJoin::idx_JoinOptions,
+ AddSetting(
+ equiJoin.JoinOptions().Ref(),
+ equiJoin.JoinOptions().Pos(),
+ "prune_keys_added",
+ nullptr,
+ ctx));
+
+ return TExprBase(result);
+}
+
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBase node, TExprContext& ctx) const {
auto equiJoin = node.Cast<TYtEquiJoin>();
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_misc.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_misc.cpp
index 87b2f66e546..9852aa0ff32 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_misc.cpp
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_misc.cpp
@@ -7,6 +7,7 @@
#include <yql/essentials/providers/common/codec/yql_codec_type_flags.h>
#include <yql/essentials/providers/result/expr_nodes/yql_res_expr_nodes.h>
+#include <yql/essentials/utils/log/log.h>
#include <yql/essentials/core/yql_opt_utils.h>
#include <yql/essentials/core/yql_type_helpers.h>
@@ -980,16 +981,6 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::PushPruneKeysIntoYtOper
return {};
}
- auto mapper = ctx.Builder(node.Pos())
- .Lambda()
- .Param("stream")
- .Callable(node.Ref().Content())
- .Arg(0, "stream")
- .Add(1, extractorLambda.Ptr())
- .Seal()
- .Seal()
- .Build();
-
auto outItemType = SilentGetSequenceItemType(op.Input().Ref(), true);
if (!outItemType || !outItemType->IsPersistable()) {
return node;
@@ -998,40 +989,19 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::PushPruneKeysIntoYtOper
return {};
}
- bool sortedOutput = TCoPruneAdjacentKeys::Match(node.Raw());
- TVector<TYtOutTable> outTables = ConvertOutTablesWithSortAware(mapper, sortedOutput, node.Pos(),
- outItemType, ctx, State_, node.Ref().GetConstraintSet());
-
- auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, node.Pos());
- if (sortedOutput) {
- settingsBuilder
- .Add()
- .Name()
- .Value(ToString(EYtSettingType::Ordered))
- .Build()
- .Build();
- }
- if (State_->Configuration->UseFlow.Get().GetOrElse(DEFAULT_USE_FLOW)) {
- settingsBuilder
- .Add()
- .Name()
- .Value(ToString(EYtSettingType::Flow))
- .Build()
- .Build();
- }
-
- auto map = Build<TYtMap>(ctx, node.Pos())
- .World(GetWorld(op.Input(), {}, ctx))
- .DataSink(MakeDataSink(node.Pos(), *cluster, ctx))
- .Input(ConvertInputTable(op.Input(), ctx))
- .Output()
- .Add(outTables)
- .Build()
- .Settings(settingsBuilder.Done())
- .Mapper(std::move(mapper))
- .Done();
-
- return WrapOp(map, ctx);
+ bool isOrdered = TCoPruneAdjacentKeys::Match(node.Raw());
+ auto pruneKeysCallable = isOrdered ? "PruneAdjacentKeys" : "PruneKeys";
+ YQL_CLOG(DEBUG, Core) << "Push " << pruneKeysCallable << " into YT Operation";
+ return BuildMapForPruneKeys(
+ node,
+ extractorLambda.Ptr(),
+ isOrdered,
+ *cluster,
+ GetWorld(op.Input(), {}, ctx).Ptr(),
+ ConvertInputTable(op.Input(), ctx),
+ outItemType,
+ ctx,
+ State_);
}
} // namespace NYql
diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
index 04ae3a8943d..84bb2d67f0e 100644
--- a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
@@ -5212,6 +5212,9 @@ TMaybeNode<TExprBase> ExportYtEquiJoin(TYtEquiJoin equiJoin, const TYtJoinNodeOp
if (!HasSetting(*joinSettings, "cbo_passed") && op.CostBasedOptPassed) {
joinSettings = AddSetting(*joinSettings, joinSettings->Pos(), "cbo_passed", {}, ctx);
}
+ if (sections.size() < equiJoin.Input().Size()) {
+ joinSettings = RemoveSetting(*joinSettings, "prune_keys_added", ctx);
+ }
auto outItemType = GetSequenceItemType(equiJoin.Pos(),
equiJoin.Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems()[1],
diff --git a/yt/yql/tests/sql/suites/join/prune_keys_yt_opt.cfg b/yt/yql/tests/sql/suites/join/prune_keys_yt_opt.cfg
new file mode 100644
index 00000000000..6978b80cd9d
--- /dev/null
+++ b/yt/yql/tests/sql/suites/join/prune_keys_yt_opt.cfg
@@ -0,0 +1,4 @@
+in Input1 input1.txt
+in Input2 input2.txt
+in Input3 input3.txt
+in Input4 input4.txt
diff --git a/yt/yql/tests/sql/suites/join/prune_keys_yt_opt.sql b/yt/yql/tests/sql/suites/join/prune_keys_yt_opt.sql
new file mode 100644
index 00000000000..64db7095443
--- /dev/null
+++ b/yt/yql/tests/sql/suites/join/prune_keys_yt_opt.sql
@@ -0,0 +1,44 @@
+/* postgres can not */
+use plato;
+
+pragma config.flags('OptimizerFlags', 'EmitPruneKeys');
+
+-- simple test
+select count(*)
+from Input1
+where Input1.key in
+ (select Input2.key
+ from Input2
+ cross join Input3);
+
+-- add PruneKeys in core opt and then in yt opt
+select count(*)
+from Input1
+left semi join Input2
+on Input1.key = Input2.key
+right semi join Input3
+on Input1.key = Input3.key;
+
+-- add PruneKeys in yt opt multiple times
+$part = (select Input1.key as key
+ from Input1
+ cross join Input2);
+
+select count(*)
+from Input3
+left semi join $part as part
+on Input3.key = part.key
+right semi join Input4
+on Input3.key = Input4.key;
+
+-- attempt to add PruneKeys in same child multiple times
+
+$part = (select Input1.value as value
+ from Input1
+ cross join Input2);
+
+select count(*)
+from Input3
+right semi join $part as part
+on Input3.value = part.value;
+