summaryrefslogtreecommitdiffstats
path: root/yql/essentials/core
diff options
context:
space:
mode:
authormpereskokova <[email protected]>2025-04-17 07:34:43 +0300
committermpereskokova <[email protected]>2025-04-17 07:49:22 +0300
commit9b44be1dd627cdc45103e07c3186eda2a400d8dd (patch)
treeecb8b7f238380035708e34ee15b7df898ea55dc4 /yql/essentials/core
parentbd352209a921603d0bb9876b7bf6f72cee7be91e (diff)
Add pruneKeys in EquiJoin
commit_hash:24b52143fbef864df48f3359b14f9e0294f367f5
Diffstat (limited to 'yql/essentials/core')
-rw-r--r--yql/essentials/core/common_opt/yql_co_flow2.cpp95
-rw-r--r--yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp69
-rw-r--r--yql/essentials/core/yql_expr_constraint.cpp18
-rw-r--r--yql/essentials/core/yql_join.cpp33
-rw-r--r--yql/essentials/core/yql_join.h2
-rw-r--r--yql/essentials/core/yql_type_annotation.h1
6 files changed, 196 insertions, 22 deletions
diff --git a/yql/essentials/core/common_opt/yql_co_flow2.cpp b/yql/essentials/core/common_opt/yql_co_flow2.cpp
index 7d521b2ac47..8ebb5243ffa 100644
--- a/yql/essentials/core/common_opt/yql_co_flow2.cpp
+++ b/yql/essentials/core/common_opt/yql_co_flow2.cpp
@@ -2027,7 +2027,100 @@ void RegisterCoFlowCallables2(TCallableOptimizerMap& map) {
return ret;
}
- return node;
+ // Add PruneKeys to EquiJoin
+ static const char optName[] = "EmitPruneKeys";
+ if (!IsOptimizerEnabled<optName>(*optCtx.Types) || IsOptimizerDisabled<optName>(*optCtx.Types)) {
+ return node;
+ }
+ auto equiJoin = TCoEquiJoin(node);
+ if (HasSetting(equiJoin.Arg(equiJoin.ArgCount() - 1).Ref(), "prune_keys_added")) {
+ return node;
+ }
+
+ THashMap<TStringBuf, THashSet<TStringBuf>> columnsForPruneKeysExtractor;
+ GetPruneKeysColumnsForJoinLeaves(equiJoin.Arg(equiJoin.ArgCount() - 2).Cast<TCoEquiJoinTuple>(), columnsForPruneKeysExtractor);
+
+ TExprNode::TListType children;
+ bool hasChanges = false;
+ for (size_t i = 0; i + 2 < equiJoin.ArgCount(); ++i) {
+ auto child = equiJoin.Arg(i).Cast<TCoEquiJoinInput>();
+ auto list = child.List();
+ auto scope = child.Scope();
+
+ if (!scope.Ref().IsAtom()) {
+ children.push_back(equiJoin.Arg(i).Ptr());
+ continue;
+ }
+
+ auto itemNames = columnsForPruneKeysExtractor.find(scope.Ref().Content());
+ if (itemNames == columnsForPruneKeysExtractor.end() || itemNames->second.empty()) {
+ children.push_back(equiJoin.Arg(i).Ptr());
+ continue;
+ }
+
+ if (auto distinct = list.Ref().GetConstraint<TDistinctConstraintNode>()) {
+ if (distinct->ContainsCompleteSet(std::vector<std::string_view>(itemNames->second.cbegin(), itemNames->second.cend()))) {
+ children.push_back(equiJoin.Arg(i).Ptr());
+ continue;
+ }
+ }
+
+ bool isOrdered = false;
+ if (auto sorted = list.Ref().GetConstraint<TSortedConstraintNode>()) {
+ for (const auto& item : sorted->GetContent()) {
+ size_t foundItemNamesCount = 0;
+ for (const auto& path : item.first) {
+ if (itemNames->second.contains(path.front())) {
+ foundItemNamesCount++;
+ }
+ }
+ if (foundItemNamesCount == itemNames->second.size()) {
+ isOrdered = true;
+ break;
+ }
+ }
+ }
+
+ auto pruneKeysCallable = isOrdered ? "PruneAdjacentKeys" : "PruneKeys";
+ YQL_CLOG(DEBUG, Core) << "Add " << pruneKeysCallable << " to EquiJoin input #" << i << ", label " << scope.Ref().Content();
+ children.push_back(ctx.Builder(child.Pos())
+ .List()
+ .Callable(0, pruneKeysCallable)
+ .Add(0, list.Ptr())
+ .Lambda(1)
+ .Param("item")
+ .List(0)
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder & {
+ ui32 i = 0;
+ for (const auto& column : itemNames->second) {
+ parent.Callable(i++, "Member")
+ .Arg(0, "item")
+ .Atom(1, column)
+ .Seal();
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Seal()
+ .Add(1, scope.Ptr())
+ .Seal()
+ .Build());
+ hasChanges = true;
+ }
+
+ if (!hasChanges) {
+ return node;
+ }
+
+ children.push_back(equiJoin.Arg(equiJoin.ArgCount() - 2).Ptr());
+ children.push_back(AddSetting(
+ equiJoin.Arg(equiJoin.ArgCount() - 1).Ref(),
+ equiJoin.Arg(equiJoin.ArgCount() - 1).Pos(),
+ "prune_keys_added",
+ nullptr,
+ ctx));
+ return ctx.ChangeChildren(*node, std::move(children));
};
map["ExtractMembers"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
diff --git a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp
index 5755bf1dbf5..28ee1ead8f5 100644
--- a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp
+++ b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp
@@ -2760,19 +2760,11 @@ TExprNode::TPtr ExpandListHas(const TExprNode::TPtr& input, TExprContext& ctx) {
return RewriteSearchByKeyForTypesMismatch<true, true>(input, ctx);
}
-TExprNode::TPtr ExpandPruneAdjacentKeys(const TExprNode::TPtr& input, TExprContext& ctx) {
- const auto type = input->Head().GetTypeAnn();
- const auto& keyExtractorLambda = input->ChildRef(1);
-
- YQL_ENSURE(type->GetKind() == ETypeAnnotationKind::List || type->GetKind() == ETypeAnnotationKind::Stream);
-
- const auto elemType = type->GetKind() == ETypeAnnotationKind::List
- ? type->Cast<TListExprType>()->GetItemType()
- : type->Cast<TStreamExprType>()->GetItemType();
- const auto optionalElemType = *ctx.MakeType<TOptionalExprType>(elemType);
+TExprNode::TPtr ExpandPruneAdjacentKeys(const TExprNode::TPtr& input, TExprContext& ctx, TTypeAnnotationContext& /*typesCtx*/) {
+ const auto& keyExtractorLambda = input->ChildPtr(1);
YQL_CLOG(DEBUG, CorePeepHole) << "Expand " << input->Content();
- return ctx.Builder(input->Pos())
+ return KeepConstraints(ctx.Builder(input->Pos())
.Callable("OrderedFlatMap")
.Callable(0, "Fold1Map")
.Add(0, input->HeadPtr())
@@ -2799,7 +2791,11 @@ TExprNode::TPtr ExpandPruneAdjacentKeys(const TExprNode::TPtr& input, TExprConte
.Seal()
.Seal()
.Callable(1, "Nothing")
- .Add(0, ExpandType(input->Pos(), optionalElemType, ctx))
+ .Callable(0, "OptionalType")
+ .Callable(0, "TypeOf")
+ .Arg(0, "item")
+ .Seal()
+ .Seal()
.Seal()
.Callable(2, "Just")
.Arg(0, "item")
@@ -2814,13 +2810,16 @@ TExprNode::TPtr ExpandPruneAdjacentKeys(const TExprNode::TPtr& input, TExprConte
.Arg(0, "item")
.Seal()
.Seal()
- .Build();
+ .Build(), *input, ctx);
}
-TExprNode::TPtr ExpandPruneKeys(const TExprNode::TPtr& input, TExprContext& ctx) {
+TExprNode::TPtr ExpandPruneKeys(const TExprNode::TPtr& input, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {
const auto type = input->Head().GetTypeAnn();
- const auto& keyExtractorLambda = input->ChildRef(1);
- YQL_ENSURE(type->GetKind() == ETypeAnnotationKind::List || type->GetKind() == ETypeAnnotationKind::Stream);
+ auto keyExtractorLambda = input->ChildPtr(1);
+
+ YQL_ENSURE(type->GetKind() == ETypeAnnotationKind::Flow
+ || type->GetKind() == ETypeAnnotationKind::List
+ || type->GetKind() == ETypeAnnotationKind::Stream);
auto initHandler = ctx.Builder(input->Pos())
.Lambda()
@@ -2865,6 +2864,39 @@ TExprNode::TPtr ExpandPruneKeys(const TExprNode::TPtr& input, TExprContext& ctx)
.Seal()
.Build();
} else {
+ // Slight copy of GetDictionaryKeyTypes to check if keyExtractorLambda result type is complicated
+ // mkql CombineCore supports only simple types; for others we should add pickling
+ bool keyExtractorLambdaShouldBePickled = false;
+ auto itemType = keyExtractorLambda->GetTypeAnn();
+ if (itemType->GetKind() == ETypeAnnotationKind::Optional) {
+ itemType = itemType->Cast<TOptionalExprType>()->GetItemType();
+ }
+
+ if (itemType->GetKind() == ETypeAnnotationKind::Tuple) {
+ auto tuple = itemType->Cast<TTupleExprType>();
+ for (const auto& item : tuple->GetItems()) {
+ if (!IsDataOrOptionalOfData(item)) {
+ keyExtractorLambdaShouldBePickled = true;
+ break;
+ }
+ }
+ } else if (itemType->GetKind() != ETypeAnnotationKind::Data) {
+ keyExtractorLambdaShouldBePickled = true;
+ }
+
+ if (keyExtractorLambdaShouldBePickled) {
+ keyExtractorLambda = ctx.Builder(input->Pos())
+ .Lambda()
+ .Param("item")
+ .Callable(0, "StablePickle")
+ .Apply(0, keyExtractorLambda)
+ .With(0, "item")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
return ctx.Builder(input->Pos())
.Callable("CombineCore")
.Add(0, input->HeadPtr())
@@ -2872,6 +2904,7 @@ TExprNode::TPtr ExpandPruneKeys(const TExprNode::TPtr& input, TExprContext& ctx)
.Add(2, initHandler)
.Add(3, updateHandler)
.Add(4, finishHandler)
+ .Atom(5, ToString(typesCtx.PruneKeysMemLimit))
.Seal()
.Build();
}
@@ -8906,8 +8939,6 @@ struct TPeepHoleRules {
{"CheckedDiv", &ExpandCheckedDiv},
{"CheckedMod", &ExpandCheckedMod},
{"CheckedMinus", &ExpandCheckedMinus},
- {"PruneAdjacentKeys", &ExpandPruneAdjacentKeys},
- {"PruneKeys", &ExpandPruneKeys},
{"JsonValue", &ExpandJsonValue},
{"JsonExists", &ExpandJsonExists},
{"EmptyIterator", &DropDependsOnFromEmptyIterator},
@@ -8926,6 +8957,8 @@ struct TPeepHoleRules {
{"CostsOf", &ExpandCostsOf},
{"JsonQuery", &ExpandJsonQuery},
{"MatchRecognize", &ExpandMatchRecognize},
+ {"PruneAdjacentKeys", &ExpandPruneAdjacentKeys},
+ {"PruneKeys", &ExpandPruneKeys},
{"CalcOverWindow", &ExpandCalcOverWindow},
{"CalcOverSessionWindow", &ExpandCalcOverWindow},
{"CalcOverWindowGroup", &ExpandCalcOverWindow},
diff --git a/yql/essentials/core/yql_expr_constraint.cpp b/yql/essentials/core/yql_expr_constraint.cpp
index e67557d9645..7fcf560a2cf 100644
--- a/yql/essentials/core/yql_expr_constraint.cpp
+++ b/yql/essentials/core/yql_expr_constraint.cpp
@@ -822,9 +822,23 @@ private:
}
if constexpr (Adjacent) {
- return CopyAllFrom<0>(input, output, ctx);
+ if (const auto status = CopyAllFrom<0>(input, output, ctx); status != TStatus::Ok) {
+ return status;
+ }
+
+ TPartOfConstraintBase::TSetType keys = GetPathsToKeys<true>(input->Child(1)->Tail(), input->Child(1)->Head().Head());
+ TPartOfConstraintBase::TSetOfSetsType uniqueKeys;
+ for (const auto& elem : keys) {
+ uniqueKeys.insert(TPartOfConstraintBase::TSetType{elem});
+ }
+ if (!keys.empty()) {
+ input->AddConstraint(ctx.MakeConstraint<TUniqueConstraintNode>(TUniqueConstraintNode::TContentType{uniqueKeys}));
+ input->AddConstraint(ctx.MakeConstraint<TDistinctConstraintNode>(TDistinctConstraintNode::TContentType{uniqueKeys}));
+ }
+ return TStatus::Ok;
}
- return FromFirst<TEmptyConstraintNode, TUniqueConstraintNode, TPartOfUniqueConstraintNode, TDistinctConstraintNode, TPartOfDistinctConstraintNode>(input, output, ctx);
+
+ return FromFirst<TEmptyConstraintNode>(input, output, ctx);
}
template<class TConstraint>
diff --git a/yql/essentials/core/yql_join.cpp b/yql/essentials/core/yql_join.cpp
index 7cca45604d1..5ce3fba268b 100644
--- a/yql/essentials/core/yql_join.cpp
+++ b/yql/essentials/core/yql_join.cpp
@@ -820,6 +820,10 @@ IGraphTransformer::TStatus ValidateEquiJoinOptions(TPositionHandle positionHandl
// do nothing
} else if (optionName == "multiple_joins") {
// do nothing
+ } else if (optionName == "prune_keys_added") {
+ if (!EnsureTupleSize(*child, 1, ctx)) {
+ return IGraphTransformer::TStatus::Error;
+ }
} else {
YQL_ENSURE(false, "Cached join option '" << optionName << "' not handled");
}
@@ -2007,7 +2011,7 @@ void GatherJoinInputs(const TExprNode::TPtr& expr, const TExprNode& row,
}
bool IsCachedJoinOption(TStringBuf name) {
- static THashSet<TStringBuf> CachedJoinOptions = {"preferred_sort", "cbo_passed", "multiple_joins"};
+ static THashSet<TStringBuf> CachedJoinOptions = {"preferred_sort", "cbo_passed", "multiple_joins", "prune_keys_added"};
return CachedJoinOptions.contains(name);
}
@@ -2016,4 +2020,31 @@ bool IsCachedJoinLinkOption(TStringBuf name) {
return CachedJoinLinkOptions.contains(name);
}
+void GetPruneKeysColumnsForJoinLeaves(const TCoEquiJoinTuple& joinTree, THashMap<TStringBuf, THashSet<TStringBuf>>& columnsForPruneKeysExtractor) {
+ auto settings = GetEquiJoinLinkSettings(joinTree.Options().Ref());
+ TStringBuf joinKind = joinTree.Type().Value();
+
+ auto left = joinTree.LeftScope();
+ if (!left.Maybe<TCoAtom>()) {
+ GetPruneKeysColumnsForJoinLeaves(left.Cast<TCoEquiJoinTuple>(), columnsForPruneKeysExtractor);
+ } else {
+ if (joinKind == "RightSemi" || joinKind == "RightOnly" || settings.LeftHints.contains("any")) {
+ if (!settings.LeftHints.contains("unique")) {
+ CollectEquiJoinKeyColumnsFromLeaf(joinTree.LeftKeys().Ref(), columnsForPruneKeysExtractor);
+ }
+ }
+ }
+
+ auto right = joinTree.RightScope();
+ if (!right.Maybe<TCoAtom>()) {
+ GetPruneKeysColumnsForJoinLeaves(right.Cast<TCoEquiJoinTuple>(), columnsForPruneKeysExtractor);
+ } else {
+ if (joinKind == "LeftSemi" || joinKind == "LeftOnly" || settings.RightHints.contains("any")) {
+ if (!settings.RightHints.contains("unique")) {
+ CollectEquiJoinKeyColumnsFromLeaf(joinTree.RightKeys().Ref(), columnsForPruneKeysExtractor);
+ }
+ }
+ }
+}
+
} // namespace NYql
diff --git a/yql/essentials/core/yql_join.h b/yql/essentials/core/yql_join.h
index a313aceefa3..26b417cf542 100644
--- a/yql/essentials/core/yql_join.h
+++ b/yql/essentials/core/yql_join.h
@@ -180,4 +180,6 @@ void GatherJoinInputs(const TExprNode::TPtr& expr, const TExprNode& row,
bool IsCachedJoinOption(TStringBuf name);
bool IsCachedJoinLinkOption(TStringBuf name);
+void GetPruneKeysColumnsForJoinLeaves(const NNodes::TCoEquiJoinTuple& joinTree, THashMap<TStringBuf, THashSet<TStringBuf>>& columnsForPruneKeysExtractor);
+
}
diff --git a/yql/essentials/core/yql_type_annotation.h b/yql/essentials/core/yql_type_annotation.h
index bc09c963f4b..a43650a01f4 100644
--- a/yql/essentials/core/yql_type_annotation.h
+++ b/yql/essentials/core/yql_type_annotation.h
@@ -452,6 +452,7 @@ struct TTypeAnnotationContext: public TThrRefBase {
THashSet<TString> PeepholeFlags;
bool StreamLookupJoin = false;
ui32 MaxAggPushdownPredicates = 6; // algorithm complexity is O(2^N)
+ ui32 PruneKeysMemLimit = 128 * 1024 * 1024;
TMaybe<TColumnOrder> LookupColumnOrder(const TExprNode& node) const;
IGraphTransformer::TStatus SetColumnOrder(const TExprNode& node, const TColumnOrder& columnOrder, TExprContext& ctx);