aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/core/yql_aggregate_expander.cpp
diff options
context:
space:
mode:
authorimunkin <imunkin@yandex-team.com>2025-01-17 16:12:38 +0300
committerimunkin <imunkin@yandex-team.com>2025-01-17 16:29:14 +0300
commit5bf3fd8cf1463c43723f1c8ba9d2322073f93c04 (patch)
tree89f3800c888c589798de9e4f5a34384eda2f6808 /yql/essentials/core/yql_aggregate_expander.cpp
parente2b9ea56b89acb80b934fcfa26a782bb92e1d39e (diff)
downloadydb-5bf3fd8cf1463c43723f1c8ba9d2322073f93c04.tar.gz
YQL-19424: Use WideStream instead of WideFlow in WideFromBlocks computation node
commit_hash:21c84a9004cc57883d949b8fc637fc3ae7bfbda9
Diffstat (limited to 'yql/essentials/core/yql_aggregate_expander.cpp')
-rw-r--r--yql/essentials/core/yql_aggregate_expander.cpp31
1 files changed, 11 insertions, 20 deletions
diff --git a/yql/essentials/core/yql_aggregate_expander.cpp b/yql/essentials/core/yql_aggregate_expander.cpp
index 0ebfaa988b..bad2c22071 100644
--- a/yql/essentials/core/yql_aggregate_expander.cpp
+++ b/yql/essentials/core/yql_aggregate_expander.cpp
@@ -708,18 +708,9 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {
TExprNode::TPtr aggWideFlow;
if (hashed) {
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
aggWideFlow = Ctx.Builder(Node->Pos())
- .Callable("WideFromBlocks")
- .Callable(0, "ToFlow")
+ .Callable("ToFlow")
+ .Callable(0, "WideFromBlocks")
.Callable(0, "BlockCombineHashed")
.Callable(0, "FromFlow")
.Add(0, blocks)
@@ -2939,15 +2930,15 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() {
.Build();
}
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
- auto aggWideFlow = Ctx.NewCallable(Node->Pos(), "WideFromBlocks", { aggBlocks });
+ auto aggWideFlow = Ctx.Builder(Node->Pos())
+ .Callable("ToFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "FromFlow")
+ .Add(0, aggBlocks)
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
auto finalFlow = MakeNarrowMap(Node->Pos(), outputColumns, aggWideFlow, Ctx);
auto root = Ctx.NewCallable(Node->Pos(), "FromFlow", { finalFlow });
auto lambdaStream = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), { streamArg }), std::move(root));