diff options
author | aneporada <aneporada@yandex-team.com> | 2024-12-10 21:05:07 +0300 |
---|---|---|
committer | aneporada <aneporada@yandex-team.com> | 2024-12-10 22:01:55 +0300 |
commit | dbbdb649150e00878fcc21ad1ea99052aca804c7 (patch) | |
tree | 7a040eff12fb115324799008e5468283cb5f4481 | |
parent | ebceebb7b22e971e5eb52b5323bfc1a4de855145 (diff) | |
download | ydb-dbbdb649150e00878fcc21ad1ea99052aca804c7.tar.gz |
Fuse reduce with trivial map
commit_hash:38c7edfde8f64d80a47f7309bdc03c613b1746be
10 files changed, 430 insertions, 5 deletions
diff --git a/yql/essentials/core/common_opt/yql_co_simple1.cpp b/yql/essentials/core/common_opt/yql_co_simple1.cpp index 06107c3bfd..4f0ddab741 100644 --- a/yql/essentials/core/common_opt/yql_co_simple1.cpp +++ b/yql/essentials/core/common_opt/yql_co_simple1.cpp @@ -5465,12 +5465,18 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) { if (node->ChildrenSize() % 2 == 1) { // No default value bool allJust = true; + bool allSingleAsList = true; TNodeSet uniqLambdas; for (ui32 index = 1; index < node->ChildrenSize(); index += 2) { - uniqLambdas.insert(node->Child(index + 1)); - if (!TCoJust::Match(node->Child(index + 1)->Child(1))) { + const TExprNode* visitLambda = node->Child(index + 1); + const TExprNode* body = visitLambda->Child(1); + uniqLambdas.insert(visitLambda); + if (!TCoJust::Match(body)) { allJust = false; } + if (!TCoAsList::Match(body) || body->ChildrenSize() != 1) { + allSingleAsList = false; + } } if (uniqLambdas.size() == 1 && node->ChildrenSize() > 3) { @@ -5486,10 +5492,10 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) { .Build(); } - if (allJust) { - YQL_CLOG(DEBUG, Core) << node->Content() << " - extract Just"; + if (allJust || allSingleAsList) { + YQL_CLOG(DEBUG, Core) << node->Content() << " - extract " << (allJust ? "Just" : "AsList"); return ctx.Builder(node->Pos()) - .Callable("Just") + .Callable(allJust ? "Just" : "AsList") .Callable(0, "Visit") .Add(0, node->HeadPtr()) .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { diff --git a/yql/essentials/tests/sql/sql2yql/canondata/result.json b/yql/essentials/tests/sql/sql2yql/canondata/result.json index ebec884c2a..9233984d80 100644 --- a/yql/essentials/tests/sql/sql2yql/canondata/result.json +++ b/yql/essentials/tests/sql/sql2yql/canondata/result.json @@ -16253,6 +16253,13 @@ "uri": "https://{canondata_backend}/1936273/4a1b39013e1bae40e722cff8ccef8829784964e2/resource.tar.gz#test_sql2yql.test_produce-reduce_with_python_row_repack_/sql.yql" } ], + "test_sql2yql.test[produce-reduce_with_trivial_remaps]": [ + { + "checksum": "b094793320668f635c5e837a71e84b50", + "size": 2276, + "uri": "https://{canondata_backend}/1942100/fc877ab79aa3673db82f29099d27c26f000fde66/resource.tar.gz#test_sql2yql.test_produce-reduce_with_trivial_remaps_/sql.yql" + } + ], "test_sql2yql.test[produce-yql-10297]": [ { "checksum": "0efb35a2c6333d72f2edddd180c70a00", @@ -29302,6 +29309,11 @@ "uri": "file://test_sql_format.test_produce-reduce_with_python_row_repack_/formatted.sql" } ], + "test_sql_format.test[produce-reduce_with_trivial_remaps]": [ + { + "uri": "file://test_sql_format.test_produce-reduce_with_trivial_remaps_/formatted.sql" + } + ], "test_sql_format.test[produce-yql-10297]": [ { "uri": "file://test_sql_format.test_produce-yql-10297_/formatted.sql" diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_produce-reduce_with_trivial_remaps_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_produce-reduce_with_trivial_remaps_/formatted.sql new file mode 100644 index 0000000000..a29b85cedb --- /dev/null +++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_produce-reduce_with_trivial_remaps_/formatted.sql @@ -0,0 +1,23 @@ +USE plato; + +PRAGMA warning("disable", "4510"); + +$udf = ($_key, $stream) -> { + $init = ($item) -> (AsStruct(1u AS cnt, $item AS row)); + $switch = ($_item, $_state) -> (FALSE); + $update = ($item, $state) -> ( + AsStruct( + $state.cnt + 1u AS cnt, + if(($item.value > $state.row.value) ?? FALSE, $item, $state.row) AS row + ) + ); + $state = YQL::Collect(YQL::Condense1($stream, $init, $switch, $update)); + RETURN $state; +}; + +REDUCE CONCAT(Input1, Input2) +PRESORT + subkey +ON + key +USING $udf(TableRow()); diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.cfg b/yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.cfg new file mode 100644 index 0000000000..1516335c81 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.cfg @@ -0,0 +1,3 @@ +in Input1 sorted1.txt +in Input2 sorted2.txt +providers yt diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.sql new file mode 100644 index 0000000000..65e70e8715 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.sql @@ -0,0 +1,15 @@ +use plato; +pragma warning("disable", "4510"); + +$udf = ($_key, $stream) -> { + $init = ($item) -> (AsStruct(1u as cnt, $item as row)); + $switch = ($_item, $_state) -> (false); + $update = ($item, $state) -> (AsStruct($state.cnt + 1u as cnt, + if(($item.value > $state.row.value) ?? false, $item, $state.row) as row)); + $state = YQL::Collect(YQL::Condense1($stream, $init, $switch, $update)); + return $state; +}; + +REDUCE CONCAT(Input1,Input2) +presort subkey +ON key USING $udf(TableRow()); diff --git a/yql/essentials/tests/sql/suites/produce/sorted2.txt b/yql/essentials/tests/sql/suites/produce/sorted2.txt new file mode 100644 index 0000000000..b214aab0d9 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/sorted2.txt @@ -0,0 +1,10 @@ +{"key"="023";"subkey"="3";"value"="aaa"}; +{"key"="037";"subkey"="5";"value"="ddd"}; +{"key"="075";"subkey"="1";"value"="abc"}; +{"key"="150";"subkey"="1";"value"="aaa"}; +{"key"="150";"subkey"="3";"value"="iii"}; +{"key"="150";"subkey"="8";"value"="zzz"}; +{"key"="200";"subkey"="7";"value"="qqq"}; +{"key"="527";"subkey"="4";"value"="bbb"}; +{"key"="761";"subkey"="6";"value"="ccc"}; +{"key"="911";"subkey"="2";"value"="kkk"}; diff --git a/yql/essentials/tests/sql/suites/produce/sorted2.txt.attr b/yql/essentials/tests/sql/suites/produce/sorted2.txt.attr new file mode 100644 index 0000000000..5d3821a576 --- /dev/null +++ b/yql/essentials/tests/sql/suites/produce/sorted2.txt.attr @@ -0,0 +1,11 @@ +{"_yql_row_spec"={ + "Type"=["StructType";[ + ["key";["DataType";"String"]]; + ["subkey";["DataType";"String"]]; + ["value";["DataType";"Utf8"]] + ]]; + "SortDirections"=[1;1;1;]; + "SortedBy"=["key";"subkey";"value";]; + "SortedByTypes"=[["DataType";"String";];["DataType";"String";];["DataType";"Utf8";];]; + "SortMembers"=["key";"subkey";"value";]; +}} diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp index 51c9d75136..1107e0210d 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp @@ -78,6 +78,7 @@ TYtPhysicalOptProposalTransformer::TYtPhysicalOptProposalTransformer(TYtState::T AddHandler(1, &TYtMerge::Match, HNDL(PushMergeLimitToInput)); if (!State_->Configuration->DisableFuseOperations.Get().GetOrElse(DEFAULT_DISABLE_FUSE_OPERATIONS)) { AddHandler(1, &TYtReduce::Match, HNDL(FuseReduce)); + AddHandler(1, &TYtReduce::Match, HNDL(FuseReduceWithTrivialMap)); } AddHandler(2, &TYtEquiJoin::Match, HNDL(RuntimeEquiJoin)); diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h index 95f9435922..67e68ea979 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h @@ -76,6 +76,8 @@ private: NNodes::TMaybeNode<NNodes::TExprBase> FuseReduce(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const; + NNodes::TMaybeNode<NNodes::TExprBase> FuseReduceWithTrivialMap(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const; + NNodes::TMaybeNode<NNodes::TExprBase> FuseInnerMap(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const; NNodes::TMaybeNode<NNodes::TExprBase> FuseOuterMap(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const; diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp index 96fe6dab33..7753c648f2 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp @@ -264,6 +264,348 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::FuseReduce(TExprBase no .Done(); } +TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::FuseReduceWithTrivialMap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const { + const EYtSettingTypes acceptedReduceSettings = + EYtSettingType::ReduceBy + | EYtSettingType::Limit + | EYtSettingType::SortLimitBy + | EYtSettingType::SortBy + // | EYtSettingType::JoinReduce + // | EYtSettingType::FirstAsPrimary + | EYtSettingType::Flow + | EYtSettingType::KeepSorted + | EYtSettingType::KeySwitch + // | EYtSettingType::ReduceInputType + | EYtSettingType::NoDq; + + const EYtSettingTypes acceptedMapSettings = + EYtSettingType::Ordered + //| EYtSettingType::Limit + //| EYtSettingType::SortLimitBy + //| EYtSettingType::WeakFields + //| EYtSettingType::Sharded + //| EYtSettingType::JobCount + | EYtSettingType::Flow + | EYtSettingType::KeepSorted + | EYtSettingType::NoDq + //| EYtSettingType::BlockInputReady + //| EYtSettingType::BlockInputApplied + ; + + auto outerReduce = node.Cast<TYtReduce>(); + if (NYql::HasSettingsExcept(outerReduce.Settings().Ref(), acceptedReduceSettings)) { + return node; + } + + const bool hasKeySwitch = NYql::HasSetting(outerReduce.Settings().Ref(), EYtSettingType::KeySwitch); + const bool isFlow = NYql::HasSetting(outerReduce.Settings().Ref(), EYtSettingType::Flow); + + const auto sortBy = NYql::GetSettingAsColumnList(outerReduce.Settings().Ref(), EYtSettingType::SortBy); + const auto reduceBy = NYql::GetSettingAsColumnList(outerReduce.Settings().Ref(), EYtSettingType::ReduceBy); + + THashSet<TString> sortOrKeyColumns(sortBy.begin(), sortBy.end()); + sortOrKeyColumns.insert(reduceBy.begin(), reduceBy.end()); + + struct TFused { + TYtPath Path; + TCoLambda MapLambda; + TCoLambda ReduceLambda; + TExprBase ReducePlaceholder; + size_t InputIndex; + size_t OrigInputIndex; + TYtMap OrigMap; + }; + + TExprNode::TPtr origVariantType; + if (outerReduce.Input().Size() > 1) { + auto itemType = GetSequenceItemType(outerReduce.Reducer().Args().Arg(0), true); + YQL_ENSURE(itemType); + origVariantType = ExpandType(outerReduce.Pos(), *itemType->Cast<TVariantExprType>(), ctx); + } + + TMaybe<TFused> fusedMap; + TVector<TYtSection> newInput; + const size_t origReduceInputs = outerReduce.Input().Size(); + for (size_t i = 0; i < origReduceInputs; ++i) { + const auto& section = outerReduce.Input().Item(i); + if (fusedMap.Defined() || section.Settings().Size() != 0) { + newInput.push_back(section); + continue; + } + + TVector<TYtPath> newPaths; + newPaths.reserve(section.Paths().Size()); + for (const auto& path : section.Paths()) { + if (fusedMap.Defined() || !path.Ranges().Maybe<TCoVoid>()) { + newPaths.push_back(path); + continue; + } + + auto maybeInnerMap = path.Table().Maybe<TYtOutput>().Operation().Maybe<TYtMap>(); + if (!maybeInnerMap) { + newPaths.push_back(path); + continue; + } + + TYtMap innerMap = maybeInnerMap.Cast(); + if (innerMap.Ref().StartsExecution() || + innerMap.Ref().HasResult() || + outerReduce.DataSink().Cluster().Value() != innerMap.DataSink().Cluster().Value() || + innerMap.Output().Size() > 1 || + innerMap.Input().Size() > 1 || + innerMap.Input().Item(0).Paths().Size() > 1 || + !NYql::HasSetting(innerMap.Settings().Ref(), EYtSettingType::Ordered) || + isFlow != NYql::HasSetting(innerMap.Settings().Ref(), EYtSettingType::Flow) || + NYql::HasSettingsExcept(innerMap.Settings().Ref(), acceptedMapSettings)) + { + newPaths.push_back(path); + continue; + } + + const TParentsMap* parents = getParents(); + if (IsOutputUsedMultipleTimes(path.Table().Cast<TYtOutput>().Ref(), *parents)) { + // Inner reduce output is used more than once + newPaths.push_back(path); + continue; + } + + // Check world dependencies + auto parentsIt = parents->find(innerMap.Raw()); + YQL_ENSURE(parentsIt != parents->cend()); + if (!AllOf(parentsIt->second, [](const TExprNode* dep) { return TYtOutput::Match(dep); })) { + newPaths.push_back(path); + continue; + } + + const TCoLambda mapLambda = innerMap.Mapper(); + auto maybeFlatMap = GetFlatMapOverInputStream(mapLambda, *parents); + TMaybe<THashSet<TStringBuf>> passthrough; + if (!maybeFlatMap.Maybe<TCoOrderedFlatMap>() || + !IsJustOrSingleAsList(maybeFlatMap.Cast().Lambda().Body().Ref()) || + !IsPassthroughFlatMap(maybeFlatMap.Cast(), &passthrough) || + !passthrough || + !AllOf(sortOrKeyColumns, [&](const TString& col) { return passthrough->contains(col); })) + { + newPaths.push_back(path); + continue; + } + + auto fuseRes = CanFuseLambdas(mapLambda, outerReduce.Reducer(), ctx); + if (!fuseRes) { + // Some error + return {}; + } + if (!*fuseRes) { + // Cannot fuse + newPaths.push_back(path); + continue; + } + + auto [placeHolder, lambdaWithPlaceholder] = ReplaceDependsOn(outerReduce.Reducer().Ptr(), ctx, State_->Types); + if (!placeHolder) { + return {}; + } + + TYtPath newPath = innerMap.Input().Item(0).Paths().Item(0); + YQL_ENSURE(newInput.size() == i); + if (!newPaths.empty()) { + newInput.push_back( + Build<TYtSection>(ctx, section.Pos()) + .InitFrom(section) + .Paths() + .Add(newPaths) + .Build() + .Done()); + newPaths.clear(); + } + size_t inputIndex = newInput.size(); + newInput.push_back( + Build<TYtSection>(ctx, section.Pos()) + .InitFrom(section) + .Paths() + .Add(newPath) + .Build() + .Done()); + fusedMap = { + .Path = newPath, + .MapLambda = mapLambda, + .ReduceLambda = TCoLambda(lambdaWithPlaceholder), + .ReducePlaceholder = TExprBase(placeHolder), + .InputIndex = inputIndex, + .OrigInputIndex = i, + .OrigMap = innerMap, + }; + } + if (!newPaths.empty()) { + newInput.push_back( + Build<TYtSection>(ctx, section.Pos()) + .InitFrom(section) + .Paths() + .Add(newPaths) + .Build() + .Done()); + } + } + + if (!fusedMap) { + return node; + } + + YQL_ENSURE(newInput.size() >= origReduceInputs); + YQL_ENSURE(newInput.size() - origReduceInputs <= 1); + + TExprNode::TPtr remapLambda = ctx.Builder(fusedMap->MapLambda.Pos()) + .Lambda() + .Param("item") + .Apply(fusedMap->MapLambda.Ptr()) + .With(0) + .Callable("AsList") + .Arg(0, "item") + .Seal() + .Done() + .Seal() + .Seal() + .Build(); + if (hasKeySwitch) { + remapLambda = ctx.Builder(fusedMap->MapLambda.Pos()) + .Lambda() + .Param("item") + .Callable(0, "OrderedMap") + .Apply(0, remapLambda) + .With(0) + .Callable("RemoveMember") + .Arg(0, "item") + .Atom(1, "_yql_sys_tablekeyswitch") + .Seal() + .Done() + .Seal() + .Lambda(1) + .Param("remappedItem") + .Callable(0, "AddMember") + .Arg(0, "remappedItem") + .Atom(1, "_yql_sys_tablekeyswitch") + .Callable(2, "Member") + .Arg(0, "item") + .Atom(1, "_yql_sys_tablekeyswitch") + .Seal() + .Seal() + .Seal() + .Seal() + .Seal() + .Build(); + } + + TExprNode::TPtr flatMapLambda; + if (newInput.size() == 1) { + flatMapLambda = remapLambda; + } else { + flatMapLambda = ctx.Builder(outerReduce.Pos()) + .Lambda() + .Param("item") + .Callable("Visit") + .Arg(0, "item") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + const bool inserted = newInput.size() > origReduceInputs; + for (size_t i = 0; i < newInput.size(); ++i) { + TString paramName = TStringBuilder() << "alt" << i; + TString remappedName = TStringBuilder() << "remapped" << i; + if (i != fusedMap->InputIndex) { + parent + .Atom(2 * i + 1, i) + .Lambda(2 * i + 2) + .Param(paramName) + .Callable("AsList") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + if (origVariantType) { + parent + .Callable(0, "Variant") + .Arg(0, paramName) + .Atom(1, (i > fusedMap->InputIndex && inserted) ? i - 1 : i) + .Add(2, origVariantType) + .Seal(); + } else { + parent + .Arg(0, paramName); + } + return parent; + }) + .Seal() + .Seal(); + } else { + parent + .Atom(2 * i + 1, i) + .Lambda(2 * i + 2) + .Param(paramName) + .Callable("OrderedMap") + .Apply(0, remapLambda) + .With(0, paramName) + .Seal() + .Lambda(1) + .Param(remappedName) + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + if (origVariantType) { + parent + .Callable("Variant") + .Arg(0, remappedName) + .Atom(1, fusedMap->OrigInputIndex) + .Add(2, origVariantType) + .Seal(); + } else { + parent + .Arg(remappedName); + } + return parent; + }) + .Seal() + .Seal() + .Seal(); + } + } + return parent; + }) + .Seal() + .Seal() + .Build(); + } + + TExprNode::TPtr newReduceLambda = ctx.Builder(outerReduce.Pos()) + .Lambda() + .Param("inputStream") + .Apply(0, fusedMap->ReduceLambda.Ptr()) + .With(0) + .Callable("OrderedFlatMap") + .Arg(0, "inputStream") + .Add(1, flatMapLambda) + .Seal() + .Done() + .WithNode(fusedMap->ReducePlaceholder.Ref(), "inputStream") + .Seal() + .Seal() + .Build(); + + auto newSettings = outerReduce.Settings().Ptr(); + if (!NYql::HasSetting(outerReduce.Settings().Ref(), EYtSettingType::NoDq) && + NYql::HasSetting(fusedMap->OrigMap.Settings().Ref(), EYtSettingType::NoDq)) + { + newSettings = NYql::AddSetting(*newSettings, EYtSettingType::NoDq, {}, ctx); + } + + return Build<TYtReduce>(ctx, node.Pos()) + .InitFrom(outerReduce) + .World<TCoSync>() + .Add(fusedMap->OrigMap.World()) + .Add(outerReduce.World()) + .Build() + .Input() + .Add(newInput) + .Build() + .Reducer(newReduceLambda) + .Settings(newSettings) + .Done(); + + return node; +} + TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::FuseInnerMap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const { auto outerMap = node.Cast<TYtMap>(); if (outerMap.Input().Size() != 1 || outerMap.Input().Item(0).Paths().Size() != 1) { |