diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2023-05-03 11:44:46 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2023-05-03 11:44:46 +0300 |
commit | 1033b838cd60505f8e2f24d182915d2330f358cb (patch) | |
tree | 90201410cf7ca9a9f8d98a341b7f586dbbfaff3e | |
parent | 2366526b03d6e6f274a112c5276300f09d101ea8 (diff) | |
download | ydb-1033b838cd60505f8e2f24d182915d2330f358cb.tar.gz |
YQL-15867 Fix CombineByKey premap optimizers.
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_co_flow1.cpp | 134 |
1 files changed, 57 insertions, 77 deletions
diff --git a/ydb/library/yql/core/common_opt/yql_co_flow1.cpp b/ydb/library/yql/core/common_opt/yql_co_flow1.cpp index 5cf964f8cc..4e27e05694 100644 --- a/ydb/library/yql/core/common_opt/yql_co_flow1.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_flow1.cpp @@ -408,44 +408,6 @@ TExprNode::TPtr FuseFlatmaps(TCoFlatMapBase outerMap, TExprContext& ctx, TTypeAn return outerMap.Ptr(); } -TExprNode::TPtr FuseCombineByKeyOverFlatmap(TCoCombineByKey combine, TExprContext& ctx) { - auto flatmap = combine.Input().Cast<TCoFlatMapBase>(); - - auto inputType = flatmap.Input().Ref().GetTypeAnn(); - if (inputType->GetKind() == ETypeAnnotationKind::Optional) { - // Optional not supported as CombineByKey input. - return combine.Ptr(); - } - - auto flatmapBody = flatmap.Lambda().Body(); - if (!IsJustOrSingleAsList(flatmapBody.Ref())) { - // Only push maps to PreMapLambda. - return combine.Ptr(); - } - - auto value = flatmapBody.Maybe<TCoJust>() - ? flatmapBody.Cast<TCoJust>().Input() - : flatmapBody.Cast<TCoAsList>().Arg(0); - - auto ret = Build<TCoCombineByKey>(ctx, combine.Pos()) - .Input(flatmap.Input()) - .PreMapLambda() - .Args({"item"}) - .Body<TExprApplier>() - .Apply(combine.PreMapLambda().Body()) - .With(combine.PreMapLambda().Args().Arg(0), value) - .With(flatmap.Lambda().Args().Arg(0), "item") - .Build() - .Build() - .KeySelectorLambda(combine.KeySelectorLambda()) - .InitHandlerLambda(combine.InitHandlerLambda()) - .UpdateHandlerLambda(combine.UpdateHandlerLambda()) - .FinishHandlerLambda(combine.FinishHandlerLambda()) - .Done(); - - return ret.Ptr(); -} - template <bool TakeOrSkip> TExprNode::TPtr FusePart(const TExprNode& node, TExprContext& ctx) { auto children = node.Head().ChildrenList(); @@ -465,7 +427,7 @@ TExprNode::TPtr SumLengthOverExtend(const TExprNode& node, TExprContext& ctx) { .Add(std::move(children)) .Seal() .Callable(1, "Uint64") - .Atom(0, "0", TNodeFlags::Default) + .Atom(0, 0U) .Seal() .Lambda(2) .Param("item") @@ -580,7 +542,7 @@ TExprNode::TPtr ExtractOneItemTupleFromFold(const TExprNode& node, TExprContext& .Add(0, node.HeadPtr()) .Callable(1, "Nth") .Add(0, node.ChildPtr(1)) - .Atom(1, "0", TNodeFlags::Default) + .Atom(1, 0U) .Seal() .Lambda(2) .Param("item") @@ -594,7 +556,7 @@ TExprNode::TPtr ExtractOneItemTupleFromFold(const TExprNode& node, TExprContext& .Seal() .Done() .Seal() - .Atom(1, "0", TNodeFlags::Default) + .Atom(1, 0U) .Seal() .Seal() .Seal() @@ -663,7 +625,7 @@ TExprNode::TPtr ExtractOneItemTupleFromFold1(const TExprNode& node, TExprContext .Apply(0, *node.Child(1)) .With(0, "item") .Seal() - .Atom(1, "0", TNodeFlags::Default) + .Atom(1, 0U) .Seal() .Seal() .Lambda(2) @@ -678,7 +640,7 @@ TExprNode::TPtr ExtractOneItemTupleFromFold1(const TExprNode& node, TExprContext .Seal() .Done() .Seal() - .Atom(1, "0", TNodeFlags::Default) + .Atom(1, 0U) .Seal() .Seal() .Seal() @@ -759,7 +721,7 @@ TExprNode::TPtr ExtractOneItemTupleFromCondense(const TExprNode& node, TExprCont .Add(0, node.HeadPtr()) .Callable(1, "Nth") .Add(0, node.ChildPtr(1)) - .Atom(1, "0", TNodeFlags::Default) + .Atom(1, 0U) .Seal() .Lambda(2) .Param("item") @@ -785,7 +747,7 @@ TExprNode::TPtr ExtractOneItemTupleFromCondense(const TExprNode& node, TExprCont .Seal() .Done() .Seal() - .Atom(1, "0", TNodeFlags::Default) + .Atom(1, 0U) .Seal() .Seal() .Seal() @@ -875,7 +837,7 @@ TExprNode::TPtr ExtractOneItemTupleFromCondense1(const TExprNode& node, TExprCon .Apply(0, *node.Child(1)) .With(0, "item") .Seal() - .Atom(1, "0", TNodeFlags::Default) + .Atom(1, 0U) .Seal() .Seal() .Lambda(2) @@ -902,7 +864,7 @@ TExprNode::TPtr ExtractOneItemTupleFromCondense1(const TExprNode& node, TExprCon .Seal() .Done() .Seal() - .Atom(1, "0", TNodeFlags::Default) + .Atom(1, 0U) .Seal() .Seal() .Seal() @@ -1089,12 +1051,12 @@ TExprNode::TPtr ConvertFold1ByConstMinMax(const TExprNode::TPtr& node, TExprCont .Add(0, node->HeadPtr()) .Seal() .Callable(1, "If") - .Callable(0, "==") + .Callable(0, "AggrEquals") .Callable(0, "Length") .Add(0, node->HeadPtr()) .Seal() .Callable(1, "Uint64") - .Atom(0, "1", TNodeFlags::Default) + .Atom(0, 1U) .Seal() .Seal() .Add(1, initLambda.TailPtr()) @@ -1238,12 +1200,21 @@ TExprNode::TPtr PropagateConstPremapIntoCombineByKey(const TExprNode& node, TExp const auto constItem = node.Child(1)->Tail().HeadPtr(); auto children = node.ChildrenList(); - // keyExtractorLambda - children[2] = ctx.Builder(children[2]->Pos()) + + // update lambda + children[4] = ctx.Builder(children[4]->Pos()) .Lambda() + .Param("key") .Param("item") - .Apply(*children[2]) - .With(0, constItem) + .Param("state") + .Apply(*children[4]) + .With(0) + .Apply(*children[2]) + .With(0, constItem) + .Seal() + .Done() + .With(1, constItem) + .With(2, "state") .Seal() .Seal() .Build(); @@ -1251,21 +1222,25 @@ TExprNode::TPtr PropagateConstPremapIntoCombineByKey(const TExprNode& node, TExp // init lambda children[3] = ctx.Builder(children[3]->Pos()) .Lambda() + .Param("key") .Param("item") .Apply(*children[3]) - .With(0, constItem) + .With(0) + .Apply(*children[2]) + .With(0, constItem) + .Seal() + .Done() + .With(1, constItem) .Seal() .Seal() .Build(); - // update lambda - children[4] = ctx.Builder(children[4]->Pos()) + // keyExtractorLambda + children[2] = ctx.Builder(children[2]->Pos()) .Lambda() .Param("item") - .Param("state") - .Apply(*children[4]) - .With(0, constItem) - .With(1, "state") + .Apply(*children[2]) + .With(0, std::move(constItem)) .Seal() .Seal() .Build(); @@ -1770,18 +1745,30 @@ void RegisterCoFlowCallables1(TCallableOptimizerMap& map) { return node; }; - map["CombineByKey"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) { + map["FinalizeByKey"] = map["CombineByKey"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) { if (!optCtx.IsSingleUsage(node->Head())) { return node; } - TCoCombineByKey self(node); - if (self.Input().Maybe<TCoFlatMapBase>()) { - YQL_CLOG(DEBUG, Core) << "FuseCombineByKeyOverFlatmap"; - return FuseCombineByKeyOverFlatmap(self, ctx); + const TCoLambda preMap(node->ChildPtr(1)); + if (const TMaybeNode<TCoFlatMapBase> maybeFlatmap = node->HeadPtr()) { + if (const auto flatMap = maybeFlatmap.Cast(); IsJustOrSingleAsList(flatMap.Lambda().Body().Ref()) && flatMap.Input().Ref().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Optional) { + YQL_CLOG(DEBUG, Core) << "Fuse " << node->Content() << " over " << node->Head().Content(); + auto children = node->ChildrenList(); + children.front() = flatMap.Input().Ptr(); + children[1] = Build<TCoLambda>(ctx, preMap.Pos()) + .Args({"item"}) + .Body<TExprApplier>() + .Apply(preMap.Body()) + .With(preMap.Args().Arg(0), TExprBase(flatMap.Lambda().Body().Ref().HeadPtr())) + .With(flatMap.Lambda().Args().Arg(0), "item") + .Build() + .Done().Ptr(); + + return ctx.ChangeChildren(*node,std::move(children)); + } } - const auto preMap = self.PreMapLambda(); if (IsConstMapLambda(preMap) && ( IsDepended(node->Child(2)->Tail(), *node->Child(2)->Head().Child(0)) || @@ -1793,17 +1780,10 @@ void RegisterCoFlowCallables1(TCallableOptimizerMap& map) { } if (preMap.Body().Ref().IsCallable("ToList")) { - YQL_CLOG(DEBUG, Core) << node->Content() << " premap ToList elimination"; - TExprNode::TPtr newPreMapLambda = ctx.DeepCopyLambda(preMap.Ref()); - newPreMapLambda = ctx.ChangeChild(*newPreMapLambda, 1, newPreMapLambda->Child(1)->Child(0)); - return Build<TCoCombineByKey>(ctx, node->Pos()) - .Input(self.Input()) - .PreMapLambda(TCoLambda(newPreMapLambda)) - .KeySelectorLambda(self.KeySelectorLambda()) - .InitHandlerLambda(self.InitHandlerLambda()) - .UpdateHandlerLambda(self.UpdateHandlerLambda()) - .FinishHandlerLambda(self.FinishHandlerLambda()) - .Done().Ptr(); + YQL_CLOG(DEBUG, Core) << node->Content() << " premap ToList elimination."; + auto newPreMapLambda = ctx.DeepCopyLambda(preMap.Ref()); + newPreMapLambda = ctx.ChangeChild(*newPreMapLambda, 1, newPreMapLambda->Tail().HeadPtr()); + return ctx.ChangeChild(*node, 1U, std::move(newPreMapLambda)); } return node; |