diff options
| author | mpereskokova <[email protected]> | 2025-04-17 07:34:43 +0300 | 
|---|---|---|
| committer | mpereskokova <[email protected]> | 2025-04-17 07:49:22 +0300 | 
| commit | 9b44be1dd627cdc45103e07c3186eda2a400d8dd (patch) | |
| tree | ecb8b7f238380035708e34ee15b7df898ea55dc4 | |
| parent | bd352209a921603d0bb9876b7bf6f72cee7be91e (diff) | |
Add pruneKeys in EquiJoin
commit_hash:24b52143fbef864df48f3359b14f9e0294f367f5
19 files changed, 626 insertions, 33 deletions
diff --git a/yql/essentials/core/common_opt/yql_co_flow2.cpp b/yql/essentials/core/common_opt/yql_co_flow2.cpp index 7d521b2ac47..8ebb5243ffa 100644 --- a/yql/essentials/core/common_opt/yql_co_flow2.cpp +++ b/yql/essentials/core/common_opt/yql_co_flow2.cpp @@ -2027,7 +2027,100 @@ void RegisterCoFlowCallables2(TCallableOptimizerMap& map) {              return ret;          } -        return node; +        // Add PruneKeys to EquiJoin +        static const char optName[] = "EmitPruneKeys"; +        if (!IsOptimizerEnabled<optName>(*optCtx.Types) || IsOptimizerDisabled<optName>(*optCtx.Types)) { +            return node; +        } +        auto equiJoin = TCoEquiJoin(node); +        if (HasSetting(equiJoin.Arg(equiJoin.ArgCount() - 1).Ref(), "prune_keys_added")) { +            return node; +        } + +        THashMap<TStringBuf, THashSet<TStringBuf>> columnsForPruneKeysExtractor; +        GetPruneKeysColumnsForJoinLeaves(equiJoin.Arg(equiJoin.ArgCount() - 2).Cast<TCoEquiJoinTuple>(), columnsForPruneKeysExtractor); + +        TExprNode::TListType children; +        bool hasChanges = false; +        for (size_t i = 0; i + 2 < equiJoin.ArgCount(); ++i) { +            auto child = equiJoin.Arg(i).Cast<TCoEquiJoinInput>(); +            auto list = child.List(); +            auto scope = child.Scope(); + +            if (!scope.Ref().IsAtom()) { +                children.push_back(equiJoin.Arg(i).Ptr()); +                continue; +            } + +            auto itemNames = columnsForPruneKeysExtractor.find(scope.Ref().Content()); +            if (itemNames == columnsForPruneKeysExtractor.end() || itemNames->second.empty()) { +                children.push_back(equiJoin.Arg(i).Ptr()); +                continue; +            } + +            if (auto distinct = list.Ref().GetConstraint<TDistinctConstraintNode>()) { +                if (distinct->ContainsCompleteSet(std::vector<std::string_view>(itemNames->second.cbegin(), itemNames->second.cend()))) { +                    children.push_back(equiJoin.Arg(i).Ptr()); +                    continue; +                } +            } + +            bool isOrdered = false; +            if (auto sorted = list.Ref().GetConstraint<TSortedConstraintNode>()) { +                for (const auto& item : sorted->GetContent()) { +                    size_t foundItemNamesCount = 0; +                    for (const auto& path : item.first) { +                        if (itemNames->second.contains(path.front())) { +                            foundItemNamesCount++; +                        } +                    } +                    if (foundItemNamesCount == itemNames->second.size()) { +                        isOrdered = true; +                        break; +                    } +                } +            } + +            auto pruneKeysCallable = isOrdered ? "PruneAdjacentKeys" : "PruneKeys"; +            YQL_CLOG(DEBUG, Core) << "Add " << pruneKeysCallable << " to EquiJoin input #" << i << ", label " << scope.Ref().Content(); +            children.push_back(ctx.Builder(child.Pos()) +                .List() +                    .Callable(0, pruneKeysCallable) +                        .Add(0, list.Ptr()) +                        .Lambda(1) +                            .Param("item") +                            .List(0) +                                .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder & { +                                    ui32 i = 0; +                                    for (const auto& column : itemNames->second) { +                                        parent.Callable(i++, "Member") +                                            .Arg(0, "item") +                                            .Atom(1, column) +                                        .Seal(); +                                    } +                                    return parent; +                                }) +                            .Seal() +                        .Seal() +                    .Seal() +                    .Add(1, scope.Ptr()) +                .Seal() +                .Build()); +            hasChanges = true; +        } + +        if (!hasChanges) { +            return node; +        } + +        children.push_back(equiJoin.Arg(equiJoin.ArgCount() - 2).Ptr()); +        children.push_back(AddSetting( +            equiJoin.Arg(equiJoin.ArgCount() - 1).Ref(), +            equiJoin.Arg(equiJoin.ArgCount() - 1).Pos(), +            "prune_keys_added", +            nullptr, +            ctx)); +        return ctx.ChangeChildren(*node, std::move(children));      };      map["ExtractMembers"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) { diff --git a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp index 5755bf1dbf5..28ee1ead8f5 100644 --- a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -2760,19 +2760,11 @@ TExprNode::TPtr ExpandListHas(const TExprNode::TPtr& input, TExprContext& ctx) {      return RewriteSearchByKeyForTypesMismatch<true, true>(input, ctx);  } -TExprNode::TPtr ExpandPruneAdjacentKeys(const TExprNode::TPtr& input, TExprContext& ctx) { -    const auto type = input->Head().GetTypeAnn(); -    const auto& keyExtractorLambda = input->ChildRef(1); - -    YQL_ENSURE(type->GetKind() == ETypeAnnotationKind::List || type->GetKind() == ETypeAnnotationKind::Stream); - -    const auto elemType = type->GetKind() == ETypeAnnotationKind::List -        ? type->Cast<TListExprType>()->GetItemType() -        : type->Cast<TStreamExprType>()->GetItemType(); -    const auto optionalElemType = *ctx.MakeType<TOptionalExprType>(elemType); +TExprNode::TPtr ExpandPruneAdjacentKeys(const TExprNode::TPtr& input, TExprContext& ctx, TTypeAnnotationContext& /*typesCtx*/) { +    const auto& keyExtractorLambda = input->ChildPtr(1);      YQL_CLOG(DEBUG, CorePeepHole) << "Expand " << input->Content(); -    return ctx.Builder(input->Pos()) +    return KeepConstraints(ctx.Builder(input->Pos())          .Callable("OrderedFlatMap")              .Callable(0, "Fold1Map")                  .Add(0, input->HeadPtr()) @@ -2799,7 +2791,11 @@ TExprNode::TPtr ExpandPruneAdjacentKeys(const TExprNode::TPtr& input, TExprConte                                  .Seal()                              .Seal()                              .Callable(1, "Nothing") -                                .Add(0, ExpandType(input->Pos(), optionalElemType, ctx)) +                                .Callable(0, "OptionalType") +                                    .Callable(0, "TypeOf") +                                        .Arg(0, "item") +                                    .Seal() +                                .Seal()                              .Seal()                              .Callable(2, "Just")                                  .Arg(0, "item") @@ -2814,13 +2810,16 @@ TExprNode::TPtr ExpandPruneAdjacentKeys(const TExprNode::TPtr& input, TExprConte                  .Arg(0, "item")              .Seal()          .Seal() -        .Build(); +        .Build(), *input, ctx);  } -TExprNode::TPtr ExpandPruneKeys(const TExprNode::TPtr& input, TExprContext& ctx) { +TExprNode::TPtr ExpandPruneKeys(const TExprNode::TPtr& input, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {      const auto type = input->Head().GetTypeAnn(); -    const auto& keyExtractorLambda = input->ChildRef(1); -    YQL_ENSURE(type->GetKind() == ETypeAnnotationKind::List || type->GetKind() == ETypeAnnotationKind::Stream); +    auto keyExtractorLambda = input->ChildPtr(1); + +    YQL_ENSURE(type->GetKind() == ETypeAnnotationKind::Flow +        || type->GetKind() == ETypeAnnotationKind::List +        || type->GetKind() == ETypeAnnotationKind::Stream);      auto initHandler = ctx.Builder(input->Pos())          .Lambda() @@ -2865,6 +2864,39 @@ TExprNode::TPtr ExpandPruneKeys(const TExprNode::TPtr& input, TExprContext& ctx)              .Seal()              .Build();      } else { +        // Slight copy of GetDictionaryKeyTypes to check if keyExtractorLambda result type is complicated +        // mkql CombineCore supports only simple types; for others we should add pickling +        bool keyExtractorLambdaShouldBePickled = false; +        auto itemType = keyExtractorLambda->GetTypeAnn(); +        if (itemType->GetKind() == ETypeAnnotationKind::Optional) { +            itemType = itemType->Cast<TOptionalExprType>()->GetItemType(); +        } + +        if (itemType->GetKind() == ETypeAnnotationKind::Tuple) { +            auto tuple = itemType->Cast<TTupleExprType>(); +            for (const auto& item : tuple->GetItems()) { +                if (!IsDataOrOptionalOfData(item)) { +                    keyExtractorLambdaShouldBePickled = true; +                    break; +                } +            } +        } else if (itemType->GetKind() != ETypeAnnotationKind::Data) { +            keyExtractorLambdaShouldBePickled = true; +        } + +        if (keyExtractorLambdaShouldBePickled) { +            keyExtractorLambda = ctx.Builder(input->Pos()) +            .Lambda() +                .Param("item") +                .Callable(0, "StablePickle") +                    .Apply(0, keyExtractorLambda) +                        .With(0, "item") +                    .Seal() +                .Seal() +            .Seal() +            .Build(); +        } +          return ctx.Builder(input->Pos())              .Callable("CombineCore")                  .Add(0, input->HeadPtr()) @@ -2872,6 +2904,7 @@ TExprNode::TPtr ExpandPruneKeys(const TExprNode::TPtr& input, TExprContext& ctx)                  .Add(2, initHandler)                  .Add(3, updateHandler)                  .Add(4, finishHandler) +                .Atom(5, ToString(typesCtx.PruneKeysMemLimit))              .Seal()              .Build();      } @@ -8906,8 +8939,6 @@ struct TPeepHoleRules {          {"CheckedDiv", &ExpandCheckedDiv},          {"CheckedMod", &ExpandCheckedMod},          {"CheckedMinus", &ExpandCheckedMinus}, -        {"PruneAdjacentKeys", &ExpandPruneAdjacentKeys}, -        {"PruneKeys", &ExpandPruneKeys},          {"JsonValue", &ExpandJsonValue},          {"JsonExists", &ExpandJsonExists},          {"EmptyIterator", &DropDependsOnFromEmptyIterator}, @@ -8926,6 +8957,8 @@ struct TPeepHoleRules {          {"CostsOf", &ExpandCostsOf},          {"JsonQuery", &ExpandJsonQuery},          {"MatchRecognize", &ExpandMatchRecognize}, +        {"PruneAdjacentKeys", &ExpandPruneAdjacentKeys}, +        {"PruneKeys", &ExpandPruneKeys},          {"CalcOverWindow", &ExpandCalcOverWindow},          {"CalcOverSessionWindow", &ExpandCalcOverWindow},          {"CalcOverWindowGroup", &ExpandCalcOverWindow}, diff --git a/yql/essentials/core/yql_expr_constraint.cpp b/yql/essentials/core/yql_expr_constraint.cpp index e67557d9645..7fcf560a2cf 100644 --- a/yql/essentials/core/yql_expr_constraint.cpp +++ b/yql/essentials/core/yql_expr_constraint.cpp @@ -822,9 +822,23 @@ private:          }          if constexpr (Adjacent) { -            return CopyAllFrom<0>(input, output, ctx); +            if (const auto status = CopyAllFrom<0>(input, output, ctx); status != TStatus::Ok) { +                return status; +            } + +            TPartOfConstraintBase::TSetType keys = GetPathsToKeys<true>(input->Child(1)->Tail(), input->Child(1)->Head().Head()); +            TPartOfConstraintBase::TSetOfSetsType uniqueKeys; +            for (const auto& elem : keys) { +                uniqueKeys.insert(TPartOfConstraintBase::TSetType{elem}); +            } +            if (!keys.empty()) { +                input->AddConstraint(ctx.MakeConstraint<TUniqueConstraintNode>(TUniqueConstraintNode::TContentType{uniqueKeys})); +                input->AddConstraint(ctx.MakeConstraint<TDistinctConstraintNode>(TDistinctConstraintNode::TContentType{uniqueKeys})); +            } +            return TStatus::Ok;          } -        return FromFirst<TEmptyConstraintNode, TUniqueConstraintNode, TPartOfUniqueConstraintNode, TDistinctConstraintNode, TPartOfDistinctConstraintNode>(input, output, ctx); + +        return FromFirst<TEmptyConstraintNode>(input, output, ctx);      }      template<class TConstraint> diff --git a/yql/essentials/core/yql_join.cpp b/yql/essentials/core/yql_join.cpp index 7cca45604d1..5ce3fba268b 100644 --- a/yql/essentials/core/yql_join.cpp +++ b/yql/essentials/core/yql_join.cpp @@ -820,6 +820,10 @@ IGraphTransformer::TStatus ValidateEquiJoinOptions(TPositionHandle positionHandl                  // do nothing              } else if (optionName == "multiple_joins") {                  // do nothing +            } else if (optionName == "prune_keys_added") { +                if (!EnsureTupleSize(*child, 1, ctx)) { +                    return IGraphTransformer::TStatus::Error; +                }              } else {                  YQL_ENSURE(false, "Cached join option '" << optionName << "' not handled");              } @@ -2007,7 +2011,7 @@ void GatherJoinInputs(const TExprNode::TPtr& expr, const TExprNode& row,  }  bool IsCachedJoinOption(TStringBuf name) { -    static THashSet<TStringBuf> CachedJoinOptions = {"preferred_sort", "cbo_passed", "multiple_joins"}; +    static THashSet<TStringBuf> CachedJoinOptions = {"preferred_sort", "cbo_passed", "multiple_joins", "prune_keys_added"};      return CachedJoinOptions.contains(name);  } @@ -2016,4 +2020,31 @@ bool IsCachedJoinLinkOption(TStringBuf name) {      return CachedJoinLinkOptions.contains(name);  } +void GetPruneKeysColumnsForJoinLeaves(const TCoEquiJoinTuple& joinTree, THashMap<TStringBuf, THashSet<TStringBuf>>& columnsForPruneKeysExtractor) { +    auto settings = GetEquiJoinLinkSettings(joinTree.Options().Ref()); +    TStringBuf joinKind = joinTree.Type().Value(); + +    auto left = joinTree.LeftScope(); +    if (!left.Maybe<TCoAtom>()) { +        GetPruneKeysColumnsForJoinLeaves(left.Cast<TCoEquiJoinTuple>(), columnsForPruneKeysExtractor); +    } else { +        if (joinKind == "RightSemi" || joinKind == "RightOnly" || settings.LeftHints.contains("any")) { +            if (!settings.LeftHints.contains("unique")) { +                CollectEquiJoinKeyColumnsFromLeaf(joinTree.LeftKeys().Ref(), columnsForPruneKeysExtractor); +            } +        } +    } + +    auto right = joinTree.RightScope(); +    if (!right.Maybe<TCoAtom>()) { +        GetPruneKeysColumnsForJoinLeaves(right.Cast<TCoEquiJoinTuple>(), columnsForPruneKeysExtractor); +    } else { +        if (joinKind == "LeftSemi" || joinKind == "LeftOnly" || settings.RightHints.contains("any")) { +            if (!settings.RightHints.contains("unique")) { +                CollectEquiJoinKeyColumnsFromLeaf(joinTree.RightKeys().Ref(), columnsForPruneKeysExtractor); +            } +        } +    } +} +  } // namespace NYql diff --git a/yql/essentials/core/yql_join.h b/yql/essentials/core/yql_join.h index a313aceefa3..26b417cf542 100644 --- a/yql/essentials/core/yql_join.h +++ b/yql/essentials/core/yql_join.h @@ -180,4 +180,6 @@ void GatherJoinInputs(const TExprNode::TPtr& expr, const TExprNode& row,  bool IsCachedJoinOption(TStringBuf name);  bool IsCachedJoinLinkOption(TStringBuf name); +void GetPruneKeysColumnsForJoinLeaves(const NNodes::TCoEquiJoinTuple& joinTree, THashMap<TStringBuf, THashSet<TStringBuf>>& columnsForPruneKeysExtractor); +  } diff --git a/yql/essentials/core/yql_type_annotation.h b/yql/essentials/core/yql_type_annotation.h index bc09c963f4b..a43650a01f4 100644 --- a/yql/essentials/core/yql_type_annotation.h +++ b/yql/essentials/core/yql_type_annotation.h @@ -452,6 +452,7 @@ struct TTypeAnnotationContext: public TThrRefBase {      THashSet<TString> PeepholeFlags;      bool StreamLookupJoin = false;      ui32 MaxAggPushdownPredicates = 6; // algorithm complexity is O(2^N) +    ui32 PruneKeysMemLimit = 128 * 1024 * 1024;      TMaybe<TColumnOrder> LookupColumnOrder(const TExprNode& node) const;      IGraphTransformer::TStatus SetColumnOrder(const TExprNode& node, const TColumnOrder& columnOrder, TExprContext& ctx); diff --git a/yql/essentials/tests/sql/minirun/part5/canondata/result.json b/yql/essentials/tests/sql/minirun/part5/canondata/result.json index 579677b9314..a98d74ebde0 100644 --- a/yql/essentials/tests/sql/minirun/part5/canondata/result.json +++ b/yql/essentials/tests/sql/minirun/part5/canondata/result.json @@ -1044,6 +1044,20 @@              "uri": "https://{canondata_backend}/1936273/19f08c34eba9366d29ee0ffb8eb99e637c34fd97/resource.tar.gz#test.test_join-convert_check_key_mem2-default.txt-Results_/results.txt"          }      ], +    "test.test[join-prune_keys-default.txt-Debug]": [ +        { +            "checksum": "a706e6bd4285c96ad5120d701109fc82", +            "size": 3669, +            "uri": "https://{canondata_backend}/1871102/392832e505c55eb371c9d3241b89c96b5a837c8f/resource.tar.gz#test.test_join-prune_keys-default.txt-Debug_/opt.yql" +        } +    ], +    "test.test[join-prune_keys-default.txt-Results]": [ +        { +            "checksum": "af076a3334031b8cdaa6969f064a4616", +            "size": 18160, +            "uri": "https://{canondata_backend}/1130705/620da5a4f19baef17c32a4b3c699ec3c3091ada5/resource.tar.gz#test.test_join-prune_keys-default.txt-Results_/results.txt" +        } +    ],      "test.test[json-json_exists/common_syntax-default.txt-Debug]": [          {              "checksum": "1559e7b19e1d1827f8f1ea62929effb1", diff --git a/yql/essentials/tests/sql/minirun/part7/canondata/result.json b/yql/essentials/tests/sql/minirun/part7/canondata/result.json index e8c9926ae75..8083b15dac8 100644 --- a/yql/essentials/tests/sql/minirun/part7/canondata/result.json +++ b/yql/essentials/tests/sql/minirun/part7/canondata/result.json @@ -1232,16 +1232,16 @@      ],      "test.test[select-prune_keys-default.txt-Debug]": [          { -            "checksum": "95e58e469ce10fce0d0d5e55c0cf3baf", -            "size": 3292, -            "uri": "https://{canondata_backend}/1931696/04008bc01ad4f562f8e03ad2bc296f7a64a78489/resource.tar.gz#test.test_select-prune_keys-default.txt-Debug_/opt.yql" +            "checksum": "1dc9319da10b4c1343b6ceb5d0abb5b7", +            "size": 3788, +            "uri": "https://{canondata_backend}/212715/392992a262a39acb9e6e49f104e5800a3e731eb6/resource.tar.gz#test.test_select-prune_keys-default.txt-Debug_/opt.yql"          }      ],      "test.test[select-prune_keys-default.txt-Results]": [          { -            "checksum": "f53b6976b6a5e1ee5e1d331c25963930", -            "size": 27670, -            "uri": "https://{canondata_backend}/1931696/04008bc01ad4f562f8e03ad2bc296f7a64a78489/resource.tar.gz#test.test_select-prune_keys-default.txt-Results_/results.txt" +            "checksum": "46c226ab1c00ed4a9a5b57c6639f15b2", +            "size": 30306, +            "uri": "https://{canondata_backend}/212715/392992a262a39acb9e6e49f104e5800a3e731eb6/resource.tar.gz#test.test_select-prune_keys-default.txt-Results_/results.txt"          }      ],      "test.test[union-union_positional_mix-default.txt-Debug]": [ diff --git a/yql/essentials/tests/sql/sql2yql/canondata/result.json b/yql/essentials/tests/sql/sql2yql/canondata/result.json index 4cb7e6cc7af..5027c679045 100644 --- a/yql/essentials/tests/sql/sql2yql/canondata/result.json +++ b/yql/essentials/tests/sql/sql2yql/canondata/result.json @@ -3933,6 +3933,13 @@              "uri": "https://{canondata_backend}/1942173/99e88108149e222741552e7e6cddef041d6a2846/resource.tar.gz#test_sql2yql.test_join-left_join_with_self_aggr_/sql.yql"          }      ], +    "test_sql2yql.test[join-prune_keys]": [ +        { +            "checksum": "a04490e5ef3a567ca13cb1ed88272bd8", +            "size": 22535, +            "uri": "https://{canondata_backend}/1599023/2a161150407124ac83c2566a6542b19a53abfccb/resource.tar.gz#test_sql2yql.test_join-prune_keys_/sql.yql" +        } +    ],      "test_sql2yql.test[join-yql-19192]": [          {              "checksum": "fffdf1cbb40643da9daf9bdf3edec121", @@ -7022,9 +7029,9 @@      ],      "test_sql2yql.test[select-prune_keys]": [          { -            "checksum": "55346f77548ef19f9a09d2f1d3f6f466", -            "size": 17765, -            "uri": "https://{canondata_backend}/1871182/906a4c4e540bb8746f8d7595500d4d1c9f664846/resource.tar.gz#test_sql2yql.test_select-prune_keys_/sql.yql" +            "checksum": "f7da5706622461ab177712e6c348c61b", +            "size": 19536, +            "uri": "https://{canondata_backend}/1814674/c8d78993e8e9976f1e3fae2197140afe33195365/resource.tar.gz#test_sql2yql.test_select-prune_keys_/sql.yql"          }      ],      "test_sql2yql.test[select-result_label]": [ @@ -10243,6 +10250,11 @@              "uri": "file://test_sql_format.test_join-left_join_with_self_aggr_/formatted.sql"          }      ], +    "test_sql_format.test[join-prune_keys]": [ +        { +            "uri": "file://test_sql_format.test_join-prune_keys_/formatted.sql" +        } +    ],      "test_sql_format.test[join-yql-19192]": [          {              "uri": "file://test_sql_format.test_join-yql-19192_/formatted.sql" diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_join-prune_keys_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_join-prune_keys_/formatted.sql new file mode 100644 index 00000000000..0f53bcbe441 --- /dev/null +++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_join-prune_keys_/formatted.sql @@ -0,0 +1,238 @@ +PRAGMA config.flags('OptimizerFlags', 'EmitPruneKeys'); + +$a = ( +    SELECT +        * +    FROM +        as_table([ +            <|x: 1, t: 1|>, +            <|x: 1, t: 1|>, +            <|x: 1, t: 2|>, +            <|x: 3, t: 1|>, +            <|x: 3, t: 4|>, +            <|x: 3, t: 2|>, +        ]) +); + +$b = ( +    SELECT +        * +    FROM +        as_table([ +            <|x: 1, y: 1|>, +            <|x: 1, y: 2|>, +            <|x: 1, y: 3|>, +            <|x: 1, y: 3|>, +            <|x: 2, y: 3|>, +            <|x: 2, y: 4|>, +        ]) +); + +$c = ( +    SELECT +        * +    FROM +        as_table([ +            <|x: 1|>, +            <|x: 1|>, +            <|x: 1|>, +            <|x: 1|>, +            <|x: 2|>, +            <|x: 2|>, +        ]) +); + +-- PruneKeys +SELECT +    a.* +FROM +    $a AS a +WHERE +    a.x IN ( +        SELECT +            x +        FROM +            $b +    ) +; -- PruneKeys + +SELECT +    a.* +FROM +    $a AS a +WHERE +    a.x IN ( +        SELECT +            /*+ distinct(x) */ x +        FROM +            $b +    ) +; -- nothing + +SELECT +    a.* +FROM +    $a AS a +WHERE +    a.x IN ( +        SELECT +            x +        FROM +            $c +    ) +; -- PruneKeys + +SELECT +    a.* +FROM +    $a AS a +LEFT SEMI JOIN +    $b AS b +ON +    a.x == b.x +; -- PruneKeys(b) + +SELECT +    a.* +FROM +    $b AS b +RIGHT SEMI JOIN +    $a AS a +ON +    b.x == a.x +; -- PruneKeys(b) + +SELECT +    a.x, +    a.t, +    b.x +FROM ANY +    $a AS a +JOIN +    $b AS b +ON +    a.x == b.x +; -- PruneKeys(a) + +SELECT +    a.x, +    a.t, +    b.x +FROM +    $a AS a +JOIN ANY +    $b AS b +ON +    a.x == b.x +; -- PruneKeys(b) + +$a_sorted = ( +    SELECT +        * +    FROM +        $a +    ASSUME ORDER BY +        x +); + +$b_sorted = ( +    SELECT +        * +    FROM +        $b +    ASSUME ORDER BY +        x +); + +$c_sorted = ( +    SELECT +        * +    FROM +        $c +    ASSUME ORDER BY +        x +); + +-- PruneAdjacentKeys +SELECT +    a.* +FROM +    $a AS a +WHERE +    a.x IN ( +        SELECT +            x +        FROM +            $b_sorted +    ) +; -- PruneAdjacentKeys + +SELECT +    a.* +FROM +    $a AS a +WHERE +    a.x IN ( +        SELECT +            /*+ distinct(x) */ x +        FROM +            $b_sorted +    ) +; -- nothing + +SELECT +    a.* +FROM +    $a AS a +WHERE +    a.x IN ( +        SELECT +            x +        FROM +            $c_sorted +    ) +; -- PruneAdjacentKeys + +SELECT +    a.* +FROM +    $a AS a +LEFT SEMI JOIN +    $b_sorted AS b +ON +    a.x == b.x +; -- PruneAdjacentKeys(b_sorted) + +SELECT +    a.* +FROM +    $b_sorted AS b +RIGHT SEMI JOIN +    $a AS a +ON +    b.x == a.x +; -- PruneAdjacentKeys(b_sorted) + +SELECT +    a.x, +    a.t, +    b.x +FROM ANY +    $a_sorted AS a +JOIN +    $b AS b +ON +    a.x == b.x +; -- PruneAdjacentKeys(a_sorted) + +SELECT +    a.x, +    a.t, +    b.x +FROM +    $a AS a +JOIN ANY +    $b_sorted AS b +ON +    a.x == b.x +; -- PruneAdjacentKeys(b_sorted) diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_select-prune_keys_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_select-prune_keys_/formatted.sql index 65a0e7c3a79..80c2fa06403 100644 --- a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_select-prune_keys_/formatted.sql +++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_select-prune_keys_/formatted.sql @@ -29,6 +29,14 @@ SELECT      ListLength(Yql::PruneKeys(AsList(1, 1, 1, 3, 3, 3, 3), $mod2))  ; +SELECT +    Yql::PruneAdjacentKeys(AsList(NULL, NULL, NULL, 1, 1, 2, 3, 3, 4, 5), $id) +; + +SELECT +    Yql::PruneKeys(AsList(1, NULL, 1, NULL, 1, NULL, 1), $id) +; +  -- optimize tests  $get_a = ($x) -> {      RETURN <|a: $x.a|>; diff --git a/yql/essentials/tests/sql/suites/join/prune_keys.sql b/yql/essentials/tests/sql/suites/join/prune_keys.sql new file mode 100644 index 00000000000..9f10de9ace7 --- /dev/null +++ b/yql/essentials/tests/sql/suites/join/prune_keys.sql @@ -0,0 +1,50 @@ +pragma config.flags('OptimizerFlags', 'EmitPruneKeys'); + +$a = select * from as_table([ +	    <|x:1, t:1|>, +	    <|x:1, t:1|>, +	    <|x:1, t:2|>, +	    <|x:3, t:1|>, +	    <|x:3, t:4|>, +	    <|x:3, t:2|>, +	]); + +$b = select * from as_table([ +	    <|x:1, y:1|>, +	    <|x:1, y:2|>, +	    <|x:1, y:3|>, +	    <|x:1, y:3|>, +	    <|x:2, y:3|>, +	    <|x:2, y:4|>, +	]); + +$c = select * from as_table([ +	    <|x:1|>, +	    <|x:1|>, +	    <|x:1|>, +	    <|x:1|>, +	    <|x:2|>, +	    <|x:2|>, +	]); + +-- PruneKeys +select a.* from $a as a where a.x in (select x from $b); -- PruneKeys +select a.* from $a as a where a.x in (select /*+ distinct(x) */ x from $b); -- nothing +select a.* from $a as a where a.x in (select x from $c); -- PruneKeys +select a.* from $a as a left semi join $b as b on a.x = b.x; -- PruneKeys(b) +select a.* from $b as b right semi join $a as a on b.x = a.x; -- PruneKeys(b) +select a.x, a.t, b.x from any $a as a join $b as b on a.x == b.x; -- PruneKeys(a) +select a.x, a.t, b.x from $a as a join any $b as b on a.x == b.x; -- PruneKeys(b) + +$a_sorted = select * from $a assume order by x; +$b_sorted = select * from $b assume order by x; +$c_sorted = select * from $c assume order by x; + +-- PruneAdjacentKeys +select a.* from $a as a where a.x in (select x from $b_sorted); -- PruneAdjacentKeys +select a.* from $a as a where a.x in (select /*+ distinct(x) */ x from $b_sorted); -- nothing +select a.* from $a as a where a.x in (select x from $c_sorted); -- PruneAdjacentKeys +select a.* from $a as a left semi join $b_sorted as b on a.x = b.x; -- PruneAdjacentKeys(b_sorted) +select a.* from $b_sorted as b right semi join $a as a on b.x = a.x; -- PruneAdjacentKeys(b_sorted) +select a.x, a.t, b.x from any $a_sorted as a join $b as b on a.x == b.x; -- PruneAdjacentKeys(a_sorted) +select a.x, a.t, b.x from $a as a join any $b_sorted as b on a.x == b.x; -- PruneAdjacentKeys(b_sorted) diff --git a/yql/essentials/tests/sql/suites/select/prune_keys.sql b/yql/essentials/tests/sql/suites/select/prune_keys.sql index 9cf5cb3ced7..f541bcfc61d 100644 --- a/yql/essentials/tests/sql/suites/select/prune_keys.sql +++ b/yql/essentials/tests/sql/suites/select/prune_keys.sql @@ -11,6 +11,9 @@ SELECT Yql::PruneKeys([], $id);  $mod2 = ($x) -> { RETURN $x % 2; };  SELECT ListLength(Yql::PruneKeys(AsList(1,1,1,3,3,3,3), $mod2)); +SELECT Yql::PruneAdjacentKeys(AsList(null,null,null,1,1,2,3,3,4,5), $id); +SELECT Yql::PruneKeys(AsList(1,null,1,null,1,null,1), $id); +  -- optimize tests  $get_a = ($x) -> { RETURN <|a:$x.a|>; }; diff --git a/yt/yql/providers/yt/lib/expr_traits/yql_expr_traits.cpp b/yt/yql/providers/yt/lib/expr_traits/yql_expr_traits.cpp index 8d6b31a1571..c949b461a7b 100644 --- a/yt/yql/providers/yt/lib/expr_traits/yql_expr_traits.cpp +++ b/yt/yql/providers/yt/lib/expr_traits/yql_expr_traits.cpp @@ -49,8 +49,11 @@ namespace NYql {                          (*memoryUsage)["CommonJoinCore"] += FromString<ui64>(memLimitSetting->Child(1)->Content());                      }                  } else if (node.IsCallable("WideCombiner")) { -                    (*memoryUsage)["WideCombiner"] += FromString<ui64>(node.Child(1U)->Content()); -                } else if (NNodes::TCoCombineCore::Match(&node)) { +                    i64 memLimit = 0LL; +                    if (TryFromString<i64>(node.Child(1U)->Content(), memLimit)) { +                        (*memoryUsage)["WideCombiner"] += memLimit; +                    } +                } else if (NNodes::TCoCombineCore::Match(&node) && NNodes::TCoCombineCore::idx_MemLimit < node.ChildrenSize()) {                      (*memoryUsage)["CombineCore"] += FromString<ui64>(node.Child(NNodes::TCoCombineCore::idx_MemLimit)->Content());                  }              } diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp index bc785946bd0..bb5c2f4e36a 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp @@ -31,6 +31,8 @@ TYtPhysicalOptProposalTransformer::TYtPhysicalOptProposalTransformer(TYtState::T      AddHandler(0, &TCoTopSort::Match, HNDL(Sort<true>));      AddHandler(0, &TCoTop::Match, HNDL(Sort<true>));      AddHandler(0, &TYtSort::Match, HNDL(YtSortOverAlreadySorted)); +    AddHandler(0, &TCoPruneKeys::Match, HNDL(PushPruneKeysIntoYtOperation)); +    AddHandler(0, &TCoPruneAdjacentKeys::Match, HNDL(PushPruneKeysIntoYtOperation));      AddHandler(0, &TCoPartitionByKeyBase::Match, HNDL(PartitionByKey));      AddHandler(0, &TCoFlatMapBase::Match, HNDL(FlatMap));      AddHandler(0, &TCoCombineByKey::Match, HNDL(CombineByKey)); diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h index ef783c31c94..cb940e59963 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h @@ -149,6 +149,8 @@ private:      NNodes::TMaybeNode<NNodes::TExprBase> UpdateDataSourceCluster(NNodes::TExprBase node, TExprContext& ctx) const; +    NNodes::TMaybeNode<NNodes::TExprBase> PushPruneKeysIntoYtOperation(NNodes::TExprBase node, TExprContext& ctx) const; +      template <typename TLMapType>      NNodes::TMaybeNode<NNodes::TExprBase> LMap(NNodes::TExprBase node, TExprContext& ctx) const; diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_misc.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_misc.cpp index 3b65cedd9f9..87b2f66e546 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_misc.cpp +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_misc.cpp @@ -964,4 +964,74 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::UpdateDataSourceCluster      return ctx.ChangeChild(node.Ref(), TYtReadTable::idx_DataSource, MakeDataSource(op.DataSource().Pos(), cluster, ctx).Ptr());  } +TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::PushPruneKeysIntoYtOperation(TExprBase node, TExprContext& ctx) const { +    auto op = node.Cast<TCoPruneKeysBase>(); +    auto extractorLambda = op.Extractor(); + +    if (!IsYtProviderInput(op.Input())) { +        return node; +    } + +    TSyncMap syncList; +    const ERuntimeClusterSelectionMode selectionMode = +        State_->Configuration->RuntimeClusterSelection.Get().GetOrElse(DEFAULT_RUNTIME_CLUSTER_SELECTION); +    auto cluster = DeriveClusterFromInput(op.Input(), selectionMode); +    if (!cluster || !IsYtCompleteIsolatedLambda(extractorLambda.Ref(), syncList, *cluster, false, selectionMode)) { +        return {}; +    } + +    auto mapper = ctx.Builder(node.Pos()) +        .Lambda() +            .Param("stream") +            .Callable(node.Ref().Content()) +                .Arg(0, "stream") +                .Add(1, extractorLambda.Ptr()) +            .Seal() +        .Seal() +        .Build(); + +    auto outItemType = SilentGetSequenceItemType(op.Input().Ref(), true); +    if (!outItemType || !outItemType->IsPersistable()) { +        return node; +    } +    if (!EnsurePersistableYsonTypes(node.Pos(), *outItemType, ctx, State_)) { +        return {}; +    } + +    bool sortedOutput = TCoPruneAdjacentKeys::Match(node.Raw()); +    TVector<TYtOutTable> outTables = ConvertOutTablesWithSortAware(mapper, sortedOutput, node.Pos(), +        outItemType, ctx, State_, node.Ref().GetConstraintSet()); + +    auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, node.Pos()); +    if (sortedOutput) { +        settingsBuilder +            .Add() +                .Name() +                    .Value(ToString(EYtSettingType::Ordered)) +                .Build() +            .Build(); +    } +    if (State_->Configuration->UseFlow.Get().GetOrElse(DEFAULT_USE_FLOW)) { +        settingsBuilder +            .Add() +                .Name() +                    .Value(ToString(EYtSettingType::Flow)) +                .Build() +            .Build(); +    } + +    auto map = Build<TYtMap>(ctx, node.Pos()) +        .World(GetWorld(op.Input(), {}, ctx)) +        .DataSink(MakeDataSink(node.Pos(), *cluster, ctx)) +        .Input(ConvertInputTable(op.Input(), ctx)) +        .Output() +            .Add(outTables) +        .Build() +        .Settings(settingsBuilder.Done()) +        .Mapper(std::move(mapper)) +        .Done(); + +    return WrapOp(map, ctx); +} +  }  // namespace NYql diff --git a/yt/yql/tests/sql/suites/join/prune_keys.cfg b/yt/yql/tests/sql/suites/join/prune_keys.cfg new file mode 100644 index 00000000000..9cd81aaa85a --- /dev/null +++ b/yt/yql/tests/sql/suites/join/prune_keys.cfg @@ -0,0 +1,2 @@ +in a_sorted sorted_by_k1.txt +in b_sorted sorted_by_k2.txt diff --git a/yt/yql/tests/sql/suites/join/prune_keys.sql b/yt/yql/tests/sql/suites/join/prune_keys.sql new file mode 100644 index 00000000000..bce76fcb3b4 --- /dev/null +++ b/yt/yql/tests/sql/suites/join/prune_keys.sql @@ -0,0 +1,15 @@ +/* postgres can not */ +use plato; + +pragma yt.JoinMergeTablesLimit = "10"; +pragma config.flags('OptimizerFlags', 'EmitPruneKeys'); + +-- PruneKeys +select * +from a_sorted +where v1 in (select v2 from b_sorted); + +-- PruneAdjacentKeys +select * +from a_sorted +where k1 in (select k2 from b_sorted);  | 
