diff options
author | vvvv <[email protected]> | 2022-10-12 12:02:03 +0300 |
---|---|---|
committer | vvvv <[email protected]> | 2022-10-12 12:02:03 +0300 |
commit | eaddec45ab37af96ce24b3d296dd0aababe0893e (patch) | |
tree | ae4453282c51296b68b18028f2071567f1685b5c | |
parent | 96d04f634b2f08e77a8e450789769d109fbb3c6f (diff) |
an option to split Aggregate to lower ones without AggApply expansion (part2 - with distincts)
-rw-r--r-- | ydb/library/yql/core/yql_aggregate_expander.cpp | 244 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_aggregate_expander.h | 5 |
2 files changed, 204 insertions, 45 deletions
diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp index 5e769791e7e..a9289372513 100644 --- a/ydb/library/yql/core/yql_aggregate_expander.cpp +++ b/ydb/library/yql/core/yql_aggregate_expander.cpp @@ -5,10 +5,13 @@ #include <ydb/library/yql/core/yql_expr_type_annotation.h> #include <ydb/library/yql/core/yql_opt_window.h> +#include <ydb/library/yql/utils/log/log.h> + namespace NYql { TExprNode::TPtr TAggregateExpander::ExpandAggregate() { + YQL_CLOG(DEBUG, Core) << "Expand " << Node->Content(); Suffix = Node->Content(); YQL_ENSURE(Suffix.SkipPrefix("Aggregate")); AggList = Node->HeadPtr(); @@ -659,6 +662,43 @@ TExprNode::TPtr TAggregateExpander::GeneratePartialAggregateForNonDistinct(const .Build(); } +void TAggregateExpander::GenerateInitForDistinct(TExprNodeBuilder& parent, ui32& ndx, const TIdxSet& indicies, const TExprNode::TPtr& distinctField) { + for (ui32 i: indicies) { + auto trait = Traits[i]; + auto initLambda = trait->Child(1); + if (initLambda->Head().ChildrenSize() == 1) { + parent.List(ndx++) + .Add(0, InitialColumnNames[i]) + .Apply(1, *initLambda) + .With(0) + .Callable("Member") + .Arg(0, "item") + .Add(1, distinctField) + .Seal() + .Done() + .Seal() + .Seal(); + } else { + parent.List(ndx++) + .Add(0, InitialColumnNames[i]) + .Apply(1, *initLambda) + .With(0) + .Callable("Member") + .Arg(0, "item") + .Add(1, distinctField) + .Seal() + .Done() + .With(1) + .Callable("Uint32") + .Atom(0, ToString(i), TNodeFlags::Default) + .Seal() + .Done() + .Seal() + .Seal(); + } + } +} + TExprNode::TPtr TAggregateExpander::GenerateDistinctGrouper(const TExprNode::TPtr distinctField, const TVector<const TTypeAnnotationNode*>& keyItemTypes, bool needDistinctPickle) { @@ -805,40 +845,7 @@ TExprNode::TPtr TAggregateExpander::GenerateDistinctGrouper(const TExprNode::TPt .Callable("AsStruct") .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { ui32 ndx = 0; - for (ui32 i: indicies) { - auto trait = Traits[i]; - auto initLambda = trait->Child(1); - if (initLambda->Head().ChildrenSize() == 1) { - parent.List(ndx++) - .Add(0, InitialColumnNames[i]) - .Apply(1, *initLambda) - .With(0) - .Callable("Member") - .Arg(0, "item") - .Add(1, distinctField) - .Seal() - .Done() - .Seal() - .Seal(); - } else { - parent.List(ndx++) - .Add(0, InitialColumnNames[i]) - .Apply(1, *initLambda) - .With(0) - .Callable("Member") - .Arg(0, "item") - .Add(1, distinctField) - .Seal() - .Done() - .With(1) - .Callable("Uint32") - .Atom(0, ToString(i), TNodeFlags::Default) - .Seal() - .Done() - .Seal() - .Seal(); - } - } + GenerateInitForDistinct(parent, ndx, indicies, distinctField); return parent; }) .Seal() @@ -1742,7 +1749,50 @@ TExprNode::TPtr TAggregateExpander::GeneratePostAggregateMergePhase() .Build(); } +TExprNode::TPtr TAggregateExpander::GenerateJustOverStates(const TExprNode::TPtr& input, const TIdxSet& indicies) { + return Ctx.Builder(Node->Pos()) + .Callable("Map") + .Add(0, input) + .Lambda(1) + .Param("row") + .Callable("AsStruct") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + ui32 pos = 0; + for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) { + parent + .List(pos++) + .Add(0, KeyColumns->ChildPtr(i)) + .Callable(1, "Member") + .Arg(0, "row") + .Add(1, KeyColumns->ChildPtr(i)) + .Seal() + .Seal(); + } + + for (ui32 i : indicies) { + parent + .List(pos++) + .Add(0, InitialColumnNames[i]) + .Callable(1, "Just") + .Callable(0, "Member") + .Arg(0, "row") + .Add(1, InitialColumnNames[i]) + .Seal() + .Seal() + .Seal(); + } + + return parent; + }) + .Seal() + .Seal() + .Seal() + .Build(); +} + TExprNode::TPtr TAggregateExpander::GeneratePhases() { + const bool many = HaveDistinct; + YQL_CLOG(DEBUG, Core) << "Aggregate: generate " << (many ? "phases with distinct" : "simple phases"); TExprNode::TListType mergeTraits; for (ui32 index = 0; index < AggregatedColumns->ChildrenSize(); ++index) { auto originalTrait = AggregatedColumns->Child(index)->ChildPtr(1); @@ -1756,8 +1806,25 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() { .Seal() .Build(); + if (many) { + extractor = Ctx.Builder(Node->Pos()) + .Lambda() + .Param("row") + .Callable("Unwrap") + .Apply(0, extractor) + .With(0, "row") + .Seal() + .Seal() + .Seal() + .Build(); + } + bool isAggApply = originalTrait->IsCallable("AggApply"); auto serializedStateType = isAggApply ? AggApplySerializedStateType(originalTrait, Ctx) : originalTrait->Child(3)->GetTypeAnn(); + if (many) { + serializedStateType = Ctx.MakeType<TOptionalExprType>(serializedStateType); + } + auto extractorTypeNode = Ctx.Builder(Node->Pos()) .Callable("StructType") .List(0) @@ -1797,9 +1864,19 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() { } } - if (!HaveDistinct) { + TExprNode::TListType finalizeColumns; + for (ui32 index = 0; index < AggregatedColumns->ChildrenSize(); ++index) { + finalizeColumns.push_back(Ctx.Builder(Node->Pos()) + .List() + .Add(0, AggregatedColumns->Child(index)->ChildPtr(0)) + .Add(1, mergeTraits[index]) + .Seal() + .Build()); + } + + if (!many) { // simple Combine + MergeFinalize - TExprNode::TListType combineColumns, finalizeColumns; + TExprNode::TListType combineColumns; for (ui32 index = 0; index < AggregatedColumns->ChildrenSize(); ++index) { combineColumns.push_back(Ctx.Builder(Node->Pos()) .List() @@ -1807,13 +1884,6 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() { .Add(1, AggregatedColumns->Child(index)->ChildPtr(1)) .Seal() .Build()); - - finalizeColumns.push_back(Ctx.Builder(Node->Pos()) - .List() - .Add(0, AggregatedColumns->Child(index)->ChildPtr(0)) - .Add(1, mergeTraits[index]) - .Seal() - .Build()); } auto combine = Ctx.Builder(Node->Pos()) @@ -1838,7 +1908,93 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() { } // process with distincts - YQL_ENSURE(false, "TODO"); + // Combine + Map with Just over states + // for each distinct field: + // Combine by keys + field + // MergeFinalize by keys + field + // Map with Just over init func + // UnionAll + // MergeManyFinalize + TExprNode::TListType unionAllInputs; + if (!NonDistinctColumns.empty()) { + TExprNode::TListType combineColumns; + for (ui32 i : NonDistinctColumns) { + combineColumns.push_back(Ctx.Builder(Node->Pos()) + .List() + .Add(0, InitialColumnNames[i]) + .Add(1, AggregatedColumns->Child(i)->ChildPtr(1)) + .Seal() + .Build()); + } + + auto combine = Ctx.Builder(Node->Pos()) + .Callable("AggregateCombine") + .Add(0, AggList) + .Add(1, KeyColumns) + .Add(2, Ctx.NewList(Node->Pos(), std::move(combineColumns))) + .Add(3, Node->ChildPtr(3)) + .Seal() + .Build(); + + unionAllInputs.push_back(GenerateJustOverStates(combine, NonDistinctColumns)); + } + + for (ui32 index = 0; index < DistinctFields.size(); ++index) { + auto distinctField = DistinctFields[index]; + auto& indicies = Distinct2Columns[distinctField->Content()]; + TExprNode::TListType allKeyColumns = KeyColumns->ChildrenList(); + allKeyColumns.push_back(distinctField); + auto distinct = Ctx.Builder(Node->Pos()) + .Callable("Aggregate") + .Add(0, AggList) + .Add(1, Ctx.NewList(Node->Pos(), std::move(allKeyColumns))) + .List(2) + .Seal() + .Add(3, Node->ChildPtr(3)) + .Seal() + .Build(); + + auto mapInit = Ctx.Builder(Node->Pos()) + .Callable("Map") + .Add(0, distinct) + .Lambda(1) + .Param("item") + .Callable("AsStruct") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + ui32 pos = 0; + for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) { + parent + .List(pos++) + .Add(0, KeyColumns->ChildPtr(i)) + .Callable(1, "Member") + .Arg(0, "item") + .Add(1, KeyColumns->ChildPtr(i)) + .Seal() + .Seal(); + } + + GenerateInitForDistinct(parent, pos, indicies, distinctField); + return parent; + }) + .Seal() + .Seal() + .Seal() + .Build(); + + unionAllInputs.push_back(GenerateJustOverStates(mapInit, indicies)); + } + + auto unionAll = Ctx.NewCallable(Node->Pos(), "UnionAll", std::move(unionAllInputs)); + auto mergeManyFinalize = Ctx.Builder(Node->Pos()) + .Callable("AggregateMergeManyFinalize") + .Add(0, unionAll) + .Add(1, KeyColumns) + .Add(2, Ctx.NewList(Node->Pos(), std::move(finalizeColumns))) + .Add(3, Node->ChildPtr(3)) + .Seal() + .Build(); + + return mergeManyFinalize; } } // namespace NYql diff --git a/ydb/library/yql/core/yql_aggregate_expander.h b/ydb/library/yql/core/yql_aggregate_expander.h index a46d4e58498..4c2cba4236c 100644 --- a/ydb/library/yql/core/yql_aggregate_expander.h +++ b/ydb/library/yql/core/yql_aggregate_expander.h @@ -39,6 +39,8 @@ public: TExprNode::TPtr ExpandAggregate(); private: + using TIdxSet = std::set<ui32>; + TExprNode::TPtr ExpandAggApply(const TExprNode::TPtr& node); bool CollectTraits(); TExprNode::TPtr RebuildAggregate(); @@ -71,6 +73,8 @@ private: TExprNode::TPtr GetFinalAggStateExtractor(ui32 i); TExprNode::TPtr GeneratePhases(); + void GenerateInitForDistinct(TExprNodeBuilder& parent, ui32& ndx, const TIdxSet& indicies, const TExprNode::TPtr& distinctField); + TExprNode::TPtr GenerateJustOverStates(const TExprNode::TPtr& input, const TIdxSet& indicies); private: static constexpr TStringBuf SessionStartMemberName = "_yql_group_session_start"; @@ -105,7 +109,6 @@ private: TExprNode::TListType DistinctFields; TExprNode::TListType NothingStates; - using TIdxSet = std::set<ui32>; std::unordered_map<std::string_view, TIdxSet> Distinct2Columns; TIdxSet NonDistinctColumns; |