diff options
author | aidarsamer <aidarsamer@ydb.tech> | 2022-09-19 18:10:58 +0300 |
---|---|---|
committer | aidarsamer <aidarsamer@ydb.tech> | 2022-09-19 18:10:58 +0300 |
commit | f7178ec1e428904ba55ebc0225d16fb00a928f73 (patch) | |
tree | 9e6400287c0bd1f2377563139e6b86d4942f86e9 | |
parent | 9d86325bfc8fb4496df5bd300c50d2548ffe098a (diff) | |
download | ydb-f7178ec1e428904ba55ebc0225d16fb00a928f73.tar.gz |
Refactor ExpandAggregate function to class
Refactor ExpandAggregate function to class.
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_opt.cpp | 5 | ||||
-rw-r--r-- | ydb/library/yql/core/CMakeLists.txt | 2 | ||||
-rw-r--r-- | ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_aggregate_expander.cpp (renamed from ydb/library/yql/core/yql_opt_aggregate.cpp) | 2010 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_aggregate_expander.h | 113 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_opt_aggregate.h | 14 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_opt_utils.cpp | 12 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_opt_utils.h | 4 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_opt_window.cpp | 19 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_opt_window.h | 33 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_log.cpp | 5 |
11 files changed, 1220 insertions, 999 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_opt.cpp b/ydb/core/kqp/provider/yql_kikimr_opt.cpp index a3b8a36848a..642f5f3a621 100644 --- a/ydb/core/kqp/provider/yql_kikimr_opt.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_opt.cpp @@ -2,8 +2,8 @@ #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/core/common_opt/yql_co.h> +#include<ydb/library/yql/core/yql_aggregate_expander.h> #include <ydb/library/yql/core/yql_opt_utils.h> -#include <ydb/library/yql/core/yql_opt_aggregate.h> namespace NYql { namespace { @@ -120,7 +120,8 @@ TExprNode::TPtr KiRewriteAggregate(TExprBase node, TExprContext& ctx, TTypeAnnot } YQL_CLOG(INFO, ProviderKikimr) << "KiRewriteAggregate"; - return ExpandAggregate(true, node.Ptr(), ctx, typesCtx); + TAggregateExpander aggExpander(true, node.Ptr(), ctx, typesCtx); + return aggExpander.ExpandAggregate(); } TExprNode::TPtr KiRedundantSortByPk(TExprBase node, TExprContext& ctx, diff --git a/ydb/library/yql/core/CMakeLists.txt b/ydb/library/yql/core/CMakeLists.txt index a7a8c462e44..4e2113b6e39 100644 --- a/ydb/library/yql/core/CMakeLists.txt +++ b/ydb/library/yql/core/CMakeLists.txt @@ -47,6 +47,7 @@ target_link_libraries(library-yql-core PUBLIC tools-enum_parser-enum_serialization_runtime ) target_sources(library-yql-core PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_aggregate_expander.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_callable_transform.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_csv.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_execution.cpp @@ -59,7 +60,6 @@ target_sources(library-yql-core PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_holding_file_storage.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_join.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_library_compiler.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_aggregate.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_proposed_by_data.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_range.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/yql_opt_rewrite_io.cpp diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp index 11ed902b0a0..64f027f69ac 100644 --- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -12,7 +12,7 @@ #include <ydb/library/yql/core/yql_gc_transformer.h> #include <ydb/library/yql/core/yql_expr_csee.h> #include <ydb/library/yql/core/type_ann/type_ann_expr.h> -#include <ydb/library/yql/core/yql_opt_aggregate.h> +#include <ydb/library/yql/core/yql_aggregate_expander.h> #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/core/services/yql_transform_pipeline.h> #include <ydb/library/yql/core/services/yql_out_transformers.h> diff --git a/ydb/library/yql/core/yql_opt_aggregate.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp index 5ff03416653..5fc67082331 100644 --- a/ydb/library/yql/core/yql_opt_aggregate.cpp +++ b/ydb/library/yql/core/yql_aggregate_expander.cpp @@ -1,92 +1,104 @@ -#include "yql_opt_aggregate.h" -#include "yql_opt_utils.h" -#include "yql_opt_window.h" -#include "yql_expr_optimize.h" -#include "yql_expr_type_annotation.h" +#include "yql_aggregate_expander.h" + +#include <ydb/library/yql/core/yql_expr_optimize.h> +#include <ydb/library/yql/core/yql_expr_type_annotation.h> +#include <ydb/library/yql/core/yql_opt_window.h> namespace NYql { -namespace { +TExprNode::TPtr TAggregateExpander::ExpandAggregate() +{ + AggList = Node->HeadPtr(); + KeyColumns = Node->ChildPtr(1); + AggregatedColumns = Node->Child(2); + auto settings = Node->Child(3); + + if (!CollectTraits()) { + return RebuildAggregate(); + } + YQL_ENSURE(!HasSetting(*settings, "hopping"), "Aggregate with hopping unsupported here."); -TExprNode::TPtr ExpandAggApply(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) { + HaveDistinct = AnyOf(AggregatedColumns->ChildrenList(), + [](const auto& child) { return child->ChildrenSize() == 3; }); + EffectiveCompact = (HaveDistinct && CompactForDistinct) || ForceCompact || HasSetting(*settings, "compact"); + for (const auto& trait : Traits) { + auto mergeLambda = trait->Child(5); + if (mergeLambda->Tail().IsCallable("Void")) { + EffectiveCompact = true; + break; + } + } + + OriginalRowType = GetSeqItemType(Node->Head().GetTypeAnn())->Cast<TStructExprType>(); + RowItems = OriginalRowType->GetItems(); + + ProcessSessionSetting(GetSetting(*settings, "session")); + RowType = Ctx.MakeType<TStructExprType>(RowItems); + + TVector<const TTypeAnnotationNode*> keyItemTypes = GetKeyItemTypes(); + bool needPickle = IsNeedPickle(keyItemTypes); + auto keyExtractor = GetKeyExtractor(needPickle); + CollectColumnsSpecs(); + + TExprNode::TPtr preAgg = GeneratePartialAggregate(keyExtractor, keyItemTypes, needPickle); + if (EffectiveCompact || !preAgg) { + preAgg = std::move(AggList); + } + return GeneratePostAggregate(preAgg, keyExtractor); +} + +TExprNode::TPtr TAggregateExpander::ExpandAggApply(const TExprNode::TPtr& node) +{ auto name = node->Head().Content(); - auto exportsPtr = typesCtx.Modules->GetModule("/lib/yql/aggregate.yql"); + auto exportsPtr = TypesCtx.Modules->GetModule("/lib/yql/aggregate.yql"); YQL_ENSURE(exportsPtr); const auto& exports = exportsPtr->Symbols(); const auto ex = exports.find(TString(name) + "_traits_factory"); YQL_ENSURE(exports.cend() != ex); TNodeOnNodeOwnedMap deepClones; - auto lambda = ctx.DeepCopy(*ex->second, exportsPtr->ExprCtx(), deepClones, true, false); + auto lambda = Ctx.DeepCopy(*ex->second, exportsPtr->ExprCtx(), deepClones, true, false); - auto listTypeNode = ctx.NewCallable(node->Pos(), "ListType", { node->ChildPtr(1) }); + auto listTypeNode = Ctx.NewCallable(node->Pos(), "ListType", { node->ChildPtr(1) }); auto extractor = node->ChildPtr(2); - auto traits = ctx.ReplaceNodes(lambda->TailPtr(), { + auto traits = Ctx.ReplaceNodes(lambda->TailPtr(), { {lambda->Head().Child(0), listTypeNode}, {lambda->Head().Child(1), extractor} }); - ctx.Step.Repeat(TExprStep::ExpandApplyForLambdas); - auto status = ExpandApply(traits, traits, ctx); + Ctx.Step.Repeat(TExprStep::ExpandApplyForLambdas); + auto status = ExpandApply(traits, traits, Ctx); YQL_ENSURE(status != IGraphTransformer::TStatus::Error); return traits; } -} - -TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, bool forceCompact, bool compactForDistinct) { - auto list = node->HeadPtr(); - auto keyColumns = node->ChildPtr(1); - auto aggregatedColumns = node->Child(2); - auto settings = node->Child(3); - TExprNode::TListType traits; - bool needRebuild = false; - for (ui32 index = 0; index < aggregatedColumns->ChildrenSize(); ++index) { - auto trait = aggregatedColumns->Child(index)->ChildPtr(1); +bool TAggregateExpander::CollectTraits() { + bool allTraitsCollected = true; + for (ui32 index = 0; index < AggregatedColumns->ChildrenSize(); ++index) { + auto trait = AggregatedColumns->Child(index)->ChildPtr(1); if (trait->IsCallable("AggApply")) { - trait = ExpandAggApply(trait, ctx, typesCtx); - needRebuild = true; + trait = ExpandAggApply(trait); + allTraitsCollected = false; } - - traits.push_back(trait); + Traits.push_back(trait); } + return allTraitsCollected; +} - if (needRebuild) { - TExprNode::TListType newAggregatedColumnsItems = aggregatedColumns->ChildrenList(); - for (ui32 index = 0; index < aggregatedColumns->ChildrenSize(); ++index) { - newAggregatedColumnsItems[index] = ctx.ChangeChild(*(newAggregatedColumnsItems[index]), 1, std::move(traits[index])); - } - - return ctx.ChangeChild(*node, 2, ctx.NewList(node->Pos(), std::move(newAggregatedColumnsItems))); +TExprNode::TPtr TAggregateExpander::RebuildAggregate() +{ + TExprNode::TListType newAggregatedColumnsItems = AggregatedColumns->ChildrenList(); + for (ui32 index = 0; index < AggregatedColumns->ChildrenSize(); ++index) { + newAggregatedColumnsItems[index] = Ctx.ChangeChild(*(newAggregatedColumnsItems[index]), 1, std::move(Traits[index])); } - YQL_ENSURE(!HasSetting(*settings, "hopping"), "Aggregate with hopping unsupported here."); - - static const TStringBuf sessionStartMemberName = "_yql_group_session_start"; - const TExprNode::TPtr voidNode = ctx.NewCallable(node->Pos(), "Void", {}); - - TExprNode::TPtr sessionKey; - const TTypeAnnotationNode* sessionKeyType = nullptr; - const TTypeAnnotationNode* sessionParamsType = nullptr; - TExprNode::TPtr sessionInit; - TExprNode::TPtr sessionUpdate; - - TExprNode::TPtr sortKey = voidNode; - TExprNode::TPtr sortOrder = voidNode; - - const bool haveDistinct = AnyOf(aggregatedColumns->ChildrenList(), - [](const auto& child) { return child->ChildrenSize() == 3; }); - bool effectiveCompact = (haveDistinct && compactForDistinct) || forceCompact || HasSetting(*settings, "compact"); - for (const auto& trait : traits) { - auto mergeLambda = trait->Child(5); - if (mergeLambda->Tail().IsCallable("Void")) { - effectiveCompact = true; - break; - } - } + return Ctx.ChangeChild(*Node, 2, Ctx.NewList(Node->Pos(), std::move(newAggregatedColumnsItems))); +} - auto contextLambda = HasContextFuncs(*aggregatedColumns) ? - ctx.Builder(node->Pos()) +TExprNode::TPtr TAggregateExpander::GetContextLambda() +{ + return HasContextFuncs(*AggregatedColumns) ? + Ctx.Builder(Node->Pos()) .Lambda() .Param("stream") .Callable("WithContext") @@ -95,113 +107,110 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T .Seal() .Seal() .Build() : - ctx.Builder(node->Pos()) + Ctx.Builder(Node->Pos()) .Lambda() .Param("stream") .Arg("stream") .Seal() .Build(); +} - const TStructExprType* originalRowType = GetSeqItemType(node->Head().GetTypeAnn())->Cast<TStructExprType>(); - TVector<const TItemExprType*> rowItems = originalRowType->GetItems(); - - const auto sessionSetting = GetSetting(*settings, "session"); - TMaybe<TStringBuf> sessionOutputColumn; - if (sessionSetting) { - YQL_ENSURE(sessionSetting->Child(1)->Child(0)->IsAtom()); - sessionOutputColumn = sessionSetting->Child(1)->Child(0)->Content(); - - // remove session column from other keys - TExprNodeList keyColumnsList = keyColumns->ChildrenList(); - EraseIf(keyColumnsList, [&](const auto& key) { return sessionOutputColumn == key->Content(); }); - keyColumns = ctx.NewList(keyColumns->Pos(), std::move(keyColumnsList)); - - TExprNode::TPtr sessionSortTraits; - ExtractSessionWindowParams(node->Pos(), sessionSetting->Child(1)->ChildPtr(1), sessionKey, sessionKeyType, sessionParamsType, sessionSortTraits, - sessionInit, sessionUpdate, ctx); - ExtractSortKeyAndOrder(node->Pos(), sessionSortTraits, sortKey, sortOrder, ctx); - - if (haveDistinct) { - auto keySelector = BuildKeySelector(node->Pos(), *originalRowType, keyColumns, ctx); - const auto sessionStartMemberLambda = AddSessionParamsMemberLambda(node->Pos(), sessionStartMemberName, "", keySelector, - sessionKey, sessionInit, sessionUpdate, ctx); - - list = ctx.Builder(node->Pos()) - .Callable("PartitionsByKeys") - .Add(0, list) - .Add(1, keySelector) - .Add(2, sortOrder) - .Add(3, sortKey) - .Lambda(4) - .Param("partitionedStream") - .Apply(sessionStartMemberLambda) - .With(0, "partitionedStream") - .Seal() +void TAggregateExpander::ProcessSessionSetting(TExprNode::TPtr sessionSetting) +{ + if (!sessionSetting) { + return; + } + HaveSessionSetting = true; + + YQL_ENSURE(sessionSetting->Child(1)->Child(0)->IsAtom()); + SessionOutputColumn = sessionSetting->Child(1)->Child(0)->Content(); + + // remove session column from other keys + TExprNodeList keyColumnsList = KeyColumns->ChildrenList(); + EraseIf(keyColumnsList, [&](const auto& key) { return SessionOutputColumn == key->Content(); }); + KeyColumns = Ctx.NewList(KeyColumns->Pos(), std::move(keyColumnsList)); + + SessionWindowParams.Traits = sessionSetting->Child(1)->ChildPtr(1); + ExtractSessionWindowParams(Node->Pos(), SessionWindowParams, Ctx); + ExtractSortKeyAndOrder(Node->Pos(), SessionWindowParams.SortTraits, SortParams, Ctx); + + if (HaveDistinct) { + auto keySelector = BuildKeySelector(Node->Pos(), *OriginalRowType, KeyColumns, Ctx); + const auto sessionStartMemberLambda = AddSessionParamsMemberLambda(Node->Pos(), SessionStartMemberName, keySelector, + SessionWindowParams, Ctx); + + AggList = Ctx.Builder(Node->Pos()) + .Callable("PartitionsByKeys") + .Add(0, AggList) + .Add(1, keySelector) + .Add(2, SortParams.Order) + .Add(3, SortParams.Key) + .Lambda(4) + .Param("partitionedStream") + .Apply(sessionStartMemberLambda) + .With(0, "partitionedStream") .Seal() .Seal() - .Build(); + .Seal() + .Build(); - auto keyColumnsList = keyColumns->ChildrenList(); - keyColumnsList.push_back(ctx.NewAtom(node->Pos(), sessionStartMemberName)); - keyColumns = ctx.NewList(node->Pos(), std::move(keyColumnsList)); + auto keyColumnsList = KeyColumns->ChildrenList(); + keyColumnsList.push_back(Ctx.NewAtom(Node->Pos(), SessionStartMemberName)); + KeyColumns = Ctx.NewList(Node->Pos(), std::move(keyColumnsList)); - rowItems.push_back(ctx.MakeType<TItemExprType>(sessionStartMemberName, sessionKeyType)); + RowItems.push_back(Ctx.MakeType<TItemExprType>(SessionStartMemberName, SessionWindowParams.KeyType)); - sortOrder = sortKey = voidNode; - sessionKey = sessionInit = sessionUpdate = {}; - sessionKeyType = nullptr; - } else { - effectiveCompact = true; - } + SessionWindowParams.Reset(); + SortParams.Key = SortParams.Order = VoidNode; + } else { + EffectiveCompact = true; } +} - const bool compact = effectiveCompact; - const auto rowType = ctx.MakeType<TStructExprType>(rowItems); - - auto preMap = ctx.Builder(node->Pos()) - .Lambda() - .Param("premap") - .Callable("Just").Arg(0, "premap").Seal() - .Seal().Build(); - - bool needPickle = false; +TVector<const TTypeAnnotationNode*> TAggregateExpander::GetKeyItemTypes() +{ TVector<const TTypeAnnotationNode*> keyItemTypes; - for (auto keyColumn : keyColumns->Children()) { - auto index = rowType->FindItem(keyColumn->Content()); + for (auto keyColumn : KeyColumns->Children()) { + auto index = RowType->FindItem(keyColumn->Content()); YQL_ENSURE(index, "Unknown column: " << keyColumn->Content()); - auto type = rowType->GetItems()[*index]->GetItemType(); + auto type = RowType->GetItems()[*index]->GetItemType(); keyItemTypes.push_back(type); - needPickle = needPickle || allowPickle && !IsDataOrOptionalOfData(type); + } + return keyItemTypes; +} - const TTypeAnnotationNode* pickleType = nullptr; - TExprNode::TPtr pickleTypeNode; - if (needPickle) { - pickleType = keyColumns->ChildrenSize() > 1 ? ctx.MakeType<TTupleExprType>(keyItemTypes) : keyItemTypes[0]; - pickleTypeNode = ExpandType(node->Pos(), *pickleType, ctx); +bool TAggregateExpander::IsNeedPickle(const TVector<const TTypeAnnotationNode*>& keyItemTypes) +{ + bool needPickle = false; + for (auto type : keyItemTypes) { + needPickle = needPickle || AllowPickle && !IsDataOrOptionalOfData(type); } + return needPickle; +} - auto keyExtractor = ctx.Builder(node->Pos()) +TExprNode::TPtr TAggregateExpander::GetKeyExtractor(bool needPickle) +{ + TExprNode::TPtr keyExtractor = Ctx.Builder(Node->Pos()) .Lambda() .Param("item") .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - if (keyColumns->ChildrenSize() == 0) { + if (KeyColumns->ChildrenSize() == 0) { return parent.Callable("Uint32").Atom(0, "0", TNodeFlags::Default).Seal(); } - else if (keyColumns->ChildrenSize() == 1) { - return parent.Callable("Member").Arg(0, "item").Add(1, keyColumns->HeadPtr()).Seal(); + else if (KeyColumns->ChildrenSize() == 1) { + return parent.Callable("Member").Arg(0, "item").Add(1, KeyColumns->HeadPtr()).Seal(); } else { auto listBuilder = parent.List(); ui32 pos = 0; - for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) { + for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) { listBuilder .Callable(pos++, "Member") .Arg(0, "item") - .Add(1, keyColumns->ChildPtr(i)) + .Add(1, KeyColumns->ChildPtr(i)) .Seal(); } - return listBuilder.Seal(); } }) @@ -209,7 +218,7 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T .Build(); if (needPickle) { - keyExtractor = ctx.Builder(node->Pos()) + keyExtractor = Ctx.Builder(Node->Pos()) .Lambda() .Param("item") .Callable("StablePickle") @@ -220,45 +229,36 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T .Seal() .Build(); } + return keyExtractor; +} - TExprNode::TListType initialColumnNames; - TExprNode::TListType finalColumnNames; - TExprNode::TListType distinctFields; - using TIdxSet = std::set<ui32>; - std::unordered_map<std::string_view, TIdxSet> distinct2Columns; - std::unordered_map<std::string_view, bool> distinctFieldNeedsPickle; - std::unordered_map<std::string_view, TExprNode::TPtr> udfSetCreate; - std::unordered_map<std::string_view, TExprNode::TPtr> udfAddValue; - std::unordered_map<std::string_view, TExprNode::TPtr> udfWasChanged; - TIdxSet nondistinctColumns; - for (auto child : aggregatedColumns->Children()) { +void TAggregateExpander::CollectColumnsSpecs() +{ + for (ui32 index = 0; index < AggregatedColumns->ChildrenSize(); ++index) { + auto child = AggregatedColumns->Child(index); if (const auto distinctField = (child->ChildrenSize() == 3) ? child->Child(2) : nullptr) { - const auto ins = distinct2Columns.emplace(distinctField->Content(), TIdxSet()); + const auto ins = Distinct2Columns.emplace(distinctField->Content(), TIdxSet()); if (ins.second) { - distinctFields.push_back(distinctField); + DistinctFields.push_back(distinctField); } - ins.first->second.insert(initialColumnNames.size()); + ins.first->second.insert(InitialColumnNames.size()); } else { - nondistinctColumns.insert(initialColumnNames.size()); + NonDistinctColumns.insert(InitialColumnNames.size()); } if (child->Head().IsAtom()) { - finalColumnNames.push_back(child->HeadPtr()); + FinalColumnNames.push_back(child->HeadPtr()); } else { - finalColumnNames.push_back(child->Head().HeadPtr()); + FinalColumnNames.push_back(child->Head().HeadPtr()); } - initialColumnNames.push_back(ctx.NewAtom(finalColumnNames.back()->Pos(), "_yql_agg_" + ToString(initialColumnNames.size()), TNodeFlags::Default)); - } - - TExprNode::TListType nothingStates; - for (ui32 index = 0; index < aggregatedColumns->ChildrenSize(); ++index) { - auto trait = traits[index]; + InitialColumnNames.push_back(Ctx.NewAtom(FinalColumnNames.back()->Pos(), "_yql_agg_" + ToString(InitialColumnNames.size()), TNodeFlags::Default)); + auto trait = Traits[index]; auto saveLambda = trait->Child(3); auto saveLambdaType = saveLambda->GetTypeAnn(); - auto typeNode = ExpandType(node->Pos(), *saveLambdaType, ctx); - nothingStates.push_back(ctx.Builder(node->Pos()) + auto typeNode = ExpandType(Node->Pos(), *saveLambdaType, Ctx); + NothingStates.push_back(Ctx.Builder(Node->Pos()) .Callable("Nothing") .Callable(0, "OptionalType") .Add(0, std::move(typeNode)) @@ -267,620 +267,681 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T .Build() ); } +} - TExprNode::TPtr groupInput; +TExprNode::TPtr TAggregateExpander::GeneratePartialAggregate(const TExprNode::TPtr keyExtractor, + const TVector<const TTypeAnnotationNode*>& keyItemTypes, bool needPickle) +{ + TExprNode::TPtr pickleTypeNode = nullptr; + if (needPickle) { + const TTypeAnnotationNode* pickleType = nullptr; + pickleType = KeyColumns->ChildrenSize() > 1 ? Ctx.MakeType<TTupleExprType>(keyItemTypes) : keyItemTypes[0]; + pickleTypeNode = ExpandType(Node->Pos(), *pickleType, Ctx); + } - if (!nondistinctColumns.empty()) { - auto combineInit = ctx.Builder(node->Pos()) - .Lambda() - .Param("key") - .Param("item") - .Callable("AsStruct") - .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - ui32 ndx = 0; - for (ui32 i: nondistinctColumns) { - auto trait = traits[i]; - auto initLambda = trait->Child(1); - if (initLambda->Head().ChildrenSize() == 1) { - parent.List(ndx++) - .Add(0, initialColumnNames[i]) - .Apply(1, *initLambda) - .With(0) - .Callable("CastStruct") - .Arg(0, "item") - .Add(1, ExpandType(node->Pos(), *initLambda->Head().Head().GetTypeAnn(), ctx)) - .Seal() - .Done() - .Seal() - .Seal(); - } else { - parent.List(ndx++) - .Add(0, initialColumnNames[i]) - .Apply(1, *initLambda) - .With(0) - .Callable("CastStruct") - .Arg(0, "item") - .Add(1, ExpandType(node->Pos(), *initLambda->Head().Head().GetTypeAnn(), ctx)) - .Seal() - .Done() - .With(1) - .Callable("Uint32") - .Atom(0, ToString(i), TNodeFlags::Default) - .Seal() - .Done() - .Seal() - .Seal(); - } - } - return parent; - }) + TExprNode::TPtr partialAgg = nullptr; + if (!NonDistinctColumns.empty()) { + partialAgg = GeneratePartialAggregateForNonDistinct(keyExtractor, pickleTypeNode); + } + for (ui32 index = 0; index < DistinctFields.size(); ++index) { + auto distinctField = DistinctFields[index]; + + bool needDistinctPickle = EffectiveCompact ? false : needPickle; + auto distinctGrouper = GenerateDistinctGrouper(distinctField, keyItemTypes, needDistinctPickle); + + if (!partialAgg) { + partialAgg = std::move(distinctGrouper); + } else { + partialAgg = Ctx.Builder(Node->Pos()) + .Callable("Extend") + .Add(0, std::move(partialAgg)) + .Add(1, std::move(distinctGrouper)) .Seal() - .Seal() - .Build(); + .Build(); + } + } + // If no aggregation functions then add additional combiner + if (AggregatedColumns->ChildrenSize() == 0 && KeyColumns->ChildrenSize() > 0 && !SessionWindowParams.Update) { + if (!partialAgg) { + partialAgg = AggList; + } - auto combineUpdate = ctx.Builder(node->Pos()) + auto uniqCombineInit = ReturnKeyAsIsForCombineInit(pickleTypeNode); + auto uniqCombineUpdate = Ctx.Builder(Node->Pos()) .Lambda() .Param("key") .Param("item") .Param("state") - .Callable("AsStruct") - .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - ui32 ndx = 0; - for (ui32 i: nondistinctColumns) { - auto trait = traits[i]; - auto updateLambda = trait->Child(2); - if (updateLambda->Head().ChildrenSize() == 2) { - parent.List(ndx++) - .Add(0, initialColumnNames[i]) - .Apply(1, *updateLambda) - .With(0) - .Callable("CastStruct") - .Arg(0, "item") - .Add(1, ExpandType(node->Pos(), *updateLambda->Head().Head().GetTypeAnn(), ctx)) - .Seal() - .Done() - .With(1) - .Callable("Member") - .Arg(0, "state") - .Add(1, initialColumnNames[i]) - .Seal() - .Done() - .Seal() - .Seal(); - } else { - parent.List(ndx++) - .Add(0, initialColumnNames[i]) - .Apply(1, *updateLambda) - .With(0) - .Callable("CastStruct") - .Arg(0, "item") - .Add(1, ExpandType(node->Pos(), *updateLambda->Head().Head().GetTypeAnn(), ctx)) - .Seal() - .Done() - .With(1) - .Callable("Member") - .Arg(0, "state") - .Add(1, initialColumnNames[i]) - .Seal() - .Done() - .With(2) - .Callable("Uint32") - .Atom(0, ToString(i), TNodeFlags::Default) - .Seal() - .Done() - .Seal() - .Seal(); - } - } - return parent; - }) - .Seal() + .Arg("state") .Seal() .Build(); - - auto combineSave = ctx.Builder(node->Pos()) + + // Return state as-is + auto uniqCombineSave = Ctx.Builder(Node->Pos()) .Lambda() .Param("key") .Param("state") .Callable("Just") - .Callable(0, "AsStruct") - .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - for (ui32 i = 0; i < initialColumnNames.size(); ++i) { - if (nondistinctColumns.find(i) == nondistinctColumns.end()) { + .Arg(0, "state") + .Seal() + .Seal() + .Build(); + + partialAgg = Ctx.Builder(Node->Pos()) + .Callable("CombineByKey") + .Add(0, std::move(partialAgg)) + .Add(1, PreMap) + .Add(2, keyExtractor) + .Add(3, std::move(uniqCombineInit)) + .Add(4, std::move(uniqCombineUpdate)) + .Add(5, std::move(uniqCombineSave)) + .Seal() + .Build(); + } + return partialAgg; +} + +TExprNode::TPtr TAggregateExpander::GeneratePartialAggregateForNonDistinct(const TExprNode::TPtr& keyExtractor, const TExprNode::TPtr& pickleTypeNode) +{ + auto combineInit = Ctx.Builder(Node->Pos()) + .Lambda() + .Param("key") + .Param("item") + .Callable("AsStruct") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + ui32 ndx = 0; + for (ui32 i: NonDistinctColumns) { + auto trait = Traits[i]; + auto initLambda = trait->Child(1); + if (initLambda->Head().ChildrenSize() == 1) { + parent.List(ndx++) + .Add(0, InitialColumnNames[i]) + .Apply(1, *initLambda) + .With(0) + .Callable("CastStruct") + .Arg(0, "item") + .Add(1, ExpandType(Node->Pos(), *initLambda->Head().Head().GetTypeAnn(), Ctx)) + .Seal() + .Done() + .Seal() + .Seal(); + } else { + parent.List(ndx++) + .Add(0, InitialColumnNames[i]) + .Apply(1, *initLambda) + .With(0) + .Callable("CastStruct") + .Arg(0, "item") + .Add(1, ExpandType(Node->Pos(), *initLambda->Head().Head().GetTypeAnn(), Ctx)) + .Seal() + .Done() + .With(1) + .Callable("Uint32") + .Atom(0, ToString(i), TNodeFlags::Default) + .Seal() + .Done() + .Seal() + .Seal(); + } + } + return parent; + }) + .Seal() + .Seal() + .Build(); + + auto combineUpdate = Ctx.Builder(Node->Pos()) + .Lambda() + .Param("key") + .Param("item") + .Param("state") + .Callable("AsStruct") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + ui32 ndx = 0; + for (ui32 i: NonDistinctColumns) { + auto trait = Traits[i]; + auto updateLambda = trait->Child(2); + if (updateLambda->Head().ChildrenSize() == 2) { + parent.List(ndx++) + .Add(0, InitialColumnNames[i]) + .Apply(1, *updateLambda) + .With(0) + .Callable("CastStruct") + .Arg(0, "item") + .Add(1, ExpandType(Node->Pos(), *updateLambda->Head().Head().GetTypeAnn(), Ctx)) + .Seal() + .Done() + .With(1) + .Callable("Member") + .Arg(0, "state") + .Add(1, InitialColumnNames[i]) + .Seal() + .Done() + .Seal() + .Seal(); + } else { + parent.List(ndx++) + .Add(0, InitialColumnNames[i]) + .Apply(1, *updateLambda) + .With(0) + .Callable("CastStruct") + .Arg(0, "item") + .Add(1, ExpandType(Node->Pos(), *updateLambda->Head().Head().GetTypeAnn(), Ctx)) + .Seal() + .Done() + .With(1) + .Callable("Member") + .Arg(0, "state") + .Add(1, InitialColumnNames[i]) + .Seal() + .Done() + .With(2) + .Callable("Uint32") + .Atom(0, ToString(i), TNodeFlags::Default) + .Seal() + .Done() + .Seal() + .Seal(); + } + } + return parent; + }) + .Seal() + .Seal() + .Build(); + + auto combineSave = Ctx.Builder(Node->Pos()) + .Lambda() + .Param("key") + .Param("state") + .Callable("Just") + .Callable(0, "AsStruct") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + for (ui32 i = 0; i < InitialColumnNames.size(); ++i) { + if (NonDistinctColumns.find(i) == NonDistinctColumns.end()) { + parent.List(i) + .Add(0, InitialColumnNames[i]) + .Add(1, NothingStates[i]) + .Seal(); + } else { + auto trait = Traits[i]; + auto saveLambda = trait->Child(3); + if (!DistinctFields.empty()) { parent.List(i) - .Add(0, initialColumnNames[i]) - .Add(1, nothingStates[i]) - .Seal(); - } else { - auto trait = traits[i]; - auto saveLambda = trait->Child(3); - if (!distinctFields.empty()) { - parent.List(i) - .Add(0, initialColumnNames[i]) - .Callable(1, "Just") - .Apply(0, *saveLambda) - .With(0) - .Callable("Member") - .Arg(0, "state") - .Add(1, initialColumnNames[i]) - .Seal() - .Done() - .Seal() - .Seal() - .Seal(); - } else { - parent.List(i) - .Add(0, initialColumnNames[i]) - .Apply(1, *saveLambda) + .Add(0, InitialColumnNames[i]) + .Callable(1, "Just") + .Apply(0, *saveLambda) .With(0) .Callable("Member") .Arg(0, "state") - .Add(1, initialColumnNames[i]) + .Add(1, InitialColumnNames[i]) .Seal() .Done() .Seal() - .Seal(); - } + .Seal() + .Seal(); + } else { + parent.List(i) + .Add(0, InitialColumnNames[i]) + .Apply(1, *saveLambda) + .With(0) + .Callable("Member") + .Arg(0, "state") + .Add(1, InitialColumnNames[i]) + .Seal() + .Done() + .Seal() + .Seal(); } } - return parent; - }) - .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - ui32 pos = 0; - for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) { - auto listBuilder = parent.List(initialColumnNames.size() + i); - listBuilder.Add(0, keyColumns->ChildPtr(i)); - if (keyColumns->ChildrenSize() > 1) { - if (needPickle) { - listBuilder - .Callable(1, "Nth") - .Callable(0, "Unpickle") - .Add(0, pickleTypeNode) - .Arg(1, "key") - .Seal() - .Atom(1, ToString(pos), TNodeFlags::Default) - .Seal(); - } else { - listBuilder - .Callable(1, "Nth") - .Arg(0, "key") - .Atom(1, ToString(pos), TNodeFlags::Default) - .Seal(); - } - ++pos; + } + return parent; + }) + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + ui32 pos = 0; + for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) { + auto listBuilder = parent.List(InitialColumnNames.size() + i); + listBuilder.Add(0, KeyColumns->ChildPtr(i)); + if (KeyColumns->ChildrenSize() > 1) { + if (pickleTypeNode) { + listBuilder + .Callable(1, "Nth") + .Callable(0, "Unpickle") + .Add(0, pickleTypeNode) + .Arg(1, "key") + .Seal() + .Atom(1, ToString(pos), TNodeFlags::Default) + .Seal(); } else { - if (needPickle) { - listBuilder.Callable(1, "Unpickle") - .Add(0, pickleTypeNode) - .Arg(1, "key") - .Seal(); - } else { - listBuilder.Arg(1, "key"); - } + listBuilder + .Callable(1, "Nth") + .Arg(0, "key") + .Atom(1, ToString(pos), TNodeFlags::Default) + .Seal(); + } + ++pos; + } else { + if (pickleTypeNode) { + listBuilder.Callable(1, "Unpickle") + .Add(0, pickleTypeNode) + .Arg(1, "key") + .Seal(); + } else { + listBuilder.Arg(1, "key"); } - listBuilder.Seal(); } - return parent; - }) - .Seal() + listBuilder.Seal(); + } + return parent; + }) .Seal() .Seal() - .Build(); + .Seal() + .Build(); - groupInput = ctx.Builder(node->Pos()) - .Callable("CombineByKey") - .Add(0, list) - .Add(1, preMap) - .Add(2, keyExtractor) - .Add(3, std::move(combineInit)) - .Add(4, std::move(combineUpdate)) - .Add(5, std::move(combineSave)) - .Seal() - .Build(); - } + return Ctx.Builder(Node->Pos()) + .Callable("CombineByKey") + .Add(0, AggList) + .Add(1, PreMap) + .Add(2, keyExtractor) + .Add(3, std::move(combineInit)) + .Add(4, std::move(combineUpdate)) + .Add(5, std::move(combineSave)) + .Seal() + .Build(); +} - for (ui32 index = 0; index < distinctFields.size(); ++index) { - auto distinctField = distinctFields[index]; - auto& indicies = distinct2Columns[distinctField->Content()]; - auto distinctIndex = rowType->FindItem(distinctField->Content()); - YQL_ENSURE(distinctIndex, "Unknown field: " << distinctField->Content()); - auto distinctType = rowType->GetItems()[*distinctIndex]->GetItemType(); - TVector<const TTypeAnnotationNode*> distinctKeyItemTypes = keyItemTypes; - distinctKeyItemTypes.push_back(distinctType); - bool needDistinctPickle = compact ? false : needPickle; - auto valueType = distinctType; - if (distinctType->GetKind() == ETypeAnnotationKind::Optional) { - distinctType = distinctType->Cast<TOptionalExprType>()->GetItemType(); - } +TExprNode::TPtr TAggregateExpander::GenerateDistinctGrouper(const TExprNode::TPtr distinctField, + const TVector<const TTypeAnnotationNode*>& keyItemTypes, bool needDistinctPickle) +{ + auto& indicies = Distinct2Columns[distinctField->Content()]; + auto distinctIndex = RowType->FindItem(distinctField->Content()); + YQL_ENSURE(distinctIndex, "Unknown field: " << distinctField->Content()); + auto distinctType = RowType->GetItems()[*distinctIndex]->GetItemType(); + TVector<const TTypeAnnotationNode*> distinctKeyItemTypes = keyItemTypes; + distinctKeyItemTypes.push_back(distinctType); + auto valueType = distinctType; + if (distinctType->GetKind() == ETypeAnnotationKind::Optional) { + distinctType = distinctType->Cast<TOptionalExprType>()->GetItemType(); + } - if (distinctType->GetKind() != ETypeAnnotationKind::Data) { - needDistinctPickle = true; - valueType = ctx.MakeType<TDataExprType>(EDataSlot::String); - } + if (distinctType->GetKind() != ETypeAnnotationKind::Data) { + needDistinctPickle = true; + valueType = Ctx.MakeType<TDataExprType>(EDataSlot::String); + } - const auto expandedValueType = needDistinctPickle ? - ctx.Builder(node->Pos()) - .Callable("DataType") - .Atom(0, "String", TNodeFlags::Default) - .Seal() - .Build(): - ExpandType(node->Pos(), *valueType, ctx); - - distinctFieldNeedsPickle[distinctField->Content()] = needDistinctPickle; - auto udfSetCreateValue = ctx.Builder(node->Pos()) - .Callable("Udf") - .Atom(0, "Set.Create") - .Callable(1, "Void").Seal() - .Callable(2, "TupleType") - .Callable(0, "TupleType") - .Add(0, expandedValueType) - .Callable(1, "DataType") - .Atom(0, "Uint32", TNodeFlags::Default) - .Seal() + const auto expandedValueType = needDistinctPickle ? + Ctx.Builder(Node->Pos()) + .Callable("DataType") + .Atom(0, "String", TNodeFlags::Default) + .Seal() + .Build() + : ExpandType(Node->Pos(), *valueType, Ctx); + + DistinctFieldNeedsPickle[distinctField->Content()] = needDistinctPickle; + auto udfSetCreateValue = Ctx.Builder(Node->Pos()) + .Callable("Udf") + .Atom(0, "Set.Create") + .Callable(1, "Void").Seal() + .Callable(2, "TupleType") + .Callable(0, "TupleType") + .Add(0, expandedValueType) + .Callable(1, "DataType") + .Atom(0, "Uint32", TNodeFlags::Default) .Seal() - .Callable(1, "StructType").Seal() - .Add(2, expandedValueType) .Seal() + .Callable(1, "StructType").Seal() + .Add(2, expandedValueType) .Seal() - .Build(); + .Seal() + .Build(); - udfSetCreate[distinctField->Content()] = udfSetCreateValue; - auto resourceType = ctx.Builder(node->Pos()) - .Callable("TypeOf") - .Callable(0, "Apply") - .Add(0, udfSetCreateValue) - .Callable(1, "InstanceOf") - .Add(0, expandedValueType) - .Seal() - .Callable(2, "Uint32") - .Atom(0, "0", TNodeFlags::Default) - .Seal() + UdfSetCreate[distinctField->Content()] = udfSetCreateValue; + auto resourceType = Ctx.Builder(Node->Pos()) + .Callable("TypeOf") + .Callable(0, "Apply") + .Add(0, udfSetCreateValue) + .Callable(1, "InstanceOf") + .Add(0, expandedValueType) + .Seal() + .Callable(2, "Uint32") + .Atom(0, "0", TNodeFlags::Default) .Seal() .Seal() - .Build(); + .Seal() + .Build(); - udfAddValue[distinctField->Content()] = ctx.Builder(node->Pos()) - .Callable("Udf") - .Atom(0, "Set.AddValue") - .Callable(1, "Void").Seal() - .Callable(2, "TupleType") - .Callable(0, "TupleType") - .Add(0, resourceType) - .Add(1, expandedValueType) - .Seal() - .Callable(1, "StructType").Seal() - .Add(2, expandedValueType) + UdfAddValue[distinctField->Content()] = Ctx.Builder(Node->Pos()) + .Callable("Udf") + .Atom(0, "Set.AddValue") + .Callable(1, "Void").Seal() + .Callable(2, "TupleType") + .Callable(0, "TupleType") + .Add(0, resourceType) + .Add(1, expandedValueType) .Seal() + .Callable(1, "StructType").Seal() + .Add(2, expandedValueType) .Seal() - .Build(); + .Seal() + .Build(); - udfWasChanged[distinctField->Content()] = ctx.Builder(node->Pos()) - .Callable("Udf") - .Atom(0, "Set.WasChanged") - .Callable(1, "Void").Seal() - .Callable(2, "TupleType") - .Callable(0, "TupleType") - .Add(0, resourceType) - .Seal() - .Callable(1, "StructType").Seal() - .Add(2, expandedValueType) + UdfWasChanged[distinctField->Content()] = Ctx.Builder(Node->Pos()) + .Callable("Udf") + .Atom(0, "Set.WasChanged") + .Callable(1, "Void").Seal() + .Callable(2, "TupleType") + .Callable(0, "TupleType") + .Add(0, resourceType) .Seal() + .Callable(1, "StructType").Seal() + .Add(2, expandedValueType) .Seal() - .Build(); + .Seal() + .Build(); - auto distinctKeyExtractor = ctx.Builder(node->Pos()) - .Lambda() - .Param("item") - .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - if (keyColumns->ChildrenSize() != 0) { - auto listBuilder = parent.List(); - ui32 pos = 0; - for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) { - listBuilder - .Callable(pos++, "Member") - .Arg(0, "item") - .Add(1, keyColumns->ChildPtr(i)) - .Seal(); - } + auto distinctKeyExtractor = Ctx.Builder(Node->Pos()) + .Lambda() + .Param("item") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + if (KeyColumns->ChildrenSize() != 0) { + auto listBuilder = parent.List(); + ui32 pos = 0; + for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) { listBuilder - .Callable(pos, "Member") - .Arg(0, "item") - .Add(1, distinctField) - .Seal(); - - return listBuilder.Seal(); - } else { - return parent - .Callable("Member") + .Callable(pos++, "Member") .Arg(0, "item") - .Add(1, distinctField) + .Add(1, KeyColumns->ChildPtr(i)) .Seal(); } - }) - .Seal() - .Build(); + listBuilder + .Callable(pos, "Member") + .Arg(0, "item") + .Add(1, distinctField) + .Seal(); - const TTypeAnnotationNode* distinctPickleType = nullptr; - TExprNode::TPtr distinctPickleTypeNode; - if (needDistinctPickle) { - distinctPickleType = keyColumns->ChildrenSize() > 0 ? ctx.MakeType<TTupleExprType>(distinctKeyItemTypes) : distinctKeyItemTypes.front(); - distinctPickleTypeNode = ExpandType(node->Pos(), *distinctPickleType, ctx); - } + return listBuilder.Seal(); + } else { + return parent + .Callable("Member") + .Arg(0, "item") + .Add(1, distinctField) + .Seal(); + } + }) + .Seal() + .Build(); - if (needDistinctPickle) { - distinctKeyExtractor = ctx.Builder(node->Pos()) - .Lambda() - .Param("item") - .Callable("StablePickle") - .Apply(0, *distinctKeyExtractor).With(0, "item").Seal() - .Seal() - .Seal() - .Build(); - } + const TTypeAnnotationNode* distinctPickleType = nullptr; + TExprNode::TPtr distinctPickleTypeNode; + if (needDistinctPickle) { + distinctPickleType = KeyColumns->ChildrenSize() > 0 ? Ctx.MakeType<TTupleExprType>(distinctKeyItemTypes) : distinctKeyItemTypes.front(); + distinctPickleTypeNode = ExpandType(Node->Pos(), *distinctPickleType, Ctx); + } - auto distinctCombineInit = ctx.Builder(node->Pos()) + if (needDistinctPickle) { + distinctKeyExtractor = Ctx.Builder(Node->Pos()) .Lambda() - .Param("key") .Param("item") - .Callable("AsStruct") - .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - ui32 ndx = 0; - for (ui32 i: indicies) { - auto trait = traits[i]; - auto initLambda = trait->Child(1); - if (initLambda->Head().ChildrenSize() == 1) { - parent.List(ndx++) - .Add(0, initialColumnNames[i]) - .Apply(1, *initLambda) - .With(0) - .Callable("Member") - .Arg(0, "item") - .Add(1, distinctField) - .Seal() - .Done() - .Seal() - .Seal(); - } else { - parent.List(ndx++) - .Add(0, initialColumnNames[i]) - .Apply(1, *initLambda) - .With(0) - .Callable("Member") - .Arg(0, "item") - .Add(1, distinctField) - .Seal() - .Done() - .With(1) - .Callable("Uint32") - .Atom(0, ToString(i), TNodeFlags::Default) - .Seal() - .Done() - .Seal() - .Seal(); - } - } - return parent; - }) + .Callable("StablePickle") + .Apply(0, *distinctKeyExtractor).With(0, "item").Seal() .Seal() .Seal() .Build(); + } - auto distinctCombineUpdate = ctx.Builder(node->Pos()) - .Lambda() - .Param("key") - .Param("item") - .Param("state") - .Arg("state") + auto distinctCombineInit = Ctx.Builder(Node->Pos()) + .Lambda() + .Param("key") + .Param("item") + .Callable("AsStruct") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + ui32 ndx = 0; + for (ui32 i: indicies) { + auto trait = Traits[i]; + auto initLambda = trait->Child(1); + if (initLambda->Head().ChildrenSize() == 1) { + parent.List(ndx++) + .Add(0, InitialColumnNames[i]) + .Apply(1, *initLambda) + .With(0) + .Callable("Member") + .Arg(0, "item") + .Add(1, distinctField) + .Seal() + .Done() + .Seal() + .Seal(); + } else { + parent.List(ndx++) + .Add(0, InitialColumnNames[i]) + .Apply(1, *initLambda) + .With(0) + .Callable("Member") + .Arg(0, "item") + .Add(1, distinctField) + .Seal() + .Done() + .With(1) + .Callable("Uint32") + .Atom(0, ToString(i), TNodeFlags::Default) + .Seal() + .Done() + .Seal() + .Seal(); + } + } + return parent; + }) .Seal() - .Build(); + .Seal() + .Build(); - ui32 ndx = 0; - auto distinctCombineSave = ctx.Builder(node->Pos()) - .Lambda() - .Param("key") - .Param("state") - .Callable("Just") - .Callable(0, "AsStruct") - .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - for (ui32 i: indicies) { - auto trait = traits[i]; - auto saveLambda = trait->Child(3); - parent.List(ndx++) - .Add(0, initialColumnNames[i]) - .Apply(1, *saveLambda) - .With(0) - .Callable("Member") - .Arg(0, "state") - .Add(1, initialColumnNames[i]) - .Seal() - .Done() - .Seal() - .Seal(); - } - return parent; - }) - .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - if (keyColumns->ChildrenSize() > 0) { - if (needDistinctPickle) { - ui32 pos = 0; - for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) { - parent.List(ndx++) - .Add(0, keyColumns->ChildPtr(i)) - .Callable(1, "Nth") - .Callable(0, "Unpickle") - .Add(0, distinctPickleTypeNode) - .Arg(1, "key") - .Seal() - .Atom(1, ToString(pos++), TNodeFlags::Default) - .Seal() - .Seal(); - } + auto distinctCombineUpdate = Ctx.Builder(Node->Pos()) + .Lambda() + .Param("key") + .Param("item") + .Param("state") + .Arg("state") + .Seal() + .Build(); + + ui32 ndx = 0; + auto distinctCombineSave = Ctx.Builder(Node->Pos()) + .Lambda() + .Param("key") + .Param("state") + .Callable("Just") + .Callable(0, "AsStruct") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + for (ui32 i: indicies) { + auto trait = Traits[i]; + auto saveLambda = trait->Child(3); + parent.List(ndx++) + .Add(0, InitialColumnNames[i]) + .Apply(1, *saveLambda) + .With(0) + .Callable("Member") + .Arg(0, "state") + .Add(1, InitialColumnNames[i]) + .Seal() + .Done() + .Seal() + .Seal(); + } + return parent; + }) + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + if (KeyColumns->ChildrenSize() > 0) { + if (needDistinctPickle) { + ui32 pos = 0; + for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) { parent.List(ndx++) - .Add(0, distinctField) + .Add(0, KeyColumns->ChildPtr(i)) .Callable(1, "Nth") - .Callable(0, "Unpickle") - .Add(0, distinctPickleTypeNode) - .Arg(1, "key") - .Seal() + .Callable(0, "Unpickle") + .Add(0, distinctPickleTypeNode) + .Arg(1, "key") + .Seal() .Atom(1, ToString(pos++), TNodeFlags::Default) .Seal() - .Seal(); - - } else { - ui32 pos = 0; - for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) { - parent.List(ndx++) - .Add(0, keyColumns->ChildPtr(i)) - .Callable(1, "Nth") - .Arg(0, "key") - .Atom(1, ToString(pos++), TNodeFlags::Default) + .Seal(); + } + parent.List(ndx++) + .Add(0, distinctField) + .Callable(1, "Nth") + .Callable(0, "Unpickle") + .Add(0, distinctPickleTypeNode) + .Arg(1, "key") .Seal() - .Seal(); - } + .Atom(1, ToString(pos++), TNodeFlags::Default) + .Seal() + .Seal(); + + } else { + ui32 pos = 0; + for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) { parent.List(ndx++) - .Add(0, distinctField) + .Add(0, KeyColumns->ChildPtr(i)) .Callable(1, "Nth") .Arg(0, "key") .Atom(1, ToString(pos++), TNodeFlags::Default) .Seal() - .Seal(); + .Seal(); } - } else { - if (needDistinctPickle) { - parent.List(ndx++) - .Add(0, distinctField) - .Callable(1, "Unpickle") - .Add(0, distinctPickleTypeNode) - .Arg(1, "key") - .Seal() - .Seal(); - } else { - parent.List(ndx++) - .Add(0, distinctField) + parent.List(ndx++) + .Add(0, distinctField) + .Callable(1, "Nth") + .Arg(0, "key") + .Atom(1, ToString(pos++), TNodeFlags::Default) + .Seal() + .Seal(); + } + } else { + if (needDistinctPickle) { + parent.List(ndx++) + .Add(0, distinctField) + .Callable(1, "Unpickle") + .Add(0, distinctPickleTypeNode) .Arg(1, "key") - .Seal(); - } + .Seal() + .Seal(); + } else { + parent.List(ndx++) + .Add(0, distinctField) + .Arg(1, "key") + .Seal(); } - return parent; - }) - .Seal() + } + return parent; + }) .Seal() .Seal() - .Build(); + .Seal() + .Build(); - auto distinctCombiner = ctx.Builder(node->Pos()) - .Callable("CombineByKey") - .Add(0, list) - .Add(1, preMap) - .Add(2, distinctKeyExtractor) - .Add(3, std::move(distinctCombineInit)) - .Add(4, std::move(distinctCombineUpdate)) - .Add(5, std::move(distinctCombineSave)) - .Seal() - .Build(); + auto distinctCombiner = Ctx.Builder(Node->Pos()) + .Callable("CombineByKey") + .Add(0, AggList) + .Add(1, PreMap) + .Add(2, distinctKeyExtractor) + .Add(3, std::move(distinctCombineInit)) + .Add(4, std::move(distinctCombineUpdate)) + .Add(5, std::move(distinctCombineSave)) + .Seal() + .Build(); - auto distinctGrouper = ctx.Builder(node->Pos()) - .Callable("PartitionsByKeys") - .Add(0, std::move(distinctCombiner)) - .Add(1, distinctKeyExtractor) - .Callable(2, "Void").Seal() - .Callable(3, "Void").Seal() - .Lambda(4) - .Param("groups") - .Callable("Map") - .Callable(0, "Condense1") - .Arg(0, "groups") - .Lambda(1) - .Param("item") - .Arg("item") - .Seal() - .Lambda(2) - .Param("item") - .Param("state") - .Callable("IsKeySwitch") - .Arg(0, "item") - .Arg(1, "state") - .Add(2, distinctKeyExtractor) - .Add(3, distinctKeyExtractor) - .Seal() - .Seal() - .Lambda(3) - .Param("item") - .Param("state") - .Arg("item") + auto distinctGrouper = Ctx.Builder(Node->Pos()) + .Callable("PartitionsByKeys") + .Add(0, std::move(distinctCombiner)) + .Add(1, distinctKeyExtractor) + .Callable(2, "Void").Seal() + .Callable(3, "Void").Seal() + .Lambda(4) + .Param("groups") + .Callable("Map") + .Callable(0, "Condense1") + .Arg(0, "groups") + .Lambda(1) + .Param("item") + .Arg("item") + .Seal() + .Lambda(2) + .Param("item") + .Param("state") + .Callable("IsKeySwitch") + .Arg(0, "item") + .Arg(1, "state") + .Add(2, distinctKeyExtractor) + .Add(3, distinctKeyExtractor) .Seal() .Seal() - .Lambda(1) + .Lambda(3) + .Param("item") .Param("state") - .Callable("AsStruct") - .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - for (ui32 i = 0; i < initialColumnNames.size(); ++i) { - if (indicies.find(i) != indicies.end()) { - parent.List(i) - .Add(0, initialColumnNames[i]) - .Callable(1, "Just") - .Callable(0, "Member") - .Arg(0, "state") - .Add(1, initialColumnNames[i]) - .Seal() + .Arg("item") + .Seal() + .Seal() + .Lambda(1) + .Param("state") + .Callable("AsStruct") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + for (ui32 i = 0; i < InitialColumnNames.size(); ++i) { + if (indicies.find(i) != indicies.end()) { + parent.List(i) + .Add(0, InitialColumnNames[i]) + .Callable(1, "Just") + .Callable(0, "Member") + .Arg(0, "state") + .Add(1, InitialColumnNames[i]) .Seal() - .Seal(); - } else { - parent.List(i) - .Add(0, initialColumnNames[i]) - .Add(1, nothingStates[i]) - .Seal(); - } + .Seal() + .Seal(); + } else { + parent.List(i) + .Add(0, InitialColumnNames[i]) + .Add(1, NothingStates[i]) + .Seal(); } - return parent; - }) - .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - if (keyColumns->ChildrenSize() > 0) { - for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) { - parent.List(initialColumnNames.size() + i) - .Add(0, keyColumns->ChildPtr(i)) - .Callable(1, "Member") - .Arg(0, "state") - .Add(1, keyColumns->ChildPtr(i)) - .Seal().Seal(); - } + } + return parent; + }) + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + if (KeyColumns->ChildrenSize() > 0) { + for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) { + parent.List(InitialColumnNames.size() + i) + .Add(0, KeyColumns->ChildPtr(i)) + .Callable(1, "Member") + .Arg(0, "state") + .Add(1, KeyColumns->ChildPtr(i)) + .Seal().Seal(); } - return parent; - }) - .Seal() + } + return parent; + }) .Seal() .Seal() .Seal() .Seal() - .Build(); - - if (!groupInput) { - groupInput = std::move(distinctGrouper); - } else { - groupInput = ctx.Builder(node->Pos()) - .Callable("Extend") - .Add(0, std::move(groupInput)) - .Add(1, std::move(distinctGrouper)) - .Seal() - .Build(); - } - } + .Seal() + .Build(); + return distinctGrouper; +} - // If no aggregation functions than add addional combiner - if (aggregatedColumns->ChildrenSize() == 0 && keyColumns->ChildrenSize() > 0 && !sessionUpdate) { - // Return key as-is - auto uniqCombineInit = ctx.Builder(node->Pos()) +TExprNode::TPtr TAggregateExpander::ReturnKeyAsIsForCombineInit(const TExprNode::TPtr& pickleTypeNode) +{ + return Ctx.Builder(Node->Pos()) .Lambda() .Param("key") .Param("item") .Callable("AsStruct") .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { ui32 pos = 0; - for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) { + for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) { auto listBuilder = parent.List(i); - listBuilder.Add(0, keyColumns->Child(i)); - if (keyColumns->ChildrenSize() > 1) { - if (needPickle) { + listBuilder.Add(0, KeyColumns->Child(i)); + if (KeyColumns->ChildrenSize() > 1) { + if (pickleTypeNode) { listBuilder .Callable(1, "Nth") .Callable(0, "Unpickle") @@ -897,7 +958,7 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T .Seal(); } } else { - if (needPickle) { + if (pickleTypeNode) { listBuilder.Callable(1, "Unpickle") .Add(0, pickleTypeNode) .Arg(1, "key") @@ -913,97 +974,172 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T .Seal() .Seal() .Build(); +} - auto uniqCombineUpdate = ctx.Builder(node->Pos()) +TExprNode::TPtr TAggregateExpander::GeneratePostAggregate(const TExprNode::TPtr& preAgg, const TExprNode::TPtr& keyExtractor) +{ + auto preprocessLambda = GeneratePreprocessLambda(keyExtractor); + auto condenseSwitch = GenerateCondenseSwitch(keyExtractor); + auto postAgg = Ctx.Builder(Node->Pos()) + .Callable("PartitionsByKeys") + .Add(0, std::move(preAgg)) + .Add(1, keyExtractor) + .Add(2, SortParams.Order) + .Add(3, SortParams.Key) + .Lambda(4) + .Param("stream") + .Apply(GetContextLambda()) + .With(0) + .Callable("Map") + .Callable(0, "Condense1") + .Apply(0, preprocessLambda) + .With(0, "stream") + .Seal() + .Add(1, GeneratePostAggregateInitPhase()) + .Add(2, condenseSwitch) + .Add(3, GeneratePostAggregateMergePhase()) + .Seal() + .Add(1, GeneratePostAggregateSavePhase()) + .Seal() + .Done() + .Seal() + .Seal() + .Seal().Build(); + + if (KeyColumns->ChildrenSize() == 0 && !HaveSessionSetting) { + return MakeSingleGroupRow(*Node, postAgg, Ctx); + } + + return postAgg; +} + +TExprNode::TPtr TAggregateExpander::GeneratePreprocessLambda(const TExprNode::TPtr& keyExtractor) +{ + TExprNode::TPtr preprocessLambda; + if (SessionWindowParams.Update) { + YQL_ENSURE(EffectiveCompact); + YQL_ENSURE(SessionWindowParams.Key); + YQL_ENSURE(SessionWindowParams.KeyType); + YQL_ENSURE(SessionWindowParams.Init); + + preprocessLambda = AddSessionParamsMemberLambda(Node->Pos(), SessionStartMemberName, "", keyExtractor, + SessionWindowParams.Key, SessionWindowParams.Init, SessionWindowParams.Update, Ctx); + } else { + YQL_ENSURE(!SessionWindowParams.Key); + preprocessLambda = MakeIdentityLambda(Node->Pos(), Ctx); + } + return preprocessLambda; +} + +TExprNode::TPtr TAggregateExpander::GenerateCondenseSwitch(const TExprNode::TPtr& keyExtractor) +{ + TExprNode::TPtr condenseSwitch; + if (SessionWindowParams.Update) { + YQL_ENSURE(EffectiveCompact); + YQL_ENSURE(SessionWindowParams.Key); + YQL_ENSURE(SessionWindowParams.KeyType); + YQL_ENSURE(SessionWindowParams.Init); + + condenseSwitch = Ctx.Builder(Node->Pos()) .Lambda() - .Param("key") .Param("item") .Param("state") - .Arg("state") + .Callable("Or") + .Callable(0, "AggrNotEquals") + .Apply(0, keyExtractor) + .With(0, "item") + .Seal() + .Apply(1, keyExtractor) + .With(0, "state") + .Seal() + .Seal() + .Callable(1, "AggrNotEquals") + .Callable(0, "Member") + .Arg(0, "item") + .Atom(1, SessionStartMemberName) + .Seal() + .Callable(1, "Member") + .Arg(0, "state") + .Atom(1, SessionStartMemberName) + .Seal() + .Seal() + .Seal() .Seal() .Build(); - - // Return state as-is - auto uniqCombineSave = ctx.Builder(node->Pos()) + } else { + YQL_ENSURE(!SessionWindowParams.Key); + condenseSwitch = Ctx.Builder(Node->Pos()) .Lambda() - .Param("key") + .Param("item") .Param("state") - .Callable("Just") - .Arg(0, "state") + .Callable("IsKeySwitch") + .Arg(0, "item") + .Arg(1, "state") + .Add(2, keyExtractor) + .Add(3, keyExtractor) .Seal() .Seal() .Build(); - - if (!groupInput) { - groupInput = list; - } - - groupInput = ctx.Builder(node->Pos()) - .Callable("CombineByKey") - .Add(0, std::move(groupInput)) - .Add(1, preMap) - .Add(2, keyExtractor) - .Add(3, std::move(uniqCombineInit)) - .Add(4, std::move(uniqCombineUpdate)) - .Add(5, std::move(uniqCombineSave)) - .Seal() - .Build(); } + return condenseSwitch; +} +TExprNode::TPtr TAggregateExpander::GeneratePostAggregateInitPhase() +{ ui32 index = 0U; - auto groupInit = ctx.Builder(node->Pos()) + return Ctx.Builder(Node->Pos()) .Lambda() .Param("item") .Callable("AsStruct") .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) { + for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) { parent .List(index++) - .Add(0, keyColumns->ChildPtr(i)) + .Add(0, KeyColumns->ChildPtr(i)) .Callable(1, "Member") .Arg(0, "item") - .Add(1, keyColumns->ChildPtr(i)) + .Add(1, KeyColumns->ChildPtr(i)) .Seal() .Seal(); } - if (sessionUpdate) { + if (SessionWindowParams.Update) { parent .List(index++) - .Atom(0, sessionStartMemberName) + .Atom(0, SessionStartMemberName) .Callable(1, "Member") .Arg(0, "item") - .Atom(1, sessionStartMemberName) + .Atom(1, SessionStartMemberName) .Seal() .Seal(); } return parent; }) .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - for (ui32 i = 0; i < initialColumnNames.size(); ++i) { - auto child = aggregatedColumns->Child(i); - auto trait = traits[i]; - if (!compact) { + for (ui32 i = 0; i < InitialColumnNames.size(); ++i) { + auto child = AggregatedColumns->Child(i); + auto trait = Traits[i]; + if (!EffectiveCompact) { auto loadLambda = trait->Child(4); - if (!distinctFields.empty()) { + if (!DistinctFields.empty()) { parent.List(index++) - .Add(0, initialColumnNames[i]) + .Add(0, InitialColumnNames[i]) .Callable(1, "Map") .Callable(0, "Member") .Arg(0, "item") - .Add(1, initialColumnNames[i]) + .Add(1, InitialColumnNames[i]) .Seal() .Add(1, loadLambda) .Seal() .Seal(); } else { parent.List(index++) - .Add(0, initialColumnNames[i]) + .Add(0, InitialColumnNames[i]) .Apply(1, *loadLambda) .With(0) .Callable("Member") .Arg(0, "item") - .Add(1, initialColumnNames[i]) + .Add(1, InitialColumnNames[i]) .Seal() .Done() .Seal(); @@ -1025,7 +1161,7 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T parent .Callable("CastStruct") .Arg(0, "item") - .Add(1, ExpandType(node->Pos(), *initLambda->Head().Head().GetTypeAnn(), ctx)) + .Add(1, ExpandType(Node->Pos(), *initLambda->Head().Head().GetTypeAnn(), Ctx)) .Seal(); } @@ -1049,16 +1185,16 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T }; if (distinctField) { - const bool isFirst = *distinct2Columns[distinctField->Content()].begin() == i; + const bool isFirst = *Distinct2Columns[distinctField->Content()].begin() == i; if (isFirst) { parent.List(index++) - .Add(0, initialColumnNames[i]) + .Add(0, InitialColumnNames[i]) .List(1) .Callable(0, "NamedApply") - .Add(0, udfSetCreate[distinctField->Content()]) + .Add(0, UdfSetCreate[distinctField->Content()]) .List(1) .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - if (!distinctFieldNeedsPickle[distinctField->Content()]) { + if (!DistinctFieldNeedsPickle[distinctField->Content()]) { parent.Callable(0, "Member") .Arg(0, "item") .Add(1, distinctField) @@ -1090,13 +1226,13 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T .Seal(); } else { parent.List(index++) - .Add(0, initialColumnNames[i]) + .Add(0, InitialColumnNames[i]) .Do(initApply) .Seal(); } } else { parent.List(index++) - .Add(0, initialColumnNames[i]) + .Add(0, InitialColumnNames[i]) .Do(initApply) .Seal(); } @@ -1107,72 +1243,205 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T .Seal() .Seal() .Build(); +} + +TExprNode::TPtr TAggregateExpander::GeneratePostAggregateSavePhase() +{ + ui32 index = 0U; + return Ctx.Builder(Node->Pos()) + .Lambda() + .Param("state") + .Callable("AsStruct") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) { + if (KeyColumns->Child(i)->Content() == SessionStartMemberName) { + continue; + } + parent + .List(index++) + .Add(0, KeyColumns->ChildPtr(i)) + .Callable(1, "Member") + .Arg(0, "state") + .Add(1, KeyColumns->ChildPtr(i)) + .Seal() + .Seal(); + } + + if (SessionOutputColumn) { + parent + .List(index++) + .Atom(0, *SessionOutputColumn) + .Callable(1, "Member") + .Arg(0, "state") + .Atom(1, SessionStartMemberName) + .Seal() + .Seal(); + } + return parent; + }) + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + for (ui32 i = 0; i < InitialColumnNames.size(); ++i) { + auto child = AggregatedColumns->Child(i); + auto trait = Traits[i]; + auto finishLambda = trait->Child(6); + + if (!EffectiveCompact && !DistinctFields.empty()) { + if (child->Head().IsAtom()) { + parent.List(index++) + .Add(0, FinalColumnNames[i]) + .Callable(1, "Unwrap") + .Callable(0, "Map") + .Callable(0, "Member") + .Arg(0, "state") + .Add(1, InitialColumnNames[i]) + .Seal() + .Add(1, finishLambda) + .Seal() + .Seal() + .Seal(); + } else { + const auto& multiFields = child->Child(0); + for (ui32 field = 0; field < multiFields->ChildrenSize(); ++field) { + parent.List(index++) + .Atom(0, multiFields->Child(field)->Content()) + .Callable(1, "Nth") + .Callable(0, "Unwrap") + .Callable(0, "Map") + .Callable(0, "Member") + .Arg(0, "state") + .Add(1, InitialColumnNames[i]) + .Seal() + .Add(1, finishLambda) + .Seal() + .Seal() + .Atom(1, ToString(field), TNodeFlags::Default) + .Seal() + .Seal(); + } + } + } else { + auto distinctField = (child->ChildrenSize() == 3) ? child->Child(2) : nullptr; + auto stateExtractor = [&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + const bool isFirst = distinctField ? (*Distinct2Columns[distinctField->Content()].begin() == i) : false; + if (distinctField && isFirst) { + parent.Callable("Nth") + .Callable(0, "Member") + .Arg(0, "state") + .Add(1, InitialColumnNames[i]) + .Seal() + .Atom(1, "1", TNodeFlags::Default) + .Seal(); + } else { + parent.Callable("Member") + .Arg(0, "state") + .Add(1, InitialColumnNames[i]) + .Seal(); + } + + return parent; + }; + + if (child->Head().IsAtom()) { + parent.List(index++) + .Add(0, FinalColumnNames[i]) + .Apply(1, *finishLambda) + .With(0) + .Do(stateExtractor) + .Done() + .Seal() + .Seal(); + } else { + const auto& multiFields = child->Head(); + for (ui32 field = 0; field < multiFields.ChildrenSize(); ++field) { + parent.List(index++) + .Atom(0, multiFields.Child(field)->Content()) + .Callable(1, "Nth") + .Apply(0, *finishLambda) + .With(0) + .Do(stateExtractor) + .Done() + .Seal() + .Atom(1, ToString(field), TNodeFlags::Default) + .Seal() + .Seal(); + } + } + } + } + return parent; + }) + .Seal() + .Seal() + .Build(); +} - index = 0; - auto groupMerge = ctx.Builder(node->Pos()) +TExprNode::TPtr TAggregateExpander::GeneratePostAggregateMergePhase() +{ + ui32 index = 0U; + return Ctx.Builder(Node->Pos()) .Lambda() .Param("item") .Param("state") .Callable("AsStruct") .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) { + for (ui32 i = 0; i < KeyColumns->ChildrenSize(); ++i) { parent .List(index++) - .Add(0, keyColumns->ChildPtr(i)) + .Add(0, KeyColumns->ChildPtr(i)) .Callable(1, "Member") .Arg(0, "state") - .Add(1, keyColumns->ChildPtr(i)) + .Add(1, KeyColumns->ChildPtr(i)) .Seal() .Seal(); } - if (sessionUpdate) { + if (SessionWindowParams.Update) { parent .List(index++) - .Atom(0, sessionStartMemberName) + .Atom(0, SessionStartMemberName) .Callable(1, "Member") .Arg(0, "state") - .Atom(1, sessionStartMemberName) + .Atom(1, SessionStartMemberName) .Seal() .Seal(); } return parent; }) .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - for (ui32 i = 0; i < initialColumnNames.size(); ++i) { - auto child = aggregatedColumns->Child(i); - auto trait = traits[i]; - if (!compact) { + for (ui32 i = 0; i < InitialColumnNames.size(); ++i) { + auto child = AggregatedColumns->Child(i); + auto trait = Traits[i]; + if (!EffectiveCompact) { auto loadLambda = trait->Child(4); auto mergeLambda = trait->Child(5); - if (!distinctFields.empty()) { + if (!DistinctFields.empty()) { parent.List(index++) - .Add(0, initialColumnNames[i]) + .Add(0, InitialColumnNames[i]) .Callable(1, "OptionalReduce") .Callable(0, "Map") .Callable(0, "Member") .Arg(0, "item") - .Add(1, initialColumnNames[i]) + .Add(1, InitialColumnNames[i]) .Seal() .Add(1, loadLambda) .Seal() .Callable(1, "Member") .Arg(0, "state") - .Add(1, initialColumnNames[i]) + .Add(1, InitialColumnNames[i]) .Seal() .Add(2, mergeLambda) .Seal() .Seal(); } else { parent.List(index++) - .Add(0, initialColumnNames[i]) + .Add(0, InitialColumnNames[i]) .Apply(1, *mergeLambda) .With(0) .Apply(*loadLambda) .With(0) .Callable("Member") .Arg(0, "item") - .Add(1, initialColumnNames[i]) + .Add(1, InitialColumnNames[i]) .Seal() .Done() .Seal() @@ -1180,7 +1449,7 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T .With(1) .Callable("Member") .Arg(0, "state") - .Add(1, initialColumnNames[i]) + .Add(1, InitialColumnNames[i]) .Seal() .Done() .Seal() @@ -1189,7 +1458,7 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T } else { auto updateLambda = trait->Child(2); auto distinctField = (child->ChildrenSize() == 3) ? child->Child(2) : nullptr; - const bool isFirst = distinctField ? (*distinct2Columns[distinctField->Content()].begin() == i) : false; + const bool isFirst = distinctField ? (*Distinct2Columns[distinctField->Content()].begin() == i) : false; auto updateApply = [&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { parent.Apply(1, *updateLambda) .With(0) @@ -1204,7 +1473,7 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T parent .Callable("CastStruct") .Arg(0, "item") - .Add(1, ExpandType(node->Pos(), *updateLambda->Head().Head().GetTypeAnn(), ctx)) + .Add(1, ExpandType(Node->Pos(), *updateLambda->Head().Head().GetTypeAnn(), Ctx)) .Seal(); } @@ -1217,14 +1486,14 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T parent.Callable("Nth") .Callable(0, "Member") .Arg(0, "state") - .Add(1, initialColumnNames[i]) + .Add(1, InitialColumnNames[i]) .Seal() .Atom(1, "1", TNodeFlags::Default) .Seal(); } else { parent.Callable("Member") .Arg(0, "state") - .Add(1, initialColumnNames[i]) + .Add(1, InitialColumnNames[i]) .Seal(); } @@ -1249,21 +1518,21 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T }; if (distinctField) { - auto distinctIndex = *distinct2Columns[distinctField->Content()].begin(); + auto distinctIndex = *Distinct2Columns[distinctField->Content()].begin(); ui32 newValueIndex = 0; auto newValue = [&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { parent.Callable(newValueIndex, "NamedApply") - .Add(0, udfAddValue[distinctField->Content()]) + .Add(0, UdfAddValue[distinctField->Content()]) .List(1) .Callable(0, "Nth") .Callable(0, "Member") .Arg(0, "state") - .Add(1, initialColumnNames[distinctIndex]) + .Add(1, InitialColumnNames[distinctIndex]) .Seal() .Atom(1, "0", TNodeFlags::Default) .Seal() .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - if (!distinctFieldNeedsPickle[distinctField->Content()]) { + if (!DistinctFieldNeedsPickle[distinctField->Content()]) { parent.Callable(1, "Member") .Arg(0, "item") .Add(1, distinctField) @@ -1287,23 +1556,23 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T }; parent.List(index++) - .Add(0, initialColumnNames[i]) + .Add(0, InitialColumnNames[i]) .Callable(1, "If") .Callable(0, "NamedApply") - .Add(0, udfWasChanged[distinctField->Content()]) + .Add(0, UdfWasChanged[distinctField->Content()]) .List(1) .Callable(0, "NamedApply") - .Add(0, udfAddValue[distinctField->Content()]) + .Add(0, UdfAddValue[distinctField->Content()]) .List(1) .Callable(0, "Nth") .Callable(0, "Member") .Arg(0, "state") - .Add(1, initialColumnNames[distinctIndex]) + .Add(1, InitialColumnNames[distinctIndex]) .Seal() .Atom(1, "0", TNodeFlags::Default) .Seal() .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - if (!distinctFieldNeedsPickle[distinctField->Content()]) { + if (!DistinctFieldNeedsPickle[distinctField->Content()]) { parent.Callable(1, "Member") .Arg(0, "item") .Add(1, distinctField) @@ -1339,13 +1608,13 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T }) .Callable(2, "Member") .Arg(0, "state") - .Add(1, initialColumnNames[i]) + .Add(1, InitialColumnNames[i]) .Seal() .Seal() .Seal(); } else { parent.List(index++) - .Add(0, initialColumnNames[i]) + .Add(0, InitialColumnNames[i]) .Do(updateApply) .Seal(); } @@ -1356,223 +1625,6 @@ TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, T .Seal() .Seal() .Build(); - - index = 0U; - auto groupSave = ctx.Builder(node->Pos()) - .Lambda() - .Param("state") - .Callable("AsStruct") - .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - for (ui32 i = 0; i < keyColumns->ChildrenSize(); ++i) { - if (keyColumns->Child(i)->Content() == sessionStartMemberName) { - continue; - } - parent - .List(index++) - .Add(0, keyColumns->ChildPtr(i)) - .Callable(1, "Member") - .Arg(0, "state") - .Add(1, keyColumns->ChildPtr(i)) - .Seal() - .Seal(); - } - - if (sessionOutputColumn) { - parent - .List(index++) - .Atom(0, *sessionOutputColumn) - .Callable(1, "Member") - .Arg(0, "state") - .Atom(1, sessionStartMemberName) - .Seal() - .Seal(); - } - return parent; - }) - .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - for (ui32 i = 0; i < initialColumnNames.size(); ++i) { - auto child = aggregatedColumns->Child(i); - auto trait = traits[i]; - auto finishLambda = trait->Child(6); - - if (!compact && !distinctFields.empty()) { - if (child->Head().IsAtom()) { - parent.List(index++) - .Add(0, finalColumnNames[i]) - .Callable(1, "Unwrap") - .Callable(0, "Map") - .Callable(0, "Member") - .Arg(0, "state") - .Add(1, initialColumnNames[i]) - .Seal() - .Add(1, finishLambda) - .Seal() - .Seal() - .Seal(); - } else { - const auto& multiFields = child->Child(0); - for (ui32 field = 0; field < multiFields->ChildrenSize(); ++field) { - parent.List(index++) - .Atom(0, multiFields->Child(field)->Content()) - .Callable(1, "Nth") - .Callable(0, "Unwrap") - .Callable(0, "Map") - .Callable(0, "Member") - .Arg(0, "state") - .Add(1, initialColumnNames[i]) - .Seal() - .Add(1, finishLambda) - .Seal() - .Seal() - .Atom(1, ToString(field), TNodeFlags::Default) - .Seal() - .Seal(); - } - } - } else { - auto distinctField = (child->ChildrenSize() == 3) ? child->Child(2) : nullptr; - auto stateExtractor = [&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { - const bool isFirst = distinctField ? (*distinct2Columns[distinctField->Content()].begin() == i) : false; - if (distinctField && isFirst) { - parent.Callable("Nth") - .Callable(0, "Member") - .Arg(0, "state") - .Add(1, initialColumnNames[i]) - .Seal() - .Atom(1, "1", TNodeFlags::Default) - .Seal(); - } else { - parent.Callable("Member") - .Arg(0, "state") - .Add(1, initialColumnNames[i]) - .Seal(); - } - - return parent; - }; - - if (child->Head().IsAtom()) { - parent.List(index++) - .Add(0, finalColumnNames[i]) - .Apply(1, *finishLambda) - .With(0) - .Do(stateExtractor) - .Done() - .Seal() - .Seal(); - } else { - const auto& multiFields = child->Head(); - for (ui32 field = 0; field < multiFields.ChildrenSize(); ++field) { - parent.List(index++) - .Atom(0, multiFields.Child(field)->Content()) - .Callable(1, "Nth") - .Apply(0, *finishLambda) - .With(0) - .Do(stateExtractor) - .Done() - .Seal() - .Atom(1, ToString(field), TNodeFlags::Default) - .Seal() - .Seal(); - } - } - } - } - return parent; - }) - .Seal() - .Seal() - .Build(); - - if (compact || !groupInput) { - groupInput = std::move(list); - } - - TExprNode::TPtr preprocessLambda; - TExprNode::TPtr condenseSwitch; - if (sessionUpdate) { - YQL_ENSURE(compact); - YQL_ENSURE(sessionKey); - YQL_ENSURE(sessionKeyType); - YQL_ENSURE(sessionInit); - - preprocessLambda = - AddSessionParamsMemberLambda(node->Pos(), sessionStartMemberName, "", keyExtractor, sessionKey, sessionInit, sessionUpdate, ctx); - - condenseSwitch = ctx.Builder(node->Pos()) - .Lambda() - .Param("item") - .Param("state") - .Callable("Or") - .Callable(0, "AggrNotEquals") - .Apply(0, keyExtractor) - .With(0, "item") - .Seal() - .Apply(1, keyExtractor) - .With(0, "state") - .Seal() - .Seal() - .Callable(1, "AggrNotEquals") - .Callable(0, "Member") - .Arg(0, "item") - .Atom(1, sessionStartMemberName) - .Seal() - .Callable(1, "Member") - .Arg(0, "state") - .Atom(1, sessionStartMemberName) - .Seal() - .Seal() - .Seal() - .Seal() - .Build(); - } else { - YQL_ENSURE(!sessionKey); - preprocessLambda = MakeIdentityLambda(node->Pos(), ctx); - condenseSwitch = ctx.Builder(node->Pos()) - .Lambda() - .Param("item") - .Param("state") - .Callable("IsKeySwitch") - .Arg(0, "item") - .Arg(1, "state") - .Add(2, keyExtractor) - .Add(3, keyExtractor) - .Seal() - .Seal() - .Build(); - } - - auto grouper = ctx.Builder(node->Pos()) - .Callable("PartitionsByKeys") - .Add(0, std::move(groupInput)) - .Add(1, keyExtractor) - .Add(2, sortOrder) - .Add(3, sortKey) - .Lambda(4) - .Param("stream") - .Apply(contextLambda) - .With(0) - .Callable("Map") - .Callable(0, "Condense1") - .Apply(0, preprocessLambda) - .With(0, "stream") - .Seal() - .Add(1, std::move(groupInit)) - .Add(2, condenseSwitch) - .Add(3, std::move(groupMerge)) - .Seal() - .Add(1, std::move(groupSave)) - .Seal() - .Done() - .Seal() - .Seal() - .Seal().Build(); - - if (keyColumns->ChildrenSize() == 0 && !sessionSetting) { - return MakeSingleGroupRow(*node, grouper, ctx); - } - - return grouper; } -} +} // namespace NYql
\ No newline at end of file diff --git a/ydb/library/yql/core/yql_aggregate_expander.h b/ydb/library/yql/core/yql_aggregate_expander.h new file mode 100644 index 00000000000..552bdee2229 --- /dev/null +++ b/ydb/library/yql/core/yql_aggregate_expander.h @@ -0,0 +1,113 @@ +#pragma once +#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> +#include <ydb/library/yql/core/yql_opt_utils.h> + +#include "yql_type_annotation.h" + +namespace NYql { + +class TAggregateExpander { +public: + TAggregateExpander(bool allowPickle, const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, bool forceCompact = false, bool compactForDistinct = false) + : Node(node) + , Ctx(ctx) + , TypesCtx(typesCtx) + , AllowPickle(allowPickle) + , ForceCompact(forceCompact) + , CompactForDistinct(compactForDistinct) + , AggregatedColumns(nullptr) + , VoidNode(ctx.NewCallable(node->Pos(), "Void", {})) + , HaveDistinct(false) + , EffectiveCompact(false) + , HaveSessionSetting(false) + , OriginalRowType(nullptr) + , RowType(nullptr) + { + PreMap = Ctx.Builder(node->Pos()) + .Lambda() + .Param("premap") + .Callable("Just").Arg(0, "premap").Seal() + .Seal().Build(); + SortParams = { + .Key = VoidNode, + .Order = VoidNode + }; + } + + TExprNode::TPtr ExpandAggregate(); + +private: + TExprNode::TPtr ExpandAggApply(const TExprNode::TPtr& node); + bool CollectTraits(); + TExprNode::TPtr RebuildAggregate(); + TExprNode::TPtr GetContextLambda(); + void ProcessSessionSetting(TExprNode::TPtr sessionSetting); + TVector<const TTypeAnnotationNode*> GetKeyItemTypes(); + bool IsNeedPickle(const TVector<const TTypeAnnotationNode*>& keyItemTypes); + TExprNode::TPtr GetKeyExtractor(bool needPickle); + void CollectColumnsSpecs(); + + // Partial aggregate generation + TExprNode::TPtr GeneratePartialAggregate(const TExprNode::TPtr keyExtractor, const TVector<const TTypeAnnotationNode*>& keyItemTypes, bool needPickle); + TExprNode::TPtr GeneratePartialAggregateForNonDistinct(const TExprNode::TPtr& keyExtractor, const TExprNode::TPtr& pickleTypeNode); + + TExprNode::TPtr GenerateDistinctGrouper(const TExprNode::TPtr distinctField, + const TVector<const TTypeAnnotationNode*>& keyItemTypes, bool needDistinctPickle); + + TExprNode::TPtr ReturnKeyAsIsForCombineInit(const TExprNode::TPtr& pickleTypeNode); + + // Post aggregate + TExprNode::TPtr GeneratePostAggregate(const TExprNode::TPtr& preAgg, const TExprNode::TPtr& keyExtractor); + TExprNode::TPtr GeneratePreprocessLambda(const TExprNode::TPtr& keyExtractor); + TExprNode::TPtr GenerateCondenseSwitch(const TExprNode::TPtr& keyExtractor); + TExprNode::TPtr GeneratePostAggregateInitPhase(); + TExprNode::TPtr GeneratePostAggregateSavePhase(); + TExprNode::TPtr GeneratePostAggregateMergePhase(); + +private: + static constexpr TStringBuf SessionStartMemberName = "_yql_group_session_start"; + + const TExprNode::TPtr Node; + TExprContext& Ctx; + TTypeAnnotationContext& TypesCtx; + bool AllowPickle; + bool ForceCompact; + bool CompactForDistinct; + + TSessionWindowParams SessionWindowParams; + TExprNode::TPtr AggList; + TExprNode::TListType Traits; + TExprNode::TPtr KeyColumns; + TExprNode::TPtr AggregatedColumns; + const TExprNode::TPtr VoidNode; + TMaybe<TStringBuf> SessionOutputColumn; + TSortParams SortParams; + bool HaveDistinct; + bool EffectiveCompact; + bool HaveSessionSetting; + const TStructExprType* OriginalRowType; + const TStructExprType* RowType; + TVector<const TItemExprType*> RowItems; + TExprNode::TPtr PreMap; + + TExprNode::TListType InitialColumnNames; + TExprNode::TListType FinalColumnNames; + TExprNode::TListType DistinctFields; + TExprNode::TListType NothingStates; + + using TIdxSet = std::set<ui32>; + std::unordered_map<std::string_view, TIdxSet> Distinct2Columns; + TIdxSet NonDistinctColumns; + + std::unordered_map<std::string_view, bool> DistinctFieldNeedsPickle; + std::unordered_map<std::string_view, TExprNode::TPtr> UdfSetCreate; + std::unordered_map<std::string_view, TExprNode::TPtr> UdfAddValue; + std::unordered_map<std::string_view, TExprNode::TPtr> UdfWasChanged; +}; + +inline TExprNode::TPtr ExpandAggregatePeephole(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) { + TAggregateExpander aggExpander(false, node, ctx, typesCtx, true); + return aggExpander.ExpandAggregate(); +} + +}
\ No newline at end of file diff --git a/ydb/library/yql/core/yql_opt_aggregate.h b/ydb/library/yql/core/yql_opt_aggregate.h deleted file mode 100644 index 7b1c0b23d36..00000000000 --- a/ydb/library/yql/core/yql_opt_aggregate.h +++ /dev/null @@ -1,14 +0,0 @@ -#pragma once -#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> - -#include "yql_type_annotation.h" - -namespace NYql { - -TExprNode::TPtr ExpandAggregate(bool allowPickle, const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, bool forceCompact = false, bool compactForDistinct = false); -inline TExprNode::TPtr ExpandAggregatePeephole(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) { - return ExpandAggregate(false, node, ctx, typesCtx, true); -} - -} - diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp index 642a4a996fd..6acb1359668 100644 --- a/ydb/library/yql/core/yql_opt_utils.cpp +++ b/ydb/library/yql/core/yql_opt_utils.cpp @@ -1053,6 +1053,11 @@ TExprNode::TPtr ApplyWithCastStructForFirstArg(const TExprNode::TPtr& node, cons } +void ExtractSortKeyAndOrder(TPositionHandle pos, const TExprNode::TPtr& sortTraitsNode, TSortParams& sortParams, TExprContext& ctx) +{ + ExtractSortKeyAndOrder(pos, sortTraitsNode, sortParams.Key, sortParams.Order, ctx); +} + void ExtractSortKeyAndOrder(TPositionHandle pos, const TExprNode::TPtr& sortTraitsNode, TExprNode::TPtr& sortKey, TExprNode::TPtr& sortOrder, TExprContext& ctx) { if (sortTraitsNode->IsCallable("SortTraits")) { TCoSortTraits sortTraits(sortTraitsNode); @@ -1066,6 +1071,13 @@ void ExtractSortKeyAndOrder(TPositionHandle pos, const TExprNode::TPtr& sortTrai } } +void ExtractSessionWindowParams(TPositionHandle pos, TSessionWindowParams& sessionParams, TExprContext& ctx) +{ + ExtractSessionWindowParams(pos, sessionParams.Traits, sessionParams.Key, + sessionParams.KeyType, sessionParams.ParamsType, sessionParams.SortTraits, sessionParams.Init, + sessionParams.Update, ctx); +} + void ExtractSessionWindowParams(TPositionHandle pos, const TExprNode::TPtr& sessionTraits, TExprNode::TPtr& sessionKey, const TTypeAnnotationNode*& sessionKeyType, const TTypeAnnotationNode*& sessionParamsType, TExprNode::TPtr& sessionSortTraits, TExprNode::TPtr& sessionInit, TExprNode::TPtr& sessionUpdate, TExprContext& ctx) diff --git a/ydb/library/yql/core/yql_opt_utils.h b/ydb/library/yql/core/yql_opt_utils.h index 9ed7d1483f2..956bac3b4a9 100644 --- a/ydb/library/yql/core/yql_opt_utils.h +++ b/ydb/library/yql/core/yql_opt_utils.h @@ -2,6 +2,7 @@ #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> #include <ydb/library/yql/core/yql_graph_transformer.h> +#include <ydb/library/yql/core/yql_opt_window.h> #include <util/generic/hash_set.h> #include <util/generic/strbuf.h> @@ -91,6 +92,9 @@ void ExtractSessionWindowParams(TPositionHandle pos, const TExprNode::TPtr& sess const TTypeAnnotationNode*& sessionKeyType, const TTypeAnnotationNode*& sessionParamsType, TExprNode::TPtr& sessionSortTraits, TExprNode::TPtr& sessionInit, TExprNode::TPtr& sessionUpdate, TExprContext& ctx); +void ExtractSortKeyAndOrder(TPositionHandle pos, const TExprNode::TPtr& sortTraitsNode, TSortParams& sortParams, TExprContext& ctx); +void ExtractSessionWindowParams(TPositionHandle pos, TSessionWindowParams& sessionParams, TExprContext& ctx); + TExprNode::TPtr BuildKeySelector(TPositionHandle pos, const TStructExprType& rowType, const TExprNode::TPtr& keyColumns, TExprContext& ctx); template <bool Cannonize, bool EnableNewOptimizers = true> diff --git a/ydb/library/yql/core/yql_opt_window.cpp b/ydb/library/yql/core/yql_opt_window.cpp index fb9c40b0f89..6b2c8819934 100644 --- a/ydb/library/yql/core/yql_opt_window.cpp +++ b/ydb/library/yql/core/yql_opt_window.cpp @@ -3209,6 +3209,14 @@ TExprNode::TPtr ZipWithSessionParamsLambda(TPositionHandle pos, const TExprNode: } TExprNode::TPtr AddSessionParamsMemberLambda(TPositionHandle pos, + TStringBuf sessionStartMemberName, const TExprNode::TPtr& partitionKeySelector, + const TSessionWindowParams& sessionWindowParams, TExprContext& ctx) +{ + return AddSessionParamsMemberLambda(pos, sessionStartMemberName, "", partitionKeySelector, + sessionWindowParams.Key, sessionWindowParams.Init, sessionWindowParams.Update, ctx); +} + +TExprNode::TPtr AddSessionParamsMemberLambda(TPositionHandle pos, TStringBuf sessionStartMemberName, TStringBuf sessionParamsMemberName, const TExprNode::TPtr& partitionKeySelector, const TExprNode::TPtr& sessionKeySelector, const TExprNode::TPtr& sessionInit, @@ -3275,4 +3283,15 @@ TExprNode::TPtr AddSessionParamsMemberLambda(TPositionHandle pos, .Build(); } +void TSessionWindowParams::Reset() +{ + Traits = {}; + Key = {}; + KeyType = nullptr; + ParamsType = nullptr; + Init = {}; + Update = {}; + SortTraits = {}; +} + } diff --git a/ydb/library/yql/core/yql_opt_window.h b/ydb/library/yql/core/yql_opt_window.h index 6e51a6c794a..0e909f1d9de 100644 --- a/ydb/library/yql/core/yql_opt_window.h +++ b/ydb/library/yql/core/yql_opt_window.h @@ -45,6 +45,33 @@ private: bool Compact = false; }; +struct TSessionWindowParams { + TSessionWindowParams() + : Traits(nullptr) + , Key(nullptr) + , KeyType(nullptr) + , ParamsType(nullptr) + , Init(nullptr) + , Update(nullptr) + , SortTraits(nullptr) + {} + + void Reset(); + + TExprNode::TPtr Traits; + TExprNode::TPtr Key; + const TTypeAnnotationNode* KeyType; + const TTypeAnnotationNode* ParamsType; + TExprNode::TPtr Init; + TExprNode::TPtr Update; + TExprNode::TPtr SortTraits; +}; + +struct TSortParams { + TExprNode::TPtr Key; + TExprNode::TPtr Order; +}; + // Lambda(input: Stream/List<T>) -> Stream/List<Tuple<T, SessionKey, SessionState, ....>> // input is assumed to be partitioned by partitionKeySelector TExprNode::TPtr ZipWithSessionParamsLambda(TPositionHandle pos, const TExprNode::TPtr& partitionKeySelector, @@ -57,4 +84,10 @@ TExprNode::TPtr AddSessionParamsMemberLambda(TPositionHandle pos, const TExprNode::TPtr& partitionKeySelector, const TExprNode::TPtr& sessionKeySelector, const TExprNode::TPtr& sessionInit, const TExprNode::TPtr& sessionUpdate, TExprContext& ctx); + +// input should be List/Stream of structs + see above +TExprNode::TPtr AddSessionParamsMemberLambda(TPositionHandle pos, + TStringBuf sessionStartMemberName, const TExprNode::TPtr& partitionKeySelector, + const TSessionWindowParams& sessionWindowParams, TExprContext& ctx); + } diff --git a/ydb/library/yql/dq/opt/dq_opt_log.cpp b/ydb/library/yql/dq/opt/dq_opt_log.cpp index 10d765e7683..d639765e55b 100644 --- a/ydb/library/yql/dq/opt/dq_opt_log.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_log.cpp @@ -2,8 +2,8 @@ #include "dq_opt.h" +#include <ydb/library/yql/core/yql_aggregate_expander.h> #include <ydb/library/yql/core/yql_expr_optimize.h> -#include <ydb/library/yql/core/yql_opt_aggregate.h> #include <ydb/library/yql/core/yql_opt_window.h> #include <ydb/library/yql/core/yql_opt_utils.h> #include <ydb/library/yql/core/yql_type_annotation.h> @@ -18,7 +18,8 @@ TExprBase DqRewriteAggregate(TExprBase node, TExprContext& ctx, TTypeAnnotationC return node; } - auto result = ExpandAggregate(true, node.Ptr(), ctx, typesCtx, false, compactForDistinct); + TAggregateExpander aggExpander(true, node.Ptr(), ctx, typesCtx, false, compactForDistinct); + auto result = aggExpander.ExpandAggregate(); YQL_ENSURE(result); return TExprBase(result); |