aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraidarsamer <aidarsamer@ydb.tech>2022-09-19 18:10:58 +0300
committeraidarsamer <aidarsamer@ydb.tech>2022-09-19 18:10:58 +0300
commitf7178ec1e428904ba55ebc0225d16fb00a928f73 (patch)
tree9e6400287c0bd1f2377563139e6b86d4942f86e9
parent9d86325bfc8fb4496df5bd300c50d2548ffe098a (diff)
downloadydb-f7178ec1e428904ba55ebc0225d16fb00a928f73.tar.gz
Refactor ExpandAggregate function to class
Refactor ExpandAggregate function to class.
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_opt.cpp5
-rw-r--r--ydb/library/yql/core/CMakeLists.txt2
-rw-r--r--ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp2
-rw-r--r--ydb/library/yql/core/yql_aggregate_expander.cpp (renamed from ydb/library/yql/core/yql_opt_aggregate.cpp)2010
-rw-r--r--ydb/library/yql/core/yql_aggregate_expander.h113
-rw-r--r--ydb/library/yql/core/yql_opt_aggregate.h14
-rw-r--r--ydb/library/yql/core/yql_opt_utils.cpp12
-rw-r--r--ydb/library/yql/core/yql_opt_utils.h4
-rw-r--r--ydb/library/yql/core/yql_opt_window.cpp19
-rw-r--r--ydb/library/yql/core/yql_opt_window.h33
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_log.cpp5
11 files changed, 1220 insertions, 999 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_opt.cpp b/ydb/core/kqp/provider/yql_kikimr_opt.cpp
index a3b8a36848a..642f5f3a621 100644
--- a/ydb/core/kqp/provider/yql_kikimr_opt.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_opt.cpp
@@ -2,8 +2,8 @@
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/core/common_opt/yql_co.h>
+#include<ydb/library/yql/core/yql_aggregate_expander.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
-#include <ydb/library/yql/core/yql_opt_aggregate.h>
namespace NYql {
namespace {
@@ -120,7 +120,8 @@ TExprNode::TPtr KiRewriteAggregate(TExprBase node, TExprContext& ctx, TTypeAnnot
}
YQL_CLOG(INFO, ProviderKikimr) << "KiRewriteAggregate";
- return ExpandAggregate(true, node.Ptr(), ctx, typesCtx);
+ TAggregateExpander aggExpander(true, node.Ptr(), ctx, typesCtx);
+ return aggExpander.ExpandAggregate();
}
TExprNode::TPtr KiRedundantSortByPk(TExprBase node, TExprContext& ctx,
diff --git a/ydb/library/yql/core/CMakeLists.txt b/ydb/library/yql/core/CMakeLists.txt
index a7a8c462e44..4e2113b6e39 100644
--- a/ydb/library/yql/core/CMakeLists.txt
+++ b/ydb/library/yql/core/CMakeLists.txt
@@ -47,6 +47,7 @@ target_link_libraries(library-yql-core PUBLIC
tools-enum_parser-enum_serialization_runtime
)
target_sources(library-yql-core PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_aggregate_expander.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_callable_transform.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_csv.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_execution.cpp
@@ -59,7 +60,6 @@ target_sources(library-yql-core PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_holding_file_storage.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_join.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_library_compiler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_aggregate.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_proposed_by_data.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_range.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_rewrite_io.cpp
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
index 11ed902b0a0..64f027f69ac 100644
--- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
+++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
@@ -12,7 +12,7 @@
#include <ydb/library/yql/core/yql_gc_transformer.h>
#include <ydb/library/yql/core/yql_expr_csee.h>
#include <ydb/library/yql/core/type_ann/type_ann_expr.h>
-#include <ydb/library/yql/core/yql_opt_aggregate.h>
+#include <ydb/library/yql/core/yql_aggregate_expander.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/core/services/yql_transform_pipeline.h>
#include <ydb/library/yql/core/services/yql_out_transformers.h>
diff --git a/ydb/library/yql/core/yql_opt_aggregate.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp
index 5ff03416653..5fc67082331 100644
--- a/ydb/library/yql/core/yql_opt_aggregate.cpp
+++ b/ydb/library/yql/core/yql_aggregate_expander.cpp
@@ -1,92 +1,104 @@
-#include "yql_opt_aggregate.h"
-#include "yql_opt_utils.h"
-#include "yql_opt_window.h"
-#include "yql_expr_optimize.h"
-#include "yql_expr_type_annotation.h"
+#include "yql_aggregate_expander.h"
+
+#include <ydb/library/yql/core/yql_expr_optimize.h>
+#include <ydb/library/yql/core/yql_expr_type_annotation.h>
+#include <ydb/library/yql/core/yql_opt_window.h>
namespace NYql {
-namespace {
+TExprNode::TPtr TAggregateExpander::ExpandAggregate()
+{
+ AggList = Node->HeadPtr();
+ KeyColumns = Node->ChildPtr(1);
+ AggregatedColumns = Node->Child(2);
+ auto settings = Node->Child(3);
+
+ if (!CollectTraits()) {
+ return RebuildAggregate();
+ }
+ YQL_ENSURE(!HasSetting(*settings, "hopping"), "Aggregate with hopping unsupported here.");
-TExprNode::TPtr ExpandAggApply(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {
+ HaveDistinct = AnyOf(AggregatedColumns->ChildrenList(),
+ [](const auto& child) { return child->ChildrenSize() == 3; });
+ EffectiveCompact = (HaveDistinct && CompactForDistinct) || ForceCompact || HasSetting(*settings, "compact");
+ for (const auto& trait : Traits) {
+ auto mergeLambda = trait->Child(5);
+ if (mergeLambda->Tail().IsCallable("Void")) {
+ EffectiveCompact = true;
+ break;
+ }
+ }
+
+ OriginalRowType = GetSeqItemType(Node->Head().GetTypeAnn())->Cast<TStructExprType>();
+ RowItems = OriginalRowType->GetItems();
+
+ ProcessSessionSetting(GetSetting(*settings, "session"));
+ RowType = Ctx.MakeType<TStructExprType>(RowItems);
+
+ TVector<const TTypeAnnotationNode*> keyItemTypes = GetKeyItemTypes();
+ bool needPickle = IsNeedPickle(keyItemTypes);
+ auto keyExtractor = GetKeyExtractor(needPickle);
+ CollectColumnsSpecs();
+
+ TExprNode::TPtr preAgg = GeneratePartialAggregate(keyExtractor, keyItemTypes, needPickle);
+ if (EffectiveCompact || !preAgg) {
+ preAgg = std::move(AggList);
+ }
+ return GeneratePostAggregate(preAgg, keyExtractor);
+}
+
+TExprNode::TPtr TAggregateExpander::ExpandAggApply(const TExprNode::TPtr& node)
+{
auto name = node->Head().Content();
- auto exportsPtr = typesCtx.Modules->GetModule("/lib/yql/aggregate.yql");
+ auto exportsPtr = TypesCtx.Modules->GetModule("/lib/yql/aggregate.yql");
YQL_ENSURE(exportsPtr);
const auto& exports = exportsPtr->Symbols();
const auto ex = exports.find(TString(name) + "_traits_factory");
YQL_ENSURE(exports.cend() != ex);
TNodeOnNodeOwnedMap deepClones;
- auto lambda = ctx.DeepCopy(*ex->second, exportsPtr->ExprCtx(), deepClones, true, false);
+ auto lambda = Ctx.DeepCopy(*ex->second, exportsPtr->ExprCtx(), deepClones, true, false);
- auto listTypeNode = ctx.NewCallable(node->Pos(), "ListType", { node->ChildPtr(1) });
+ auto listTypeNode = Ctx.NewCallable(node->Pos(), "ListType", { node->ChildPtr(1) });
auto extractor = node->ChildPtr(2);
- auto traits = ctx.ReplaceNodes(lambda->TailPtr(), {
+ auto traits = Ctx.ReplaceNodes(lambda->TailPtr(), {
{lambda->Head().Child(0), listTypeNode},
{lambda->Head().Child(1), extractor}
});
- ctx.Step.Repeat(TExprStep::ExpandApplyForLambdas);
- auto status = ExpandApply(traits, traits, ctx);
+ Ctx.Step.Repeat(TExprStep::ExpandApplyForLambdas);
+ auto status = ExpandApply(traits, traits, Ctx);
YQL_ENSURE(status != IGraphTransformer::TStatus::Error);
return traits;
}
-}
-
-TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, bool forceCompact, bool compactForDistinct) {
- auto list = node->HeadPtr();
- auto keyColumns = node->ChildPtr(1);
- auto aggregatedColumns = node->Child(2);
- auto settings = node->Child(3);
- TExprNode::TListType traits;
- bool needRebuild = false;
- for (ui32 index = 0; index < aggregatedColumns->ChildrenSize(); ++index) {
- auto trait = aggregatedColumns->Child(index)->ChildPtr(1);
+bool TAggregateExpander::CollectTraits() {
+ bool allTraitsCollected = true;
+ for (ui32 index = 0; index < AggregatedColumns->ChildrenSize(); ++index) {
+ auto trait = AggregatedColumns->Child(index)->ChildPtr(1);
if (trait->IsCallable("AggApply")) {
- trait = ExpandAggApply(trait, ctx, typesCtx);
- needRebuild = true;
+ trait = ExpandAggApply(trait);
+ allTraitsCollected = false;
}
-
- traits.push_back(trait);
+ Traits.push_back(trait);
}
+ return allTraitsCollected;
+}
- if (needRebuild) {
- TExprNode::TListType newAggregatedColumnsItems = aggregatedColumns->ChildrenList();
- for (ui32 index = 0; index < aggregatedColumns->ChildrenSize(); ++index) {
- newAggregatedColumnsItems[index] = ctx.ChangeChild(*(newAggregatedColumnsItems[index]), 1, std::move(traits[index]));
- }
-
- return ctx.ChangeChild(*node, 2, ctx.NewList(node->Pos(), std::move(newAggregatedColumnsItems)));
+TExprNode::TPtr TAggregateExpander::RebuildAggregate()
+{
+ TExprNode::TListType newAggregatedColumnsItems = AggregatedColumns->ChildrenList();
+ for (ui32 index = 0; index < AggregatedColumns->ChildrenSize(); ++index) {
+ newAggregatedColumnsItems[index] = Ctx.ChangeChild(*(newAggregatedColumnsItems[index]), 1, std::move(Traits[index]));
}
- YQL_ENSURE(!HasSetting(*settings, "hopping"), "Aggregate with hopping unsupported here.");
-
- static const TStringBuf sessionStartMemberName = "_yql_group_session_start";
- const TExprNode::TPtr voidNode = ctx.NewCallable(node->Pos(), "Void", {});
-
- TExprNode::TPtr sessionKey;
- const TTypeAnnotationNode* sessionKeyType = nullptr;
- const TTypeAnnotationNode* sessionParamsType = nullptr;
- TExprNode::TPtr sessionInit;
- TExprNode::TPtr sessionUpdate;
-
- TExprNode::TPtr sortKey = voidNode;
- TExprNode::TPtr sortOrder = voidNode;
-
- const bool haveDistinct = AnyOf(aggregatedColumns->ChildrenList(),
- [](const auto& child) { return child->ChildrenSize() == 3; });
- bool effectiveCompact = (haveDistinct && compactForDistinct) || forceCompact || HasSetting(*settings, "compact");
- for (const auto& trait : traits) {
- auto mergeLambda = trait->Child(5);
- if (mergeLambda->Tail().IsCallable("Void")) {
- effectiveCompact = true;
- break;
- }
- }
+ return Ctx.ChangeChild(*Node, 2, Ctx.NewList(Node->Pos(), std::move(newAggregatedColumnsItems)));
+}
- auto contextLambda = HasContextFuncs(*aggregatedColumns) ?
- ctx.Builder(node->Pos())
+TExprNode::TPtr TAggregateExpander::GetContextLambda()
+{
+ return HasContextFuncs(*AggregatedColumns) ?
+ Ctx.Builder(Node->Pos())
.Lambda()
.Param("stream")
.Callable("WithContext")
@@ -95,113 +107,110 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T
.Seal()
.Seal()
.Build() :
- ctx.Builder(node->Pos())
+ Ctx.Builder(Node->Pos())
.Lambda()
.Param("stream")
.Arg("stream")
.Seal()
.Build();
+}
- const TStructExprType* originalRowType = GetSeqItemType(node->Head().GetTypeAnn())->Cast<TStructExprType>();
- TVector<const TItemExprType*> rowItems = originalRowType->GetItems();
-
- const auto sessionSetting = GetSetting(*settings, "session");
- TMaybe<TStringBuf> sessionOutputColumn;
- if (sessionSetting) {
- YQL_ENSURE(sessionSetting->Child(1)->Child(0)->IsAtom());
- sessionOutputColumn = sessionSetting->Child(1)->Child(0)->Content();
-
- // remove session column from other keys
- TExprNodeList keyColumnsList = keyColumns->ChildrenList();
- EraseIf(keyColumnsList, [&](const auto& key) { return sessionOutputColumn == key->Content(); });
- keyColumns = ctx.NewList(keyColumns->Pos(), std::move(keyColumnsList));
-
- TExprNode::TPtr sessionSortTraits;
- ExtractSessionWindowParams(node->Pos(), sessionSetting->Child(1)->ChildPtr(1), sessionKey, sessionKeyType, sessionParamsType, sessionSortTraits,
- sessionInit, sessionUpdate, ctx);
- ExtractSortKeyAndOrder(node->Pos(), sessionSortTraits, sortKey, sortOrder, ctx);
-
- if (haveDistinct) {
- auto keySelector = BuildKeySelector(node->Pos(), *originalRowType, keyColumns, ctx);
- const auto sessionStartMemberLambda = AddSessionParamsMemberLambda(node->Pos(), sessionStartMemberName, "", keySelector,
- sessionKey, sessionInit, sessionUpdate, ctx);
-
- list = ctx.Builder(node->Pos())
- .Callable("PartitionsByKeys")
- .Add(0, list)
- .Add(1, keySelector)
- .Add(2, sortOrder)
- .Add(3, sortKey)
- .Lambda(4)
- .Param("partitionedStream")
- .Apply(sessionStartMemberLambda)
- .With(0, "partitionedStream")
- .Seal()
+void TAggregateExpander::ProcessSessionSetting(TExprNode::TPtr sessionSetting)
+{
+ if (!sessionSetting) {
+ return;
+ }
+ HaveSessionSetting = true;
+
+ YQL_ENSURE(sessionSetting->Child(1)->Child(0)->IsAtom());
+ SessionOutputColumn = sessionSetting->Child(1)->Child(0)->Content();
+
+ // remove session column from other keys
+ TExprNodeList keyColumnsList = KeyColumns->ChildrenList();
+ EraseIf(keyColumnsList, [&](const auto& key) { return SessionOutputColumn == key->Content(); });
+ KeyColumns = Ctx.NewList(KeyColumns->Pos(), std::move(keyColumnsList));
+
+ SessionWindowParams.Traits = sessionSetting->Child(1)->ChildPtr(1);
+ ExtractSessionWindowParams(Node->Pos(), SessionWindowParams, Ctx);
+ ExtractSortKeyAndOrder(Node->Pos(), SessionWindowParams.SortTraits, SortParams, Ctx);
+
+ if (HaveDistinct) {
+ auto keySelector = BuildKeySelector(Node->Pos(), *OriginalRowType, KeyColumns, Ctx);
+ const auto sessionStartMemberLambda = AddSessionParamsMemberLambda(Node->Pos(), SessionStartMemberName, keySelector,
+ SessionWindowParams, Ctx);
+
+ AggList = Ctx.Builder(Node->Pos())
+ .Callable("PartitionsByKeys")
+ .Add(0, AggList)
+ .Add(1, keySelector)
+ .Add(2, SortParams.Order)
+ .Add(3, SortParams.Key)
+ .Lambda(4)
+ .Param("partitionedStream")
+ .Apply(sessionStartMemberLambda)
+ .With(0, "partitionedStream")
.Seal()
.Seal()
- .Build();
+ .Seal()
+ .Build();
- auto keyColumnsList = keyColumns->ChildrenList();
- keyColumnsList.push_back(ctx.NewAtom(node->Pos(), sessionStartMemberName));
- keyColumns = ctx.NewList(node->Pos(), std::move(keyColumnsList));
+ auto keyColumnsList = KeyColumns->ChildrenList();
+ keyColumnsList.push_back(Ctx.NewAtom(Node->Pos(), SessionStartMemberName));
+ KeyColumns = Ctx.NewList(Node->Pos(), std::move(keyColumnsList));
- rowItems.push_back(ctx.MakeType<TItemExprType>(sessionStartMemberName, sessionKeyType));
+ RowItems.push_back(Ctx.MakeType<TItemExprType>(SessionStartMemberName, SessionWindowParams.KeyType));
- sortOrder = sortKey = voidNode;
- sessionKey = sessionInit = sessionUpdate = {};
- sessionKeyType = nullptr;
- } else {
- effectiveCompact = true;
- }
+ SessionWindowParams.Reset();
+ SortParams.Key = SortParams.Order = VoidNode;
+ } else {
+ EffectiveCompact = true;
}
+}
- const bool compact = effectiveCompact;
- const auto rowType = ctx.MakeType<TStructExprType>(rowItems);
-
- auto preMap = ctx.Builder(node->Pos())
- .Lambda()
- .Param("premap")
- .Callable("Just").Arg(0, "premap").Seal()
- .Seal().Build();
-
- bool needPickle = false;
+TVector<const TTypeAnnotationNode*> TAggregateExpander::GetKeyItemTypes()
+{
TVector<const TTypeAnnotationNode*> keyItemTypes;
- for (auto keyColumn : keyColumns->Children()) {
- auto index = rowType->FindItem(keyColumn->Content());
+ for (auto keyColumn : KeyColumns->Children()) {
+ auto index = RowType->FindItem(keyColumn->Content());
YQL_ENSURE(index, "Unknown column: " << keyColumn->Content());
- auto type = rowType->GetItems()[*index]->GetItemType();
+ auto type = RowType->GetItems()[*index]->GetItemType();
keyItemTypes.push_back(type);
- needPickle = needPickle || allowPickle && !IsDataOrOptionalOfData(type);
+
}
+ return keyItemTypes;
+}
- const TTypeAnnotationNode* pickleType = nullptr;
- TExprNode::TPtr pickleTypeNode;
- if (needPickle) {
- pickleType = keyColumns->ChildrenSize() > 1 ? ctx.MakeType<TTupleExprType>(keyItemTypes) : keyItemTypes[0];
- pickleTypeNode = ExpandType(node->Pos(), *pickleType, ctx);
+bool TAggregateExpander::IsNeedPickle(const TVector<const TTypeAnnotationNode*>& keyItemTypes)
+{
+ bool needPickle = false;
+ for (auto type : keyItemTypes) {
+ needPickle = needPickle || AllowPickle && !IsDataOrOptionalOfData(type);
}
+ return needPickle;
+}
- auto keyExtractor = ctx.Builder(node->Pos())
+TExprNode::TPtr TAggregateExpander::GetKeyExtractor(bool needPickle)
+{
+ TExprNode::TPtr keyExtractor = Ctx.Builder(Node->Pos())
.Lambda()
.Param("item")
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- if (keyColumns->ChildrenSize() == 0) {
+ if (KeyColumns->ChildrenSize() == 0) {
return parent.Callable("Uint32").Atom(0, "0", TNodeFlags::Default).Seal();
}
- else if (keyColumns->ChildrenSize() == 1) {
- return parent.Callable("Member").Arg(0, "item").Add(1, keyColumns->HeadPtr()).Seal();
+ else if (KeyColumns->ChildrenSize() == 1) {
+ return parent.Callable("Member").Arg(0, "item").Add(1, KeyColumns->HeadPtr()).Seal();
}
else {
auto listBuilder = parent.List();
ui32 pos = 0;
- for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) {
+ for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) {
listBuilder
.Callable(pos++, "Member")
.Arg(0, "item")
- .Add(1, keyColumns->ChildPtr(i))
+ .Add(1, KeyColumns->ChildPtr(i))
.Seal();
}
-
return listBuilder.Seal();
}
})
@@ -209,7 +218,7 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T
.Build();
if (needPickle) {
- keyExtractor = ctx.Builder(node->Pos())
+ keyExtractor = Ctx.Builder(Node->Pos())
.Lambda()
.Param("item")
.Callable("StablePickle")
@@ -220,45 +229,36 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T
.Seal()
.Build();
}
+ return keyExtractor;
+}
- TExprNode::TListType initialColumnNames;
- TExprNode::TListType finalColumnNames;
- TExprNode::TListType distinctFields;
- using TIdxSet = std::set<ui32>;
- std::unordered_map<std::string_view, TIdxSet> distinct2Columns;
- std::unordered_map<std::string_view, bool> distinctFieldNeedsPickle;
- std::unordered_map<std::string_view, TExprNode::TPtr> udfSetCreate;
- std::unordered_map<std::string_view, TExprNode::TPtr> udfAddValue;
- std::unordered_map<std::string_view, TExprNode::TPtr> udfWasChanged;
- TIdxSet nondistinctColumns;
- for (auto child : aggregatedColumns->Children()) {
+void TAggregateExpander::CollectColumnsSpecs()
+{
+ for (ui32 index = 0; index < AggregatedColumns->ChildrenSize(); ++index) {
+ auto child = AggregatedColumns->Child(index);
if (const auto distinctField = (child->ChildrenSize() == 3) ? child->Child(2) : nullptr) {
- const auto ins = distinct2Columns.emplace(distinctField->Content(), TIdxSet());
+ const auto ins = Distinct2Columns.emplace(distinctField->Content(), TIdxSet());
if (ins.second) {
- distinctFields.push_back(distinctField);
+ DistinctFields.push_back(distinctField);
}
- ins.first->second.insert(initialColumnNames.size());
+ ins.first->second.insert(InitialColumnNames.size());
} else {
- nondistinctColumns.insert(initialColumnNames.size());
+ NonDistinctColumns.insert(InitialColumnNames.size());
}
if (child->Head().IsAtom()) {
- finalColumnNames.push_back(child->HeadPtr());
+ FinalColumnNames.push_back(child->HeadPtr());
} else {
- finalColumnNames.push_back(child->Head().HeadPtr());
+ FinalColumnNames.push_back(child->Head().HeadPtr());
}
- initialColumnNames.push_back(ctx.NewAtom(finalColumnNames.back()->Pos(), "_yql_agg_" + ToString(initialColumnNames.size()), TNodeFlags::Default));
- }
-
- TExprNode::TListType nothingStates;
- for (ui32 index = 0; index < aggregatedColumns->ChildrenSize(); ++index) {
- auto trait = traits[index];
+ InitialColumnNames.push_back(Ctx.NewAtom(FinalColumnNames.back()->Pos(), "_yql_agg_" + ToString(InitialColumnNames.size()), TNodeFlags::Default));
+ auto trait = Traits[index];
auto saveLambda = trait->Child(3);
auto saveLambdaType = saveLambda->GetTypeAnn();
- auto typeNode = ExpandType(node->Pos(), *saveLambdaType, ctx);
- nothingStates.push_back(ctx.Builder(node->Pos())
+ auto typeNode = ExpandType(Node->Pos(), *saveLambdaType, Ctx);
+ NothingStates.push_back(Ctx.Builder(Node->Pos())
.Callable("Nothing")
.Callable(0, "OptionalType")
.Add(0, std::move(typeNode))
@@ -267,620 +267,681 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T
.Build()
);
}
+}
- TExprNode::TPtr groupInput;
+TExprNode::TPtr TAggregateExpander::GeneratePartialAggregate(const TExprNode::TPtr keyExtractor,
+ const TVector<const TTypeAnnotationNode*>& keyItemTypes, bool needPickle)
+{
+ TExprNode::TPtr pickleTypeNode = nullptr;
+ if (needPickle) {
+ const TTypeAnnotationNode* pickleType = nullptr;
+ pickleType = KeyColumns->ChildrenSize() > 1 ? Ctx.MakeType<TTupleExprType>(keyItemTypes) : keyItemTypes[0];
+ pickleTypeNode = ExpandType(Node->Pos(), *pickleType, Ctx);
+ }
- if (!nondistinctColumns.empty()) {
- auto combineInit = ctx.Builder(node->Pos())
- .Lambda()
- .Param("key")
- .Param("item")
- .Callable("AsStruct")
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- ui32 ndx = 0;
- for (ui32 i: nondistinctColumns) {
- auto trait = traits[i];
- auto initLambda = trait->Child(1);
- if (initLambda->Head().ChildrenSize() == 1) {
- parent.List(ndx++)
- .Add(0, initialColumnNames[i])
- .Apply(1, *initLambda)
- .With(0)
- .Callable("CastStruct")
- .Arg(0, "item")
- .Add(1, ExpandType(node->Pos(), *initLambda->Head().Head().GetTypeAnn(), ctx))
- .Seal()
- .Done()
- .Seal()
- .Seal();
- } else {
- parent.List(ndx++)
- .Add(0, initialColumnNames[i])
- .Apply(1, *initLambda)
- .With(0)
- .Callable("CastStruct")
- .Arg(0, "item")
- .Add(1, ExpandType(node->Pos(), *initLambda->Head().Head().GetTypeAnn(), ctx))
- .Seal()
- .Done()
- .With(1)
- .Callable("Uint32")
- .Atom(0, ToString(i), TNodeFlags::Default)
- .Seal()
- .Done()
- .Seal()
- .Seal();
- }
- }
- return parent;
- })
+ TExprNode::TPtr partialAgg = nullptr;
+ if (!NonDistinctColumns.empty()) {
+ partialAgg = GeneratePartialAggregateForNonDistinct(keyExtractor, pickleTypeNode);
+ }
+ for (ui32 index = 0; index < DistinctFields.size(); ++index) {
+ auto distinctField = DistinctFields[index];
+
+ bool needDistinctPickle = EffectiveCompact ? false : needPickle;
+ auto distinctGrouper = GenerateDistinctGrouper(distinctField, keyItemTypes, needDistinctPickle);
+
+ if (!partialAgg) {
+ partialAgg = std::move(distinctGrouper);
+ } else {
+ partialAgg = Ctx.Builder(Node->Pos())
+ .Callable("Extend")
+ .Add(0, std::move(partialAgg))
+ .Add(1, std::move(distinctGrouper))
.Seal()
- .Seal()
- .Build();
+ .Build();
+ }
+ }
+ // If no aggregation functions then add additional combiner
+ if (AggregatedColumns->ChildrenSize() == 0 && KeyColumns->ChildrenSize() > 0 && !SessionWindowParams.Update) {
+ if (!partialAgg) {
+ partialAgg = AggList;
+ }
- auto combineUpdate = ctx.Builder(node->Pos())
+ auto uniqCombineInit = ReturnKeyAsIsForCombineInit(pickleTypeNode);
+ auto uniqCombineUpdate = Ctx.Builder(Node->Pos())
.Lambda()
.Param("key")
.Param("item")
.Param("state")
- .Callable("AsStruct")
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- ui32 ndx = 0;
- for (ui32 i: nondistinctColumns) {
- auto trait = traits[i];
- auto updateLambda = trait->Child(2);
- if (updateLambda->Head().ChildrenSize() == 2) {
- parent.List(ndx++)
- .Add(0, initialColumnNames[i])
- .Apply(1, *updateLambda)
- .With(0)
- .Callable("CastStruct")
- .Arg(0, "item")
- .Add(1, ExpandType(node->Pos(), *updateLambda->Head().Head().GetTypeAnn(), ctx))
- .Seal()
- .Done()
- .With(1)
- .Callable("Member")
- .Arg(0, "state")
- .Add(1, initialColumnNames[i])
- .Seal()
- .Done()
- .Seal()
- .Seal();
- } else {
- parent.List(ndx++)
- .Add(0, initialColumnNames[i])
- .Apply(1, *updateLambda)
- .With(0)
- .Callable("CastStruct")
- .Arg(0, "item")
- .Add(1, ExpandType(node->Pos(), *updateLambda->Head().Head().GetTypeAnn(), ctx))
- .Seal()
- .Done()
- .With(1)
- .Callable("Member")
- .Arg(0, "state")
- .Add(1, initialColumnNames[i])
- .Seal()
- .Done()
- .With(2)
- .Callable("Uint32")
- .Atom(0, ToString(i), TNodeFlags::Default)
- .Seal()
- .Done()
- .Seal()
- .Seal();
- }
- }
- return parent;
- })
- .Seal()
+ .Arg("state")
.Seal()
.Build();
-
- auto combineSave = ctx.Builder(node->Pos())
+
+ // Return state as-is
+ auto uniqCombineSave = Ctx.Builder(Node->Pos())
.Lambda()
.Param("key")
.Param("state")
.Callable("Just")
- .Callable(0, "AsStruct")
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- for (ui32 i = 0; i < initialColumnNames.size(); ++i) {
- if (nondistinctColumns.find(i) == nondistinctColumns.end()) {
+ .Arg(0, "state")
+ .Seal()
+ .Seal()
+ .Build();
+
+ partialAgg = Ctx.Builder(Node->Pos())
+ .Callable("CombineByKey")
+ .Add(0, std::move(partialAgg))
+ .Add(1, PreMap)
+ .Add(2, keyExtractor)
+ .Add(3, std::move(uniqCombineInit))
+ .Add(4, std::move(uniqCombineUpdate))
+ .Add(5, std::move(uniqCombineSave))
+ .Seal()
+ .Build();
+ }
+ return partialAgg;
+}
+
+TExprNode::TPtr TAggregateExpander::GeneratePartialAggregateForNonDistinct(const TExprNode::TPtr& keyExtractor, const TExprNode::TPtr& pickleTypeNode)
+{
+ auto combineInit = Ctx.Builder(Node->Pos())
+ .Lambda()
+ .Param("key")
+ .Param("item")
+ .Callable("AsStruct")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ ui32 ndx = 0;
+ for (ui32 i: NonDistinctColumns) {
+ auto trait = Traits[i];
+ auto initLambda = trait->Child(1);
+ if (initLambda->Head().ChildrenSize() == 1) {
+ parent.List(ndx++)
+ .Add(0, InitialColumnNames[i])
+ .Apply(1, *initLambda)
+ .With(0)
+ .Callable("CastStruct")
+ .Arg(0, "item")
+ .Add(1, ExpandType(Node->Pos(), *initLambda->Head().Head().GetTypeAnn(), Ctx))
+ .Seal()
+ .Done()
+ .Seal()
+ .Seal();
+ } else {
+ parent.List(ndx++)
+ .Add(0, InitialColumnNames[i])
+ .Apply(1, *initLambda)
+ .With(0)
+ .Callable("CastStruct")
+ .Arg(0, "item")
+ .Add(1, ExpandType(Node->Pos(), *initLambda->Head().Head().GetTypeAnn(), Ctx))
+ .Seal()
+ .Done()
+ .With(1)
+ .Callable("Uint32")
+ .Atom(0, ToString(i), TNodeFlags::Default)
+ .Seal()
+ .Done()
+ .Seal()
+ .Seal();
+ }
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Build();
+
+ auto combineUpdate = Ctx.Builder(Node->Pos())
+ .Lambda()
+ .Param("key")
+ .Param("item")
+ .Param("state")
+ .Callable("AsStruct")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ ui32 ndx = 0;
+ for (ui32 i: NonDistinctColumns) {
+ auto trait = Traits[i];
+ auto updateLambda = trait->Child(2);
+ if (updateLambda->Head().ChildrenSize() == 2) {
+ parent.List(ndx++)
+ .Add(0, InitialColumnNames[i])
+ .Apply(1, *updateLambda)
+ .With(0)
+ .Callable("CastStruct")
+ .Arg(0, "item")
+ .Add(1, ExpandType(Node->Pos(), *updateLambda->Head().Head().GetTypeAnn(), Ctx))
+ .Seal()
+ .Done()
+ .With(1)
+ .Callable("Member")
+ .Arg(0, "state")
+ .Add(1, InitialColumnNames[i])
+ .Seal()
+ .Done()
+ .Seal()
+ .Seal();
+ } else {
+ parent.List(ndx++)
+ .Add(0, InitialColumnNames[i])
+ .Apply(1, *updateLambda)
+ .With(0)
+ .Callable("CastStruct")
+ .Arg(0, "item")
+ .Add(1, ExpandType(Node->Pos(), *updateLambda->Head().Head().GetTypeAnn(), Ctx))
+ .Seal()
+ .Done()
+ .With(1)
+ .Callable("Member")
+ .Arg(0, "state")
+ .Add(1, InitialColumnNames[i])
+ .Seal()
+ .Done()
+ .With(2)
+ .Callable("Uint32")
+ .Atom(0, ToString(i), TNodeFlags::Default)
+ .Seal()
+ .Done()
+ .Seal()
+ .Seal();
+ }
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Build();
+
+ auto combineSave = Ctx.Builder(Node->Pos())
+ .Lambda()
+ .Param("key")
+ .Param("state")
+ .Callable("Just")
+ .Callable(0, "AsStruct")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ for (ui32 i = 0; i < InitialColumnNames.size(); ++i) {
+ if (NonDistinctColumns.find(i) == NonDistinctColumns.end()) {
+ parent.List(i)
+ .Add(0, InitialColumnNames[i])
+ .Add(1, NothingStates[i])
+ .Seal();
+ } else {
+ auto trait = Traits[i];
+ auto saveLambda = trait->Child(3);
+ if (!DistinctFields.empty()) {
parent.List(i)
- .Add(0, initialColumnNames[i])
- .Add(1, nothingStates[i])
- .Seal();
- } else {
- auto trait = traits[i];
- auto saveLambda = trait->Child(3);
- if (!distinctFields.empty()) {
- parent.List(i)
- .Add(0, initialColumnNames[i])
- .Callable(1, "Just")
- .Apply(0, *saveLambda)
- .With(0)
- .Callable("Member")
- .Arg(0, "state")
- .Add(1, initialColumnNames[i])
- .Seal()
- .Done()
- .Seal()
- .Seal()
- .Seal();
- } else {
- parent.List(i)
- .Add(0, initialColumnNames[i])
- .Apply(1, *saveLambda)
+ .Add(0, InitialColumnNames[i])
+ .Callable(1, "Just")
+ .Apply(0, *saveLambda)
.With(0)
.Callable("Member")
.Arg(0, "state")
- .Add(1, initialColumnNames[i])
+ .Add(1, InitialColumnNames[i])
.Seal()
.Done()
.Seal()
- .Seal();
- }
+ .Seal()
+ .Seal();
+ } else {
+ parent.List(i)
+ .Add(0, InitialColumnNames[i])
+ .Apply(1, *saveLambda)
+ .With(0)
+ .Callable("Member")
+ .Arg(0, "state")
+ .Add(1, InitialColumnNames[i])
+ .Seal()
+ .Done()
+ .Seal()
+ .Seal();
}
}
- return parent;
- })
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- ui32 pos = 0;
- for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) {
- auto listBuilder = parent.List(initialColumnNames.size() + i);
- listBuilder.Add(0, keyColumns->ChildPtr(i));
- if (keyColumns->ChildrenSize() > 1) {
- if (needPickle) {
- listBuilder
- .Callable(1, "Nth")
- .Callable(0, "Unpickle")
- .Add(0, pickleTypeNode)
- .Arg(1, "key")
- .Seal()
- .Atom(1, ToString(pos), TNodeFlags::Default)
- .Seal();
- } else {
- listBuilder
- .Callable(1, "Nth")
- .Arg(0, "key")
- .Atom(1, ToString(pos), TNodeFlags::Default)
- .Seal();
- }
- ++pos;
+ }
+ return parent;
+ })
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ ui32 pos = 0;
+ for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) {
+ auto listBuilder = parent.List(InitialColumnNames.size() + i);
+ listBuilder.Add(0, KeyColumns->ChildPtr(i));
+ if (KeyColumns->ChildrenSize() > 1) {
+ if (pickleTypeNode) {
+ listBuilder
+ .Callable(1, "Nth")
+ .Callable(0, "Unpickle")
+ .Add(0, pickleTypeNode)
+ .Arg(1, "key")
+ .Seal()
+ .Atom(1, ToString(pos), TNodeFlags::Default)
+ .Seal();
} else {
- if (needPickle) {
- listBuilder.Callable(1, "Unpickle")
- .Add(0, pickleTypeNode)
- .Arg(1, "key")
- .Seal();
- } else {
- listBuilder.Arg(1, "key");
- }
+ listBuilder
+ .Callable(1, "Nth")
+ .Arg(0, "key")
+ .Atom(1, ToString(pos), TNodeFlags::Default)
+ .Seal();
+ }
+ ++pos;
+ } else {
+ if (pickleTypeNode) {
+ listBuilder.Callable(1, "Unpickle")
+ .Add(0, pickleTypeNode)
+ .Arg(1, "key")
+ .Seal();
+ } else {
+ listBuilder.Arg(1, "key");
}
- listBuilder.Seal();
}
- return parent;
- })
- .Seal()
+ listBuilder.Seal();
+ }
+ return parent;
+ })
.Seal()
.Seal()
- .Build();
+ .Seal()
+ .Build();
- groupInput = ctx.Builder(node->Pos())
- .Callable("CombineByKey")
- .Add(0, list)
- .Add(1, preMap)
- .Add(2, keyExtractor)
- .Add(3, std::move(combineInit))
- .Add(4, std::move(combineUpdate))
- .Add(5, std::move(combineSave))
- .Seal()
- .Build();
- }
+ return Ctx.Builder(Node->Pos())
+ .Callable("CombineByKey")
+ .Add(0, AggList)
+ .Add(1, PreMap)
+ .Add(2, keyExtractor)
+ .Add(3, std::move(combineInit))
+ .Add(4, std::move(combineUpdate))
+ .Add(5, std::move(combineSave))
+ .Seal()
+ .Build();
+}
- for (ui32 index = 0; index < distinctFields.size(); ++index) {
- auto distinctField = distinctFields[index];
- auto& indicies = distinct2Columns[distinctField->Content()];
- auto distinctIndex = rowType->FindItem(distinctField->Content());
- YQL_ENSURE(distinctIndex, "Unknown field: " << distinctField->Content());
- auto distinctType = rowType->GetItems()[*distinctIndex]->GetItemType();
- TVector<const TTypeAnnotationNode*> distinctKeyItemTypes = keyItemTypes;
- distinctKeyItemTypes.push_back(distinctType);
- bool needDistinctPickle = compact ? false : needPickle;
- auto valueType = distinctType;
- if (distinctType->GetKind() == ETypeAnnotationKind::Optional) {
- distinctType = distinctType->Cast<TOptionalExprType>()->GetItemType();
- }
+TExprNode::TPtr TAggregateExpander::GenerateDistinctGrouper(const TExprNode::TPtr distinctField,
+ const TVector<const TTypeAnnotationNode*>& keyItemTypes, bool needDistinctPickle)
+{
+ auto& indicies = Distinct2Columns[distinctField->Content()];
+ auto distinctIndex = RowType->FindItem(distinctField->Content());
+ YQL_ENSURE(distinctIndex, "Unknown field: " << distinctField->Content());
+ auto distinctType = RowType->GetItems()[*distinctIndex]->GetItemType();
+ TVector<const TTypeAnnotationNode*> distinctKeyItemTypes = keyItemTypes;
+ distinctKeyItemTypes.push_back(distinctType);
+ auto valueType = distinctType;
+ if (distinctType->GetKind() == ETypeAnnotationKind::Optional) {
+ distinctType = distinctType->Cast<TOptionalExprType>()->GetItemType();
+ }
- if (distinctType->GetKind() != ETypeAnnotationKind::Data) {
- needDistinctPickle = true;
- valueType = ctx.MakeType<TDataExprType>(EDataSlot::String);
- }
+ if (distinctType->GetKind() != ETypeAnnotationKind::Data) {
+ needDistinctPickle = true;
+ valueType = Ctx.MakeType<TDataExprType>(EDataSlot::String);
+ }
- const auto expandedValueType = needDistinctPickle ?
- ctx.Builder(node->Pos())
- .Callable("DataType")
- .Atom(0, "String", TNodeFlags::Default)
- .Seal()
- .Build():
- ExpandType(node->Pos(), *valueType, ctx);
-
- distinctFieldNeedsPickle[distinctField->Content()] = needDistinctPickle;
- auto udfSetCreateValue = ctx.Builder(node->Pos())
- .Callable("Udf")
- .Atom(0, "Set.Create")
- .Callable(1, "Void").Seal()
- .Callable(2, "TupleType")
- .Callable(0, "TupleType")
- .Add(0, expandedValueType)
- .Callable(1, "DataType")
- .Atom(0, "Uint32", TNodeFlags::Default)
- .Seal()
+ const auto expandedValueType = needDistinctPickle ?
+ Ctx.Builder(Node->Pos())
+ .Callable("DataType")
+ .Atom(0, "String", TNodeFlags::Default)
+ .Seal()
+ .Build()
+ : ExpandType(Node->Pos(), *valueType, Ctx);
+
+ DistinctFieldNeedsPickle[distinctField->Content()] = needDistinctPickle;
+ auto udfSetCreateValue = Ctx.Builder(Node->Pos())
+ .Callable("Udf")
+ .Atom(0, "Set.Create")
+ .Callable(1, "Void").Seal()
+ .Callable(2, "TupleType")
+ .Callable(0, "TupleType")
+ .Add(0, expandedValueType)
+ .Callable(1, "DataType")
+ .Atom(0, "Uint32", TNodeFlags::Default)
.Seal()
- .Callable(1, "StructType").Seal()
- .Add(2, expandedValueType)
.Seal()
+ .Callable(1, "StructType").Seal()
+ .Add(2, expandedValueType)
.Seal()
- .Build();
+ .Seal()
+ .Build();
- udfSetCreate[distinctField->Content()] = udfSetCreateValue;
- auto resourceType = ctx.Builder(node->Pos())
- .Callable("TypeOf")
- .Callable(0, "Apply")
- .Add(0, udfSetCreateValue)
- .Callable(1, "InstanceOf")
- .Add(0, expandedValueType)
- .Seal()
- .Callable(2, "Uint32")
- .Atom(0, "0", TNodeFlags::Default)
- .Seal()
+ UdfSetCreate[distinctField->Content()] = udfSetCreateValue;
+ auto resourceType = Ctx.Builder(Node->Pos())
+ .Callable("TypeOf")
+ .Callable(0, "Apply")
+ .Add(0, udfSetCreateValue)
+ .Callable(1, "InstanceOf")
+ .Add(0, expandedValueType)
+ .Seal()
+ .Callable(2, "Uint32")
+ .Atom(0, "0", TNodeFlags::Default)
.Seal()
.Seal()
- .Build();
+ .Seal()
+ .Build();
- udfAddValue[distinctField->Content()] = ctx.Builder(node->Pos())
- .Callable("Udf")
- .Atom(0, "Set.AddValue")
- .Callable(1, "Void").Seal()
- .Callable(2, "TupleType")
- .Callable(0, "TupleType")
- .Add(0, resourceType)
- .Add(1, expandedValueType)
- .Seal()
- .Callable(1, "StructType").Seal()
- .Add(2, expandedValueType)
+ UdfAddValue[distinctField->Content()] = Ctx.Builder(Node->Pos())
+ .Callable("Udf")
+ .Atom(0, "Set.AddValue")
+ .Callable(1, "Void").Seal()
+ .Callable(2, "TupleType")
+ .Callable(0, "TupleType")
+ .Add(0, resourceType)
+ .Add(1, expandedValueType)
.Seal()
+ .Callable(1, "StructType").Seal()
+ .Add(2, expandedValueType)
.Seal()
- .Build();
+ .Seal()
+ .Build();
- udfWasChanged[distinctField->Content()] = ctx.Builder(node->Pos())
- .Callable("Udf")
- .Atom(0, "Set.WasChanged")
- .Callable(1, "Void").Seal()
- .Callable(2, "TupleType")
- .Callable(0, "TupleType")
- .Add(0, resourceType)
- .Seal()
- .Callable(1, "StructType").Seal()
- .Add(2, expandedValueType)
+ UdfWasChanged[distinctField->Content()] = Ctx.Builder(Node->Pos())
+ .Callable("Udf")
+ .Atom(0, "Set.WasChanged")
+ .Callable(1, "Void").Seal()
+ .Callable(2, "TupleType")
+ .Callable(0, "TupleType")
+ .Add(0, resourceType)
.Seal()
+ .Callable(1, "StructType").Seal()
+ .Add(2, expandedValueType)
.Seal()
- .Build();
+ .Seal()
+ .Build();
- auto distinctKeyExtractor = ctx.Builder(node->Pos())
- .Lambda()
- .Param("item")
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- if (keyColumns->ChildrenSize() != 0) {
- auto listBuilder = parent.List();
- ui32 pos = 0;
- for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) {
- listBuilder
- .Callable(pos++, "Member")
- .Arg(0, "item")
- .Add(1, keyColumns->ChildPtr(i))
- .Seal();
- }
+ auto distinctKeyExtractor = Ctx.Builder(Node->Pos())
+ .Lambda()
+ .Param("item")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ if (KeyColumns->ChildrenSize() != 0) {
+ auto listBuilder = parent.List();
+ ui32 pos = 0;
+ for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) {
listBuilder
- .Callable(pos, "Member")
- .Arg(0, "item")
- .Add(1, distinctField)
- .Seal();
-
- return listBuilder.Seal();
- } else {
- return parent
- .Callable("Member")
+ .Callable(pos++, "Member")
.Arg(0, "item")
- .Add(1, distinctField)
+ .Add(1, KeyColumns->ChildPtr(i))
.Seal();
}
- })
- .Seal()
- .Build();
+ listBuilder
+ .Callable(pos, "Member")
+ .Arg(0, "item")
+ .Add(1, distinctField)
+ .Seal();
- const TTypeAnnotationNode* distinctPickleType = nullptr;
- TExprNode::TPtr distinctPickleTypeNode;
- if (needDistinctPickle) {
- distinctPickleType = keyColumns->ChildrenSize() > 0 ? ctx.MakeType<TTupleExprType>(distinctKeyItemTypes) : distinctKeyItemTypes.front();
- distinctPickleTypeNode = ExpandType(node->Pos(), *distinctPickleType, ctx);
- }
+ return listBuilder.Seal();
+ } else {
+ return parent
+ .Callable("Member")
+ .Arg(0, "item")
+ .Add(1, distinctField)
+ .Seal();
+ }
+ })
+ .Seal()
+ .Build();
- if (needDistinctPickle) {
- distinctKeyExtractor = ctx.Builder(node->Pos())
- .Lambda()
- .Param("item")
- .Callable("StablePickle")
- .Apply(0, *distinctKeyExtractor).With(0, "item").Seal()
- .Seal()
- .Seal()
- .Build();
- }
+ const TTypeAnnotationNode* distinctPickleType = nullptr;
+ TExprNode::TPtr distinctPickleTypeNode;
+ if (needDistinctPickle) {
+ distinctPickleType = KeyColumns->ChildrenSize() > 0 ? Ctx.MakeType<TTupleExprType>(distinctKeyItemTypes) : distinctKeyItemTypes.front();
+ distinctPickleTypeNode = ExpandType(Node->Pos(), *distinctPickleType, Ctx);
+ }
- auto distinctCombineInit = ctx.Builder(node->Pos())
+ if (needDistinctPickle) {
+ distinctKeyExtractor = Ctx.Builder(Node->Pos())
.Lambda()
- .Param("key")
.Param("item")
- .Callable("AsStruct")
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- ui32 ndx = 0;
- for (ui32 i: indicies) {
- auto trait = traits[i];
- auto initLambda = trait->Child(1);
- if (initLambda->Head().ChildrenSize() == 1) {
- parent.List(ndx++)
- .Add(0, initialColumnNames[i])
- .Apply(1, *initLambda)
- .With(0)
- .Callable("Member")
- .Arg(0, "item")
- .Add(1, distinctField)
- .Seal()
- .Done()
- .Seal()
- .Seal();
- } else {
- parent.List(ndx++)
- .Add(0, initialColumnNames[i])
- .Apply(1, *initLambda)
- .With(0)
- .Callable("Member")
- .Arg(0, "item")
- .Add(1, distinctField)
- .Seal()
- .Done()
- .With(1)
- .Callable("Uint32")
- .Atom(0, ToString(i), TNodeFlags::Default)
- .Seal()
- .Done()
- .Seal()
- .Seal();
- }
- }
- return parent;
- })
+ .Callable("StablePickle")
+ .Apply(0, *distinctKeyExtractor).With(0, "item").Seal()
.Seal()
.Seal()
.Build();
+ }
- auto distinctCombineUpdate = ctx.Builder(node->Pos())
- .Lambda()
- .Param("key")
- .Param("item")
- .Param("state")
- .Arg("state")
+ auto distinctCombineInit = Ctx.Builder(Node->Pos())
+ .Lambda()
+ .Param("key")
+ .Param("item")
+ .Callable("AsStruct")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ ui32 ndx = 0;
+ for (ui32 i: indicies) {
+ auto trait = Traits[i];
+ auto initLambda = trait->Child(1);
+ if (initLambda->Head().ChildrenSize() == 1) {
+ parent.List(ndx++)
+ .Add(0, InitialColumnNames[i])
+ .Apply(1, *initLambda)
+ .With(0)
+ .Callable("Member")
+ .Arg(0, "item")
+ .Add(1, distinctField)
+ .Seal()
+ .Done()
+ .Seal()
+ .Seal();
+ } else {
+ parent.List(ndx++)
+ .Add(0, InitialColumnNames[i])
+ .Apply(1, *initLambda)
+ .With(0)
+ .Callable("Member")
+ .Arg(0, "item")
+ .Add(1, distinctField)
+ .Seal()
+ .Done()
+ .With(1)
+ .Callable("Uint32")
+ .Atom(0, ToString(i), TNodeFlags::Default)
+ .Seal()
+ .Done()
+ .Seal()
+ .Seal();
+ }
+ }
+ return parent;
+ })
.Seal()
- .Build();
+ .Seal()
+ .Build();
- ui32 ndx = 0;
- auto distinctCombineSave = ctx.Builder(node->Pos())
- .Lambda()
- .Param("key")
- .Param("state")
- .Callable("Just")
- .Callable(0, "AsStruct")
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- for (ui32 i: indicies) {
- auto trait = traits[i];
- auto saveLambda = trait->Child(3);
- parent.List(ndx++)
- .Add(0, initialColumnNames[i])
- .Apply(1, *saveLambda)
- .With(0)
- .Callable("Member")
- .Arg(0, "state")
- .Add(1, initialColumnNames[i])
- .Seal()
- .Done()
- .Seal()
- .Seal();
- }
- return parent;
- })
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- if (keyColumns->ChildrenSize() > 0) {
- if (needDistinctPickle) {
- ui32 pos = 0;
- for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) {
- parent.List(ndx++)
- .Add(0, keyColumns->ChildPtr(i))
- .Callable(1, "Nth")
- .Callable(0, "Unpickle")
- .Add(0, distinctPickleTypeNode)
- .Arg(1, "key")
- .Seal()
- .Atom(1, ToString(pos++), TNodeFlags::Default)
- .Seal()
- .Seal();
- }
+ auto distinctCombineUpdate = Ctx.Builder(Node->Pos())
+ .Lambda()
+ .Param("key")
+ .Param("item")
+ .Param("state")
+ .Arg("state")
+ .Seal()
+ .Build();
+
+ ui32 ndx = 0;
+ auto distinctCombineSave = Ctx.Builder(Node->Pos())
+ .Lambda()
+ .Param("key")
+ .Param("state")
+ .Callable("Just")
+ .Callable(0, "AsStruct")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ for (ui32 i: indicies) {
+ auto trait = Traits[i];
+ auto saveLambda = trait->Child(3);
+ parent.List(ndx++)
+ .Add(0, InitialColumnNames[i])
+ .Apply(1, *saveLambda)
+ .With(0)
+ .Callable("Member")
+ .Arg(0, "state")
+ .Add(1, InitialColumnNames[i])
+ .Seal()
+ .Done()
+ .Seal()
+ .Seal();
+ }
+ return parent;
+ })
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ if (KeyColumns->ChildrenSize() > 0) {
+ if (needDistinctPickle) {
+ ui32 pos = 0;
+ for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) {
parent.List(ndx++)
- .Add(0, distinctField)
+ .Add(0, KeyColumns->ChildPtr(i))
.Callable(1, "Nth")
- .Callable(0, "Unpickle")
- .Add(0, distinctPickleTypeNode)
- .Arg(1, "key")
- .Seal()
+ .Callable(0, "Unpickle")
+ .Add(0, distinctPickleTypeNode)
+ .Arg(1, "key")
+ .Seal()
.Atom(1, ToString(pos++), TNodeFlags::Default)
.Seal()
- .Seal();
-
- } else {
- ui32 pos = 0;
- for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) {
- parent.List(ndx++)
- .Add(0, keyColumns->ChildPtr(i))
- .Callable(1, "Nth")
- .Arg(0, "key")
- .Atom(1, ToString(pos++), TNodeFlags::Default)
+ .Seal();
+ }
+ parent.List(ndx++)
+ .Add(0, distinctField)
+ .Callable(1, "Nth")
+ .Callable(0, "Unpickle")
+ .Add(0, distinctPickleTypeNode)
+ .Arg(1, "key")
.Seal()
- .Seal();
- }
+ .Atom(1, ToString(pos++), TNodeFlags::Default)
+ .Seal()
+ .Seal();
+
+ } else {
+ ui32 pos = 0;
+ for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) {
parent.List(ndx++)
- .Add(0, distinctField)
+ .Add(0, KeyColumns->ChildPtr(i))
.Callable(1, "Nth")
.Arg(0, "key")
.Atom(1, ToString(pos++), TNodeFlags::Default)
.Seal()
- .Seal();
+ .Seal();
}
- } else {
- if (needDistinctPickle) {
- parent.List(ndx++)
- .Add(0, distinctField)
- .Callable(1, "Unpickle")
- .Add(0, distinctPickleTypeNode)
- .Arg(1, "key")
- .Seal()
- .Seal();
- } else {
- parent.List(ndx++)
- .Add(0, distinctField)
+ parent.List(ndx++)
+ .Add(0, distinctField)
+ .Callable(1, "Nth")
+ .Arg(0, "key")
+ .Atom(1, ToString(pos++), TNodeFlags::Default)
+ .Seal()
+ .Seal();
+ }
+ } else {
+ if (needDistinctPickle) {
+ parent.List(ndx++)
+ .Add(0, distinctField)
+ .Callable(1, "Unpickle")
+ .Add(0, distinctPickleTypeNode)
.Arg(1, "key")
- .Seal();
- }
+ .Seal()
+ .Seal();
+ } else {
+ parent.List(ndx++)
+ .Add(0, distinctField)
+ .Arg(1, "key")
+ .Seal();
}
- return parent;
- })
- .Seal()
+ }
+ return parent;
+ })
.Seal()
.Seal()
- .Build();
+ .Seal()
+ .Build();
- auto distinctCombiner = ctx.Builder(node->Pos())
- .Callable("CombineByKey")
- .Add(0, list)
- .Add(1, preMap)
- .Add(2, distinctKeyExtractor)
- .Add(3, std::move(distinctCombineInit))
- .Add(4, std::move(distinctCombineUpdate))
- .Add(5, std::move(distinctCombineSave))
- .Seal()
- .Build();
+ auto distinctCombiner = Ctx.Builder(Node->Pos())
+ .Callable("CombineByKey")
+ .Add(0, AggList)
+ .Add(1, PreMap)
+ .Add(2, distinctKeyExtractor)
+ .Add(3, std::move(distinctCombineInit))
+ .Add(4, std::move(distinctCombineUpdate))
+ .Add(5, std::move(distinctCombineSave))
+ .Seal()
+ .Build();
- auto distinctGrouper = ctx.Builder(node->Pos())
- .Callable("PartitionsByKeys")
- .Add(0, std::move(distinctCombiner))
- .Add(1, distinctKeyExtractor)
- .Callable(2, "Void").Seal()
- .Callable(3, "Void").Seal()
- .Lambda(4)
- .Param("groups")
- .Callable("Map")
- .Callable(0, "Condense1")
- .Arg(0, "groups")
- .Lambda(1)
- .Param("item")
- .Arg("item")
- .Seal()
- .Lambda(2)
- .Param("item")
- .Param("state")
- .Callable("IsKeySwitch")
- .Arg(0, "item")
- .Arg(1, "state")
- .Add(2, distinctKeyExtractor)
- .Add(3, distinctKeyExtractor)
- .Seal()
- .Seal()
- .Lambda(3)
- .Param("item")
- .Param("state")
- .Arg("item")
+ auto distinctGrouper = Ctx.Builder(Node->Pos())
+ .Callable("PartitionsByKeys")
+ .Add(0, std::move(distinctCombiner))
+ .Add(1, distinctKeyExtractor)
+ .Callable(2, "Void").Seal()
+ .Callable(3, "Void").Seal()
+ .Lambda(4)
+ .Param("groups")
+ .Callable("Map")
+ .Callable(0, "Condense1")
+ .Arg(0, "groups")
+ .Lambda(1)
+ .Param("item")
+ .Arg("item")
+ .Seal()
+ .Lambda(2)
+ .Param("item")
+ .Param("state")
+ .Callable("IsKeySwitch")
+ .Arg(0, "item")
+ .Arg(1, "state")
+ .Add(2, distinctKeyExtractor)
+ .Add(3, distinctKeyExtractor)
.Seal()
.Seal()
- .Lambda(1)
+ .Lambda(3)
+ .Param("item")
.Param("state")
- .Callable("AsStruct")
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- for (ui32 i = 0; i < initialColumnNames.size(); ++i) {
- if (indicies.find(i) != indicies.end()) {
- parent.List(i)
- .Add(0, initialColumnNames[i])
- .Callable(1, "Just")
- .Callable(0, "Member")
- .Arg(0, "state")
- .Add(1, initialColumnNames[i])
- .Seal()
+ .Arg("item")
+ .Seal()
+ .Seal()
+ .Lambda(1)
+ .Param("state")
+ .Callable("AsStruct")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ for (ui32 i = 0; i < InitialColumnNames.size(); ++i) {
+ if (indicies.find(i) != indicies.end()) {
+ parent.List(i)
+ .Add(0, InitialColumnNames[i])
+ .Callable(1, "Just")
+ .Callable(0, "Member")
+ .Arg(0, "state")
+ .Add(1, InitialColumnNames[i])
.Seal()
- .Seal();
- } else {
- parent.List(i)
- .Add(0, initialColumnNames[i])
- .Add(1, nothingStates[i])
- .Seal();
- }
+ .Seal()
+ .Seal();
+ } else {
+ parent.List(i)
+ .Add(0, InitialColumnNames[i])
+ .Add(1, NothingStates[i])
+ .Seal();
}
- return parent;
- })
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- if (keyColumns->ChildrenSize() > 0) {
- for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) {
- parent.List(initialColumnNames.size() + i)
- .Add(0, keyColumns->ChildPtr(i))
- .Callable(1, "Member")
- .Arg(0, "state")
- .Add(1, keyColumns->ChildPtr(i))
- .Seal().Seal();
- }
+ }
+ return parent;
+ })
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ if (KeyColumns->ChildrenSize() > 0) {
+ for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) {
+ parent.List(InitialColumnNames.size() + i)
+ .Add(0, KeyColumns->ChildPtr(i))
+ .Callable(1, "Member")
+ .Arg(0, "state")
+ .Add(1, KeyColumns->ChildPtr(i))
+ .Seal().Seal();
}
- return parent;
- })
- .Seal()
+ }
+ return parent;
+ })
.Seal()
.Seal()
.Seal()
.Seal()
- .Build();
-
- if (!groupInput) {
- groupInput = std::move(distinctGrouper);
- } else {
- groupInput = ctx.Builder(node->Pos())
- .Callable("Extend")
- .Add(0, std::move(groupInput))
- .Add(1, std::move(distinctGrouper))
- .Seal()
- .Build();
- }
- }
+ .Seal()
+ .Build();
+ return distinctGrouper;
+}
- // If no aggregation functions than add addional combiner
- if (aggregatedColumns->ChildrenSize() == 0 && keyColumns->ChildrenSize() > 0 && !sessionUpdate) {
- // Return key as-is
- auto uniqCombineInit = ctx.Builder(node->Pos())
+TExprNode::TPtr TAggregateExpander::ReturnKeyAsIsForCombineInit(const TExprNode::TPtr& pickleTypeNode)
+{
+ return Ctx.Builder(Node->Pos())
.Lambda()
.Param("key")
.Param("item")
.Callable("AsStruct")
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
ui32 pos = 0;
- for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) {
+ for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) {
auto listBuilder = parent.List(i);
- listBuilder.Add(0, keyColumns->Child(i));
- if (keyColumns->ChildrenSize() > 1) {
- if (needPickle) {
+ listBuilder.Add(0, KeyColumns->Child(i));
+ if (KeyColumns->ChildrenSize() > 1) {
+ if (pickleTypeNode) {
listBuilder
.Callable(1, "Nth")
.Callable(0, "Unpickle")
@@ -897,7 +958,7 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T
.Seal();
}
} else {
- if (needPickle) {
+ if (pickleTypeNode) {
listBuilder.Callable(1, "Unpickle")
.Add(0, pickleTypeNode)
.Arg(1, "key")
@@ -913,97 +974,172 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T
.Seal()
.Seal()
.Build();
+}
- auto uniqCombineUpdate = ctx.Builder(node->Pos())
+TExprNode::TPtr TAggregateExpander::GeneratePostAggregate(const TExprNode::TPtr& preAgg, const TExprNode::TPtr& keyExtractor)
+{
+ auto preprocessLambda = GeneratePreprocessLambda(keyExtractor);
+ auto condenseSwitch = GenerateCondenseSwitch(keyExtractor);
+ auto postAgg = Ctx.Builder(Node->Pos())
+ .Callable("PartitionsByKeys")
+ .Add(0, std::move(preAgg))
+ .Add(1, keyExtractor)
+ .Add(2, SortParams.Order)
+ .Add(3, SortParams.Key)
+ .Lambda(4)
+ .Param("stream")
+ .Apply(GetContextLambda())
+ .With(0)
+ .Callable("Map")
+ .Callable(0, "Condense1")
+ .Apply(0, preprocessLambda)
+ .With(0, "stream")
+ .Seal()
+ .Add(1, GeneratePostAggregateInitPhase())
+ .Add(2, condenseSwitch)
+ .Add(3, GeneratePostAggregateMergePhase())
+ .Seal()
+ .Add(1, GeneratePostAggregateSavePhase())
+ .Seal()
+ .Done()
+ .Seal()
+ .Seal()
+ .Seal().Build();
+
+ if (KeyColumns->ChildrenSize() == 0 && !HaveSessionSetting) {
+ return MakeSingleGroupRow(*Node, postAgg, Ctx);
+ }
+
+ return postAgg;
+}
+
+TExprNode::TPtr TAggregateExpander::GeneratePreprocessLambda(const TExprNode::TPtr& keyExtractor)
+{
+ TExprNode::TPtr preprocessLambda;
+ if (SessionWindowParams.Update) {
+ YQL_ENSURE(EffectiveCompact);
+ YQL_ENSURE(SessionWindowParams.Key);
+ YQL_ENSURE(SessionWindowParams.KeyType);
+ YQL_ENSURE(SessionWindowParams.Init);
+
+ preprocessLambda = AddSessionParamsMemberLambda(Node->Pos(), SessionStartMemberName, "", keyExtractor,
+ SessionWindowParams.Key, SessionWindowParams.Init, SessionWindowParams.Update, Ctx);
+ } else {
+ YQL_ENSURE(!SessionWindowParams.Key);
+ preprocessLambda = MakeIdentityLambda(Node->Pos(), Ctx);
+ }
+ return preprocessLambda;
+}
+
+TExprNode::TPtr TAggregateExpander::GenerateCondenseSwitch(const TExprNode::TPtr& keyExtractor)
+{
+ TExprNode::TPtr condenseSwitch;
+ if (SessionWindowParams.Update) {
+ YQL_ENSURE(EffectiveCompact);
+ YQL_ENSURE(SessionWindowParams.Key);
+ YQL_ENSURE(SessionWindowParams.KeyType);
+ YQL_ENSURE(SessionWindowParams.Init);
+
+ condenseSwitch = Ctx.Builder(Node->Pos())
.Lambda()
- .Param("key")
.Param("item")
.Param("state")
- .Arg("state")
+ .Callable("Or")
+ .Callable(0, "AggrNotEquals")
+ .Apply(0, keyExtractor)
+ .With(0, "item")
+ .Seal()
+ .Apply(1, keyExtractor)
+ .With(0, "state")
+ .Seal()
+ .Seal()
+ .Callable(1, "AggrNotEquals")
+ .Callable(0, "Member")
+ .Arg(0, "item")
+ .Atom(1, SessionStartMemberName)
+ .Seal()
+ .Callable(1, "Member")
+ .Arg(0, "state")
+ .Atom(1, SessionStartMemberName)
+ .Seal()
+ .Seal()
+ .Seal()
.Seal()
.Build();
-
- // Return state as-is
- auto uniqCombineSave = ctx.Builder(node->Pos())
+ } else {
+ YQL_ENSURE(!SessionWindowParams.Key);
+ condenseSwitch = Ctx.Builder(Node->Pos())
.Lambda()
- .Param("key")
+ .Param("item")
.Param("state")
- .Callable("Just")
- .Arg(0, "state")
+ .Callable("IsKeySwitch")
+ .Arg(0, "item")
+ .Arg(1, "state")
+ .Add(2, keyExtractor)
+ .Add(3, keyExtractor)
.Seal()
.Seal()
.Build();
-
- if (!groupInput) {
- groupInput = list;
- }
-
- groupInput = ctx.Builder(node->Pos())
- .Callable("CombineByKey")
- .Add(0, std::move(groupInput))
- .Add(1, preMap)
- .Add(2, keyExtractor)
- .Add(3, std::move(uniqCombineInit))
- .Add(4, std::move(uniqCombineUpdate))
- .Add(5, std::move(uniqCombineSave))
- .Seal()
- .Build();
}
+ return condenseSwitch;
+}
+TExprNode::TPtr TAggregateExpander::GeneratePostAggregateInitPhase()
+{
ui32 index = 0U;
- auto groupInit = ctx.Builder(node->Pos())
+ return Ctx.Builder(Node->Pos())
.Lambda()
.Param("item")
.Callable("AsStruct")
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) {
+ for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) {
parent
.List(index++)
- .Add(0, keyColumns->ChildPtr(i))
+ .Add(0, KeyColumns->ChildPtr(i))
.Callable(1, "Member")
.Arg(0, "item")
- .Add(1, keyColumns->ChildPtr(i))
+ .Add(1, KeyColumns->ChildPtr(i))
.Seal()
.Seal();
}
- if (sessionUpdate) {
+ if (SessionWindowParams.Update) {
parent
.List(index++)
- .Atom(0, sessionStartMemberName)
+ .Atom(0, SessionStartMemberName)
.Callable(1, "Member")
.Arg(0, "item")
- .Atom(1, sessionStartMemberName)
+ .Atom(1, SessionStartMemberName)
.Seal()
.Seal();
}
return parent;
})
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- for (ui32 i = 0; i < initialColumnNames.size(); ++i) {
- auto child = aggregatedColumns->Child(i);
- auto trait = traits[i];
- if (!compact) {
+ for (ui32 i = 0; i < InitialColumnNames.size(); ++i) {
+ auto child = AggregatedColumns->Child(i);
+ auto trait = Traits[i];
+ if (!EffectiveCompact) {
auto loadLambda = trait->Child(4);
- if (!distinctFields.empty()) {
+ if (!DistinctFields.empty()) {
parent.List(index++)
- .Add(0, initialColumnNames[i])
+ .Add(0, InitialColumnNames[i])
.Callable(1, "Map")
.Callable(0, "Member")
.Arg(0, "item")
- .Add(1, initialColumnNames[i])
+ .Add(1, InitialColumnNames[i])
.Seal()
.Add(1, loadLambda)
.Seal()
.Seal();
} else {
parent.List(index++)
- .Add(0, initialColumnNames[i])
+ .Add(0, InitialColumnNames[i])
.Apply(1, *loadLambda)
.With(0)
.Callable("Member")
.Arg(0, "item")
- .Add(1, initialColumnNames[i])
+ .Add(1, InitialColumnNames[i])
.Seal()
.Done()
.Seal();
@@ -1025,7 +1161,7 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T
parent
.Callable("CastStruct")
.Arg(0, "item")
- .Add(1, ExpandType(node->Pos(), *initLambda->Head().Head().GetTypeAnn(), ctx))
+ .Add(1, ExpandType(Node->Pos(), *initLambda->Head().Head().GetTypeAnn(), Ctx))
.Seal();
}
@@ -1049,16 +1185,16 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T
};
if (distinctField) {
- const bool isFirst = *distinct2Columns[distinctField->Content()].begin() == i;
+ const bool isFirst = *Distinct2Columns[distinctField->Content()].begin() == i;
if (isFirst) {
parent.List(index++)
- .Add(0, initialColumnNames[i])
+ .Add(0, InitialColumnNames[i])
.List(1)
.Callable(0, "NamedApply")
- .Add(0, udfSetCreate[distinctField->Content()])
+ .Add(0, UdfSetCreate[distinctField->Content()])
.List(1)
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- if (!distinctFieldNeedsPickle[distinctField->Content()]) {
+ if (!DistinctFieldNeedsPickle[distinctField->Content()]) {
parent.Callable(0, "Member")
.Arg(0, "item")
.Add(1, distinctField)
@@ -1090,13 +1226,13 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T
.Seal();
} else {
parent.List(index++)
- .Add(0, initialColumnNames[i])
+ .Add(0, InitialColumnNames[i])
.Do(initApply)
.Seal();
}
} else {
parent.List(index++)
- .Add(0, initialColumnNames[i])
+ .Add(0, InitialColumnNames[i])
.Do(initApply)
.Seal();
}
@@ -1107,72 +1243,205 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T
.Seal()
.Seal()
.Build();
+}
+
+TExprNode::TPtr TAggregateExpander::GeneratePostAggregateSavePhase()
+{
+ ui32 index = 0U;
+ return Ctx.Builder(Node->Pos())
+ .Lambda()
+ .Param("state")
+ .Callable("AsStruct")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) {
+ if (KeyColumns->Child(i)->Content() == SessionStartMemberName) {
+ continue;
+ }
+ parent
+ .List(index++)
+ .Add(0, KeyColumns->ChildPtr(i))
+ .Callable(1, "Member")
+ .Arg(0, "state")
+ .Add(1, KeyColumns->ChildPtr(i))
+ .Seal()
+ .Seal();
+ }
+
+ if (SessionOutputColumn) {
+ parent
+ .List(index++)
+ .Atom(0, *SessionOutputColumn)
+ .Callable(1, "Member")
+ .Arg(0, "state")
+ .Atom(1, SessionStartMemberName)
+ .Seal()
+ .Seal();
+ }
+ return parent;
+ })
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ for (ui32 i = 0; i < InitialColumnNames.size(); ++i) {
+ auto child = AggregatedColumns->Child(i);
+ auto trait = Traits[i];
+ auto finishLambda = trait->Child(6);
+
+ if (!EffectiveCompact && !DistinctFields.empty()) {
+ if (child->Head().IsAtom()) {
+ parent.List(index++)
+ .Add(0, FinalColumnNames[i])
+ .Callable(1, "Unwrap")
+ .Callable(0, "Map")
+ .Callable(0, "Member")
+ .Arg(0, "state")
+ .Add(1, InitialColumnNames[i])
+ .Seal()
+ .Add(1, finishLambda)
+ .Seal()
+ .Seal()
+ .Seal();
+ } else {
+ const auto& multiFields = child->Child(0);
+ for (ui32 field = 0; field < multiFields->ChildrenSize(); ++field) {
+ parent.List(index++)
+ .Atom(0, multiFields->Child(field)->Content())
+ .Callable(1, "Nth")
+ .Callable(0, "Unwrap")
+ .Callable(0, "Map")
+ .Callable(0, "Member")
+ .Arg(0, "state")
+ .Add(1, InitialColumnNames[i])
+ .Seal()
+ .Add(1, finishLambda)
+ .Seal()
+ .Seal()
+ .Atom(1, ToString(field), TNodeFlags::Default)
+ .Seal()
+ .Seal();
+ }
+ }
+ } else {
+ auto distinctField = (child->ChildrenSize() == 3) ? child->Child(2) : nullptr;
+ auto stateExtractor = [&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ const bool isFirst = distinctField ? (*Distinct2Columns[distinctField->Content()].begin() == i) : false;
+ if (distinctField && isFirst) {
+ parent.Callable("Nth")
+ .Callable(0, "Member")
+ .Arg(0, "state")
+ .Add(1, InitialColumnNames[i])
+ .Seal()
+ .Atom(1, "1", TNodeFlags::Default)
+ .Seal();
+ } else {
+ parent.Callable("Member")
+ .Arg(0, "state")
+ .Add(1, InitialColumnNames[i])
+ .Seal();
+ }
+
+ return parent;
+ };
+
+ if (child->Head().IsAtom()) {
+ parent.List(index++)
+ .Add(0, FinalColumnNames[i])
+ .Apply(1, *finishLambda)
+ .With(0)
+ .Do(stateExtractor)
+ .Done()
+ .Seal()
+ .Seal();
+ } else {
+ const auto& multiFields = child->Head();
+ for (ui32 field = 0; field < multiFields.ChildrenSize(); ++field) {
+ parent.List(index++)
+ .Atom(0, multiFields.Child(field)->Content())
+ .Callable(1, "Nth")
+ .Apply(0, *finishLambda)
+ .With(0)
+ .Do(stateExtractor)
+ .Done()
+ .Seal()
+ .Atom(1, ToString(field), TNodeFlags::Default)
+ .Seal()
+ .Seal();
+ }
+ }
+ }
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Build();
+}
- index = 0;
- auto groupMerge = ctx.Builder(node->Pos())
+TExprNode::TPtr TAggregateExpander::GeneratePostAggregateMergePhase()
+{
+ ui32 index = 0U;
+ return Ctx.Builder(Node->Pos())
.Lambda()
.Param("item")
.Param("state")
.Callable("AsStruct")
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) {
+ for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) {
parent
.List(index++)
- .Add(0, keyColumns->ChildPtr(i))
+ .Add(0, KeyColumns->ChildPtr(i))
.Callable(1, "Member")
.Arg(0, "state")
- .Add(1, keyColumns->ChildPtr(i))
+ .Add(1, KeyColumns->ChildPtr(i))
.Seal()
.Seal();
}
- if (sessionUpdate) {
+ if (SessionWindowParams.Update) {
parent
.List(index++)
- .Atom(0, sessionStartMemberName)
+ .Atom(0, SessionStartMemberName)
.Callable(1, "Member")
.Arg(0, "state")
- .Atom(1, sessionStartMemberName)
+ .Atom(1, SessionStartMemberName)
.Seal()
.Seal();
}
return parent;
})
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- for (ui32 i = 0; i < initialColumnNames.size(); ++i) {
- auto child = aggregatedColumns->Child(i);
- auto trait = traits[i];
- if (!compact) {
+ for (ui32 i = 0; i < InitialColumnNames.size(); ++i) {
+ auto child = AggregatedColumns->Child(i);
+ auto trait = Traits[i];
+ if (!EffectiveCompact) {
auto loadLambda = trait->Child(4);
auto mergeLambda = trait->Child(5);
- if (!distinctFields.empty()) {
+ if (!DistinctFields.empty()) {
parent.List(index++)
- .Add(0, initialColumnNames[i])
+ .Add(0, InitialColumnNames[i])
.Callable(1, "OptionalReduce")
.Callable(0, "Map")
.Callable(0, "Member")
.Arg(0, "item")
- .Add(1, initialColumnNames[i])
+ .Add(1, InitialColumnNames[i])
.Seal()
.Add(1, loadLambda)
.Seal()
.Callable(1, "Member")
.Arg(0, "state")
- .Add(1, initialColumnNames[i])
+ .Add(1, InitialColumnNames[i])
.Seal()
.Add(2, mergeLambda)
.Seal()
.Seal();
} else {
parent.List(index++)
- .Add(0, initialColumnNames[i])
+ .Add(0, InitialColumnNames[i])
.Apply(1, *mergeLambda)
.With(0)
.Apply(*loadLambda)
.With(0)
.Callable("Member")
.Arg(0, "item")
- .Add(1, initialColumnNames[i])
+ .Add(1, InitialColumnNames[i])
.Seal()
.Done()
.Seal()
@@ -1180,7 +1449,7 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T
.With(1)
.Callable("Member")
.Arg(0, "state")
- .Add(1, initialColumnNames[i])
+ .Add(1, InitialColumnNames[i])
.Seal()
.Done()
.Seal()
@@ -1189,7 +1458,7 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T
} else {
auto updateLambda = trait->Child(2);
auto distinctField = (child->ChildrenSize() == 3) ? child->Child(2) : nullptr;
- const bool isFirst = distinctField ? (*distinct2Columns[distinctField->Content()].begin() == i) : false;
+ const bool isFirst = distinctField ? (*Distinct2Columns[distinctField->Content()].begin() == i) : false;
auto updateApply = [&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
parent.Apply(1, *updateLambda)
.With(0)
@@ -1204,7 +1473,7 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T
parent
.Callable("CastStruct")
.Arg(0, "item")
- .Add(1, ExpandType(node->Pos(), *updateLambda->Head().Head().GetTypeAnn(), ctx))
+ .Add(1, ExpandType(Node->Pos(), *updateLambda->Head().Head().GetTypeAnn(), Ctx))
.Seal();
}
@@ -1217,14 +1486,14 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T
parent.Callable("Nth")
.Callable(0, "Member")
.Arg(0, "state")
- .Add(1, initialColumnNames[i])
+ .Add(1, InitialColumnNames[i])
.Seal()
.Atom(1, "1", TNodeFlags::Default)
.Seal();
} else {
parent.Callable("Member")
.Arg(0, "state")
- .Add(1, initialColumnNames[i])
+ .Add(1, InitialColumnNames[i])
.Seal();
}
@@ -1249,21 +1518,21 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T
};
if (distinctField) {
- auto distinctIndex = *distinct2Columns[distinctField->Content()].begin();
+ auto distinctIndex = *Distinct2Columns[distinctField->Content()].begin();
ui32 newValueIndex = 0;
auto newValue = [&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
parent.Callable(newValueIndex, "NamedApply")
- .Add(0, udfAddValue[distinctField->Content()])
+ .Add(0, UdfAddValue[distinctField->Content()])
.List(1)
.Callable(0, "Nth")
.Callable(0, "Member")
.Arg(0, "state")
- .Add(1, initialColumnNames[distinctIndex])
+ .Add(1, InitialColumnNames[distinctIndex])
.Seal()
.Atom(1, "0", TNodeFlags::Default)
.Seal()
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- if (!distinctFieldNeedsPickle[distinctField->Content()]) {
+ if (!DistinctFieldNeedsPickle[distinctField->Content()]) {
parent.Callable(1, "Member")
.Arg(0, "item")
.Add(1, distinctField)
@@ -1287,23 +1556,23 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T
};
parent.List(index++)
- .Add(0, initialColumnNames[i])
+ .Add(0, InitialColumnNames[i])
.Callable(1, "If")
.Callable(0, "NamedApply")
- .Add(0, udfWasChanged[distinctField->Content()])
+ .Add(0, UdfWasChanged[distinctField->Content()])
.List(1)
.Callable(0, "NamedApply")
- .Add(0, udfAddValue[distinctField->Content()])
+ .Add(0, UdfAddValue[distinctField->Content()])
.List(1)
.Callable(0, "Nth")
.Callable(0, "Member")
.Arg(0, "state")
- .Add(1, initialColumnNames[distinctIndex])
+ .Add(1, InitialColumnNames[distinctIndex])
.Seal()
.Atom(1, "0", TNodeFlags::Default)
.Seal()
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- if (!distinctFieldNeedsPickle[distinctField->Content()]) {
+ if (!DistinctFieldNeedsPickle[distinctField->Content()]) {
parent.Callable(1, "Member")
.Arg(0, "item")
.Add(1, distinctField)
@@ -1339,13 +1608,13 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T
})
.Callable(2, "Member")
.Arg(0, "state")
- .Add(1, initialColumnNames[i])
+ .Add(1, InitialColumnNames[i])
.Seal()
.Seal()
.Seal();
} else {
parent.List(index++)
- .Add(0, initialColumnNames[i])
+ .Add(0, InitialColumnNames[i])
.Do(updateApply)
.Seal();
}
@@ -1356,223 +1625,6 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T
.Seal()
.Seal()
.Build();
-
- index = 0U;
- auto groupSave = ctx.Builder(node->Pos())
- .Lambda()
- .Param("state")
- .Callable("AsStruct")
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) {
- if (keyColumns->Child(i)->Content() == sessionStartMemberName) {
- continue;
- }
- parent
- .List(index++)
- .Add(0, keyColumns->ChildPtr(i))
- .Callable(1, "Member")
- .Arg(0, "state")
- .Add(1, keyColumns->ChildPtr(i))
- .Seal()
- .Seal();
- }
-
- if (sessionOutputColumn) {
- parent
- .List(index++)
- .Atom(0, *sessionOutputColumn)
- .Callable(1, "Member")
- .Arg(0, "state")
- .Atom(1, sessionStartMemberName)
- .Seal()
- .Seal();
- }
- return parent;
- })
- .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- for (ui32 i = 0; i < initialColumnNames.size(); ++i) {
- auto child = aggregatedColumns->Child(i);
- auto trait = traits[i];
- auto finishLambda = trait->Child(6);
-
- if (!compact && !distinctFields.empty()) {
- if (child->Head().IsAtom()) {
- parent.List(index++)
- .Add(0, finalColumnNames[i])
- .Callable(1, "Unwrap")
- .Callable(0, "Map")
- .Callable(0, "Member")
- .Arg(0, "state")
- .Add(1, initialColumnNames[i])
- .Seal()
- .Add(1, finishLambda)
- .Seal()
- .Seal()
- .Seal();
- } else {
- const auto& multiFields = child->Child(0);
- for (ui32 field = 0; field < multiFields->ChildrenSize(); ++field) {
- parent.List(index++)
- .Atom(0, multiFields->Child(field)->Content())
- .Callable(1, "Nth")
- .Callable(0, "Unwrap")
- .Callable(0, "Map")
- .Callable(0, "Member")
- .Arg(0, "state")
- .Add(1, initialColumnNames[i])
- .Seal()
- .Add(1, finishLambda)
- .Seal()
- .Seal()
- .Atom(1, ToString(field), TNodeFlags::Default)
- .Seal()
- .Seal();
- }
- }
- } else {
- auto distinctField = (child->ChildrenSize() == 3) ? child->Child(2) : nullptr;
- auto stateExtractor = [&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
- const bool isFirst = distinctField ? (*distinct2Columns[distinctField->Content()].begin() == i) : false;
- if (distinctField && isFirst) {
- parent.Callable("Nth")
- .Callable(0, "Member")
- .Arg(0, "state")
- .Add(1, initialColumnNames[i])
- .Seal()
- .Atom(1, "1", TNodeFlags::Default)
- .Seal();
- } else {
- parent.Callable("Member")
- .Arg(0, "state")
- .Add(1, initialColumnNames[i])
- .Seal();
- }
-
- return parent;
- };
-
- if (child->Head().IsAtom()) {
- parent.List(index++)
- .Add(0, finalColumnNames[i])
- .Apply(1, *finishLambda)
- .With(0)
- .Do(stateExtractor)
- .Done()
- .Seal()
- .Seal();
- } else {
- const auto& multiFields = child->Head();
- for (ui32 field = 0; field < multiFields.ChildrenSize(); ++field) {
- parent.List(index++)
- .Atom(0, multiFields.Child(field)->Content())
- .Callable(1, "Nth")
- .Apply(0, *finishLambda)
- .With(0)
- .Do(stateExtractor)
- .Done()
- .Seal()
- .Atom(1, ToString(field), TNodeFlags::Default)
- .Seal()
- .Seal();
- }
- }
- }
- }
- return parent;
- })
- .Seal()
- .Seal()
- .Build();
-
- if (compact || !groupInput) {
- groupInput = std::move(list);
- }
-
- TExprNode::TPtr preprocessLambda;
- TExprNode::TPtr condenseSwitch;
- if (sessionUpdate) {
- YQL_ENSURE(compact);
- YQL_ENSURE(sessionKey);
- YQL_ENSURE(sessionKeyType);
- YQL_ENSURE(sessionInit);
-
- preprocessLambda =
- AddSessionParamsMemberLambda(node->Pos(), sessionStartMemberName, "", keyExtractor, sessionKey, sessionInit, sessionUpdate, ctx);
-
- condenseSwitch = ctx.Builder(node->Pos())
- .Lambda()
- .Param("item")
- .Param("state")
- .Callable("Or")
- .Callable(0, "AggrNotEquals")
- .Apply(0, keyExtractor)
- .With(0, "item")
- .Seal()
- .Apply(1, keyExtractor)
- .With(0, "state")
- .Seal()
- .Seal()
- .Callable(1, "AggrNotEquals")
- .Callable(0, "Member")
- .Arg(0, "item")
- .Atom(1, sessionStartMemberName)
- .Seal()
- .Callable(1, "Member")
- .Arg(0, "state")
- .Atom(1, sessionStartMemberName)
- .Seal()
- .Seal()
- .Seal()
- .Seal()
- .Build();
- } else {
- YQL_ENSURE(!sessionKey);
- preprocessLambda = MakeIdentityLambda(node->Pos(), ctx);
- condenseSwitch = ctx.Builder(node->Pos())
- .Lambda()
- .Param("item")
- .Param("state")
- .Callable("IsKeySwitch")
- .Arg(0, "item")
- .Arg(1, "state")
- .Add(2, keyExtractor)
- .Add(3, keyExtractor)
- .Seal()
- .Seal()
- .Build();
- }
-
- auto grouper = ctx.Builder(node->Pos())
- .Callable("PartitionsByKeys")
- .Add(0, std::move(groupInput))
- .Add(1, keyExtractor)
- .Add(2, sortOrder)
- .Add(3, sortKey)
- .Lambda(4)
- .Param("stream")
- .Apply(contextLambda)
- .With(0)
- .Callable("Map")
- .Callable(0, "Condense1")
- .Apply(0, preprocessLambda)
- .With(0, "stream")
- .Seal()
- .Add(1, std::move(groupInit))
- .Add(2, condenseSwitch)
- .Add(3, std::move(groupMerge))
- .Seal()
- .Add(1, std::move(groupSave))
- .Seal()
- .Done()
- .Seal()
- .Seal()
- .Seal().Build();
-
- if (keyColumns->ChildrenSize() == 0 && !sessionSetting) {
- return MakeSingleGroupRow(*node, grouper, ctx);
- }
-
- return grouper;
}
-}
+} // namespace NYql \ No newline at end of file
diff --git a/ydb/library/yql/core/yql_aggregate_expander.h b/ydb/library/yql/core/yql_aggregate_expander.h
new file mode 100644
index 00000000000..552bdee2229
--- /dev/null
+++ b/ydb/library/yql/core/yql_aggregate_expander.h
@@ -0,0 +1,113 @@
+#pragma once
+#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
+#include <ydb/library/yql/core/yql_opt_utils.h>
+
+#include "yql_type_annotation.h"
+
+namespace NYql {
+
+class TAggregateExpander {
+public:
+ TAggregateExpander(bool allowPickle, const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, bool forceCompact = false, bool compactForDistinct = false)
+ : Node(node)
+ , Ctx(ctx)
+ , TypesCtx(typesCtx)
+ , AllowPickle(allowPickle)
+ , ForceCompact(forceCompact)
+ , CompactForDistinct(compactForDistinct)
+ , AggregatedColumns(nullptr)
+ , VoidNode(ctx.NewCallable(node->Pos(), "Void", {}))
+ , HaveDistinct(false)
+ , EffectiveCompact(false)
+ , HaveSessionSetting(false)
+ , OriginalRowType(nullptr)
+ , RowType(nullptr)
+ {
+ PreMap = Ctx.Builder(node->Pos())
+ .Lambda()
+ .Param("premap")
+ .Callable("Just").Arg(0, "premap").Seal()
+ .Seal().Build();
+ SortParams = {
+ .Key = VoidNode,
+ .Order = VoidNode
+ };
+ }
+
+ TExprNode::TPtr ExpandAggregate();
+
+private:
+ TExprNode::TPtr ExpandAggApply(const TExprNode::TPtr& node);
+ bool CollectTraits();
+ TExprNode::TPtr RebuildAggregate();
+ TExprNode::TPtr GetContextLambda();
+ void ProcessSessionSetting(TExprNode::TPtr sessionSetting);
+ TVector<const TTypeAnnotationNode*> GetKeyItemTypes();
+ bool IsNeedPickle(const TVector<const TTypeAnnotationNode*>& keyItemTypes);
+ TExprNode::TPtr GetKeyExtractor(bool needPickle);
+ void CollectColumnsSpecs();
+
+ // Partial aggregate generation
+ TExprNode::TPtr GeneratePartialAggregate(const TExprNode::TPtr keyExtractor, const TVector<const TTypeAnnotationNode*>& keyItemTypes, bool needPickle);
+ TExprNode::TPtr GeneratePartialAggregateForNonDistinct(const TExprNode::TPtr& keyExtractor, const TExprNode::TPtr& pickleTypeNode);
+
+ TExprNode::TPtr GenerateDistinctGrouper(const TExprNode::TPtr distinctField,
+ const TVector<const TTypeAnnotationNode*>& keyItemTypes, bool needDistinctPickle);
+
+ TExprNode::TPtr ReturnKeyAsIsForCombineInit(const TExprNode::TPtr& pickleTypeNode);
+
+ // Post aggregate
+ TExprNode::TPtr GeneratePostAggregate(const TExprNode::TPtr& preAgg, const TExprNode::TPtr& keyExtractor);
+ TExprNode::TPtr GeneratePreprocessLambda(const TExprNode::TPtr& keyExtractor);
+ TExprNode::TPtr GenerateCondenseSwitch(const TExprNode::TPtr& keyExtractor);
+ TExprNode::TPtr GeneratePostAggregateInitPhase();
+ TExprNode::TPtr GeneratePostAggregateSavePhase();
+ TExprNode::TPtr GeneratePostAggregateMergePhase();
+
+private:
+ static constexpr TStringBuf SessionStartMemberName = "_yql_group_session_start";
+
+ const TExprNode::TPtr Node;
+ TExprContext& Ctx;
+ TTypeAnnotationContext& TypesCtx;
+ bool AllowPickle;
+ bool ForceCompact;
+ bool CompactForDistinct;
+
+ TSessionWindowParams SessionWindowParams;
+ TExprNode::TPtr AggList;
+ TExprNode::TListType Traits;
+ TExprNode::TPtr KeyColumns;
+ TExprNode::TPtr AggregatedColumns;
+ const TExprNode::TPtr VoidNode;
+ TMaybe<TStringBuf> SessionOutputColumn;
+ TSortParams SortParams;
+ bool HaveDistinct;
+ bool EffectiveCompact;
+ bool HaveSessionSetting;
+ const TStructExprType* OriginalRowType;
+ const TStructExprType* RowType;
+ TVector<const TItemExprType*> RowItems;
+ TExprNode::TPtr PreMap;
+
+ TExprNode::TListType InitialColumnNames;
+ TExprNode::TListType FinalColumnNames;
+ TExprNode::TListType DistinctFields;
+ TExprNode::TListType NothingStates;
+
+ using TIdxSet = std::set<ui32>;
+ std::unordered_map<std::string_view, TIdxSet> Distinct2Columns;
+ TIdxSet NonDistinctColumns;
+
+ std::unordered_map<std::string_view, bool> DistinctFieldNeedsPickle;
+ std::unordered_map<std::string_view, TExprNode::TPtr> UdfSetCreate;
+ std::unordered_map<std::string_view, TExprNode::TPtr> UdfAddValue;
+ std::unordered_map<std::string_view, TExprNode::TPtr> UdfWasChanged;
+};
+
+inline TExprNode::TPtr ExpandAggregatePeephole(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {
+ TAggregateExpander aggExpander(false, node, ctx, typesCtx, true);
+ return aggExpander.ExpandAggregate();
+}
+
+} \ No newline at end of file
diff --git a/ydb/library/yql/core/yql_opt_aggregate.h b/ydb/library/yql/core/yql_opt_aggregate.h
deleted file mode 100644
index 7b1c0b23d36..00000000000
--- a/ydb/library/yql/core/yql_opt_aggregate.h
+++ /dev/null
@@ -1,14 +0,0 @@
-#pragma once
-#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
-
-#include "yql_type_annotation.h"
-
-namespace NYql {
-
-TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, bool forceCompact = false, bool compactForDistinct = false);
-inline TExprNode::TPtr ExpandAggregatePeephole(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {
- return ExpandAggregate(false, node, ctx, typesCtx, true);
-}
-
-}
-
diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp
index 642a4a996fd..6acb1359668 100644
--- a/ydb/library/yql/core/yql_opt_utils.cpp
+++ b/ydb/library/yql/core/yql_opt_utils.cpp
@@ -1053,6 +1053,11 @@ TExprNode::TPtr ApplyWithCastStructForFirstArg(const TExprNode::TPtr& node, cons
}
+void ExtractSortKeyAndOrder(TPositionHandle pos, const TExprNode::TPtr& sortTraitsNode, TSortParams& sortParams, TExprContext& ctx)
+{
+ ExtractSortKeyAndOrder(pos, sortTraitsNode, sortParams.Key, sortParams.Order, ctx);
+}
+
void ExtractSortKeyAndOrder(TPositionHandle pos, const TExprNode::TPtr& sortTraitsNode, TExprNode::TPtr& sortKey, TExprNode::TPtr& sortOrder, TExprContext& ctx) {
if (sortTraitsNode->IsCallable("SortTraits")) {
TCoSortTraits sortTraits(sortTraitsNode);
@@ -1066,6 +1071,13 @@ void ExtractSortKeyAndOrder(TPositionHandle pos, const TExprNode::TPtr& sortTrai
}
}
+void ExtractSessionWindowParams(TPositionHandle pos, TSessionWindowParams& sessionParams, TExprContext& ctx)
+{
+ ExtractSessionWindowParams(pos, sessionParams.Traits, sessionParams.Key,
+ sessionParams.KeyType, sessionParams.ParamsType, sessionParams.SortTraits, sessionParams.Init,
+ sessionParams.Update, ctx);
+}
+
void ExtractSessionWindowParams(TPositionHandle pos, const TExprNode::TPtr& sessionTraits, TExprNode::TPtr& sessionKey,
const TTypeAnnotationNode*& sessionKeyType, const TTypeAnnotationNode*& sessionParamsType, TExprNode::TPtr& sessionSortTraits, TExprNode::TPtr& sessionInit,
TExprNode::TPtr& sessionUpdate, TExprContext& ctx)
diff --git a/ydb/library/yql/core/yql_opt_utils.h b/ydb/library/yql/core/yql_opt_utils.h
index 9ed7d1483f2..956bac3b4a9 100644
--- a/ydb/library/yql/core/yql_opt_utils.h
+++ b/ydb/library/yql/core/yql_opt_utils.h
@@ -2,6 +2,7 @@
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/core/yql_graph_transformer.h>
+#include <ydb/library/yql/core/yql_opt_window.h>
#include <util/generic/hash_set.h>
#include <util/generic/strbuf.h>
@@ -91,6 +92,9 @@ void ExtractSessionWindowParams(TPositionHandle pos, const TExprNode::TPtr& sess
const TTypeAnnotationNode*& sessionKeyType, const TTypeAnnotationNode*& sessionParamsType, TExprNode::TPtr& sessionSortTraits, TExprNode::TPtr& sessionInit,
TExprNode::TPtr& sessionUpdate, TExprContext& ctx);
+void ExtractSortKeyAndOrder(TPositionHandle pos, const TExprNode::TPtr& sortTraitsNode, TSortParams& sortParams, TExprContext& ctx);
+void ExtractSessionWindowParams(TPositionHandle pos, TSessionWindowParams& sessionParams, TExprContext& ctx);
+
TExprNode::TPtr BuildKeySelector(TPositionHandle pos, const TStructExprType& rowType, const TExprNode::TPtr& keyColumns, TExprContext& ctx);
template <bool Cannonize, bool EnableNewOptimizers = true>
diff --git a/ydb/library/yql/core/yql_opt_window.cpp b/ydb/library/yql/core/yql_opt_window.cpp
index fb9c40b0f89..6b2c8819934 100644
--- a/ydb/library/yql/core/yql_opt_window.cpp
+++ b/ydb/library/yql/core/yql_opt_window.cpp
@@ -3209,6 +3209,14 @@ TExprNode::TPtr ZipWithSessionParamsLambda(TPositionHandle pos, const TExprNode:
}
TExprNode::TPtr AddSessionParamsMemberLambda(TPositionHandle pos,
+ TStringBuf sessionStartMemberName, const TExprNode::TPtr& partitionKeySelector,
+ const TSessionWindowParams& sessionWindowParams, TExprContext& ctx)
+{
+ return AddSessionParamsMemberLambda(pos, sessionStartMemberName, "", partitionKeySelector,
+ sessionWindowParams.Key, sessionWindowParams.Init, sessionWindowParams.Update, ctx);
+}
+
+TExprNode::TPtr AddSessionParamsMemberLambda(TPositionHandle pos,
TStringBuf sessionStartMemberName, TStringBuf sessionParamsMemberName,
const TExprNode::TPtr& partitionKeySelector,
const TExprNode::TPtr& sessionKeySelector, const TExprNode::TPtr& sessionInit,
@@ -3275,4 +3283,15 @@ TExprNode::TPtr AddSessionParamsMemberLambda(TPositionHandle pos,
.Build();
}
+void TSessionWindowParams::Reset()
+{
+ Traits = {};
+ Key = {};
+ KeyType = nullptr;
+ ParamsType = nullptr;
+ Init = {};
+ Update = {};
+ SortTraits = {};
+}
+
}
diff --git a/ydb/library/yql/core/yql_opt_window.h b/ydb/library/yql/core/yql_opt_window.h
index 6e51a6c794a..0e909f1d9de 100644
--- a/ydb/library/yql/core/yql_opt_window.h
+++ b/ydb/library/yql/core/yql_opt_window.h
@@ -45,6 +45,33 @@ private:
bool Compact = false;
};
+struct TSessionWindowParams {
+ TSessionWindowParams()
+ : Traits(nullptr)
+ , Key(nullptr)
+ , KeyType(nullptr)
+ , ParamsType(nullptr)
+ , Init(nullptr)
+ , Update(nullptr)
+ , SortTraits(nullptr)
+ {}
+
+ void Reset();
+
+ TExprNode::TPtr Traits;
+ TExprNode::TPtr Key;
+ const TTypeAnnotationNode* KeyType;
+ const TTypeAnnotationNode* ParamsType;
+ TExprNode::TPtr Init;
+ TExprNode::TPtr Update;
+ TExprNode::TPtr SortTraits;
+};
+
+struct TSortParams {
+ TExprNode::TPtr Key;
+ TExprNode::TPtr Order;
+};
+
// Lambda(input: Stream/List<T>) -> Stream/List<Tuple<T, SessionKey, SessionState, ....>>
// input is assumed to be partitioned by partitionKeySelector
TExprNode::TPtr ZipWithSessionParamsLambda(TPositionHandle pos, const TExprNode::TPtr& partitionKeySelector,
@@ -57,4 +84,10 @@ TExprNode::TPtr AddSessionParamsMemberLambda(TPositionHandle pos,
const TExprNode::TPtr& partitionKeySelector,
const TExprNode::TPtr& sessionKeySelector, const TExprNode::TPtr& sessionInit,
const TExprNode::TPtr& sessionUpdate, TExprContext& ctx);
+
+// input should be List/Stream of structs + see above
+TExprNode::TPtr AddSessionParamsMemberLambda(TPositionHandle pos,
+ TStringBuf sessionStartMemberName, const TExprNode::TPtr& partitionKeySelector,
+ const TSessionWindowParams& sessionWindowParams, TExprContext& ctx);
+
}
diff --git a/ydb/library/yql/dq/opt/dq_opt_log.cpp b/ydb/library/yql/dq/opt/dq_opt_log.cpp
index 10d765e7683..d639765e55b 100644
--- a/ydb/library/yql/dq/opt/dq_opt_log.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_log.cpp
@@ -2,8 +2,8 @@
#include "dq_opt.h"
+#include <ydb/library/yql/core/yql_aggregate_expander.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
-#include <ydb/library/yql/core/yql_opt_aggregate.h>
#include <ydb/library/yql/core/yql_opt_window.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/core/yql_type_annotation.h>
@@ -18,7 +18,8 @@ TExprBase DqRewriteAggregate(TExprBase node, TExprContext& ctx, TTypeAnnotationC
return node;
}
- auto result = ExpandAggregate(true, node.Ptr(), ctx, typesCtx, false, compactForDistinct);
+ TAggregateExpander aggExpander(true, node.Ptr(), ctx, typesCtx, false, compactForDistinct);
+ auto result = aggExpander.ExpandAggregate();
YQL_ENSURE(result);
return TExprBase(result);