aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@yandex-team.com>2024-12-10 21:05:07 +0300
committeraneporada <aneporada@yandex-team.com>2024-12-10 22:01:55 +0300
commitdbbdb649150e00878fcc21ad1ea99052aca804c7 (patch)
tree7a040eff12fb115324799008e5468283cb5f4481
parentebceebb7b22e971e5eb52b5323bfc1a4de855145 (diff)
downloadydb-dbbdb649150e00878fcc21ad1ea99052aca804c7.tar.gz
Fuse reduce with trivial map
commit_hash:38c7edfde8f64d80a47f7309bdc03c613b1746be
-rw-r--r--yql/essentials/core/common_opt/yql_co_simple1.cpp16
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/result.json12
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_produce-reduce_with_trivial_remaps_/formatted.sql23
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.cfg3
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.sql15
-rw-r--r--yql/essentials/tests/sql/suites/produce/sorted2.txt10
-rw-r--r--yql/essentials/tests/sql/suites/produce/sorted2.txt.attr11
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp1
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h2
-rw-r--r--yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp342
10 files changed, 430 insertions, 5 deletions
diff --git a/yql/essentials/core/common_opt/yql_co_simple1.cpp b/yql/essentials/core/common_opt/yql_co_simple1.cpp
index 06107c3bfd..4f0ddab741 100644
--- a/yql/essentials/core/common_opt/yql_co_simple1.cpp
+++ b/yql/essentials/core/common_opt/yql_co_simple1.cpp
@@ -5465,12 +5465,18 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
if (node->ChildrenSize() % 2 == 1) { // No default value
bool allJust = true;
+ bool allSingleAsList = true;
TNodeSet uniqLambdas;
for (ui32 index = 1; index < node->ChildrenSize(); index += 2) {
- uniqLambdas.insert(node->Child(index + 1));
- if (!TCoJust::Match(node->Child(index + 1)->Child(1))) {
+ const TExprNode* visitLambda = node->Child(index + 1);
+ const TExprNode* body = visitLambda->Child(1);
+ uniqLambdas.insert(visitLambda);
+ if (!TCoJust::Match(body)) {
allJust = false;
}
+ if (!TCoAsList::Match(body) || body->ChildrenSize() != 1) {
+ allSingleAsList = false;
+ }
}
if (uniqLambdas.size() == 1 && node->ChildrenSize() > 3) {
@@ -5486,10 +5492,10 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
.Build();
}
- if (allJust) {
- YQL_CLOG(DEBUG, Core) << node->Content() << " - extract Just";
+ if (allJust || allSingleAsList) {
+ YQL_CLOG(DEBUG, Core) << node->Content() << " - extract " << (allJust ? "Just" : "AsList");
return ctx.Builder(node->Pos())
- .Callable("Just")
+ .Callable(allJust ? "Just" : "AsList")
.Callable(0, "Visit")
.Add(0, node->HeadPtr())
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/result.json b/yql/essentials/tests/sql/sql2yql/canondata/result.json
index ebec884c2a..9233984d80 100644
--- a/yql/essentials/tests/sql/sql2yql/canondata/result.json
+++ b/yql/essentials/tests/sql/sql2yql/canondata/result.json
@@ -16253,6 +16253,13 @@
"uri": "https://{canondata_backend}/1936273/4a1b39013e1bae40e722cff8ccef8829784964e2/resource.tar.gz#test_sql2yql.test_produce-reduce_with_python_row_repack_/sql.yql"
}
],
+ "test_sql2yql.test[produce-reduce_with_trivial_remaps]": [
+ {
+ "checksum": "b094793320668f635c5e837a71e84b50",
+ "size": 2276,
+ "uri": "https://{canondata_backend}/1942100/fc877ab79aa3673db82f29099d27c26f000fde66/resource.tar.gz#test_sql2yql.test_produce-reduce_with_trivial_remaps_/sql.yql"
+ }
+ ],
"test_sql2yql.test[produce-yql-10297]": [
{
"checksum": "0efb35a2c6333d72f2edddd180c70a00",
@@ -29302,6 +29309,11 @@
"uri": "file://test_sql_format.test_produce-reduce_with_python_row_repack_/formatted.sql"
}
],
+ "test_sql_format.test[produce-reduce_with_trivial_remaps]": [
+ {
+ "uri": "file://test_sql_format.test_produce-reduce_with_trivial_remaps_/formatted.sql"
+ }
+ ],
"test_sql_format.test[produce-yql-10297]": [
{
"uri": "file://test_sql_format.test_produce-yql-10297_/formatted.sql"
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_produce-reduce_with_trivial_remaps_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_produce-reduce_with_trivial_remaps_/formatted.sql
new file mode 100644
index 0000000000..a29b85cedb
--- /dev/null
+++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_produce-reduce_with_trivial_remaps_/formatted.sql
@@ -0,0 +1,23 @@
+USE plato;
+
+PRAGMA warning("disable", "4510");
+
+$udf = ($_key, $stream) -> {
+ $init = ($item) -> (AsStruct(1u AS cnt, $item AS row));
+ $switch = ($_item, $_state) -> (FALSE);
+ $update = ($item, $state) -> (
+ AsStruct(
+ $state.cnt + 1u AS cnt,
+ if(($item.value > $state.row.value) ?? FALSE, $item, $state.row) AS row
+ )
+ );
+ $state = YQL::Collect(YQL::Condense1($stream, $init, $switch, $update));
+ RETURN $state;
+};
+
+REDUCE CONCAT(Input1, Input2)
+PRESORT
+ subkey
+ON
+ key
+USING $udf(TableRow());
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.cfg b/yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.cfg
new file mode 100644
index 0000000000..1516335c81
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.cfg
@@ -0,0 +1,3 @@
+in Input1 sorted1.txt
+in Input2 sorted2.txt
+providers yt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.sql
new file mode 100644
index 0000000000..65e70e8715
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.sql
@@ -0,0 +1,15 @@
+use plato;
+pragma warning("disable", "4510");
+
+$udf = ($_key, $stream) -> {
+ $init = ($item) -> (AsStruct(1u as cnt, $item as row));
+ $switch = ($_item, $_state) -> (false);
+ $update = ($item, $state) -> (AsStruct($state.cnt + 1u as cnt,
+ if(($item.value > $state.row.value) ?? false, $item, $state.row) as row));
+ $state = YQL::Collect(YQL::Condense1($stream, $init, $switch, $update));
+ return $state;
+};
+
+REDUCE CONCAT(Input1,Input2)
+presort subkey
+ON key USING $udf(TableRow());
diff --git a/yql/essentials/tests/sql/suites/produce/sorted2.txt b/yql/essentials/tests/sql/suites/produce/sorted2.txt
new file mode 100644
index 0000000000..b214aab0d9
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/sorted2.txt
@@ -0,0 +1,10 @@
+{"key"="023";"subkey"="3";"value"="aaa"};
+{"key"="037";"subkey"="5";"value"="ddd"};
+{"key"="075";"subkey"="1";"value"="abc"};
+{"key"="150";"subkey"="1";"value"="aaa"};
+{"key"="150";"subkey"="3";"value"="iii"};
+{"key"="150";"subkey"="8";"value"="zzz"};
+{"key"="200";"subkey"="7";"value"="qqq"};
+{"key"="527";"subkey"="4";"value"="bbb"};
+{"key"="761";"subkey"="6";"value"="ccc"};
+{"key"="911";"subkey"="2";"value"="kkk"};
diff --git a/yql/essentials/tests/sql/suites/produce/sorted2.txt.attr b/yql/essentials/tests/sql/suites/produce/sorted2.txt.attr
new file mode 100644
index 0000000000..5d3821a576
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/sorted2.txt.attr
@@ -0,0 +1,11 @@
+{"_yql_row_spec"={
+ "Type"=["StructType";[
+ ["key";["DataType";"String"]];
+ ["subkey";["DataType";"String"]];
+ ["value";["DataType";"Utf8"]]
+ ]];
+ "SortDirections"=[1;1;1;];
+ "SortedBy"=["key";"subkey";"value";];
+ "SortedByTypes"=[["DataType";"String";];["DataType";"String";];["DataType";"Utf8";];];
+ "SortMembers"=["key";"subkey";"value";];
+}}
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp
index 51c9d75136..1107e0210d 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp
@@ -78,6 +78,7 @@ TYtPhysicalOptProposalTransformer::TYtPhysicalOptProposalTransformer(TYtState::T
AddHandler(1, &TYtMerge::Match, HNDL(PushMergeLimitToInput));
if (!State_->Configuration->DisableFuseOperations.Get().GetOrElse(DEFAULT_DISABLE_FUSE_OPERATIONS)) {
AddHandler(1, &TYtReduce::Match, HNDL(FuseReduce));
+ AddHandler(1, &TYtReduce::Match, HNDL(FuseReduceWithTrivialMap));
}
AddHandler(2, &TYtEquiJoin::Match, HNDL(RuntimeEquiJoin));
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h
index 95f9435922..67e68ea979 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h
@@ -76,6 +76,8 @@ private:
NNodes::TMaybeNode<NNodes::TExprBase> FuseReduce(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const;
+ NNodes::TMaybeNode<NNodes::TExprBase> FuseReduceWithTrivialMap(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const;
+
NNodes::TMaybeNode<NNodes::TExprBase> FuseInnerMap(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const;
NNodes::TMaybeNode<NNodes::TExprBase> FuseOuterMap(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const;
diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp
index 96fe6dab33..7753c648f2 100644
--- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp
+++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp
@@ -264,6 +264,348 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::FuseReduce(TExprBase no
.Done();
}
+TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::FuseReduceWithTrivialMap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const {
+ const EYtSettingTypes acceptedReduceSettings =
+ EYtSettingType::ReduceBy
+ | EYtSettingType::Limit
+ | EYtSettingType::SortLimitBy
+ | EYtSettingType::SortBy
+ // | EYtSettingType::JoinReduce
+ // | EYtSettingType::FirstAsPrimary
+ | EYtSettingType::Flow
+ | EYtSettingType::KeepSorted
+ | EYtSettingType::KeySwitch
+ // | EYtSettingType::ReduceInputType
+ | EYtSettingType::NoDq;
+
+ const EYtSettingTypes acceptedMapSettings =
+ EYtSettingType::Ordered
+ //| EYtSettingType::Limit
+ //| EYtSettingType::SortLimitBy
+ //| EYtSettingType::WeakFields
+ //| EYtSettingType::Sharded
+ //| EYtSettingType::JobCount
+ | EYtSettingType::Flow
+ | EYtSettingType::KeepSorted
+ | EYtSettingType::NoDq
+ //| EYtSettingType::BlockInputReady
+ //| EYtSettingType::BlockInputApplied
+ ;
+
+ auto outerReduce = node.Cast<TYtReduce>();
+ if (NYql::HasSettingsExcept(outerReduce.Settings().Ref(), acceptedReduceSettings)) {
+ return node;
+ }
+
+ const bool hasKeySwitch = NYql::HasSetting(outerReduce.Settings().Ref(), EYtSettingType::KeySwitch);
+ const bool isFlow = NYql::HasSetting(outerReduce.Settings().Ref(), EYtSettingType::Flow);
+
+ const auto sortBy = NYql::GetSettingAsColumnList(outerReduce.Settings().Ref(), EYtSettingType::SortBy);
+ const auto reduceBy = NYql::GetSettingAsColumnList(outerReduce.Settings().Ref(), EYtSettingType::ReduceBy);
+
+ THashSet<TString> sortOrKeyColumns(sortBy.begin(), sortBy.end());
+ sortOrKeyColumns.insert(reduceBy.begin(), reduceBy.end());
+
+ struct TFused {
+ TYtPath Path;
+ TCoLambda MapLambda;
+ TCoLambda ReduceLambda;
+ TExprBase ReducePlaceholder;
+ size_t InputIndex;
+ size_t OrigInputIndex;
+ TYtMap OrigMap;
+ };
+
+ TExprNode::TPtr origVariantType;
+ if (outerReduce.Input().Size() > 1) {
+ auto itemType = GetSequenceItemType(outerReduce.Reducer().Args().Arg(0), true);
+ YQL_ENSURE(itemType);
+ origVariantType = ExpandType(outerReduce.Pos(), *itemType->Cast<TVariantExprType>(), ctx);
+ }
+
+ TMaybe<TFused> fusedMap;
+ TVector<TYtSection> newInput;
+ const size_t origReduceInputs = outerReduce.Input().Size();
+ for (size_t i = 0; i < origReduceInputs; ++i) {
+ const auto& section = outerReduce.Input().Item(i);
+ if (fusedMap.Defined() || section.Settings().Size() != 0) {
+ newInput.push_back(section);
+ continue;
+ }
+
+ TVector<TYtPath> newPaths;
+ newPaths.reserve(section.Paths().Size());
+ for (const auto& path : section.Paths()) {
+ if (fusedMap.Defined() || !path.Ranges().Maybe<TCoVoid>()) {
+ newPaths.push_back(path);
+ continue;
+ }
+
+ auto maybeInnerMap = path.Table().Maybe<TYtOutput>().Operation().Maybe<TYtMap>();
+ if (!maybeInnerMap) {
+ newPaths.push_back(path);
+ continue;
+ }
+
+ TYtMap innerMap = maybeInnerMap.Cast();
+ if (innerMap.Ref().StartsExecution() ||
+ innerMap.Ref().HasResult() ||
+ outerReduce.DataSink().Cluster().Value() != innerMap.DataSink().Cluster().Value() ||
+ innerMap.Output().Size() > 1 ||
+ innerMap.Input().Size() > 1 ||
+ innerMap.Input().Item(0).Paths().Size() > 1 ||
+ !NYql::HasSetting(innerMap.Settings().Ref(), EYtSettingType::Ordered) ||
+ isFlow != NYql::HasSetting(innerMap.Settings().Ref(), EYtSettingType::Flow) ||
+ NYql::HasSettingsExcept(innerMap.Settings().Ref(), acceptedMapSettings))
+ {
+ newPaths.push_back(path);
+ continue;
+ }
+
+ const TParentsMap* parents = getParents();
+ if (IsOutputUsedMultipleTimes(path.Table().Cast<TYtOutput>().Ref(), *parents)) {
+ // Inner reduce output is used more than once
+ newPaths.push_back(path);
+ continue;
+ }
+
+ // Check world dependencies
+ auto parentsIt = parents->find(innerMap.Raw());
+ YQL_ENSURE(parentsIt != parents->cend());
+ if (!AllOf(parentsIt->second, [](const TExprNode* dep) { return TYtOutput::Match(dep); })) {
+ newPaths.push_back(path);
+ continue;
+ }
+
+ const TCoLambda mapLambda = innerMap.Mapper();
+ auto maybeFlatMap = GetFlatMapOverInputStream(mapLambda, *parents);
+ TMaybe<THashSet<TStringBuf>> passthrough;
+ if (!maybeFlatMap.Maybe<TCoOrderedFlatMap>() ||
+ !IsJustOrSingleAsList(maybeFlatMap.Cast().Lambda().Body().Ref()) ||
+ !IsPassthroughFlatMap(maybeFlatMap.Cast(), &passthrough) ||
+ !passthrough ||
+ !AllOf(sortOrKeyColumns, [&](const TString& col) { return passthrough->contains(col); }))
+ {
+ newPaths.push_back(path);
+ continue;
+ }
+
+ auto fuseRes = CanFuseLambdas(mapLambda, outerReduce.Reducer(), ctx);
+ if (!fuseRes) {
+ // Some error
+ return {};
+ }
+ if (!*fuseRes) {
+ // Cannot fuse
+ newPaths.push_back(path);
+ continue;
+ }
+
+ auto [placeHolder, lambdaWithPlaceholder] = ReplaceDependsOn(outerReduce.Reducer().Ptr(), ctx, State_->Types);
+ if (!placeHolder) {
+ return {};
+ }
+
+ TYtPath newPath = innerMap.Input().Item(0).Paths().Item(0);
+ YQL_ENSURE(newInput.size() == i);
+ if (!newPaths.empty()) {
+ newInput.push_back(
+ Build<TYtSection>(ctx, section.Pos())
+ .InitFrom(section)
+ .Paths()
+ .Add(newPaths)
+ .Build()
+ .Done());
+ newPaths.clear();
+ }
+ size_t inputIndex = newInput.size();
+ newInput.push_back(
+ Build<TYtSection>(ctx, section.Pos())
+ .InitFrom(section)
+ .Paths()
+ .Add(newPath)
+ .Build()
+ .Done());
+ fusedMap = {
+ .Path = newPath,
+ .MapLambda = mapLambda,
+ .ReduceLambda = TCoLambda(lambdaWithPlaceholder),
+ .ReducePlaceholder = TExprBase(placeHolder),
+ .InputIndex = inputIndex,
+ .OrigInputIndex = i,
+ .OrigMap = innerMap,
+ };
+ }
+ if (!newPaths.empty()) {
+ newInput.push_back(
+ Build<TYtSection>(ctx, section.Pos())
+ .InitFrom(section)
+ .Paths()
+ .Add(newPaths)
+ .Build()
+ .Done());
+ }
+ }
+
+ if (!fusedMap) {
+ return node;
+ }
+
+ YQL_ENSURE(newInput.size() >= origReduceInputs);
+ YQL_ENSURE(newInput.size() - origReduceInputs <= 1);
+
+ TExprNode::TPtr remapLambda = ctx.Builder(fusedMap->MapLambda.Pos())
+ .Lambda()
+ .Param("item")
+ .Apply(fusedMap->MapLambda.Ptr())
+ .With(0)
+ .Callable("AsList")
+ .Arg(0, "item")
+ .Seal()
+ .Done()
+ .Seal()
+ .Seal()
+ .Build();
+ if (hasKeySwitch) {
+ remapLambda = ctx.Builder(fusedMap->MapLambda.Pos())
+ .Lambda()
+ .Param("item")
+ .Callable(0, "OrderedMap")
+ .Apply(0, remapLambda)
+ .With(0)
+ .Callable("RemoveMember")
+ .Arg(0, "item")
+ .Atom(1, "_yql_sys_tablekeyswitch")
+ .Seal()
+ .Done()
+ .Seal()
+ .Lambda(1)
+ .Param("remappedItem")
+ .Callable(0, "AddMember")
+ .Arg(0, "remappedItem")
+ .Atom(1, "_yql_sys_tablekeyswitch")
+ .Callable(2, "Member")
+ .Arg(0, "item")
+ .Atom(1, "_yql_sys_tablekeyswitch")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ TExprNode::TPtr flatMapLambda;
+ if (newInput.size() == 1) {
+ flatMapLambda = remapLambda;
+ } else {
+ flatMapLambda = ctx.Builder(outerReduce.Pos())
+ .Lambda()
+ .Param("item")
+ .Callable("Visit")
+ .Arg(0, "item")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ const bool inserted = newInput.size() > origReduceInputs;
+ for (size_t i = 0; i < newInput.size(); ++i) {
+ TString paramName = TStringBuilder() << "alt" << i;
+ TString remappedName = TStringBuilder() << "remapped" << i;
+ if (i != fusedMap->InputIndex) {
+ parent
+ .Atom(2 * i + 1, i)
+ .Lambda(2 * i + 2)
+ .Param(paramName)
+ .Callable("AsList")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ if (origVariantType) {
+ parent
+ .Callable(0, "Variant")
+ .Arg(0, paramName)
+ .Atom(1, (i > fusedMap->InputIndex && inserted) ? i - 1 : i)
+ .Add(2, origVariantType)
+ .Seal();
+ } else {
+ parent
+ .Arg(0, paramName);
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal();
+ } else {
+ parent
+ .Atom(2 * i + 1, i)
+ .Lambda(2 * i + 2)
+ .Param(paramName)
+ .Callable("OrderedMap")
+ .Apply(0, remapLambda)
+ .With(0, paramName)
+ .Seal()
+ .Lambda(1)
+ .Param(remappedName)
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ if (origVariantType) {
+ parent
+ .Callable("Variant")
+ .Arg(0, remappedName)
+ .Atom(1, fusedMap->OrigInputIndex)
+ .Add(2, origVariantType)
+ .Seal();
+ } else {
+ parent
+ .Arg(remappedName);
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Seal();
+ }
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ TExprNode::TPtr newReduceLambda = ctx.Builder(outerReduce.Pos())
+ .Lambda()
+ .Param("inputStream")
+ .Apply(0, fusedMap->ReduceLambda.Ptr())
+ .With(0)
+ .Callable("OrderedFlatMap")
+ .Arg(0, "inputStream")
+ .Add(1, flatMapLambda)
+ .Seal()
+ .Done()
+ .WithNode(fusedMap->ReducePlaceholder.Ref(), "inputStream")
+ .Seal()
+ .Seal()
+ .Build();
+
+ auto newSettings = outerReduce.Settings().Ptr();
+ if (!NYql::HasSetting(outerReduce.Settings().Ref(), EYtSettingType::NoDq) &&
+ NYql::HasSetting(fusedMap->OrigMap.Settings().Ref(), EYtSettingType::NoDq))
+ {
+ newSettings = NYql::AddSetting(*newSettings, EYtSettingType::NoDq, {}, ctx);
+ }
+
+ return Build<TYtReduce>(ctx, node.Pos())
+ .InitFrom(outerReduce)
+ .World<TCoSync>()
+ .Add(fusedMap->OrigMap.World())
+ .Add(outerReduce.World())
+ .Build()
+ .Input()
+ .Add(newInput)
+ .Build()
+ .Reducer(newReduceLambda)
+ .Settings(newSettings)
+ .Done();
+
+ return node;
+}
+
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::FuseInnerMap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const {
auto outerMap = node.Cast<TYtMap>();
if (outerMap.Input().Size() != 1 || outerMap.Input().Item(0).Paths().Size() != 1) {