diff options
author | aneporada <aneporada@yandex-team.com> | 2024-11-12 14:35:35 +0300 |
---|---|---|
committer | aneporada <aneporada@yandex-team.com> | 2024-11-12 14:49:16 +0300 |
commit | 0f8074b32931e95bc77e99fc0cc06079449a2f03 (patch) | |
tree | addc4b14e22412b769270d75018927719ee8e564 /yql/essentials/core/yql_aggregate_expander.cpp | |
parent | 39b2d17a032a25ea4334f40208471552f1e3561b (diff) | |
download | ydb-0f8074b32931e95bc77e99fc0cc06079449a2f03.tar.gz |
Merge PR #10707: Fixed: Make block combine use stream instead of flow
commit_hash:946462d1ea7e74758c7d6f86cc30cd674dc2195e
Diffstat (limited to 'yql/essentials/core/yql_aggregate_expander.cpp')
-rw-r--r-- | yql/essentials/core/yql_aggregate_expander.cpp | 57 |
1 files changed, 37 insertions, 20 deletions
diff --git a/yql/essentials/core/yql_aggregate_expander.cpp b/yql/essentials/core/yql_aggregate_expander.cpp index c7068727ef..e268d022e7 100644 --- a/yql/essentials/core/yql_aggregate_expander.cpp +++ b/yql/essentials/core/yql_aggregate_expander.cpp @@ -699,7 +699,8 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() { } else { stream = AggList; } - auto blocks = MakeInputBlocks(stream, keyIdxs, outputColumns, aggs, false, false); + + TExprNode::TPtr blocks = MakeInputBlocks(stream, keyIdxs, outputColumns, aggs, false, false); if (!blocks) { return nullptr; } @@ -708,22 +709,30 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() { if (hashed) { aggWideFlow = Ctx.Builder(Node->Pos()) .Callable("WideFromBlocks") - .Callable(0, "BlockCombineHashed") - .Add(0, blocks) - .Callable(1, "Void") + .Callable(0, "ToFlow") + .Callable(0, "BlockCombineHashed") + .Callable(0, "FromFlow") + .Add(0, blocks) + .Seal() + .Callable(1, "Void") + .Seal() + .Add(2, Ctx.NewList(Node->Pos(), std::move(keyIdxs))) + .Add(3, Ctx.NewList(Node->Pos(), std::move(aggs))) .Seal() - .Add(2, Ctx.NewList(Node->Pos(), std::move(keyIdxs))) - .Add(3, Ctx.NewList(Node->Pos(), std::move(aggs))) .Seal() .Seal() .Build(); } else { aggWideFlow = Ctx.Builder(Node->Pos()) - .Callable("BlockCombineAll") - .Add(0, blocks) - .Callable(1, "Void") + .Callable("ToFlow") + .Callable(0, "BlockCombineAll") + .Callable(0, "FromFlow") + .Add(0, blocks) + .Seal() + .Callable(1, "Void") + .Seal() + .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs))) .Seal() - .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs))) .Seal() .Build(); } @@ -2891,10 +2900,14 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() { TExprNode::TPtr aggBlocks; if (!isMany) { aggBlocks = Ctx.Builder(Node->Pos()) - .Callable("BlockMergeFinalizeHashed") - .Add(0, blocks) - .Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs))) - .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs))) + .Callable("ToFlow") + .Callable(0, "BlockMergeFinalizeHashed") + .Callable(0, "FromFlow") + .Add(0, blocks) + .Seal() + .Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs))) + .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs))) + .Seal() .Seal() .Build(); } else { @@ -2902,12 +2915,16 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() { YQL_ENSURE(manyStreamsSetting, "Missing many_streams setting"); aggBlocks = Ctx.Builder(Node->Pos()) - .Callable("BlockMergeManyFinalizeHashed") - .Add(0, blocks) - .Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs))) - .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs))) - .Atom(3, ToString(streamIdxColumn)) - .Add(4, manyStreamsSetting->TailPtr()) + .Callable("ToFlow") + .Callable(0, "BlockMergeManyFinalizeHashed") + .Callable(0, "FromFlow") + .Add(0, blocks) + .Seal() + .Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs))) + .Add(2, Ctx.NewList(Node->Pos(), std::move(aggs))) + .Atom(3, ToString(streamIdxColumn)) + .Add(4, manyStreamsSetting->TailPtr()) + .Seal() .Seal() .Build(); } |