diff options
author | ziganshinmr <[email protected]> | 2025-08-26 18:38:20 +0300 |
---|---|---|
committer | ziganshinmr <[email protected]> | 2025-08-26 19:17:02 +0300 |
commit | 89c21070a7c6ac2ecfd9db66a40671bf4718a266 (patch) | |
tree | 796ab03dedf44621cabc7884ef549377d1e411be | |
parent | 086240d0ad585634f1a6bb8df82f0e5a1b25ea4d (diff) |
Forbid fuse maps with DependsOn loss in peephole - second attempt
commit_hash:d37fa5a6e62a39900f222fff86bae847202554ce
7 files changed, 231 insertions, 48 deletions
diff --git a/yql/essentials/ast/yql_expr.cpp b/yql/essentials/ast/yql_expr.cpp index 4c596895077..25a888be53f 100644 --- a/yql/essentials/ast/yql_expr.cpp +++ b/yql/essentials/ast/yql_expr.cpp @@ -2300,22 +2300,6 @@ const TTypeAnnotationNode* CompileTypeAnnotation(const TAstNode& node, TExprCont return compileCtx.CompileTypeAnnotationNode(node); } -template<class Set> -bool IsDependedImpl(const TExprNode& node, const Set& dependences, TNodeSet& visited) { - if (!visited.emplace(&node).second) - return false; - - if (dependences.cend() != dependences.find(&node)) - return true; - - for (const auto& child : node.Children()) { - if (IsDependedImpl(*child, dependences, visited)) - return true; - } - - return false; -} - namespace { enum EChangeState : ui8 { @@ -2651,11 +2635,6 @@ TExprNode::TListType TExprContext::ReplaceNodes(TExprNode::TListType&& starts, c template TExprNode::TListType TExprContext::ReplaceNodes<true>(TExprNode::TListType&& starts, const TNodeOnNodeOwnedMap& replaces); template TExprNode::TListType TExprContext::ReplaceNodes<false>(TExprNode::TListType&& starts, const TNodeOnNodeOwnedMap& replaces); -bool IsDepended(const TExprNode& node, const TNodeSet& dependences) { - TNodeSet visited; - return !dependences.empty() && IsDependedImpl(node, dependences, visited); -} - void CheckArguments(const TExprNode& root) { try { TNodeMap<TNodeSetPtr> unresolvedArgsMap; @@ -2866,7 +2845,7 @@ TExprNode::TPtr TExprContext::FuseLambdas(const TExprNode& outer, const TExprNod }); newBody = ReplaceNodes(std::move(outerBody), outerReplaces); } else if (1U == outerArgs.ChildrenSize()) { - newBody.reserve(newBody.size() * body.size()); + newBody.reserve(outerBody.size() * body.size()); for (auto item : body) { for (auto root : outerBody) { newBody.emplace_back(ReplaceNode(TExprNode::TPtr(root), outerArgs.Head(), TExprNode::TPtr(item))); diff --git a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp index b38682e1993..5c51f426625 100644 --- a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -2496,7 +2496,7 @@ TExprNode::TPtr TryExpandFlatMapOverTableSource(const TExprNode::TPtr& node, TEx } template <bool Ordered> -TExprNode::TPtr ExpandFlatMap(const TExprNode::TPtr& node, TExprContext& ctx) { +TExprNode::TPtr ExpandFlatMap(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { if (auto res = TryExpandFlatMapOverTableSource(node, ctx)) { return res; } @@ -2592,7 +2592,7 @@ TExprNode::TPtr ExpandFlatMap(const TExprNode::TPtr& node, TExprContext& ctx) { .Build(); } - if (node->Head().IsCallable("NarrowMap")) { + if (node->Head().IsCallable("NarrowMap") && CanFuseLambdas(*node->Child(1), *node->Head().Child(1), types)) { return FuseNarrowMap<true>(*node, ctx); } @@ -2605,10 +2605,16 @@ TExprNode::TPtr ExpandFlatMap(const TExprNode::TPtr& node, TExprContext& ctx) { } template<bool Ordered> -TExprNode::TPtr OptimizeMultiMap(const TExprNode::TPtr& node, TExprContext& ctx) { +TExprNode::TPtr OptimizeMultiMap(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { if (const auto& input = node->Head(); input.IsCallable("NarrowMap")) { + if (!CanFuseLambdas(*node->Child(1), *input.Child(1), types)) { + return node; + } return FuseNarrowMap<false>(*node, ctx); } else if (input.IsCallable({"MultiMap", "OrderedMultiMap", "NarrowMultiMap"})) { + if (!CanFuseLambdas(*node->Child(1), *input.Child(1), types)) { + return node; + } YQL_CLOG(DEBUG, CorePeepHole) << "Fuse " << node->Content() << " with " << input.Content(); return ctx.NewCallable(node->Pos(), Ordered || input.IsCallable("NarrowMultiMap") ? input.Content() : node->Content(), {input.HeadPtr(), ctx.FuseLambdas(node->Tail(), input.Tail())}); } @@ -3407,7 +3413,7 @@ TExprNode::TPtr ExpandMinMax(const TExprNode::TPtr& node, TExprContext& ctx) { } template <bool Ordered> -TExprNode::TPtr OptimizeMap(const TExprNode::TPtr& node, TExprContext& ctx) { +TExprNode::TPtr OptimizeMap(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { const auto& arg = node->Tail().Head().Head(); if (!arg.IsUsedInDependsOn() && ETypeAnnotationKind::Optional == node->GetTypeAnn()->GetKind()) { YQL_CLOG(DEBUG, CorePeepHole) << node->Content() << " over Optional"; @@ -3441,7 +3447,7 @@ TExprNode::TPtr OptimizeMap(const TExprNode::TPtr& node, TExprContext& ctx) { } if (node->Head().IsCallable("NarrowMap")) { - if (!arg.IsUsedInDependsOn()) { + if (!arg.IsUsedInDependsOn() && CanFuseLambdas(*node->Child(1), *node->Head().Child(1), types)) { YQL_CLOG(DEBUG, CorePeepHole) << "Fuse " << node->Content() << " with " << node->Head().Content(); const auto width = node->Head().Tail().Head().ChildrenSize(); auto lambda = ctx.Builder(node->Pos()) @@ -3459,8 +3465,8 @@ TExprNode::TPtr OptimizeMap(const TExprNode::TPtr& node, TExprContext& ctx) { } } - if (1U == node->Head().UseCount() && !arg.IsUsedInDependsOn()) { - if (node->Head().IsCallable({"Map", "OrderedMap"})) { + if (node->Head().IsCallable({"Map", "OrderedMap"})) { + if (1U == node->Head().UseCount() && !arg.IsUsedInDependsOn() && CanFuseLambdas(*node->Child(1), *node->Head().Child(1), types)) { YQL_CLOG(DEBUG, CorePeepHole) << "Fuse " << node->Content() << " over " << node->Head().Content(); auto lambda = ctx.Builder(node->Pos()) .Lambda() @@ -3955,8 +3961,11 @@ bool IsSimpleExpand(const TExprNode& out, const TExprNode& arg) { return &out == &arg; } -TExprNode::TPtr OptimizeExpandMap(const TExprNode::TPtr& node, TExprContext& ctx) { +TExprNode::TPtr OptimizeExpandMap(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { if (const auto& input = node->Head(); input.IsCallable({"Map", "OrderedMap"})) { + if (!CanFuseLambdas(*node->Child(1), *input.Child(1), types)) { + return node; + } YQL_CLOG(DEBUG, CorePeepHole) << "Fuse " << node->Content() << " with " << input.Content(); auto lambda = ctx.Builder(node->Pos()) .Lambda() @@ -4030,8 +4039,10 @@ TExprNode::TPtr OptimizeExpandMap(const TExprNode::TPtr& node, TExprContext& ctx } if (const auto& input = node->Head(); input.IsCallable("NarrowMap")) { + if (!CanFuseLambdas(*node->Child(1), *input.Child(1), types)) { + return node; + } YQL_CLOG(DEBUG, CorePeepHole) << "Fuse " << node->Content() << " with " << input.Content(); - if (input.Tail().Tail().IsCallable("AsStruct")) { auto apply = ApplyNarrowMap(input.Tail(), ctx); TLieralStructsCacheMap membersMap; // TODO: move to context. @@ -7092,14 +7103,20 @@ TExprNode::TPtr SwapReplicateScalarsWithWideMap(const TExprNode::TPtr& wideMap, .Build(); } -TExprNode::TPtr OptimizeWideMaps(const TExprNode::TPtr& node, TExprContext& ctx) { +TExprNode::TPtr OptimizeWideMaps(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { if (const auto& input = node->Head(); input.IsCallable("ExpandMap")) { + if (!CanFuseLambdas(*node->Child(1), *input.Child(1), types)) { + return node; + } YQL_CLOG(DEBUG, CorePeepHole) << "Fuse " << node->Content() << " with " << input.Content(); auto lambda = ctx.FuseLambdas(node->Tail(), input.Tail()); return ctx.NewCallable(node->Pos(), node->Content().starts_with("Narrow") ? TString("Ordered") += node->Content().substr(6U, 16U) : input.Content(), {input.HeadPtr(), std::move(lambda)}); } else if (input.IsCallable("WideMap") && !node->IsCallable("NarrowFlatMap")) { + if (!CanFuseLambdas(*node->Child(1), *input.Child(1), types)) { + return node; + } YQL_CLOG(DEBUG, CorePeepHole) << "Fuse " << node->Content() << " with " << input.Content(); auto lambda = ctx.FuseLambdas(node->Tail(), input.Tail()); return ctx.ChangeChildren(*node, {input.HeadPtr(), std::move(lambda)}); @@ -7308,7 +7325,7 @@ TExprNode::TPtr OptimizeWideMaps(const TExprNode::TPtr& node, TExprContext& ctx) return node; } -TExprNode::TPtr OptimizeNarrowFlatMap(const TExprNode::TPtr& node, TExprContext& ctx) { +TExprNode::TPtr OptimizeNarrowFlatMap(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { const auto& lambda = node->Tail(); const auto& body = lambda.Tail(); @@ -7374,11 +7391,18 @@ TExprNode::TPtr OptimizeNarrowFlatMap(const TExprNode::TPtr& node, TExprContext& } } - return OptimizeWideMaps(node, ctx); + return OptimizeWideMaps(node, ctx, types); } -TExprNode::TPtr OptimizeSqueezeToDict(const TExprNode::TPtr& node, TExprContext& ctx) { +TExprNode::TPtr OptimizeSqueezeToDict(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { if (const auto& input = node->Head(); input.IsCallable("NarrowMap")) { + if (!CanFuseLambdas(*node->Child(1), input.Tail(), types)) { + return node; + } + if (!CanFuseLambdas(*node->Child(2), input.Tail(), types)) { + return node; + } + YQL_CLOG(DEBUG, CorePeepHole) << "Fuse " << node->Content() << " with " << input.Content(); return ctx.NewCallable(node->Pos(), "NarrowSqueezeToDict", { input.HeadPtr(), @@ -9265,10 +9289,6 @@ struct TPeepHoleRules { }; const TPeepHoleOptimizerMap SimplifyStageRules = { - {"Map", &OptimizeMap<false>}, - {"OrderedMap", &OptimizeMap<true>}, - {"FlatMap", &ExpandFlatMap<false>}, - {"OrderedFlatMap", &ExpandFlatMap<true>}, {"ListIf", &ExpandContainerIf<false, true>}, {"OptionalIf", &ExpandContainerIf<false, false>}, {"FlatListIf", &ExpandContainerIf<true, true>}, @@ -9276,6 +9296,13 @@ struct TPeepHoleRules { {"IfPresent", &OptimizeIfPresent<false>} }; + const TExtPeepHoleOptimizerMap SimplifyStageExtRules = { + {"Map", &OptimizeMap<false>}, + {"OrderedMap", &OptimizeMap<true>}, + {"FlatMap", &ExpandFlatMap<false>}, + {"OrderedFlatMap", &ExpandFlatMap<true>}, + }; + const TPeepHoleOptimizerMap FinalStageRules = { {"Extend", &OptimizeExtend}, {"OrderedExtend", &OptimizeExtend}, @@ -9305,9 +9332,6 @@ struct TPeepHoleRules { {"LMap", &ExpandLMapOrShuffleByKeys}, {"OrderedLMap", &ExpandLMapOrShuffleByKeys}, {"ShuffleByKeys", &ExpandLMapOrShuffleByKeys}, - {"ExpandMap", &OptimizeExpandMap}, - {"MultiMap", &OptimizeMultiMap<false>}, - {"OrderedMultiMap", &OptimizeMultiMap<true>}, {"Nth", &OptimizeNth}, {"Member", &OptimizeMember}, {"Condense1", &OptimizeCondense1}, @@ -9321,11 +9345,6 @@ struct TPeepHoleRules { {"GraceSelfJoinCore", &OptimizeGraceSelfJoinCore}, {"CommonJoinCore", &OptimizeCommonJoinCore}, {"BuildTablePath", &DoBuildTablePath}, - {"SqueezeToDict", &OptimizeSqueezeToDict}, - {"NarrowFlatMap", &OptimizeNarrowFlatMap}, - {"NarrowMultiMap", &OptimizeWideMaps}, - {"WideMap", &OptimizeWideMaps}, - {"NarrowMap", &OptimizeWideMaps}, {"Unordered", &DropAssume}, {"AssumeUnique", &DropAssume}, {"AssumeDistinct", &DropAssume}, @@ -9343,6 +9362,14 @@ struct TPeepHoleRules { const TExtPeepHoleOptimizerMap FinalStageExtRules = { {"Exists", &OptimizeExists}, + {"ExpandMap", &OptimizeExpandMap}, + {"NarrowFlatMap", &OptimizeNarrowFlatMap}, + {"NarrowMultiMap", &OptimizeWideMaps}, + {"WideMap", &OptimizeWideMaps}, + {"NarrowMap", &OptimizeWideMaps}, + {"MultiMap", &OptimizeMultiMap<false>}, + {"OrderedMultiMap", &OptimizeMultiMap<true>}, + {"SqueezeToDict", &OptimizeSqueezeToDict}, }; const TExtPeepHoleOptimizerMap FinalStageNonDetRules = { @@ -9450,7 +9477,7 @@ THolder<IGraphTransformer> CreatePeepHoleFinalStageTransformer(TTypeAnnotationCo [&types, hasNonDeterministicFunctions, withFinalRules = peepholeSettings.WithFinalStageRules, withNonDeterministicRules = peepholeSettings.WithNonDeterministicRules](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { auto stageRules = TPeepHoleRules::Instance().SimplifyStageRules; - auto extStageRules = TExtPeepHoleOptimizerMap{}; + auto extStageRules = TPeepHoleRules::Instance().SimplifyStageExtRules; if (withFinalRules) { const auto& finalRules = TPeepHoleRules::Instance().FinalStageRules; stageRules.insert(finalRules.begin(), finalRules.end()); diff --git a/yql/essentials/core/yql_opt_utils.cpp b/yql/essentials/core/yql_opt_utils.cpp index 040b329b08f..ac14fa72727 100644 --- a/yql/essentials/core/yql_opt_utils.cpp +++ b/yql/essentials/core/yql_opt_utils.cpp @@ -92,6 +92,8 @@ const TExprNode& GetLiteralStructMember(const TExprNode& literal, const TExprNod ythrow yexception() << "Member '" << member.Content() << "' not found in literal struct."; } +const char ForbidConstantDependsOnFuseOptName[] = "ForbidConstantDependsOnFuse"; + } TExprNode::TPtr MakeBoolNothing(TPositionHandle position, TExprContext& ctx) { @@ -575,11 +577,45 @@ bool IsDependedImpl(const TExprNode* from, const TExprNode* to, TNodeMap<bool>& return false; } +bool IsDependedOnAnyImpl(const TExprNode* from, const TNodeSet& to, TNodeMap<bool>& deps) { + if (to.cend() != to.find(from)) { + return true; + } + + auto [it, inserted] = deps.emplace(from, false); + if (!inserted) { + return it->second; + } + + for (const auto& child : from->Children()) { + if (IsDependedOnAnyImpl(child.Get(), to, deps)) { + return it->second = true; + } + } + + return false; +} + bool IsDepended(const TExprNode& from, const TExprNode& to) { TNodeMap<bool> deps; return IsDependedImpl(&from, &to, deps); } +bool AreAllDependedOnAny(const TExprNode::TChildrenType& from, const TNodeSet& to) { + if (to.empty()) { + return false; + } + + TNodeMap<bool> deps; + for (const auto& node : from) { + if (!IsDependedOnAnyImpl(node.Get(), to, deps)) { + return false; + } + } + + return true; +} + bool MarkDepended(const TExprNode& from, const TExprNode& to, TNodeMap<bool>& deps) { return IsDependedImpl(&from, &to, deps); } @@ -2750,4 +2786,46 @@ bool IsNormalizedDependsOn(const TExprNode& node) { return false; } +bool CanFuseLambdas(const TExprNode& outer, const TExprNode& inner, const TTypeAnnotationContext& types) { + if (!IsOptimizerEnabled<ForbidConstantDependsOnFuseOptName>(types) || IsOptimizerDisabled<ForbidConstantDependsOnFuseOptName>(types)) { + return true; + } + + if (outer.ChildrenSize() == 1) { + return true; + } + + auto innerLambdaBody = GetLambdaBody(inner); + auto outerLambdaArgs = outer.Head().Children(); + + TNodeSet innerLambdaArgs; + inner.Head().ForEachChild([&](const TExprNode& arg) { + innerLambdaArgs.insert(&arg); + }); + + if (outerLambdaArgs.size() == innerLambdaBody.size()) { + // inner lambda bodies which used in DependsOn after fuse + TExprNode::TListType toCheck; + for (size_t i = 0; i < outerLambdaArgs.size(); i++) { + if (outerLambdaArgs[i]->IsUsedInDependsOn()) { + toCheck.push_back(innerLambdaBody[i]); + } + } + if (toCheck.empty()) { + return true; + } + + return AreAllDependedOnAny(toCheck, innerLambdaArgs); + } else if (outerLambdaArgs.size() == 1) { + if (!outerLambdaArgs.front()->IsUsedInDependsOn()) { + return true; + } + + // multimap - all inner lambda bodies are used in DependsOn after fuse + return AreAllDependedOnAny(innerLambdaBody, innerLambdaArgs); + } else { + YQL_ENSURE(false, "Incompatible lambdas for fuse"); + } +} + } diff --git a/yql/essentials/core/yql_opt_utils.h b/yql/essentials/core/yql_opt_utils.h index ed7e07bde12..63a194b9e18 100644 --- a/yql/essentials/core/yql_opt_utils.h +++ b/yql/essentials/core/yql_opt_utils.h @@ -49,6 +49,7 @@ TExprNode::TPtr FilterByFields(TPositionHandle position, const TExprNode::TPtr& TExprNode::TPtr AddMembersUsedInside(const TExprNode::TPtr& start, const TExprNode& arg, TExprNode::TPtr&& members, const TParentsMap& parentsMap, TExprContext& ctx); bool IsDepended(const TExprNode& from, const TExprNode& to); +bool AreAllDependedOnAny(const TExprNode::TChildrenType& from, const TNodeSet& to); bool MarkDepended(const TExprNode& from, const TExprNode& to, TNodeMap<bool>& deps); bool IsEmpty(const TExprNode& node, const TTypeAnnotationContext& typeCtx); bool IsEmptyContainer(const TExprNode& node); @@ -222,4 +223,6 @@ TExprNode::TPtr ReplaceUnessentials(TExprNode::TPtr predicate, TExprNode::TPtr r bool IsDependsOnUsage(const TExprNode& node, const TParentsMap& parentsMap); bool IsNormalizedDependsOn(const TExprNode& node); +bool CanFuseLambdas(const TExprNode& outer, const TExprNode& inner, const TTypeAnnotationContext& types); + } diff --git a/yql/essentials/minikql/comp_nodes/mkql_multimap.cpp b/yql/essentials/minikql/comp_nodes/mkql_multimap.cpp index 7050f33f4a5..336ded92707 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_multimap.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_multimap.cpp @@ -94,6 +94,7 @@ private: void RegisterDependencies() const final { if (const auto flow = FlowDependsOn(Flow)) { Own(flow, Item); + std::for_each(NewItems.cbegin(), NewItems.cend(), std::bind(&TFlowMultiMapWrapper::DependsOn, flow, std::placeholders::_1)); } } diff --git a/yql/essentials/tests/s-expressions/minirun/part7/canondata/result.json b/yql/essentials/tests/s-expressions/minirun/part7/canondata/result.json index f9029941a62..843f5a3f44f 100644 --- a/yql/essentials/tests/s-expressions/minirun/part7/canondata/result.json +++ b/yql/essentials/tests/s-expressions/minirun/part7/canondata/result.json @@ -475,6 +475,20 @@ "uri": "https://{canondata_backend}/1942415/9fad3ce3e91e30529937a7e90c1c9c5e144ec28e/resource.tar.gz#test.test_Optimizers-FoldConstMax-default.txt-Results_/results.txt" } ], + "test.test[Optimizers-FuseLambdasWithDependsOn-default.txt-Debug]": [ + { + "checksum": "357650f111c3df45d8a092269f8b0954", + "size": 2441, + "uri": "https://{canondata_backend}/1775319/d62ebd42aa2b3a234ff874daa1b60e8c20266ea5/resource.tar.gz#test.test_Optimizers-FuseLambdasWithDependsOn-default.txt-Debug_/opt.yql" + } + ], + "test.test[Optimizers-FuseLambdasWithDependsOn-default.txt-Results]": [ + { + "checksum": "b9da6970a0a71b11c36081f5a1247330", + "size": 10013, + "uri": "https://{canondata_backend}/1775319/d62ebd42aa2b3a234ff874daa1b60e8c20266ea5/resource.tar.gz#test.test_Optimizers-FuseLambdasWithDependsOn-default.txt-Results_/results.txt" + } + ], "test.test[Optimizers-GroupBySingleKeyListUnused-default.txt-Debug]": [ { "checksum": "381934f15093c807298b51ab2083fe99", diff --git a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLambdasWithDependsOn.yqls b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLambdasWithDependsOn.yqls new file mode 100644 index 00000000000..4cf4f742361 --- /dev/null +++ b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLambdasWithDependsOn.yqls @@ -0,0 +1,81 @@ +( +(let world (Configure! world (DataSource '"config") '"OptimizerFlags" '"ForbidConstantDependsOnFuse")) +(let res_sink (DataSink 'result)) + +(let $row1 (AsStruct '('"key" (Uint64 '"1")) '('"value" (Uint64 '"101")))) +(let $row2 (AsStruct '('"key" (Uint64 '"2")) '('"value" (Uint64 '"102")))) +(let $row3 (AsStruct '('"key" (Uint64 '"3")) '('"value" (Uint64 '"103")))) +(let $row4 (AsStruct '('"key" (Uint64 '"4")) '('"value" (Uint64 '"104")))) +(let $row5 (AsStruct '('"key" (Uint64 '"5")) '('"value" (Uint64 '"105")))) +(let $input (AsList $row1 $row2 $row3 $row4 $row5)) + +# ---------------------- + +(let $wideFlow1 (ExpandMap (ToFlow $input (DependsOn (Uint64 '"1"))) (lambda '($row) (Member $row '"key") (Member $row '"value")))) + +# can be fused +(let $processed1 (ExpandMap (NarrowMap $wideFlow1 (lambda '($key $value) $key)) (lambda '($key) (RandomNumber (DependsOn $key))))) + +(let $result1 (NarrowMap $processed1 (lambda '($rand) (AsStruct '('"rand" $rand))))) +(let world (Write! world res_sink (Key) (Collect $result1) '('('type) '('label '"Result 1")))) +(let world (Commit! world res_sink)) + +# ---------------------- + +(let $wideFlow2 (ExpandMap (ToFlow $input (DependsOn (Uint64 '"2"))) (lambda '($row) (Member $row '"key") (Member $row '"value")))) + +# cannot be fused +(let $processed2 (ExpandMap (NarrowMap $wideFlow2 (lambda '($key $value) (Uint64 '"42"))) (lambda '($key) (RandomNumber (DependsOn $key))))) + +(let $result2 (NarrowMap $processed2 (lambda '($rand) (AsStruct '('"rand" $rand))))) +(let world (Write! world res_sink (Key) (Collect $result2) '('('type) '('label '"Result 2")))) +(let world (Commit! world res_sink)) + +# ---------------------- + +(let $wideFlow3 (ExpandMap (ToFlow $input (DependsOn (Uint64 '"3"))) (lambda '($row) (Member $row '"key") (Member $row '"value")))) + +# can be fused +(let $processed3 (WideMap (WideMap $wideFlow3 (lambda '($key $value) $key $value)) (lambda '($key $value) (RandomNumber (DependsOn $key)) (RandomNumber (DependsOn $value))))) + +(let $result3 (NarrowMap $processed3 (lambda '($rand1 $rand2) (AsStruct '('"rand1" $rand1) '('"rand2" $rand2))))) +(let world (Write! world res_sink (Key) (Collect $result3) '('('type) '('label '"Result 3")))) +(let world (Commit! world res_sink)) + +# ---------------------- + +(let $wideFlow4 (ExpandMap (ToFlow $input (DependsOn (Uint64 '"4"))) (lambda '($row) (Member $row '"key") (Member $row '"value")))) + +# cannot be fused +(let $processed4 (WideMap (WideMap $wideFlow4 (lambda '($key $value) $key (Uint64 '"42"))) (lambda '($key $value) (RandomNumber (DependsOn $key)) (RandomNumber (DependsOn $value))))) + +(let $result4 (NarrowMap $processed4 (lambda '($rand1 $rand2) (AsStruct '('"rand1" $rand1) '('"rand2" $rand2))))) +(let world (Write! world res_sink (Key) (Collect $result4) '('('type) '('label '"Result 4")))) +(let world (Commit! world res_sink)) + +# ---------------------- + +(let $wideFlow5 (ExpandMap (ToFlow $input (DependsOn (Uint64 '"5"))) (lambda '($row) (Member $row '"key") (Member $row '"value")))) + +# can be fused +(let $processed5 (MultiMap (NarrowMultiMap $wideFlow5 (lambda '($key $value) $key $value)) (lambda '($key) (RandomNumber (DependsOn $key)) $key))) + +(let $result5 (Map $processed5 (lambda '($rand) (AsStruct '('"rand" $rand))))) +(let world (Write! world res_sink (Key) (Collect $result5) '('('type) '('label '"Result 5")))) +(let world (Commit! world res_sink)) + +# ---------------------- + +(let $wideFlow6 (ExpandMap (ToFlow $input (DependsOn (Uint64 '"5"))) (lambda '($row) (Member $row '"key") (Member $row '"value")))) + +# cannot be fused +(let $processed6 (MultiMap (NarrowMultiMap $wideFlow6 (lambda '($key $value) (Uint64 '"42") (Uint64 '"43"))) (lambda '($key) (RandomNumber (DependsOn $key) (DependsOn (Uint64 '"1"))) (RandomNumber (DependsOn $key) (DependsOn (Uint64 '"2")))))) + +(let $result6 (Map $processed6 (lambda '($rand) (AsStruct '('"rand" $rand))))) +(let world (Write! world res_sink (Key) (Collect $result6) '('('type) '('label '"Result 6")))) +(let world (Commit! world res_sink)) + +# ---------------------- + +(return world) +) |