diff options
author | mpereskokova <mpereskokova@yandex-team.com> | 2025-04-17 07:34:43 +0300 |
---|---|---|
committer | mpereskokova <mpereskokova@yandex-team.com> | 2025-04-17 07:49:22 +0300 |
commit | 9b44be1dd627cdc45103e07c3186eda2a400d8dd (patch) | |
tree | ecb8b7f238380035708e34ee15b7df898ea55dc4 | |
parent | bd352209a921603d0bb9876b7bf6f72cee7be91e (diff) | |
download | ydb-9b44be1dd627cdc45103e07c3186eda2a400d8dd.tar.gz |
Add pruneKeys in EquiJoin
commit_hash:24b52143fbef864df48f3359b14f9e0294f367f5
19 files changed, 626 insertions, 33 deletions
diff --git a/yql/essentials/core/common_opt/yql_co_flow2.cpp b/yql/essentials/core/common_opt/yql_co_flow2.cpp index 7d521b2ac47..8ebb5243ffa 100644 --- a/yql/essentials/core/common_opt/yql_co_flow2.cpp +++ b/yql/essentials/core/common_opt/yql_co_flow2.cpp @@ -2027,7 +2027,100 @@ void RegisterCoFlowCallables2(TCallableOptimizerMap& map) { return ret; } - return node; + // Add PruneKeys to EquiJoin + static const char optName[] = "EmitPruneKeys"; + if (!IsOptimizerEnabled<optName>(*optCtx.Types) || IsOptimizerDisabled<optName>(*optCtx.Types)) { + return node; + } + auto equiJoin = TCoEquiJoin(node); + if (HasSetting(equiJoin.Arg(equiJoin.ArgCount() - 1).Ref(), "prune_keys_added")) { + return node; + } + + THashMap<TStringBuf, THashSet<TStringBuf>> columnsForPruneKeysExtractor; + GetPruneKeysColumnsForJoinLeaves(equiJoin.Arg(equiJoin.ArgCount() - 2).Cast<TCoEquiJoinTuple>(), columnsForPruneKeysExtractor); + + TExprNode::TListType children; + bool hasChanges = false; + for (size_t i = 0; i + 2 < equiJoin.ArgCount(); ++i) { + auto child = equiJoin.Arg(i).Cast<TCoEquiJoinInput>(); + auto list = child.List(); + auto scope = child.Scope(); + + if (!scope.Ref().IsAtom()) { + children.push_back(equiJoin.Arg(i).Ptr()); + continue; + } + + auto itemNames = columnsForPruneKeysExtractor.find(scope.Ref().Content()); + if (itemNames == columnsForPruneKeysExtractor.end() || itemNames->second.empty()) { + children.push_back(equiJoin.Arg(i).Ptr()); + continue; + } + + if (auto distinct = list.Ref().GetConstraint<TDistinctConstraintNode>()) { + if (distinct->ContainsCompleteSet(std::vector<std::string_view>(itemNames->second.cbegin(), itemNames->second.cend()))) { + children.push_back(equiJoin.Arg(i).Ptr()); + continue; + } + } + + bool isOrdered = false; + if (auto sorted = list.Ref().GetConstraint<TSortedConstraintNode>()) { + for (const auto& item : sorted->GetContent()) { + size_t foundItemNamesCount = 0; + for (const auto& path : item.first) { + if (itemNames->second.contains(path.front())) { + foundItemNamesCount++; + } + } + if (foundItemNamesCount == itemNames->second.size()) { + isOrdered = true; + break; + } + } + } + + auto pruneKeysCallable = isOrdered ? "PruneAdjacentKeys" : "PruneKeys"; + YQL_CLOG(DEBUG, Core) << "Add " << pruneKeysCallable << " to EquiJoin input #" << i << ", label " << scope.Ref().Content(); + children.push_back(ctx.Builder(child.Pos()) + .List() + .Callable(0, pruneKeysCallable) + .Add(0, list.Ptr()) + .Lambda(1) + .Param("item") + .List(0) + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder & { + ui32 i = 0; + for (const auto& column : itemNames->second) { + parent.Callable(i++, "Member") + .Arg(0, "item") + .Atom(1, column) + .Seal(); + } + return parent; + }) + .Seal() + .Seal() + .Seal() + .Add(1, scope.Ptr()) + .Seal() + .Build()); + hasChanges = true; + } + + if (!hasChanges) { + return node; + } + + children.push_back(equiJoin.Arg(equiJoin.ArgCount() - 2).Ptr()); + children.push_back(AddSetting( + equiJoin.Arg(equiJoin.ArgCount() - 1).Ref(), + equiJoin.Arg(equiJoin.ArgCount() - 1).Pos(), + "prune_keys_added", + nullptr, + ctx)); + return ctx.ChangeChildren(*node, std::move(children)); }; map["ExtractMembers"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) { diff --git a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp index 5755bf1dbf5..28ee1ead8f5 100644 --- a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -2760,19 +2760,11 @@ TExprNode::TPtr ExpandListHas(const TExprNode::TPtr& input, TExprContext& ctx) { return RewriteSearchByKeyForTypesMismatch<true, true>(input, ctx); } -TExprNode::TPtr ExpandPruneAdjacentKeys(const TExprNode::TPtr& input, TExprContext& ctx) { - const auto type = input->Head().GetTypeAnn(); - const auto& keyExtractorLambda = input->ChildRef(1); - - YQL_ENSURE(type->GetKind() == ETypeAnnotationKind::List || type->GetKind() == ETypeAnnotationKind::Stream); - - const auto elemType = type->GetKind() == ETypeAnnotationKind::List - ? type->Cast<TListExprType>()->GetItemType() - : type->Cast<TStreamExprType>()->GetItemType(); - const auto optionalElemType = *ctx.MakeType<TOptionalExprType>(elemType); +TExprNode::TPtr ExpandPruneAdjacentKeys(const TExprNode::TPtr& input, TExprContext& ctx, TTypeAnnotationContext& /*typesCtx*/) { + const auto& keyExtractorLambda = input->ChildPtr(1); YQL_CLOG(DEBUG, CorePeepHole) << "Expand " << input->Content(); - return ctx.Builder(input->Pos()) + return KeepConstraints(ctx.Builder(input->Pos()) .Callable("OrderedFlatMap") .Callable(0, "Fold1Map") .Add(0, input->HeadPtr()) @@ -2799,7 +2791,11 @@ TExprNode::TPtr ExpandPruneAdjacentKeys(const TExprNode::TPtr& input, TExprConte .Seal() .Seal() .Callable(1, "Nothing") - .Add(0, ExpandType(input->Pos(), optionalElemType, ctx)) + .Callable(0, "OptionalType") + .Callable(0, "TypeOf") + .Arg(0, "item") + .Seal() + .Seal() .Seal() .Callable(2, "Just") .Arg(0, "item") @@ -2814,13 +2810,16 @@ TExprNode::TPtr ExpandPruneAdjacentKeys(const TExprNode::TPtr& input, TExprConte .Arg(0, "item") .Seal() .Seal() - .Build(); + .Build(), *input, ctx); } -TExprNode::TPtr ExpandPruneKeys(const TExprNode::TPtr& input, TExprContext& ctx) { +TExprNode::TPtr ExpandPruneKeys(const TExprNode::TPtr& input, TExprContext& ctx, TTypeAnnotationContext& typesCtx) { const auto type = input->Head().GetTypeAnn(); - const auto& keyExtractorLambda = input->ChildRef(1); - YQL_ENSURE(type->GetKind() == ETypeAnnotationKind::List || type->GetKind() == ETypeAnnotationKind::Stream); + auto keyExtractorLambda = input->ChildPtr(1); + + YQL_ENSURE(type->GetKind() == ETypeAnnotationKind::Flow + || type->GetKind() == ETypeAnnotationKind::List + || type->GetKind() == ETypeAnnotationKind::Stream); auto initHandler = ctx.Builder(input->Pos()) .Lambda() @@ -2865,6 +2864,39 @@ TExprNode::TPtr ExpandPruneKeys(const TExprNode::TPtr& input, TExprContext& ctx) .Seal() .Build(); } else { + // Slight copy of GetDictionaryKeyTypes to check if keyExtractorLambda result type is complicated + // mkql CombineCore supports only simple types; for others we should add pickling + bool keyExtractorLambdaShouldBePickled = false; + auto itemType = keyExtractorLambda->GetTypeAnn(); + if (itemType->GetKind() == ETypeAnnotationKind::Optional) { + itemType = itemType->Cast<TOptionalExprType>()->GetItemType(); + } + + if (itemType->GetKind() == ETypeAnnotationKind::Tuple) { + auto tuple = itemType->Cast<TTupleExprType>(); + for (const auto& item : tuple->GetItems()) { + if (!IsDataOrOptionalOfData(item)) { + keyExtractorLambdaShouldBePickled = true; + break; + } + } + } else if (itemType->GetKind() != ETypeAnnotationKind::Data) { + keyExtractorLambdaShouldBePickled = true; + } + + if (keyExtractorLambdaShouldBePickled) { + keyExtractorLambda = ctx.Builder(input->Pos()) + .Lambda() + .Param("item") + .Callable(0, "StablePickle") + .Apply(0, keyExtractorLambda) + .With(0, "item") + .Seal() + .Seal() + .Seal() + .Build(); + } + return ctx.Builder(input->Pos()) .Callable("CombineCore") .Add(0, input->HeadPtr()) @@ -2872,6 +2904,7 @@ TExprNode::TPtr ExpandPruneKeys(const TExprNode::TPtr& input, TExprContext& ctx) .Add(2, initHandler) .Add(3, updateHandler) .Add(4, finishHandler) + .Atom(5, ToString(typesCtx.PruneKeysMemLimit)) .Seal() .Build(); } @@ -8906,8 +8939,6 @@ struct TPeepHoleRules { {"CheckedDiv", &ExpandCheckedDiv}, {"CheckedMod", &ExpandCheckedMod}, {"CheckedMinus", &ExpandCheckedMinus}, - {"PruneAdjacentKeys", &ExpandPruneAdjacentKeys}, - {"PruneKeys", &ExpandPruneKeys}, {"JsonValue", &ExpandJsonValue}, {"JsonExists", &ExpandJsonExists}, {"EmptyIterator", &DropDependsOnFromEmptyIterator}, @@ -8926,6 +8957,8 @@ struct TPeepHoleRules { {"CostsOf", &ExpandCostsOf}, {"JsonQuery", &ExpandJsonQuery}, {"MatchRecognize", &ExpandMatchRecognize}, + {"PruneAdjacentKeys", &ExpandPruneAdjacentKeys}, + {"PruneKeys", &ExpandPruneKeys}, {"CalcOverWindow", &ExpandCalcOverWindow}, {"CalcOverSessionWindow", &ExpandCalcOverWindow}, {"CalcOverWindowGroup", &ExpandCalcOverWindow}, diff --git a/yql/essentials/core/yql_expr_constraint.cpp b/yql/essentials/core/yql_expr_constraint.cpp index e67557d9645..7fcf560a2cf 100644 --- a/yql/essentials/core/yql_expr_constraint.cpp +++ b/yql/essentials/core/yql_expr_constraint.cpp @@ -822,9 +822,23 @@ private: } if constexpr (Adjacent) { - return CopyAllFrom<0>(input, output, ctx); + if (const auto status = CopyAllFrom<0>(input, output, ctx); status != TStatus::Ok) { + return status; + } + + TPartOfConstraintBase::TSetType keys = GetPathsToKeys<true>(input->Child(1)->Tail(), input->Child(1)->Head().Head()); + TPartOfConstraintBase::TSetOfSetsType uniqueKeys; + for (const auto& elem : keys) { + uniqueKeys.insert(TPartOfConstraintBase::TSetType{elem}); + } + if (!keys.empty()) { + input->AddConstraint(ctx.MakeConstraint<TUniqueConstraintNode>(TUniqueConstraintNode::TContentType{uniqueKeys})); + input->AddConstraint(ctx.MakeConstraint<TDistinctConstraintNode>(TDistinctConstraintNode::TContentType{uniqueKeys})); + } + return TStatus::Ok; } - return FromFirst<TEmptyConstraintNode, TUniqueConstraintNode, TPartOfUniqueConstraintNode, TDistinctConstraintNode, TPartOfDistinctConstraintNode>(input, output, ctx); + + return FromFirst<TEmptyConstraintNode>(input, output, ctx); } template<class TConstraint> diff --git a/yql/essentials/core/yql_join.cpp b/yql/essentials/core/yql_join.cpp index 7cca45604d1..5ce3fba268b 100644 --- a/yql/essentials/core/yql_join.cpp +++ b/yql/essentials/core/yql_join.cpp @@ -820,6 +820,10 @@ IGraphTransformer::TStatus ValidateEquiJoinOptions(TPositionHandle positionHandl // do nothing } else if (optionName == "multiple_joins") { // do nothing + } else if (optionName == "prune_keys_added") { + if (!EnsureTupleSize(*child, 1, ctx)) { + return IGraphTransformer::TStatus::Error; + } } else { YQL_ENSURE(false, "Cached join option '" << optionName << "' not handled"); } @@ -2007,7 +2011,7 @@ void GatherJoinInputs(const TExprNode::TPtr& expr, const TExprNode& row, } bool IsCachedJoinOption(TStringBuf name) { - static THashSet<TStringBuf> CachedJoinOptions = {"preferred_sort", "cbo_passed", "multiple_joins"}; + static THashSet<TStringBuf> CachedJoinOptions = {"preferred_sort", "cbo_passed", "multiple_joins", "prune_keys_added"}; return CachedJoinOptions.contains(name); } @@ -2016,4 +2020,31 @@ bool IsCachedJoinLinkOption(TStringBuf name) { return CachedJoinLinkOptions.contains(name); } +void GetPruneKeysColumnsForJoinLeaves(const TCoEquiJoinTuple& joinTree, THashMap<TStringBuf, THashSet<TStringBuf>>& columnsForPruneKeysExtractor) { + auto settings = GetEquiJoinLinkSettings(joinTree.Options().Ref()); + TStringBuf joinKind = joinTree.Type().Value(); + + auto left = joinTree.LeftScope(); + if (!left.Maybe<TCoAtom>()) { + GetPruneKeysColumnsForJoinLeaves(left.Cast<TCoEquiJoinTuple>(), columnsForPruneKeysExtractor); + } else { + if (joinKind == "RightSemi" || joinKind == "RightOnly" || settings.LeftHints.contains("any")) { + if (!settings.LeftHints.contains("unique")) { + CollectEquiJoinKeyColumnsFromLeaf(joinTree.LeftKeys().Ref(), columnsForPruneKeysExtractor); + } + } + } + + auto right = joinTree.RightScope(); + if (!right.Maybe<TCoAtom>()) { + GetPruneKeysColumnsForJoinLeaves(right.Cast<TCoEquiJoinTuple>(), columnsForPruneKeysExtractor); + } else { + if (joinKind == "LeftSemi" || joinKind == "LeftOnly" || settings.RightHints.contains("any")) { + if (!settings.RightHints.contains("unique")) { + CollectEquiJoinKeyColumnsFromLeaf(joinTree.RightKeys().Ref(), columnsForPruneKeysExtractor); + } + } + } +} + } // namespace NYql diff --git a/yql/essentials/core/yql_join.h b/yql/essentials/core/yql_join.h index a313aceefa3..26b417cf542 100644 --- a/yql/essentials/core/yql_join.h +++ b/yql/essentials/core/yql_join.h @@ -180,4 +180,6 @@ void GatherJoinInputs(const TExprNode::TPtr& expr, const TExprNode& row, bool IsCachedJoinOption(TStringBuf name); bool IsCachedJoinLinkOption(TStringBuf name); +void GetPruneKeysColumnsForJoinLeaves(const NNodes::TCoEquiJoinTuple& joinTree, THashMap<TStringBuf, THashSet<TStringBuf>>& columnsForPruneKeysExtractor); + } diff --git a/yql/essentials/core/yql_type_annotation.h b/yql/essentials/core/yql_type_annotation.h index bc09c963f4b..a43650a01f4 100644 --- a/yql/essentials/core/yql_type_annotation.h +++ b/yql/essentials/core/yql_type_annotation.h @@ -452,6 +452,7 @@ struct TTypeAnnotationContext: public TThrRefBase { THashSet<TString> PeepholeFlags; bool StreamLookupJoin = false; ui32 MaxAggPushdownPredicates = 6; // algorithm complexity is O(2^N) + ui32 PruneKeysMemLimit = 128 * 1024 * 1024; TMaybe<TColumnOrder> LookupColumnOrder(const TExprNode& node) const; IGraphTransformer::TStatus SetColumnOrder(const TExprNode& node, const TColumnOrder& columnOrder, TExprContext& ctx); diff --git a/yql/essentials/tests/sql/minirun/part5/canondata/result.json b/yql/essentials/tests/sql/minirun/part5/canondata/result.json index 579677b9314..a98d74ebde0 100644 --- a/yql/essentials/tests/sql/minirun/part5/canondata/result.json +++ b/yql/essentials/tests/sql/minirun/part5/canondata/result.json @@ -1044,6 +1044,20 @@ "uri": "https://{canondata_backend}/1936273/19f08c34eba9366d29ee0ffb8eb99e637c34fd97/resource.tar.gz#test.test_join-convert_check_key_mem2-default.txt-Results_/results.txt" } ], + "test.test[join-prune_keys-default.txt-Debug]": [ + { + "checksum": "a706e6bd4285c96ad5120d701109fc82", + "size": 3669, + "uri": "https://{canondata_backend}/1871102/392832e505c55eb371c9d3241b89c96b5a837c8f/resource.tar.gz#test.test_join-prune_keys-default.txt-Debug_/opt.yql" + } + ], + "test.test[join-prune_keys-default.txt-Results]": [ + { + "checksum": "af076a3334031b8cdaa6969f064a4616", + "size": 18160, + "uri": "https://{canondata_backend}/1130705/620da5a4f19baef17c32a4b3c699ec3c3091ada5/resource.tar.gz#test.test_join-prune_keys-default.txt-Results_/results.txt" + } + ], "test.test[json-json_exists/common_syntax-default.txt-Debug]": [ { "checksum": "1559e7b19e1d1827f8f1ea62929effb1", diff --git a/yql/essentials/tests/sql/minirun/part7/canondata/result.json b/yql/essentials/tests/sql/minirun/part7/canondata/result.json index e8c9926ae75..8083b15dac8 100644 --- a/yql/essentials/tests/sql/minirun/part7/canondata/result.json +++ b/yql/essentials/tests/sql/minirun/part7/canondata/result.json @@ -1232,16 +1232,16 @@ ], "test.test[select-prune_keys-default.txt-Debug]": [ { - "checksum": "95e58e469ce10fce0d0d5e55c0cf3baf", - "size": 3292, - "uri": "https://{canondata_backend}/1931696/04008bc01ad4f562f8e03ad2bc296f7a64a78489/resource.tar.gz#test.test_select-prune_keys-default.txt-Debug_/opt.yql" + "checksum": "1dc9319da10b4c1343b6ceb5d0abb5b7", + "size": 3788, + "uri": "https://{canondata_backend}/212715/392992a262a39acb9e6e49f104e5800a3e731eb6/resource.tar.gz#test.test_select-prune_keys-default.txt-Debug_/opt.yql" } ], "test.test[select-prune_keys-default.txt-Results]": [ { - "checksum": "f53b6976b6a5e1ee5e1d331c25963930", - "size": 27670, - "uri": "https://{canondata_backend}/1931696/04008bc01ad4f562f8e03ad2bc296f7a64a78489/resource.tar.gz#test.test_select-prune_keys-default.txt-Results_/results.txt" + "checksum": "46c226ab1c00ed4a9a5b57c6639f15b2", + "size": 30306, + "uri": "https://{canondata_backend}/212715/392992a262a39acb9e6e49f104e5800a3e731eb6/resource.tar.gz#test.test_select-prune_keys-default.txt-Results_/results.txt" } ], "test.test[union-union_positional_mix-default.txt-Debug]": [ diff --git a/yql/essentials/tests/sql/sql2yql/canondata/result.json b/yql/essentials/tests/sql/sql2yql/canondata/result.json index 4cb7e6cc7af..5027c679045 100644 --- a/yql/essentials/tests/sql/sql2yql/canondata/result.json +++ b/yql/essentials/tests/sql/sql2yql/canondata/result.json @@ -3933,6 +3933,13 @@ "uri": "https://{canondata_backend}/1942173/99e88108149e222741552e7e6cddef041d6a2846/resource.tar.gz#test_sql2yql.test_join-left_join_with_self_aggr_/sql.yql" } ], + "test_sql2yql.test[join-prune_keys]": [ + { + "checksum": "a04490e5ef3a567ca13cb1ed88272bd8", + "size": 22535, + "uri": "https://{canondata_backend}/1599023/2a161150407124ac83c2566a6542b19a53abfccb/resource.tar.gz#test_sql2yql.test_join-prune_keys_/sql.yql" + } + ], "test_sql2yql.test[join-yql-19192]": [ { "checksum": "fffdf1cbb40643da9daf9bdf3edec121", @@ -7022,9 +7029,9 @@ ], "test_sql2yql.test[select-prune_keys]": [ { - "checksum": "55346f77548ef19f9a09d2f1d3f6f466", - "size": 17765, - "uri": "https://{canondata_backend}/1871182/906a4c4e540bb8746f8d7595500d4d1c9f664846/resource.tar.gz#test_sql2yql.test_select-prune_keys_/sql.yql" + "checksum": "f7da5706622461ab177712e6c348c61b", + "size": 19536, + "uri": "https://{canondata_backend}/1814674/c8d78993e8e9976f1e3fae2197140afe33195365/resource.tar.gz#test_sql2yql.test_select-prune_keys_/sql.yql" } ], "test_sql2yql.test[select-result_label]": [ @@ -10243,6 +10250,11 @@ "uri": "file://test_sql_format.test_join-left_join_with_self_aggr_/formatted.sql" } ], + "test_sql_format.test[join-prune_keys]": [ + { + "uri": "file://test_sql_format.test_join-prune_keys_/formatted.sql" + } + ], "test_sql_format.test[join-yql-19192]": [ { "uri": "file://test_sql_format.test_join-yql-19192_/formatted.sql" diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_join-prune_keys_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_join-prune_keys_/formatted.sql new file mode 100644 index 00000000000..0f53bcbe441 --- /dev/null +++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_join-prune_keys_/formatted.sql @@ -0,0 +1,238 @@ +PRAGMA config.flags('OptimizerFlags', 'EmitPruneKeys'); + +$a = ( + SELECT + * + FROM + as_table([ + <|x: 1, t: 1|>, + <|x: 1, t: 1|>, + <|x: 1, t: 2|>, + <|x: 3, t: 1|>, + <|x: 3, t: 4|>, + <|x: 3, t: 2|>, + ]) +); + +$b = ( + SELECT + * + FROM + as_table([ + <|x: 1, y: 1|>, + <|x: 1, y: 2|>, + <|x: 1, y: 3|>, + <|x: 1, y: 3|>, + <|x: 2, y: 3|>, + <|x: 2, y: 4|>, + ]) +); + +$c = ( + SELECT + * + FROM + as_table([ + <|x: 1|>, + <|x: 1|>, + <|x: 1|>, + <|x: 1|>, + <|x: 2|>, + <|x: 2|>, + ]) +); + +-- PruneKeys +SELECT + a.* +FROM + $a AS a +WHERE + a.x IN ( + SELECT + x + FROM + $b + ) +; -- PruneKeys + +SELECT + a.* +FROM + $a AS a +WHERE + a.x IN ( + SELECT + /*+ distinct(x) */ x + FROM + $b + ) +; -- nothing + +SELECT + a.* +FROM + $a AS a +WHERE + a.x IN ( + SELECT + x + FROM + $c + ) +; -- PruneKeys + +SELECT + a.* +FROM + $a AS a +LEFT SEMI JOIN + $b AS b +ON + a.x == b.x +; -- PruneKeys(b) + +SELECT + a.* +FROM + $b AS b +RIGHT SEMI JOIN + $a AS a +ON + b.x == a.x +; -- PruneKeys(b) + +SELECT + a.x, + a.t, + b.x +FROM ANY + $a AS a +JOIN + $b AS b +ON + a.x == b.x +; -- PruneKeys(a) + +SELECT + a.x, + a.t, + b.x +FROM + $a AS a +JOIN ANY + $b AS b +ON + a.x == b.x +; -- PruneKeys(b) + +$a_sorted = ( + SELECT + * + FROM + $a + ASSUME ORDER BY + x +); + +$b_sorted = ( + SELECT + * + FROM + $b + ASSUME ORDER BY + x +); + +$c_sorted = ( + SELECT + * + FROM + $c + ASSUME ORDER BY + x +); + +-- PruneAdjacentKeys +SELECT + a.* +FROM + $a AS a +WHERE + a.x IN ( + SELECT + x + FROM + $b_sorted + ) +; -- PruneAdjacentKeys + +SELECT + a.* +FROM + $a AS a +WHERE + a.x IN ( + SELECT + /*+ distinct(x) */ x + FROM + $b_sorted + ) +; -- nothing + +SELECT + a.* +FROM + $a AS a +WHERE + a.x IN ( + SELECT + x + FROM + $c_sorted + ) +; -- PruneAdjacentKeys + +SELECT + a.* +FROM + $a AS a +LEFT SEMI JOIN + $b_sorted AS b +ON + a.x == b.x +; -- PruneAdjacentKeys(b_sorted) + +SELECT + a.* +FROM + $b_sorted AS b +RIGHT SEMI JOIN + $a AS a +ON + b.x == a.x +; -- PruneAdjacentKeys(b_sorted) + +SELECT + a.x, + a.t, + b.x +FROM ANY + $a_sorted AS a +JOIN + $b AS b +ON + a.x == b.x +; -- PruneAdjacentKeys(a_sorted) + +SELECT + a.x, + a.t, + b.x +FROM + $a AS a +JOIN ANY + $b_sorted AS b +ON + a.x == b.x +; -- PruneAdjacentKeys(b_sorted) diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_select-prune_keys_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_select-prune_keys_/formatted.sql index 65a0e7c3a79..80c2fa06403 100644 --- a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_select-prune_keys_/formatted.sql +++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_select-prune_keys_/formatted.sql @@ -29,6 +29,14 @@ SELECT ListLength(Yql::PruneKeys(AsList(1, 1, 1, 3, 3, 3, 3), $mod2)) ; +SELECT + Yql::PruneAdjacentKeys(AsList(NULL, NULL, NULL, 1, 1, 2, 3, 3, 4, 5), $id) +; + +SELECT + Yql::PruneKeys(AsList(1, NULL, 1, NULL, 1, NULL, 1), $id) +; + -- optimize tests $get_a = ($x) -> { RETURN <|a: $x.a|>; diff --git a/yql/essentials/tests/sql/suites/join/prune_keys.sql b/yql/essentials/tests/sql/suites/join/prune_keys.sql new file mode 100644 index 00000000000..9f10de9ace7 --- /dev/null +++ b/yql/essentials/tests/sql/suites/join/prune_keys.sql @@ -0,0 +1,50 @@ +pragma config.flags('OptimizerFlags', 'EmitPruneKeys'); + +$a = select * from as_table([ + <|x:1, t:1|>, + <|x:1, t:1|>, + <|x:1, t:2|>, + <|x:3, t:1|>, + <|x:3, t:4|>, + <|x:3, t:2|>, + ]); + +$b = select * from as_table([ + <|x:1, y:1|>, + <|x:1, y:2|>, + <|x:1, y:3|>, + <|x:1, y:3|>, + <|x:2, y:3|>, + <|x:2, y:4|>, + ]); + +$c = select * from as_table([ + <|x:1|>, + <|x:1|>, + <|x:1|>, + <|x:1|>, + <|x:2|>, + <|x:2|>, + ]); + +-- PruneKeys +select a.* from $a as a where a.x in (select x from $b); -- PruneKeys +select a.* from $a as a where a.x in (select /*+ distinct(x) */ x from $b); -- nothing +select a.* from $a as a where a.x in (select x from $c); -- PruneKeys +select a.* from $a as a left semi join $b as b on a.x = b.x; -- PruneKeys(b) +select a.* from $b as b right semi join $a as a on b.x = a.x; -- PruneKeys(b) +select a.x, a.t, b.x from any $a as a join $b as b on a.x == b.x; -- PruneKeys(a) +select a.x, a.t, b.x from $a as a join any $b as b on a.x == b.x; -- PruneKeys(b) + +$a_sorted = select * from $a assume order by x; +$b_sorted = select * from $b assume order by x; +$c_sorted = select * from $c assume order by x; + +-- PruneAdjacentKeys +select a.* from $a as a where a.x in (select x from $b_sorted); -- PruneAdjacentKeys +select a.* from $a as a where a.x in (select /*+ distinct(x) */ x from $b_sorted); -- nothing +select a.* from $a as a where a.x in (select x from $c_sorted); -- PruneAdjacentKeys +select a.* from $a as a left semi join $b_sorted as b on a.x = b.x; -- PruneAdjacentKeys(b_sorted) +select a.* from $b_sorted as b right semi join $a as a on b.x = a.x; -- PruneAdjacentKeys(b_sorted) +select a.x, a.t, b.x from any $a_sorted as a join $b as b on a.x == b.x; -- PruneAdjacentKeys(a_sorted) +select a.x, a.t, b.x from $a as a join any $b_sorted as b on a.x == b.x; -- PruneAdjacentKeys(b_sorted) diff --git a/yql/essentials/tests/sql/suites/select/prune_keys.sql b/yql/essentials/tests/sql/suites/select/prune_keys.sql index 9cf5cb3ced7..f541bcfc61d 100644 --- a/yql/essentials/tests/sql/suites/select/prune_keys.sql +++ b/yql/essentials/tests/sql/suites/select/prune_keys.sql @@ -11,6 +11,9 @@ SELECT Yql::PruneKeys([], $id); $mod2 = ($x) -> { RETURN $x % 2; }; SELECT ListLength(Yql::PruneKeys(AsList(1,1,1,3,3,3,3), $mod2)); +SELECT Yql::PruneAdjacentKeys(AsList(null,null,null,1,1,2,3,3,4,5), $id); +SELECT Yql::PruneKeys(AsList(1,null,1,null,1,null,1), $id); + -- optimize tests $get_a = ($x) -> { RETURN <|a:$x.a|>; }; diff --git a/yt/yql/providers/yt/lib/expr_traits/yql_expr_traits.cpp b/yt/yql/providers/yt/lib/expr_traits/yql_expr_traits.cpp index 8d6b31a1571..c949b461a7b 100644 --- a/yt/yql/providers/yt/lib/expr_traits/yql_expr_traits.cpp +++ b/yt/yql/providers/yt/lib/expr_traits/yql_expr_traits.cpp @@ -49,8 +49,11 @@ namespace NYql { (*memoryUsage)["CommonJoinCore"] += FromString<ui64>(memLimitSetting->Child(1)->Content()); } } else if (node.IsCallable("WideCombiner")) { - (*memoryUsage)["WideCombiner"] += FromString<ui64>(node.Child(1U)->Content()); - } else if (NNodes::TCoCombineCore::Match(&node)) { + i64 memLimit = 0LL; + if (TryFromString<i64>(node.Child(1U)->Content(), memLimit)) { + (*memoryUsage)["WideCombiner"] += memLimit; + } + } else if (NNodes::TCoCombineCore::Match(&node) && NNodes::TCoCombineCore::idx_MemLimit < node.ChildrenSize()) { (*memoryUsage)["CombineCore"] += FromString<ui64>(node.Child(NNodes::TCoCombineCore::idx_MemLimit)->Content()); } } diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp index bc785946bd0..bb5c2f4e36a 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp @@ -31,6 +31,8 @@ TYtPhysicalOptProposalTransformer::TYtPhysicalOptProposalTransformer(TYtState::T AddHandler(0, &TCoTopSort::Match, HNDL(Sort<true>)); AddHandler(0, &TCoTop::Match, HNDL(Sort<true>)); AddHandler(0, &TYtSort::Match, HNDL(YtSortOverAlreadySorted)); + AddHandler(0, &TCoPruneKeys::Match, HNDL(PushPruneKeysIntoYtOperation)); + AddHandler(0, &TCoPruneAdjacentKeys::Match, HNDL(PushPruneKeysIntoYtOperation)); AddHandler(0, &TCoPartitionByKeyBase::Match, HNDL(PartitionByKey)); AddHandler(0, &TCoFlatMapBase::Match, HNDL(FlatMap)); AddHandler(0, &TCoCombineByKey::Match, HNDL(CombineByKey)); diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h index ef783c31c94..cb940e59963 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h @@ -149,6 +149,8 @@ private: NNodes::TMaybeNode<NNodes::TExprBase> UpdateDataSourceCluster(NNodes::TExprBase node, TExprContext& ctx) const; + NNodes::TMaybeNode<NNodes::TExprBase> PushPruneKeysIntoYtOperation(NNodes::TExprBase node, TExprContext& ctx) const; + template <typename TLMapType> NNodes::TMaybeNode<NNodes::TExprBase> LMap(NNodes::TExprBase node, TExprContext& ctx) const; diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_misc.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_misc.cpp index 3b65cedd9f9..87b2f66e546 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_misc.cpp +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_misc.cpp @@ -964,4 +964,74 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::UpdateDataSourceCluster return ctx.ChangeChild(node.Ref(), TYtReadTable::idx_DataSource, MakeDataSource(op.DataSource().Pos(), cluster, ctx).Ptr()); } +TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::PushPruneKeysIntoYtOperation(TExprBase node, TExprContext& ctx) const { + auto op = node.Cast<TCoPruneKeysBase>(); + auto extractorLambda = op.Extractor(); + + if (!IsYtProviderInput(op.Input())) { + return node; + } + + TSyncMap syncList; + const ERuntimeClusterSelectionMode selectionMode = + State_->Configuration->RuntimeClusterSelection.Get().GetOrElse(DEFAULT_RUNTIME_CLUSTER_SELECTION); + auto cluster = DeriveClusterFromInput(op.Input(), selectionMode); + if (!cluster || !IsYtCompleteIsolatedLambda(extractorLambda.Ref(), syncList, *cluster, false, selectionMode)) { + return {}; + } + + auto mapper = ctx.Builder(node.Pos()) + .Lambda() + .Param("stream") + .Callable(node.Ref().Content()) + .Arg(0, "stream") + .Add(1, extractorLambda.Ptr()) + .Seal() + .Seal() + .Build(); + + auto outItemType = SilentGetSequenceItemType(op.Input().Ref(), true); + if (!outItemType || !outItemType->IsPersistable()) { + return node; + } + if (!EnsurePersistableYsonTypes(node.Pos(), *outItemType, ctx, State_)) { + return {}; + } + + bool sortedOutput = TCoPruneAdjacentKeys::Match(node.Raw()); + TVector<TYtOutTable> outTables = ConvertOutTablesWithSortAware(mapper, sortedOutput, node.Pos(), + outItemType, ctx, State_, node.Ref().GetConstraintSet()); + + auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, node.Pos()); + if (sortedOutput) { + settingsBuilder + .Add() + .Name() + .Value(ToString(EYtSettingType::Ordered)) + .Build() + .Build(); + } + if (State_->Configuration->UseFlow.Get().GetOrElse(DEFAULT_USE_FLOW)) { + settingsBuilder + .Add() + .Name() + .Value(ToString(EYtSettingType::Flow)) + .Build() + .Build(); + } + + auto map = Build<TYtMap>(ctx, node.Pos()) + .World(GetWorld(op.Input(), {}, ctx)) + .DataSink(MakeDataSink(node.Pos(), *cluster, ctx)) + .Input(ConvertInputTable(op.Input(), ctx)) + .Output() + .Add(outTables) + .Build() + .Settings(settingsBuilder.Done()) + .Mapper(std::move(mapper)) + .Done(); + + return WrapOp(map, ctx); +} + } // namespace NYql diff --git a/yt/yql/tests/sql/suites/join/prune_keys.cfg b/yt/yql/tests/sql/suites/join/prune_keys.cfg new file mode 100644 index 00000000000..9cd81aaa85a --- /dev/null +++ b/yt/yql/tests/sql/suites/join/prune_keys.cfg @@ -0,0 +1,2 @@ +in a_sorted sorted_by_k1.txt +in b_sorted sorted_by_k2.txt diff --git a/yt/yql/tests/sql/suites/join/prune_keys.sql b/yt/yql/tests/sql/suites/join/prune_keys.sql new file mode 100644 index 00000000000..bce76fcb3b4 --- /dev/null +++ b/yt/yql/tests/sql/suites/join/prune_keys.sql @@ -0,0 +1,15 @@ +/* postgres can not */ +use plato; + +pragma yt.JoinMergeTablesLimit = "10"; +pragma config.flags('OptimizerFlags', 'EmitPruneKeys'); + +-- PruneKeys +select * +from a_sorted +where v1 in (select v2 from b_sorted); + +-- PruneAdjacentKeys +select * +from a_sorted +where k1 in (select k2 from b_sorted); |