summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <[email protected]>2022-10-12 12:02:03 +0300
committervvvv <[email protected]>2022-10-12 12:02:03 +0300
commiteaddec45ab37af96ce24b3d296dd0aababe0893e (patch)
treeae4453282c51296b68b18028f2071567f1685b5c
parent96d04f634b2f08e77a8e450789769d109fbb3c6f (diff)
an option to split Aggregate to lower ones without AggApply expansion (part2 - with distincts)
-rw-r--r--ydb/library/yql/core/yql_aggregate_expander.cpp244
-rw-r--r--ydb/library/yql/core/yql_aggregate_expander.h5
2 files changed, 204 insertions, 45 deletions
diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp
index 5e769791e7e..a9289372513 100644
--- a/ydb/library/yql/core/yql_aggregate_expander.cpp
+++ b/ydb/library/yql/core/yql_aggregate_expander.cpp
@@ -5,10 +5,13 @@
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
#include <ydb/library/yql/core/yql_opt_window.h>
+#include <ydb/library/yql/utils/log/log.h>
+
namespace NYql {
TExprNode::TPtr TAggregateExpander::ExpandAggregate()
{
+ YQL_CLOG(DEBUG, Core) << "Expand " << Node->Content();
Suffix = Node->Content();
YQL_ENSURE(Suffix.SkipPrefix("Aggregate"));
AggList = Node->HeadPtr();
@@ -659,6 +662,43 @@ TExprNode::TPtr TAggregateExpander::GeneratePartialAggregateForNonDistinct(const
.Build();
}
+void TAggregateExpander::GenerateInitForDistinct(TExprNodeBuilder& parent, ui32& ndx, const TIdxSet& indicies, const TExprNode::TPtr& distinctField) {
+ for (ui32 i: indicies) {
+ auto trait = Traits[i];
+ auto initLambda = trait->Child(1);
+ if (initLambda->Head().ChildrenSize() == 1) {
+ parent.List(ndx++)
+ .Add(0, InitialColumnNames[i])
+ .Apply(1, *initLambda)
+ .With(0)
+ .Callable("Member")
+ .Arg(0, "item")
+ .Add(1, distinctField)
+ .Seal()
+ .Done()
+ .Seal()
+ .Seal();
+ } else {
+ parent.List(ndx++)
+ .Add(0, InitialColumnNames[i])
+ .Apply(1, *initLambda)
+ .With(0)
+ .Callable("Member")
+ .Arg(0, "item")
+ .Add(1, distinctField)
+ .Seal()
+ .Done()
+ .With(1)
+ .Callable("Uint32")
+ .Atom(0, ToString(i), TNodeFlags::Default)
+ .Seal()
+ .Done()
+ .Seal()
+ .Seal();
+ }
+ }
+}
+
TExprNode::TPtr TAggregateExpander::GenerateDistinctGrouper(const TExprNode::TPtr distinctField,
const TVector<const TTypeAnnotationNode*>& keyItemTypes, bool needDistinctPickle)
{
@@ -805,40 +845,7 @@ TExprNode::TPtr TAggregateExpander::GenerateDistinctGrouper(const TExprNode::TPt
.Callable("AsStruct")
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
ui32 ndx = 0;
- for (ui32 i: indicies) {
- auto trait = Traits[i];
- auto initLambda = trait->Child(1);
- if (initLambda->Head().ChildrenSize() == 1) {
- parent.List(ndx++)
- .Add(0, InitialColumnNames[i])
- .Apply(1, *initLambda)
- .With(0)
- .Callable("Member")
- .Arg(0, "item")
- .Add(1, distinctField)
- .Seal()
- .Done()
- .Seal()
- .Seal();
- } else {
- parent.List(ndx++)
- .Add(0, InitialColumnNames[i])
- .Apply(1, *initLambda)
- .With(0)
- .Callable("Member")
- .Arg(0, "item")
- .Add(1, distinctField)
- .Seal()
- .Done()
- .With(1)
- .Callable("Uint32")
- .Atom(0, ToString(i), TNodeFlags::Default)
- .Seal()
- .Done()
- .Seal()
- .Seal();
- }
- }
+ GenerateInitForDistinct(parent, ndx, indicies, distinctField);
return parent;
})
.Seal()
@@ -1742,7 +1749,50 @@ TExprNode::TPtr TAggregateExpander::GeneratePostAggregateMergePhase()
.Build();
}
+TExprNode::TPtr TAggregateExpander::GenerateJustOverStates(const TExprNode::TPtr& input, const TIdxSet& indicies) {
+ return Ctx.Builder(Node->Pos())
+ .Callable("Map")
+ .Add(0, input)
+ .Lambda(1)
+ .Param("row")
+ .Callable("AsStruct")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ ui32 pos = 0;
+ for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) {
+ parent
+ .List(pos++)
+ .Add(0, KeyColumns->ChildPtr(i))
+ .Callable(1, "Member")
+ .Arg(0, "row")
+ .Add(1, KeyColumns->ChildPtr(i))
+ .Seal()
+ .Seal();
+ }
+
+ for (ui32 i : indicies) {
+ parent
+ .List(pos++)
+ .Add(0, InitialColumnNames[i])
+ .Callable(1, "Just")
+ .Callable(0, "Member")
+ .Arg(0, "row")
+ .Add(1, InitialColumnNames[i])
+ .Seal()
+ .Seal()
+ .Seal();
+ }
+
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+}
+
TExprNode::TPtr TAggregateExpander::GeneratePhases() {
+ const bool many = HaveDistinct;
+ YQL_CLOG(DEBUG, Core) << "Aggregate: generate " << (many ? "phases with distinct" : "simple phases");
TExprNode::TListType mergeTraits;
for (ui32 index = 0; index < AggregatedColumns->ChildrenSize(); ++index) {
auto originalTrait = AggregatedColumns->Child(index)->ChildPtr(1);
@@ -1756,8 +1806,25 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() {
.Seal()
.Build();
+ if (many) {
+ extractor = Ctx.Builder(Node->Pos())
+ .Lambda()
+ .Param("row")
+ .Callable("Unwrap")
+ .Apply(0, extractor)
+ .With(0, "row")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
bool isAggApply = originalTrait->IsCallable("AggApply");
auto serializedStateType = isAggApply ? AggApplySerializedStateType(originalTrait, Ctx) : originalTrait->Child(3)->GetTypeAnn();
+ if (many) {
+ serializedStateType = Ctx.MakeType<TOptionalExprType>(serializedStateType);
+ }
+
auto extractorTypeNode = Ctx.Builder(Node->Pos())
.Callable("StructType")
.List(0)
@@ -1797,9 +1864,19 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() {
}
}
- if (!HaveDistinct) {
+ TExprNode::TListType finalizeColumns;
+ for (ui32 index = 0; index < AggregatedColumns->ChildrenSize(); ++index) {
+ finalizeColumns.push_back(Ctx.Builder(Node->Pos())
+ .List()
+ .Add(0, AggregatedColumns->Child(index)->ChildPtr(0))
+ .Add(1, mergeTraits[index])
+ .Seal()
+ .Build());
+ }
+
+ if (!many) {
// simple Combine + MergeFinalize
- TExprNode::TListType combineColumns, finalizeColumns;
+ TExprNode::TListType combineColumns;
for (ui32 index = 0; index < AggregatedColumns->ChildrenSize(); ++index) {
combineColumns.push_back(Ctx.Builder(Node->Pos())
.List()
@@ -1807,13 +1884,6 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() {
.Add(1, AggregatedColumns->Child(index)->ChildPtr(1))
.Seal()
.Build());
-
- finalizeColumns.push_back(Ctx.Builder(Node->Pos())
- .List()
- .Add(0, AggregatedColumns->Child(index)->ChildPtr(0))
- .Add(1, mergeTraits[index])
- .Seal()
- .Build());
}
auto combine = Ctx.Builder(Node->Pos())
@@ -1838,7 +1908,93 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() {
}
// process with distincts
- YQL_ENSURE(false, "TODO");
+ // Combine + Map with Just over states
+ // for each distinct field:
+ // Combine by keys + field
+ // MergeFinalize by keys + field
+ // Map with Just over init func
+ // UnionAll
+ // MergeManyFinalize
+ TExprNode::TListType unionAllInputs;
+ if (!NonDistinctColumns.empty()) {
+ TExprNode::TListType combineColumns;
+ for (ui32 i : NonDistinctColumns) {
+ combineColumns.push_back(Ctx.Builder(Node->Pos())
+ .List()
+ .Add(0, InitialColumnNames[i])
+ .Add(1, AggregatedColumns->Child(i)->ChildPtr(1))
+ .Seal()
+ .Build());
+ }
+
+ auto combine = Ctx.Builder(Node->Pos())
+ .Callable("AggregateCombine")
+ .Add(0, AggList)
+ .Add(1, KeyColumns)
+ .Add(2, Ctx.NewList(Node->Pos(), std::move(combineColumns)))
+ .Add(3, Node->ChildPtr(3))
+ .Seal()
+ .Build();
+
+ unionAllInputs.push_back(GenerateJustOverStates(combine, NonDistinctColumns));
+ }
+
+ for (ui32 index = 0; index < DistinctFields.size(); ++index) {
+ auto distinctField = DistinctFields[index];
+ auto& indicies = Distinct2Columns[distinctField->Content()];
+ TExprNode::TListType allKeyColumns = KeyColumns->ChildrenList();
+ allKeyColumns.push_back(distinctField);
+ auto distinct = Ctx.Builder(Node->Pos())
+ .Callable("Aggregate")
+ .Add(0, AggList)
+ .Add(1, Ctx.NewList(Node->Pos(), std::move(allKeyColumns)))
+ .List(2)
+ .Seal()
+ .Add(3, Node->ChildPtr(3))
+ .Seal()
+ .Build();
+
+ auto mapInit = Ctx.Builder(Node->Pos())
+ .Callable("Map")
+ .Add(0, distinct)
+ .Lambda(1)
+ .Param("item")
+ .Callable("AsStruct")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ ui32 pos = 0;
+ for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) {
+ parent
+ .List(pos++)
+ .Add(0, KeyColumns->ChildPtr(i))
+ .Callable(1, "Member")
+ .Arg(0, "item")
+ .Add(1, KeyColumns->ChildPtr(i))
+ .Seal()
+ .Seal();
+ }
+
+ GenerateInitForDistinct(parent, pos, indicies, distinctField);
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+
+ unionAllInputs.push_back(GenerateJustOverStates(mapInit, indicies));
+ }
+
+ auto unionAll = Ctx.NewCallable(Node->Pos(), "UnionAll", std::move(unionAllInputs));
+ auto mergeManyFinalize = Ctx.Builder(Node->Pos())
+ .Callable("AggregateMergeManyFinalize")
+ .Add(0, unionAll)
+ .Add(1, KeyColumns)
+ .Add(2, Ctx.NewList(Node->Pos(), std::move(finalizeColumns)))
+ .Add(3, Node->ChildPtr(3))
+ .Seal()
+ .Build();
+
+ return mergeManyFinalize;
}
} // namespace NYql
diff --git a/ydb/library/yql/core/yql_aggregate_expander.h b/ydb/library/yql/core/yql_aggregate_expander.h
index a46d4e58498..4c2cba4236c 100644
--- a/ydb/library/yql/core/yql_aggregate_expander.h
+++ b/ydb/library/yql/core/yql_aggregate_expander.h
@@ -39,6 +39,8 @@ public:
TExprNode::TPtr ExpandAggregate();
private:
+ using TIdxSet = std::set<ui32>;
+
TExprNode::TPtr ExpandAggApply(const TExprNode::TPtr& node);
bool CollectTraits();
TExprNode::TPtr RebuildAggregate();
@@ -71,6 +73,8 @@ private:
TExprNode::TPtr GetFinalAggStateExtractor(ui32 i);
TExprNode::TPtr GeneratePhases();
+ void GenerateInitForDistinct(TExprNodeBuilder& parent, ui32& ndx, const TIdxSet& indicies, const TExprNode::TPtr& distinctField);
+ TExprNode::TPtr GenerateJustOverStates(const TExprNode::TPtr& input, const TIdxSet& indicies);
private:
static constexpr TStringBuf SessionStartMemberName = "_yql_group_session_start";
@@ -105,7 +109,6 @@ private:
TExprNode::TListType DistinctFields;
TExprNode::TListType NothingStates;
- using TIdxSet = std::set<ui32>;
std::unordered_map<std::string_view, TIdxSet> Distinct2Columns;
TIdxSet NonDistinctColumns;