aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/core/yql_aggregate_expander.cpp
diff options
context:
space:
mode:
authoraneporada <aneporada@yandex-team.com>2024-11-12 14:35:35 +0300
committeraneporada <aneporada@yandex-team.com>2024-11-12 14:49:16 +0300
commit0f8074b32931e95bc77e99fc0cc06079449a2f03 (patch)
treeaddc4b14e22412b769270d75018927719ee8e564 /yql/essentials/core/yql_aggregate_expander.cpp
parent39b2d17a032a25ea4334f40208471552f1e3561b (diff)
downloadydb-0f8074b32931e95bc77e99fc0cc06079449a2f03.tar.gz
Merge PR #10707: Fixed: Make block combine use stream instead of flow
commit_hash:946462d1ea7e74758c7d6f86cc30cd674dc2195e
Diffstat (limited to 'yql/essentials/core/yql_aggregate_expander.cpp')
-rw-r--r--yql/essentials/core/yql_aggregate_expander.cpp57
1 files changed, 37 insertions, 20 deletions
diff --git a/yql/essentials/core/yql_aggregate_expander.cpp b/yql/essentials/core/yql_aggregate_expander.cpp
index c7068727ef..e268d022e7 100644
--- a/yql/essentials/core/yql_aggregate_expander.cpp
+++ b/yql/essentials/core/yql_aggregate_expander.cpp
@@ -699,7 +699,8 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {
} else {
stream = AggList;
}
- auto blocks = MakeInputBlocks(stream, keyIdxs, outputColumns, aggs, false, false);
+
+ TExprNode::TPtr blocks = MakeInputBlocks(stream, keyIdxs, outputColumns, aggs, false, false);
if (!blocks) {
return nullptr;
}
@@ -708,22 +709,30 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {
if (hashed) {
aggWideFlow = Ctx.Builder(Node->Pos())
.Callable("WideFromBlocks")
- .Callable(0, "BlockCombineHashed")
- .Add(0, blocks)
- .Callable(1, "Void")
+ .Callable(0, "ToFlow")
+ .Callable(0, "BlockCombineHashed")
+ .Callable(0, "FromFlow")
+ .Add(0, blocks)
+ .Seal()
+ .Callable(1, "Void")
+ .Seal()
+ .Add(2, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
+ .Add(3, Ctx.NewList(Node->Pos(), std::move(aggs)))
.Seal()
- .Add(2, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
- .Add(3, Ctx.NewList(Node->Pos(), std::move(aggs)))
.Seal()
.Seal()
.Build();
} else {
aggWideFlow = Ctx.Builder(Node->Pos())
- .Callable("BlockCombineAll")
- .Add(0, blocks)
- .Callable(1, "Void")
+ .Callable("ToFlow")
+ .Callable(0, "BlockCombineAll")
+ .Callable(0, "FromFlow")
+ .Add(0, blocks)
+ .Seal()
+ .Callable(1, "Void")
+ .Seal()
+ .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
.Seal()
- .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
.Seal()
.Build();
}
@@ -2891,10 +2900,14 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() {
TExprNode::TPtr aggBlocks;
if (!isMany) {
aggBlocks = Ctx.Builder(Node->Pos())
- .Callable("BlockMergeFinalizeHashed")
- .Add(0, blocks)
- .Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
- .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
+ .Callable("ToFlow")
+ .Callable(0, "BlockMergeFinalizeHashed")
+ .Callable(0, "FromFlow")
+ .Add(0, blocks)
+ .Seal()
+ .Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
+ .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
+ .Seal()
.Seal()
.Build();
} else {
@@ -2902,12 +2915,16 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() {
YQL_ENSURE(manyStreamsSetting, "Missing many_streams setting");
aggBlocks = Ctx.Builder(Node->Pos())
- .Callable("BlockMergeManyFinalizeHashed")
- .Add(0, blocks)
- .Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
- .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
- .Atom(3, ToString(streamIdxColumn))
- .Add(4, manyStreamsSetting->TailPtr())
+ .Callable("ToFlow")
+ .Callable(0, "BlockMergeManyFinalizeHashed")
+ .Callable(0, "FromFlow")
+ .Add(0, blocks)
+ .Seal()
+ .Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
+ .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
+ .Atom(3, ToString(streamIdxColumn))
+ .Add(4, manyStreamsSetting->TailPtr())
+ .Seal()
.Seal()
.Build();
}