aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2023-05-03 11:44:46 +0300
committera-romanov <Anton.Romanov@ydb.tech>2023-05-03 11:44:46 +0300
commit1033b838cd60505f8e2f24d182915d2330f358cb (patch)
tree90201410cf7ca9a9f8d98a341b7f586dbbfaff3e
parent2366526b03d6e6f274a112c5276300f09d101ea8 (diff)
downloadydb-1033b838cd60505f8e2f24d182915d2330f358cb.tar.gz
YQL-15867 Fix CombineByKey premap optimizers.
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_flow1.cpp134
1 files changed, 57 insertions, 77 deletions
diff --git a/ydb/library/yql/core/common_opt/yql_co_flow1.cpp b/ydb/library/yql/core/common_opt/yql_co_flow1.cpp
index 5cf964f8cc..4e27e05694 100644
--- a/ydb/library/yql/core/common_opt/yql_co_flow1.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_flow1.cpp
@@ -408,44 +408,6 @@ TExprNode::TPtr FuseFlatmaps(TCoFlatMapBase outerMap, TExprContext& ctx, TTypeAn
return outerMap.Ptr();
}
-TExprNode::TPtr FuseCombineByKeyOverFlatmap(TCoCombineByKey combine, TExprContext& ctx) {
- auto flatmap = combine.Input().Cast<TCoFlatMapBase>();
-
- auto inputType = flatmap.Input().Ref().GetTypeAnn();
- if (inputType->GetKind() == ETypeAnnotationKind::Optional) {
- // Optional not supported as CombineByKey input.
- return combine.Ptr();
- }
-
- auto flatmapBody = flatmap.Lambda().Body();
- if (!IsJustOrSingleAsList(flatmapBody.Ref())) {
- // Only push maps to PreMapLambda.
- return combine.Ptr();
- }
-
- auto value = flatmapBody.Maybe<TCoJust>()
- ? flatmapBody.Cast<TCoJust>().Input()
- : flatmapBody.Cast<TCoAsList>().Arg(0);
-
- auto ret = Build<TCoCombineByKey>(ctx, combine.Pos())
- .Input(flatmap.Input())
- .PreMapLambda()
- .Args({"item"})
- .Body<TExprApplier>()
- .Apply(combine.PreMapLambda().Body())
- .With(combine.PreMapLambda().Args().Arg(0), value)
- .With(flatmap.Lambda().Args().Arg(0), "item")
- .Build()
- .Build()
- .KeySelectorLambda(combine.KeySelectorLambda())
- .InitHandlerLambda(combine.InitHandlerLambda())
- .UpdateHandlerLambda(combine.UpdateHandlerLambda())
- .FinishHandlerLambda(combine.FinishHandlerLambda())
- .Done();
-
- return ret.Ptr();
-}
-
template <bool TakeOrSkip>
TExprNode::TPtr FusePart(const TExprNode& node, TExprContext& ctx) {
auto children = node.Head().ChildrenList();
@@ -465,7 +427,7 @@ TExprNode::TPtr SumLengthOverExtend(const TExprNode& node, TExprContext& ctx) {
.Add(std::move(children))
.Seal()
.Callable(1, "Uint64")
- .Atom(0, "0", TNodeFlags::Default)
+ .Atom(0, 0U)
.Seal()
.Lambda(2)
.Param("item")
@@ -580,7 +542,7 @@ TExprNode::TPtr ExtractOneItemTupleFromFold(const TExprNode& node, TExprContext&
.Add(0, node.HeadPtr())
.Callable(1, "Nth")
.Add(0, node.ChildPtr(1))
- .Atom(1, "0", TNodeFlags::Default)
+ .Atom(1, 0U)
.Seal()
.Lambda(2)
.Param("item")
@@ -594,7 +556,7 @@ TExprNode::TPtr ExtractOneItemTupleFromFold(const TExprNode& node, TExprContext&
.Seal()
.Done()
.Seal()
- .Atom(1, "0", TNodeFlags::Default)
+ .Atom(1, 0U)
.Seal()
.Seal()
.Seal()
@@ -663,7 +625,7 @@ TExprNode::TPtr ExtractOneItemTupleFromFold1(const TExprNode& node, TExprContext
.Apply(0, *node.Child(1))
.With(0, "item")
.Seal()
- .Atom(1, "0", TNodeFlags::Default)
+ .Atom(1, 0U)
.Seal()
.Seal()
.Lambda(2)
@@ -678,7 +640,7 @@ TExprNode::TPtr ExtractOneItemTupleFromFold1(const TExprNode& node, TExprContext
.Seal()
.Done()
.Seal()
- .Atom(1, "0", TNodeFlags::Default)
+ .Atom(1, 0U)
.Seal()
.Seal()
.Seal()
@@ -759,7 +721,7 @@ TExprNode::TPtr ExtractOneItemTupleFromCondense(const TExprNode& node, TExprCont
.Add(0, node.HeadPtr())
.Callable(1, "Nth")
.Add(0, node.ChildPtr(1))
- .Atom(1, "0", TNodeFlags::Default)
+ .Atom(1, 0U)
.Seal()
.Lambda(2)
.Param("item")
@@ -785,7 +747,7 @@ TExprNode::TPtr ExtractOneItemTupleFromCondense(const TExprNode& node, TExprCont
.Seal()
.Done()
.Seal()
- .Atom(1, "0", TNodeFlags::Default)
+ .Atom(1, 0U)
.Seal()
.Seal()
.Seal()
@@ -875,7 +837,7 @@ TExprNode::TPtr ExtractOneItemTupleFromCondense1(const TExprNode& node, TExprCon
.Apply(0, *node.Child(1))
.With(0, "item")
.Seal()
- .Atom(1, "0", TNodeFlags::Default)
+ .Atom(1, 0U)
.Seal()
.Seal()
.Lambda(2)
@@ -902,7 +864,7 @@ TExprNode::TPtr ExtractOneItemTupleFromCondense1(const TExprNode& node, TExprCon
.Seal()
.Done()
.Seal()
- .Atom(1, "0", TNodeFlags::Default)
+ .Atom(1, 0U)
.Seal()
.Seal()
.Seal()
@@ -1089,12 +1051,12 @@ TExprNode::TPtr ConvertFold1ByConstMinMax(const TExprNode::TPtr& node, TExprCont
.Add(0, node->HeadPtr())
.Seal()
.Callable(1, "If")
- .Callable(0, "==")
+ .Callable(0, "AggrEquals")
.Callable(0, "Length")
.Add(0, node->HeadPtr())
.Seal()
.Callable(1, "Uint64")
- .Atom(0, "1", TNodeFlags::Default)
+ .Atom(0, 1U)
.Seal()
.Seal()
.Add(1, initLambda.TailPtr())
@@ -1238,12 +1200,21 @@ TExprNode::TPtr PropagateConstPremapIntoCombineByKey(const TExprNode& node, TExp
const auto constItem = node.Child(1)->Tail().HeadPtr();
auto children = node.ChildrenList();
- // keyExtractorLambda
- children[2] = ctx.Builder(children[2]->Pos())
+
+ // update lambda
+ children[4] = ctx.Builder(children[4]->Pos())
.Lambda()
+ .Param("key")
.Param("item")
- .Apply(*children[2])
- .With(0, constItem)
+ .Param("state")
+ .Apply(*children[4])
+ .With(0)
+ .Apply(*children[2])
+ .With(0, constItem)
+ .Seal()
+ .Done()
+ .With(1, constItem)
+ .With(2, "state")
.Seal()
.Seal()
.Build();
@@ -1251,21 +1222,25 @@ TExprNode::TPtr PropagateConstPremapIntoCombineByKey(const TExprNode& node, TExp
// init lambda
children[3] = ctx.Builder(children[3]->Pos())
.Lambda()
+ .Param("key")
.Param("item")
.Apply(*children[3])
- .With(0, constItem)
+ .With(0)
+ .Apply(*children[2])
+ .With(0, constItem)
+ .Seal()
+ .Done()
+ .With(1, constItem)
.Seal()
.Seal()
.Build();
- // update lambda
- children[4] = ctx.Builder(children[4]->Pos())
+ // keyExtractorLambda
+ children[2] = ctx.Builder(children[2]->Pos())
.Lambda()
.Param("item")
- .Param("state")
- .Apply(*children[4])
- .With(0, constItem)
- .With(1, "state")
+ .Apply(*children[2])
+ .With(0, std::move(constItem))
.Seal()
.Seal()
.Build();
@@ -1770,18 +1745,30 @@ void RegisterCoFlowCallables1(TCallableOptimizerMap& map) {
return node;
};
- map["CombineByKey"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
+ map["FinalizeByKey"] = map["CombineByKey"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
if (!optCtx.IsSingleUsage(node->Head())) {
return node;
}
- TCoCombineByKey self(node);
- if (self.Input().Maybe<TCoFlatMapBase>()) {
- YQL_CLOG(DEBUG, Core) << "FuseCombineByKeyOverFlatmap";
- return FuseCombineByKeyOverFlatmap(self, ctx);
+ const TCoLambda preMap(node->ChildPtr(1));
+ if (const TMaybeNode<TCoFlatMapBase> maybeFlatmap = node->HeadPtr()) {
+ if (const auto flatMap = maybeFlatmap.Cast(); IsJustOrSingleAsList(flatMap.Lambda().Body().Ref()) && flatMap.Input().Ref().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Optional) {
+ YQL_CLOG(DEBUG, Core) << "Fuse " << node->Content() << " over " << node->Head().Content();
+ auto children = node->ChildrenList();
+ children.front() = flatMap.Input().Ptr();
+ children[1] = Build<TCoLambda>(ctx, preMap.Pos())
+ .Args({"item"})
+ .Body<TExprApplier>()
+ .Apply(preMap.Body())
+ .With(preMap.Args().Arg(0), TExprBase(flatMap.Lambda().Body().Ref().HeadPtr()))
+ .With(flatMap.Lambda().Args().Arg(0), "item")
+ .Build()
+ .Done().Ptr();
+
+ return ctx.ChangeChildren(*node,std::move(children));
+ }
}
- const auto preMap = self.PreMapLambda();
if (IsConstMapLambda(preMap) &&
(
IsDepended(node->Child(2)->Tail(), *node->Child(2)->Head().Child(0)) ||
@@ -1793,17 +1780,10 @@ void RegisterCoFlowCallables1(TCallableOptimizerMap& map) {
}
if (preMap.Body().Ref().IsCallable("ToList")) {
- YQL_CLOG(DEBUG, Core) << node->Content() << " premap ToList elimination";
- TExprNode::TPtr newPreMapLambda = ctx.DeepCopyLambda(preMap.Ref());
- newPreMapLambda = ctx.ChangeChild(*newPreMapLambda, 1, newPreMapLambda->Child(1)->Child(0));
- return Build<TCoCombineByKey>(ctx, node->Pos())
- .Input(self.Input())
- .PreMapLambda(TCoLambda(newPreMapLambda))
- .KeySelectorLambda(self.KeySelectorLambda())
- .InitHandlerLambda(self.InitHandlerLambda())
- .UpdateHandlerLambda(self.UpdateHandlerLambda())
- .FinishHandlerLambda(self.FinishHandlerLambda())
- .Done().Ptr();
+ YQL_CLOG(DEBUG, Core) << node->Content() << " premap ToList elimination.";
+ auto newPreMapLambda = ctx.DeepCopyLambda(preMap.Ref());
+ newPreMapLambda = ctx.ChangeChild(*newPreMapLambda, 1, newPreMapLambda->Tail().HeadPtr());
+ return ctx.ChangeChild(*node, 1U, std::move(newPreMapLambda));
}
return node;