diff options
author | mpereskokova <[email protected]> | 2025-04-17 07:34:43 +0300 |
---|---|---|
committer | mpereskokova <[email protected]> | 2025-04-17 07:49:22 +0300 |
commit | 9b44be1dd627cdc45103e07c3186eda2a400d8dd (patch) | |
tree | ecb8b7f238380035708e34ee15b7df898ea55dc4 /yql/essentials/core | |
parent | bd352209a921603d0bb9876b7bf6f72cee7be91e (diff) |
Add pruneKeys in EquiJoin
commit_hash:24b52143fbef864df48f3359b14f9e0294f367f5
Diffstat (limited to 'yql/essentials/core')
-rw-r--r-- | yql/essentials/core/common_opt/yql_co_flow2.cpp | 95 | ||||
-rw-r--r-- | yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp | 69 | ||||
-rw-r--r-- | yql/essentials/core/yql_expr_constraint.cpp | 18 | ||||
-rw-r--r-- | yql/essentials/core/yql_join.cpp | 33 | ||||
-rw-r--r-- | yql/essentials/core/yql_join.h | 2 | ||||
-rw-r--r-- | yql/essentials/core/yql_type_annotation.h | 1 |
6 files changed, 196 insertions, 22 deletions
diff --git a/yql/essentials/core/common_opt/yql_co_flow2.cpp b/yql/essentials/core/common_opt/yql_co_flow2.cpp index 7d521b2ac47..8ebb5243ffa 100644 --- a/yql/essentials/core/common_opt/yql_co_flow2.cpp +++ b/yql/essentials/core/common_opt/yql_co_flow2.cpp @@ -2027,7 +2027,100 @@ void RegisterCoFlowCallables2(TCallableOptimizerMap& map) { return ret; } - return node; + // Add PruneKeys to EquiJoin + static const char optName[] = "EmitPruneKeys"; + if (!IsOptimizerEnabled<optName>(*optCtx.Types) || IsOptimizerDisabled<optName>(*optCtx.Types)) { + return node; + } + auto equiJoin = TCoEquiJoin(node); + if (HasSetting(equiJoin.Arg(equiJoin.ArgCount() - 1).Ref(), "prune_keys_added")) { + return node; + } + + THashMap<TStringBuf, THashSet<TStringBuf>> columnsForPruneKeysExtractor; + GetPruneKeysColumnsForJoinLeaves(equiJoin.Arg(equiJoin.ArgCount() - 2).Cast<TCoEquiJoinTuple>(), columnsForPruneKeysExtractor); + + TExprNode::TListType children; + bool hasChanges = false; + for (size_t i = 0; i + 2 < equiJoin.ArgCount(); ++i) { + auto child = equiJoin.Arg(i).Cast<TCoEquiJoinInput>(); + auto list = child.List(); + auto scope = child.Scope(); + + if (!scope.Ref().IsAtom()) { + children.push_back(equiJoin.Arg(i).Ptr()); + continue; + } + + auto itemNames = columnsForPruneKeysExtractor.find(scope.Ref().Content()); + if (itemNames == columnsForPruneKeysExtractor.end() || itemNames->second.empty()) { + children.push_back(equiJoin.Arg(i).Ptr()); + continue; + } + + if (auto distinct = list.Ref().GetConstraint<TDistinctConstraintNode>()) { + if (distinct->ContainsCompleteSet(std::vector<std::string_view>(itemNames->second.cbegin(), itemNames->second.cend()))) { + children.push_back(equiJoin.Arg(i).Ptr()); + continue; + } + } + + bool isOrdered = false; + if (auto sorted = list.Ref().GetConstraint<TSortedConstraintNode>()) { + for (const auto& item : sorted->GetContent()) { + size_t foundItemNamesCount = 0; + for (const auto& path : item.first) { + if (itemNames->second.contains(path.front())) { + foundItemNamesCount++; + } + } + if (foundItemNamesCount == itemNames->second.size()) { + isOrdered = true; + break; + } + } + } + + auto pruneKeysCallable = isOrdered ? "PruneAdjacentKeys" : "PruneKeys"; + YQL_CLOG(DEBUG, Core) << "Add " << pruneKeysCallable << " to EquiJoin input #" << i << ", label " << scope.Ref().Content(); + children.push_back(ctx.Builder(child.Pos()) + .List() + .Callable(0, pruneKeysCallable) + .Add(0, list.Ptr()) + .Lambda(1) + .Param("item") + .List(0) + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder & { + ui32 i = 0; + for (const auto& column : itemNames->second) { + parent.Callable(i++, "Member") + .Arg(0, "item") + .Atom(1, column) + .Seal(); + } + return parent; + }) + .Seal() + .Seal() + .Seal() + .Add(1, scope.Ptr()) + .Seal() + .Build()); + hasChanges = true; + } + + if (!hasChanges) { + return node; + } + + children.push_back(equiJoin.Arg(equiJoin.ArgCount() - 2).Ptr()); + children.push_back(AddSetting( + equiJoin.Arg(equiJoin.ArgCount() - 1).Ref(), + equiJoin.Arg(equiJoin.ArgCount() - 1).Pos(), + "prune_keys_added", + nullptr, + ctx)); + return ctx.ChangeChildren(*node, std::move(children)); }; map["ExtractMembers"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) { diff --git a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp index 5755bf1dbf5..28ee1ead8f5 100644 --- a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -2760,19 +2760,11 @@ TExprNode::TPtr ExpandListHas(const TExprNode::TPtr& input, TExprContext& ctx) { return RewriteSearchByKeyForTypesMismatch<true, true>(input, ctx); } -TExprNode::TPtr ExpandPruneAdjacentKeys(const TExprNode::TPtr& input, TExprContext& ctx) { - const auto type = input->Head().GetTypeAnn(); - const auto& keyExtractorLambda = input->ChildRef(1); - - YQL_ENSURE(type->GetKind() == ETypeAnnotationKind::List || type->GetKind() == ETypeAnnotationKind::Stream); - - const auto elemType = type->GetKind() == ETypeAnnotationKind::List - ? type->Cast<TListExprType>()->GetItemType() - : type->Cast<TStreamExprType>()->GetItemType(); - const auto optionalElemType = *ctx.MakeType<TOptionalExprType>(elemType); +TExprNode::TPtr ExpandPruneAdjacentKeys(const TExprNode::TPtr& input, TExprContext& ctx, TTypeAnnotationContext& /*typesCtx*/) { + const auto& keyExtractorLambda = input->ChildPtr(1); YQL_CLOG(DEBUG, CorePeepHole) << "Expand " << input->Content(); - return ctx.Builder(input->Pos()) + return KeepConstraints(ctx.Builder(input->Pos()) .Callable("OrderedFlatMap") .Callable(0, "Fold1Map") .Add(0, input->HeadPtr()) @@ -2799,7 +2791,11 @@ TExprNode::TPtr ExpandPruneAdjacentKeys(const TExprNode::TPtr& input, TExprConte .Seal() .Seal() .Callable(1, "Nothing") - .Add(0, ExpandType(input->Pos(), optionalElemType, ctx)) + .Callable(0, "OptionalType") + .Callable(0, "TypeOf") + .Arg(0, "item") + .Seal() + .Seal() .Seal() .Callable(2, "Just") .Arg(0, "item") @@ -2814,13 +2810,16 @@ TExprNode::TPtr ExpandPruneAdjacentKeys(const TExprNode::TPtr& input, TExprConte .Arg(0, "item") .Seal() .Seal() - .Build(); + .Build(), *input, ctx); } -TExprNode::TPtr ExpandPruneKeys(const TExprNode::TPtr& input, TExprContext& ctx) { +TExprNode::TPtr ExpandPruneKeys(const TExprNode::TPtr& input, TExprContext& ctx, TTypeAnnotationContext& typesCtx) { const auto type = input->Head().GetTypeAnn(); - const auto& keyExtractorLambda = input->ChildRef(1); - YQL_ENSURE(type->GetKind() == ETypeAnnotationKind::List || type->GetKind() == ETypeAnnotationKind::Stream); + auto keyExtractorLambda = input->ChildPtr(1); + + YQL_ENSURE(type->GetKind() == ETypeAnnotationKind::Flow + || type->GetKind() == ETypeAnnotationKind::List + || type->GetKind() == ETypeAnnotationKind::Stream); auto initHandler = ctx.Builder(input->Pos()) .Lambda() @@ -2865,6 +2864,39 @@ TExprNode::TPtr ExpandPruneKeys(const TExprNode::TPtr& input, TExprContext& ctx) .Seal() .Build(); } else { + // Slight copy of GetDictionaryKeyTypes to check if keyExtractorLambda result type is complicated + // mkql CombineCore supports only simple types; for others we should add pickling + bool keyExtractorLambdaShouldBePickled = false; + auto itemType = keyExtractorLambda->GetTypeAnn(); + if (itemType->GetKind() == ETypeAnnotationKind::Optional) { + itemType = itemType->Cast<TOptionalExprType>()->GetItemType(); + } + + if (itemType->GetKind() == ETypeAnnotationKind::Tuple) { + auto tuple = itemType->Cast<TTupleExprType>(); + for (const auto& item : tuple->GetItems()) { + if (!IsDataOrOptionalOfData(item)) { + keyExtractorLambdaShouldBePickled = true; + break; + } + } + } else if (itemType->GetKind() != ETypeAnnotationKind::Data) { + keyExtractorLambdaShouldBePickled = true; + } + + if (keyExtractorLambdaShouldBePickled) { + keyExtractorLambda = ctx.Builder(input->Pos()) + .Lambda() + .Param("item") + .Callable(0, "StablePickle") + .Apply(0, keyExtractorLambda) + .With(0, "item") + .Seal() + .Seal() + .Seal() + .Build(); + } + return ctx.Builder(input->Pos()) .Callable("CombineCore") .Add(0, input->HeadPtr()) @@ -2872,6 +2904,7 @@ TExprNode::TPtr ExpandPruneKeys(const TExprNode::TPtr& input, TExprContext& ctx) .Add(2, initHandler) .Add(3, updateHandler) .Add(4, finishHandler) + .Atom(5, ToString(typesCtx.PruneKeysMemLimit)) .Seal() .Build(); } @@ -8906,8 +8939,6 @@ struct TPeepHoleRules { {"CheckedDiv", &ExpandCheckedDiv}, {"CheckedMod", &ExpandCheckedMod}, {"CheckedMinus", &ExpandCheckedMinus}, - {"PruneAdjacentKeys", &ExpandPruneAdjacentKeys}, - {"PruneKeys", &ExpandPruneKeys}, {"JsonValue", &ExpandJsonValue}, {"JsonExists", &ExpandJsonExists}, {"EmptyIterator", &DropDependsOnFromEmptyIterator}, @@ -8926,6 +8957,8 @@ struct TPeepHoleRules { {"CostsOf", &ExpandCostsOf}, {"JsonQuery", &ExpandJsonQuery}, {"MatchRecognize", &ExpandMatchRecognize}, + {"PruneAdjacentKeys", &ExpandPruneAdjacentKeys}, + {"PruneKeys", &ExpandPruneKeys}, {"CalcOverWindow", &ExpandCalcOverWindow}, {"CalcOverSessionWindow", &ExpandCalcOverWindow}, {"CalcOverWindowGroup", &ExpandCalcOverWindow}, diff --git a/yql/essentials/core/yql_expr_constraint.cpp b/yql/essentials/core/yql_expr_constraint.cpp index e67557d9645..7fcf560a2cf 100644 --- a/yql/essentials/core/yql_expr_constraint.cpp +++ b/yql/essentials/core/yql_expr_constraint.cpp @@ -822,9 +822,23 @@ private: } if constexpr (Adjacent) { - return CopyAllFrom<0>(input, output, ctx); + if (const auto status = CopyAllFrom<0>(input, output, ctx); status != TStatus::Ok) { + return status; + } + + TPartOfConstraintBase::TSetType keys = GetPathsToKeys<true>(input->Child(1)->Tail(), input->Child(1)->Head().Head()); + TPartOfConstraintBase::TSetOfSetsType uniqueKeys; + for (const auto& elem : keys) { + uniqueKeys.insert(TPartOfConstraintBase::TSetType{elem}); + } + if (!keys.empty()) { + input->AddConstraint(ctx.MakeConstraint<TUniqueConstraintNode>(TUniqueConstraintNode::TContentType{uniqueKeys})); + input->AddConstraint(ctx.MakeConstraint<TDistinctConstraintNode>(TDistinctConstraintNode::TContentType{uniqueKeys})); + } + return TStatus::Ok; } - return FromFirst<TEmptyConstraintNode, TUniqueConstraintNode, TPartOfUniqueConstraintNode, TDistinctConstraintNode, TPartOfDistinctConstraintNode>(input, output, ctx); + + return FromFirst<TEmptyConstraintNode>(input, output, ctx); } template<class TConstraint> diff --git a/yql/essentials/core/yql_join.cpp b/yql/essentials/core/yql_join.cpp index 7cca45604d1..5ce3fba268b 100644 --- a/yql/essentials/core/yql_join.cpp +++ b/yql/essentials/core/yql_join.cpp @@ -820,6 +820,10 @@ IGraphTransformer::TStatus ValidateEquiJoinOptions(TPositionHandle positionHandl // do nothing } else if (optionName == "multiple_joins") { // do nothing + } else if (optionName == "prune_keys_added") { + if (!EnsureTupleSize(*child, 1, ctx)) { + return IGraphTransformer::TStatus::Error; + } } else { YQL_ENSURE(false, "Cached join option '" << optionName << "' not handled"); } @@ -2007,7 +2011,7 @@ void GatherJoinInputs(const TExprNode::TPtr& expr, const TExprNode& row, } bool IsCachedJoinOption(TStringBuf name) { - static THashSet<TStringBuf> CachedJoinOptions = {"preferred_sort", "cbo_passed", "multiple_joins"}; + static THashSet<TStringBuf> CachedJoinOptions = {"preferred_sort", "cbo_passed", "multiple_joins", "prune_keys_added"}; return CachedJoinOptions.contains(name); } @@ -2016,4 +2020,31 @@ bool IsCachedJoinLinkOption(TStringBuf name) { return CachedJoinLinkOptions.contains(name); } +void GetPruneKeysColumnsForJoinLeaves(const TCoEquiJoinTuple& joinTree, THashMap<TStringBuf, THashSet<TStringBuf>>& columnsForPruneKeysExtractor) { + auto settings = GetEquiJoinLinkSettings(joinTree.Options().Ref()); + TStringBuf joinKind = joinTree.Type().Value(); + + auto left = joinTree.LeftScope(); + if (!left.Maybe<TCoAtom>()) { + GetPruneKeysColumnsForJoinLeaves(left.Cast<TCoEquiJoinTuple>(), columnsForPruneKeysExtractor); + } else { + if (joinKind == "RightSemi" || joinKind == "RightOnly" || settings.LeftHints.contains("any")) { + if (!settings.LeftHints.contains("unique")) { + CollectEquiJoinKeyColumnsFromLeaf(joinTree.LeftKeys().Ref(), columnsForPruneKeysExtractor); + } + } + } + + auto right = joinTree.RightScope(); + if (!right.Maybe<TCoAtom>()) { + GetPruneKeysColumnsForJoinLeaves(right.Cast<TCoEquiJoinTuple>(), columnsForPruneKeysExtractor); + } else { + if (joinKind == "LeftSemi" || joinKind == "LeftOnly" || settings.RightHints.contains("any")) { + if (!settings.RightHints.contains("unique")) { + CollectEquiJoinKeyColumnsFromLeaf(joinTree.RightKeys().Ref(), columnsForPruneKeysExtractor); + } + } + } +} + } // namespace NYql diff --git a/yql/essentials/core/yql_join.h b/yql/essentials/core/yql_join.h index a313aceefa3..26b417cf542 100644 --- a/yql/essentials/core/yql_join.h +++ b/yql/essentials/core/yql_join.h @@ -180,4 +180,6 @@ void GatherJoinInputs(const TExprNode::TPtr& expr, const TExprNode& row, bool IsCachedJoinOption(TStringBuf name); bool IsCachedJoinLinkOption(TStringBuf name); +void GetPruneKeysColumnsForJoinLeaves(const NNodes::TCoEquiJoinTuple& joinTree, THashMap<TStringBuf, THashSet<TStringBuf>>& columnsForPruneKeysExtractor); + } diff --git a/yql/essentials/core/yql_type_annotation.h b/yql/essentials/core/yql_type_annotation.h index bc09c963f4b..a43650a01f4 100644 --- a/yql/essentials/core/yql_type_annotation.h +++ b/yql/essentials/core/yql_type_annotation.h @@ -452,6 +452,7 @@ struct TTypeAnnotationContext: public TThrRefBase { THashSet<TString> PeepholeFlags; bool StreamLookupJoin = false; ui32 MaxAggPushdownPredicates = 6; // algorithm complexity is O(2^N) + ui32 PruneKeysMemLimit = 128 * 1024 * 1024; TMaybe<TColumnOrder> LookupColumnOrder(const TExprNode& node) const; IGraphTransformer::TStatus SetColumnOrder(const TExprNode& node, const TColumnOrder& columnOrder, TExprContext& ctx); |