aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/core
diff options
context:
space:
mode:
authorimunkin <imunkin@yandex-team.com>2025-01-17 16:12:38 +0300
committerimunkin <imunkin@yandex-team.com>2025-01-17 16:29:14 +0300
commit5bf3fd8cf1463c43723f1c8ba9d2322073f93c04 (patch)
tree89f3800c888c589798de9e4f5a34384eda2f6808 /yql/essentials/core
parente2b9ea56b89acb80b934fcfa26a782bb92e1d39e (diff)
downloadydb-5bf3fd8cf1463c43723f1c8ba9d2322073f93c04.tar.gz
YQL-19424: Use WideStream instead of WideFlow in WideFromBlocks computation node
commit_hash:21c84a9004cc57883d949b8fc637fc3ae7bfbda9
Diffstat (limited to 'yql/essentials/core')
-rw-r--r--yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp267
-rw-r--r--yql/essentials/core/type_ann/type_ann_blocks.cpp13
-rw-r--r--yql/essentials/core/yql_aggregate_expander.cpp31
-rw-r--r--yql/essentials/core/yql_expr_type_annotation.h2
4 files changed, 161 insertions, 152 deletions
diff --git a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp
index f7ba75c96c..b02dc45e66 100644
--- a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp
+++ b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp
@@ -128,21 +128,26 @@ TExprNode::TPtr RebuildArgumentsOnlyLambdaForBlocks(const TExprNode& lambda, TEx
TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
- if (node->Head().IsCallable("WideFromBlocks")) {
- YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << node->Head().Content();
- return ctx.NewCallable(node->Pos(), "ReplicateScalars", { node->Head().HeadPtr() });
- }
-
- if (const auto& input = node->Head(); input.IsCallable({"Extend", "OrderedExtend"})) {
+ const auto& input = node->Head();
+ if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) {
+ const auto& wideFromBlocks = input.Head();
+ // Technically, the code below rewrites the following sequence
+ // (WideToBlocks (ToFlow (WideFromBlocks (<input>)))))
+ // into (ReplicateScalars (<input>)), but ToFlow/FromFlow
+ // wrappers will be removed when all other nodes in block
+ // pipeline start using WideStream instead of the WideFlow.
+ // Hence, the logging is left intact.
+ YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << wideFromBlocks.Content();
+ // If tail is FromFlow, its input is WideFlow and can be
+ // used intact; Otherwise the input is WideStream, so the
+ // new input should be converted to WideFlow.
+ const auto tail = wideFromBlocks.HeadPtr();
+ const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr()
+ : ctx.NewCallable(tail->Pos(), "ToFlow", { tail });
+ return ctx.NewCallable(node->Pos(), "ReplicateScalars", { flowInput });
+ }
+
+ if (input.IsCallable({"Extend", "OrderedExtend"})) {
YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Content() << " with " << input.Content();
TExprNodeList newChildren;
newChildren.reserve(input.ChildrenSize());
@@ -157,23 +162,35 @@ TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext&
TExprNode::TPtr OptimizeWideFromBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
- if (node->Head().IsCallable("WideToBlocks")) {
- YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << node->Head().Content();
- return node->Head().HeadPtr();
- }
-
- if (node->Head().IsCallable("ReplicateScalars")) {
- YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Head().Content() << " as input of " << node->Content();
- return ctx.ChangeChild(*node, 0, node->Head().HeadPtr());
+ const auto& input = node->Head();
+ if (input.IsCallable("FromFlow") && input.Head().IsCallable("WideToBlocks")) {
+ const auto& wideToBlocks = input.Head();
+ // Technically, the code below rewrites the following sequence
+ // (WideFromBlocks (FromFlow (WideToBlocks (<input>))))
+ // into (FromFlow (<input>)) (to match the ToFlow parent),
+ // but ToFlow/FromFlow wrappers will be removed when all
+ // other nodes in block pipeline start using WideStream
+ // instead of the WideFlow. Hence, the logging is left intact.
+ YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << wideToBlocks.Content();
+ return ctx.NewCallable(node->Pos(), "FromFlow", {wideToBlocks.HeadPtr()});
+ }
+
+ if (input.IsCallable("FromFlow") && input.Head().IsCallable("ReplicateScalars")) {
+ const auto& replicateScalars = input.Head();
+ // Technically, the code below rewrites the following sequence
+ // (WideFromBlocks (FromFlow (ReplicateScalars (<input>))))
+ // into (WideFromBlocks (FromFlow (<input>))), but ToFlow/FromFlow
+ // wrappers will be removed when all other nodes in block
+ // pipeline start using WideStream instead of the WideFlow.
+ // Hence, the logging is left intact.
+ YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << replicateScalars.Content() << " as input of " << node->Content();
+ return ctx.Builder(node->Pos())
+ .Callable(node->Content())
+ .Callable(0, input.Content())
+ .Add(0, replicateScalars.HeadPtr())
+ .Seal()
+ .Seal()
+ .Build();
}
return node;
@@ -6101,34 +6118,41 @@ TExprNode::TPtr OptimizeWideMapBlocks(const TExprNode::TPtr& node, TExprContext&
Y_ENSURE(node->IsCallable("WideMap"));
const auto lambda = node->TailPtr();
// Swap trivial WideMap and WideFromBlocks.
- if (node->Head().IsCallable("WideFromBlocks")) {
+ const auto& input = node->Head();
+ if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) {
if (auto newLambda = RebuildArgumentsOnlyLambdaForBlocks(*lambda, ctx, types)) {
- YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Head().Content() << " with " << node->Content();
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
+ const auto& wideFromBlocks = input.Head();
+ // Technically, the code below rewrites the following sequence
+ // (WideMap (ToFlow (WideFromBlocks (FromFlow (<input>)))))
+ // into (ToFlow (WideFromBlocks (FromFlow (WideMap (<input>)).
+ // Hence, the logging is left intact.
+ YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << wideFromBlocks.Content() << " with " << node->Content();
+ // If tail is FromFlow, its input is WideFlow and can be
+ // used intact; Otherwise the input is WideStream, so the
+ // new input should be converted to WideFlow.
+ const auto tail = wideFromBlocks.HeadPtr();
+ const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr()
+ : ctx.NewCallable(tail->Pos(), "ToFlow", { tail });
return ctx.Builder(node->Pos())
- .Callable("WideFromBlocks")
- .Callable(0, "WideMap")
- .Add(0, node->Head().HeadPtr())
- .Add(1, newLambda)
+ .Callable("ToFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "FromFlow")
+ .Callable(0, "WideMap")
+ .Add(0, flowInput)
+ .Add(1, newLambda)
+ .Seal()
+ .Seal()
.Seal()
.Seal()
.Build();
}
}
- if (!CanRewriteToBlocksWithInput(node->Head(), types)) {
+ if (!CanRewriteToBlocksWithInput(input, types)) {
return node;
}
- auto multiInputType = node->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>();
+ auto multiInputType = input.GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>();
ui32 newNodes;
TNodeMap<size_t> rewritePositions;
TExprNode::TPtr blockLambda;
@@ -6144,22 +6168,17 @@ TExprNode::TPtr OptimizeWideMapBlocks(const TExprNode::TPtr& node, TExprContext&
YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to blocks, extra nodes: " << newNodes
<< ", extra columns: " << rewritePositions.size();
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
auto ret = ctx.Builder(node->Pos())
- .Callable("WideFromBlocks")
- .Callable(0, "WideMap")
- .Callable(0, "WideToBlocks")
- .Add(0, node->HeadPtr())
+ .Callable("ToFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "FromFlow")
+ .Callable(0, "WideMap")
+ .Callable(0, "WideToBlocks")
+ .Add(0, node->HeadPtr())
+ .Seal()
+ .Add(1, blockLambda)
+ .Seal()
.Seal()
- .Add(1, blockLambda)
.Seal()
.Seal()
.Build();
@@ -6220,18 +6239,13 @@ TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprConte
YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to blocks, extra nodes: " << newNodes
<< ", extra columns: " << rewritePositions.size();
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
return ctx.Builder(node->Pos())
- .Callable("WideFromBlocks")
- .Add(0, result)
+ .Callable("ToFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "FromFlow")
+ .Add(0, result)
+ .Seal()
+ .Seal()
.Seal()
.Build();
}
@@ -6239,19 +6253,14 @@ TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprConte
if (!newNodes) {
return node;
}
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
auto filtered = ctx.Builder(node->Pos())
.Callable("WideFilter")
- .Callable(0, "WideFromBlocks")
- .Add(0, blockMapped)
+ .Callable(0, "ToFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "FromFlow")
+ .Add(0, blockMapped)
+ .Seal()
+ .Seal()
.Seal()
.Add(1, restLambda)
.Seal()
@@ -6320,22 +6329,17 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte
TStringBuf newName = node->Content() == "Skip" ? "WideSkipBlocks" : "WideTakeBlocks";
YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to " << newName;
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
return ctx.Builder(node->Pos())
- .Callable("WideFromBlocks")
- .Callable(0, newName)
- .Callable(0, "WideToBlocks")
- .Add(0, node->HeadPtr())
+ .Callable("ToFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "FromFlow")
+ .Callable(0, newName)
+ .Callable(0, "WideToBlocks")
+ .Add(0, node->HeadPtr())
+ .Seal()
+ .Add(1, node->ChildPtr(1))
+ .Seal()
.Seal()
- .Add(1, node->ChildPtr(1))
.Seal()
.Seal()
.Build();
@@ -6375,18 +6379,13 @@ TExprNode::TPtr OptimizeTopOrSortBlocks(const TExprNode::TPtr& node, TExprContex
YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to " << newName;
auto children = node->ChildrenList();
children[0] = ctx.NewCallable(node->Pos(), "WideToBlocks", { children[0] });
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
return ctx.Builder(node->Pos())
- .Callable("WideFromBlocks")
- .Add(0, ctx.NewCallable(node->Pos(), newName, std::move(children)))
+ .Callable("ToFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "FromFlow")
+ .Add(0, ctx.NewCallable(node->Pos(), newName, std::move(children)))
+ .Seal()
+ .Seal()
.Seal()
.Build();
}
@@ -6606,17 +6605,45 @@ TExprNode::TPtr OptimizeWideMaps(const TExprNode::TPtr& node, TExprContext& ctx)
.Add(1, DropUnusedArgs(node->Tail(), actualUnused, ctx))
.Seal().Build();
}
- } else if (input.IsCallable({"WideFromBlocks", "WideTakeBlocks", "WideSkipBlocks", "BlockExpandChunked"})) {
+ } else if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) {
+ const auto& wideFromBlocks = input.Head();
+ // WideFromBlocks uses WideStream instead of WideFlow,
+ // so it's wrapped with ToFlow/FromFlow. Hence, to drop
+ // unused fields for particular WideFromBlocks node,
+ // the optimizer has to rewrite FromFlow child, but
+ // logging is left intact.
+ YQL_CLOG(DEBUG, CorePeepHole) << node->Content() << " over " << wideFromBlocks.Content() << " with " << unused.size() << " unused fields.";
+ const auto tail = wideFromBlocks.HeadPtr();
+ const auto width = tail->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>()->GetSize();
+ const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr()
+ : ctx.NewCallable(tail->Pos(), "ToFlow", { tail });
+ return ctx.Builder(node->Pos())
+ .Callable(node->Content())
+ .Callable(0, "ToFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "FromFlow")
+ .Callable(0, "WideMap")
+ .Add(0, flowInput)
+ .Lambda(1)
+ .Params("items", width)
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ for (auto i = 0U, j = 0U; i < width; ++i) {
+ if (unused.cend() == std::find(unused.cbegin(), unused.cend(), i)) {
+ parent.Arg(j++, "items", i);
+ }
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Add(1, DropUnusedArgs(node->Tail(), unused, ctx))
+ .Seal()
+ .Build();
+ } else if (input.IsCallable({"WideTakeBlocks", "WideSkipBlocks", "BlockExpandChunked"})) {
YQL_CLOG(DEBUG, CorePeepHole) << node->Content() << " over " << input.Content() << " with " << unused.size() << " unused fields.";
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
return ctx.Builder(node->Pos())
.Callable(node->Content())
.Add(0, ctx.ChangeChild(input, 0U, MakeWideMapForDropUnused(input.HeadPtr(), unused, ctx)))
diff --git a/yql/essentials/core/type_ann/type_ann_blocks.cpp b/yql/essentials/core/type_ann/type_ann_blocks.cpp
index 4f3a092dde..807620be9a 100644
--- a/yql/essentials/core/type_ann/type_ann_blocks.cpp
+++ b/yql/essentials/core/type_ann/type_ann_blocks.cpp
@@ -966,28 +966,19 @@ IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TEx
IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
Y_UNUSED(output);
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
if (!EnsureArgsCount(*input, 1U, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
TTypeAnnotationNode::TListType retMultiType;
- if (!EnsureWideFlowBlockType(input->Head(), retMultiType, ctx.Expr)) {
+ if (!EnsureWideStreamBlockType(input->Head(), retMultiType, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
YQL_ENSURE(!retMultiType.empty());
retMultiType.pop_back();
auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
- input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
+ input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType));
return IGraphTransformer::TStatus::Ok;
}
diff --git a/yql/essentials/core/yql_aggregate_expander.cpp b/yql/essentials/core/yql_aggregate_expander.cpp
index 0ebfaa988b..bad2c22071 100644
--- a/yql/essentials/core/yql_aggregate_expander.cpp
+++ b/yql/essentials/core/yql_aggregate_expander.cpp
@@ -708,18 +708,9 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {
TExprNode::TPtr aggWideFlow;
if (hashed) {
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
aggWideFlow = Ctx.Builder(Node->Pos())
- .Callable("WideFromBlocks")
- .Callable(0, "ToFlow")
+ .Callable("ToFlow")
+ .Callable(0, "WideFromBlocks")
.Callable(0, "BlockCombineHashed")
.Callable(0, "FromFlow")
.Add(0, blocks)
@@ -2939,15 +2930,15 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() {
.Build();
}
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
- auto aggWideFlow = Ctx.NewCallable(Node->Pos(), "WideFromBlocks", { aggBlocks });
+ auto aggWideFlow = Ctx.Builder(Node->Pos())
+ .Callable("ToFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "FromFlow")
+ .Add(0, aggBlocks)
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
auto finalFlow = MakeNarrowMap(Node->Pos(), outputColumns, aggWideFlow, Ctx);
auto root = Ctx.NewCallable(Node->Pos(), "FromFlow", { finalFlow });
auto lambdaStream = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), { streamArg }), std::move(root));
diff --git a/yql/essentials/core/yql_expr_type_annotation.h b/yql/essentials/core/yql_expr_type_annotation.h
index 508c9e4258..9345a0104e 100644
--- a/yql/essentials/core/yql_expr_type_annotation.h
+++ b/yql/essentials/core/yql_expr_type_annotation.h
@@ -355,7 +355,7 @@ TStringBuf NormalizeCallableName(TStringBuf name);
void CheckExpectedTypeAndColumnOrder(const TExprNode& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx);
namespace NBlockStreamIO {
- constexpr bool WideFromBlocks = false;
+ constexpr bool WideFromBlocks = true;
} // namespace NBlockStreamIO
}