aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorimunkin <imunkin@yandex-team.com>2025-01-17 16:12:38 +0300
committerimunkin <imunkin@yandex-team.com>2025-01-17 16:29:14 +0300
commit5bf3fd8cf1463c43723f1c8ba9d2322073f93c04 (patch)
tree89f3800c888c589798de9e4f5a34384eda2f6808
parente2b9ea56b89acb80b934fcfa26a782bb92e1d39e (diff)
downloadydb-5bf3fd8cf1463c43723f1c8ba9d2322073f93c04.tar.gz
YQL-19424: Use WideStream instead of WideFlow in WideFromBlocks computation node
commit_hash:21c84a9004cc57883d949b8fc637fc3ae7bfbda9
-rw-r--r--yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp267
-rw-r--r--yql/essentials/core/type_ann/type_ann_blocks.cpp13
-rw-r--r--yql/essentials/core/yql_aggregate_expander.cpp31
-rw-r--r--yql/essentials/core/yql_expr_type_annotation.h2
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_agg.cpp12
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_blocks.cpp182
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp5
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_block_exists_ut.cpp2
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp6
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_block_top_sort_ut.cpp54
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp8
-rw-r--r--yql/essentials/minikql/mkql_node.cpp12
-rw-r--r--yql/essentials/minikql/mkql_node.h5
-rw-r--r--yql/essentials/minikql/mkql_program_builder.cpp24
-rw-r--r--yql/essentials/public/purecalc/common/transformations/utils.cpp55
-rw-r--r--yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp4
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_block_input.cpp17
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_join_impl.cpp4
18 files changed, 413 insertions, 290 deletions
diff --git a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp
index f7ba75c96c..b02dc45e66 100644
--- a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp
+++ b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp
@@ -128,21 +128,26 @@ TExprNode::TPtr RebuildArgumentsOnlyLambdaForBlocks(const TExprNode& lambda, TEx
TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
- if (node->Head().IsCallable("WideFromBlocks")) {
- YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << node->Head().Content();
- return ctx.NewCallable(node->Pos(), "ReplicateScalars", { node->Head().HeadPtr() });
- }
-
- if (const auto& input = node->Head(); input.IsCallable({"Extend", "OrderedExtend"})) {
+ const auto& input = node->Head();
+ if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) {
+ const auto& wideFromBlocks = input.Head();
+ // Technically, the code below rewrites the following sequence
+ // (WideToBlocks (ToFlow (WideFromBlocks (<input>)))))
+ // into (ReplicateScalars (<input>)), but ToFlow/FromFlow
+ // wrappers will be removed when all other nodes in block
+ // pipeline start using WideStream instead of the WideFlow.
+ // Hence, the logging is left intact.
+ YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << wideFromBlocks.Content();
+ // If tail is FromFlow, its input is WideFlow and can be
+ // used intact; Otherwise the input is WideStream, so the
+ // new input should be converted to WideFlow.
+ const auto tail = wideFromBlocks.HeadPtr();
+ const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr()
+ : ctx.NewCallable(tail->Pos(), "ToFlow", { tail });
+ return ctx.NewCallable(node->Pos(), "ReplicateScalars", { flowInput });
+ }
+
+ if (input.IsCallable({"Extend", "OrderedExtend"})) {
YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Content() << " with " << input.Content();
TExprNodeList newChildren;
newChildren.reserve(input.ChildrenSize());
@@ -157,23 +162,35 @@ TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext&
TExprNode::TPtr OptimizeWideFromBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
- if (node->Head().IsCallable("WideToBlocks")) {
- YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << node->Head().Content();
- return node->Head().HeadPtr();
- }
-
- if (node->Head().IsCallable("ReplicateScalars")) {
- YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Head().Content() << " as input of " << node->Content();
- return ctx.ChangeChild(*node, 0, node->Head().HeadPtr());
+ const auto& input = node->Head();
+ if (input.IsCallable("FromFlow") && input.Head().IsCallable("WideToBlocks")) {
+ const auto& wideToBlocks = input.Head();
+ // Technically, the code below rewrites the following sequence
+ // (WideFromBlocks (FromFlow (WideToBlocks (<input>))))
+ // into (FromFlow (<input>)) (to match the ToFlow parent),
+ // but ToFlow/FromFlow wrappers will be removed when all
+ // other nodes in block pipeline start using WideStream
+ // instead of the WideFlow. Hence, the logging is left intact.
+ YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << wideToBlocks.Content();
+ return ctx.NewCallable(node->Pos(), "FromFlow", {wideToBlocks.HeadPtr()});
+ }
+
+ if (input.IsCallable("FromFlow") && input.Head().IsCallable("ReplicateScalars")) {
+ const auto& replicateScalars = input.Head();
+ // Technically, the code below rewrites the following sequence
+ // (WideFromBlocks (FromFlow (ReplicateScalars (<input>))))
+ // into (WideFromBlocks (FromFlow (<input>))), but ToFlow/FromFlow
+ // wrappers will be removed when all other nodes in block
+ // pipeline start using WideStream instead of the WideFlow.
+ // Hence, the logging is left intact.
+ YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << replicateScalars.Content() << " as input of " << node->Content();
+ return ctx.Builder(node->Pos())
+ .Callable(node->Content())
+ .Callable(0, input.Content())
+ .Add(0, replicateScalars.HeadPtr())
+ .Seal()
+ .Seal()
+ .Build();
}
return node;
@@ -6101,34 +6118,41 @@ TExprNode::TPtr OptimizeWideMapBlocks(const TExprNode::TPtr& node, TExprContext&
Y_ENSURE(node->IsCallable("WideMap"));
const auto lambda = node->TailPtr();
// Swap trivial WideMap and WideFromBlocks.
- if (node->Head().IsCallable("WideFromBlocks")) {
+ const auto& input = node->Head();
+ if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) {
if (auto newLambda = RebuildArgumentsOnlyLambdaForBlocks(*lambda, ctx, types)) {
- YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Head().Content() << " with " << node->Content();
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
+ const auto& wideFromBlocks = input.Head();
+ // Technically, the code below rewrites the following sequence
+ // (WideMap (ToFlow (WideFromBlocks (FromFlow (<input>)))))
+ // into (ToFlow (WideFromBlocks (FromFlow (WideMap (<input>)).
+ // Hence, the logging is left intact.
+ YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << wideFromBlocks.Content() << " with " << node->Content();
+ // If tail is FromFlow, its input is WideFlow and can be
+ // used intact; Otherwise the input is WideStream, so the
+ // new input should be converted to WideFlow.
+ const auto tail = wideFromBlocks.HeadPtr();
+ const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr()
+ : ctx.NewCallable(tail->Pos(), "ToFlow", { tail });
return ctx.Builder(node->Pos())
- .Callable("WideFromBlocks")
- .Callable(0, "WideMap")
- .Add(0, node->Head().HeadPtr())
- .Add(1, newLambda)
+ .Callable("ToFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "FromFlow")
+ .Callable(0, "WideMap")
+ .Add(0, flowInput)
+ .Add(1, newLambda)
+ .Seal()
+ .Seal()
.Seal()
.Seal()
.Build();
}
}
- if (!CanRewriteToBlocksWithInput(node->Head(), types)) {
+ if (!CanRewriteToBlocksWithInput(input, types)) {
return node;
}
- auto multiInputType = node->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>();
+ auto multiInputType = input.GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>();
ui32 newNodes;
TNodeMap<size_t> rewritePositions;
TExprNode::TPtr blockLambda;
@@ -6144,22 +6168,17 @@ TExprNode::TPtr OptimizeWideMapBlocks(const TExprNode::TPtr& node, TExprContext&
YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to blocks, extra nodes: " << newNodes
<< ", extra columns: " << rewritePositions.size();
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
auto ret = ctx.Builder(node->Pos())
- .Callable("WideFromBlocks")
- .Callable(0, "WideMap")
- .Callable(0, "WideToBlocks")
- .Add(0, node->HeadPtr())
+ .Callable("ToFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "FromFlow")
+ .Callable(0, "WideMap")
+ .Callable(0, "WideToBlocks")
+ .Add(0, node->HeadPtr())
+ .Seal()
+ .Add(1, blockLambda)
+ .Seal()
.Seal()
- .Add(1, blockLambda)
.Seal()
.Seal()
.Build();
@@ -6220,18 +6239,13 @@ TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprConte
YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to blocks, extra nodes: " << newNodes
<< ", extra columns: " << rewritePositions.size();
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
return ctx.Builder(node->Pos())
- .Callable("WideFromBlocks")
- .Add(0, result)
+ .Callable("ToFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "FromFlow")
+ .Add(0, result)
+ .Seal()
+ .Seal()
.Seal()
.Build();
}
@@ -6239,19 +6253,14 @@ TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprConte
if (!newNodes) {
return node;
}
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
auto filtered = ctx.Builder(node->Pos())
.Callable("WideFilter")
- .Callable(0, "WideFromBlocks")
- .Add(0, blockMapped)
+ .Callable(0, "ToFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "FromFlow")
+ .Add(0, blockMapped)
+ .Seal()
+ .Seal()
.Seal()
.Add(1, restLambda)
.Seal()
@@ -6320,22 +6329,17 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte
TStringBuf newName = node->Content() == "Skip" ? "WideSkipBlocks" : "WideTakeBlocks";
YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to " << newName;
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
return ctx.Builder(node->Pos())
- .Callable("WideFromBlocks")
- .Callable(0, newName)
- .Callable(0, "WideToBlocks")
- .Add(0, node->HeadPtr())
+ .Callable("ToFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "FromFlow")
+ .Callable(0, newName)
+ .Callable(0, "WideToBlocks")
+ .Add(0, node->HeadPtr())
+ .Seal()
+ .Add(1, node->ChildPtr(1))
+ .Seal()
.Seal()
- .Add(1, node->ChildPtr(1))
.Seal()
.Seal()
.Build();
@@ -6375,18 +6379,13 @@ TExprNode::TPtr OptimizeTopOrSortBlocks(const TExprNode::TPtr& node, TExprContex
YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to " << newName;
auto children = node->ChildrenList();
children[0] = ctx.NewCallable(node->Pos(), "WideToBlocks", { children[0] });
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
return ctx.Builder(node->Pos())
- .Callable("WideFromBlocks")
- .Add(0, ctx.NewCallable(node->Pos(), newName, std::move(children)))
+ .Callable("ToFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "FromFlow")
+ .Add(0, ctx.NewCallable(node->Pos(), newName, std::move(children)))
+ .Seal()
+ .Seal()
.Seal()
.Build();
}
@@ -6606,17 +6605,45 @@ TExprNode::TPtr OptimizeWideMaps(const TExprNode::TPtr& node, TExprContext& ctx)
.Add(1, DropUnusedArgs(node->Tail(), actualUnused, ctx))
.Seal().Build();
}
- } else if (input.IsCallable({"WideFromBlocks", "WideTakeBlocks", "WideSkipBlocks", "BlockExpandChunked"})) {
+ } else if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) {
+ const auto& wideFromBlocks = input.Head();
+ // WideFromBlocks uses WideStream instead of WideFlow,
+ // so it's wrapped with ToFlow/FromFlow. Hence, to drop
+ // unused fields for particular WideFromBlocks node,
+ // the optimizer has to rewrite FromFlow child, but
+ // logging is left intact.
+ YQL_CLOG(DEBUG, CorePeepHole) << node->Content() << " over " << wideFromBlocks.Content() << " with " << unused.size() << " unused fields.";
+ const auto tail = wideFromBlocks.HeadPtr();
+ const auto width = tail->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>()->GetSize();
+ const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr()
+ : ctx.NewCallable(tail->Pos(), "ToFlow", { tail });
+ return ctx.Builder(node->Pos())
+ .Callable(node->Content())
+ .Callable(0, "ToFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "FromFlow")
+ .Callable(0, "WideMap")
+ .Add(0, flowInput)
+ .Lambda(1)
+ .Params("items", width)
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ for (auto i = 0U, j = 0U; i < width; ++i) {
+ if (unused.cend() == std::find(unused.cbegin(), unused.cend(), i)) {
+ parent.Arg(j++, "items", i);
+ }
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Add(1, DropUnusedArgs(node->Tail(), unused, ctx))
+ .Seal()
+ .Build();
+ } else if (input.IsCallable({"WideTakeBlocks", "WideSkipBlocks", "BlockExpandChunked"})) {
YQL_CLOG(DEBUG, CorePeepHole) << node->Content() << " over " << input.Content() << " with " << unused.size() << " unused fields.";
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
return ctx.Builder(node->Pos())
.Callable(node->Content())
.Add(0, ctx.ChangeChild(input, 0U, MakeWideMapForDropUnused(input.HeadPtr(), unused, ctx)))
diff --git a/yql/essentials/core/type_ann/type_ann_blocks.cpp b/yql/essentials/core/type_ann/type_ann_blocks.cpp
index 4f3a092dde..807620be9a 100644
--- a/yql/essentials/core/type_ann/type_ann_blocks.cpp
+++ b/yql/essentials/core/type_ann/type_ann_blocks.cpp
@@ -966,28 +966,19 @@ IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TEx
IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
Y_UNUSED(output);
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
if (!EnsureArgsCount(*input, 1U, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
TTypeAnnotationNode::TListType retMultiType;
- if (!EnsureWideFlowBlockType(input->Head(), retMultiType, ctx.Expr)) {
+ if (!EnsureWideStreamBlockType(input->Head(), retMultiType, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
YQL_ENSURE(!retMultiType.empty());
retMultiType.pop_back();
auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
- input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
+ input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType));
return IGraphTransformer::TStatus::Ok;
}
diff --git a/yql/essentials/core/yql_aggregate_expander.cpp b/yql/essentials/core/yql_aggregate_expander.cpp
index 0ebfaa988b..bad2c22071 100644
--- a/yql/essentials/core/yql_aggregate_expander.cpp
+++ b/yql/essentials/core/yql_aggregate_expander.cpp
@@ -708,18 +708,9 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {
TExprNode::TPtr aggWideFlow;
if (hashed) {
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
aggWideFlow = Ctx.Builder(Node->Pos())
- .Callable("WideFromBlocks")
- .Callable(0, "ToFlow")
+ .Callable("ToFlow")
+ .Callable(0, "WideFromBlocks")
.Callable(0, "BlockCombineHashed")
.Callable(0, "FromFlow")
.Add(0, blocks)
@@ -2939,15 +2930,15 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() {
.Build();
}
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
- auto aggWideFlow = Ctx.NewCallable(Node->Pos(), "WideFromBlocks", { aggBlocks });
+ auto aggWideFlow = Ctx.Builder(Node->Pos())
+ .Callable("ToFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "FromFlow")
+ .Add(0, aggBlocks)
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
auto finalFlow = MakeNarrowMap(Node->Pos(), outputColumns, aggWideFlow, Ctx);
auto root = Ctx.NewCallable(Node->Pos(), "FromFlow", { finalFlow });
auto lambdaStream = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), { streamArg }), std::move(root));
diff --git a/yql/essentials/core/yql_expr_type_annotation.h b/yql/essentials/core/yql_expr_type_annotation.h
index 508c9e4258..9345a0104e 100644
--- a/yql/essentials/core/yql_expr_type_annotation.h
+++ b/yql/essentials/core/yql_expr_type_annotation.h
@@ -355,7 +355,7 @@ TStringBuf NormalizeCallableName(TStringBuf name);
void CheckExpectedTypeAndColumnOrder(const TExprNode& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx);
namespace NBlockStreamIO {
- constexpr bool WideFromBlocks = false;
+ constexpr bool WideFromBlocks = true;
} // namespace NBlockStreamIO
}
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_agg.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_agg.cpp
index fef2712c4f..266e909b98 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_agg.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_agg.cpp
@@ -442,18 +442,6 @@ size_t GetBitmapPopCount(const std::shared_ptr<arrow::ArrayData>& arr) {
return GetSparseBitmapPopCount(src, len);
}
-TArrayRef<TType *const> GetWideComponents(TType* type) {
- if (type->IsFlow()) {
- const auto outputFlowType = AS_TYPE(TFlowType, type);
- return GetWideComponents(outputFlowType);
- }
- if (type->IsStream()) {
- const auto outputStreamType = AS_TYPE(TStreamType, type);
- return GetWideComponents(outputStreamType);
- }
- MKQL_ENSURE(false, "Expect either flow or stream");
-}
-
size_t CalcMaxBlockLenForOutput(TType* out) {
const auto wideComponents = GetWideComponents(out);
MKQL_ENSURE(wideComponents.size() > 0, "Expecting at least one output column");
diff --git a/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp b/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp
index 5947cb7a16..c05ccb9ca1 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp
@@ -517,10 +517,54 @@ private:
TType* ItemType_;
};
-class TWideFromBlocksWrapper : public TStatefulWideFlowCodegeneratorNode<TWideFromBlocksWrapper> {
-using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideFromBlocksWrapper>;
+struct TWideFromBlocksState : public TComputationValue<TWideFromBlocksState> {
+ size_t Count_ = 0;
+ size_t Index_ = 0;
+ size_t Current_ = 0;
+ NUdf::TUnboxedValue* Pointer_ = nullptr;
+ TUnboxedValueVector Values_;
+ std::vector<std::unique_ptr<IBlockReader>> Readers_;
+ std::vector<std::unique_ptr<IBlockItemConverter>> Converters_;
+ const std::vector<arrow::ValueDescr> ValuesDescr_;
+
+ TWideFromBlocksState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<TType*>& types)
+ : TComputationValue(memInfo)
+ , Values_(types.size() + 1)
+ , ValuesDescr_(ToValueDescr(types))
+ {
+ Pointer_ = Values_.data();
+
+ const auto& pgBuilder = ctx.Builder->GetPgBuilder();
+ for (size_t i = 0; i < types.size(); ++i) {
+ const TType* blockItemType = AS_TYPE(TBlockType, types[i])->GetItemType();
+ Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
+ Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
+ }
+ }
+
+ void ClearValues() {
+ Values_.assign(Values_.size(), NUdf::TUnboxedValuePod());
+ }
+
+ NUdf::TUnboxedValuePod Get(const THolderFactory& holderFactory, size_t idx) const {
+ TBlockItem item;
+ const auto& datum = TArrowBlock::From(Values_[idx]).GetDatum();
+ ARROW_DEBUG_CHECK_DATUM_TYPES(ValuesDescr_[idx], datum.descr());
+ if (datum.is_scalar()) {
+ item = Readers_[idx]->GetScalarItem(*datum.scalar());
+ } else {
+ MKQL_ENSURE(datum.is_array(), "Expecting array");
+ item = Readers_[idx]->GetItem(*datum.array(), Current_);
+ }
+ return Converters_[idx]->MakeValue(item, holderFactory);
+ }
+};
+
+class TWideFromBlocksFlowWrapper : public TStatefulWideFlowCodegeneratorNode<TWideFromBlocksFlowWrapper> {
+using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideFromBlocksFlowWrapper>;
+using TState = TWideFromBlocksState;
public:
- TWideFromBlocksWrapper(TComputationMutables& mutables,
+ TWideFromBlocksFlowWrapper(TComputationMutables& mutables,
IComputationWideFlowNode* flow,
TVector<TType*>&& types)
: TBaseComputation(mutables, flow, EValueRepresentation::Boxed)
@@ -589,7 +633,7 @@ public:
const auto ptrType = PointerType::getUnqual(StructType::get(context));
const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
- const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TWideFromBlocksWrapper::MakeState));
+ const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TWideFromBlocksFlowWrapper::MakeState));
const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
@@ -690,48 +734,6 @@ public:
}
#endif
private:
- struct TState : public TComputationValue<TState> {
- size_t Count_ = 0;
- size_t Index_ = 0;
- size_t Current_ = 0;
- NUdf::TUnboxedValue* Pointer_ = nullptr;
- TUnboxedValueVector Values_;
- std::vector<std::unique_ptr<IBlockReader>> Readers_;
- std::vector<std::unique_ptr<IBlockItemConverter>> Converters_;
- const std::vector<arrow::ValueDescr> ValuesDescr_;
-
- TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<TType*>& types)
- : TComputationValue(memInfo)
- , Values_(types.size() + 1)
- , ValuesDescr_(ToValueDescr(types))
- {
- Pointer_ = Values_.data();
-
- const auto& pgBuilder = ctx.Builder->GetPgBuilder();
- for (size_t i = 0; i < types.size(); ++i) {
- const TType* blockItemType = AS_TYPE(TBlockType, types[i])->GetItemType();
- Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
- Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
- }
- }
-
- void ClearValues() {
- Values_.assign(Values_.size(), NUdf::TUnboxedValuePod());
- }
-
- NUdf::TUnboxedValuePod Get(const THolderFactory& holderFactory, size_t idx) const {
- TBlockItem item;
- const auto& datum = TArrowBlock::From(Values_[idx]).GetDatum();
- ARROW_DEBUG_CHECK_DATUM_TYPES(ValuesDescr_[idx], datum.descr());
- if (datum.is_scalar()) {
- item = Readers_[idx]->GetScalarItem(*datum.scalar());
- } else {
- MKQL_ENSURE(datum.is_array(), "Expecting array");
- item = Readers_[idx]->GetItem(*datum.array(), Current_);
- }
- return Converters_[idx]->MakeValue(item, holderFactory);
- }
- };
#ifndef MKQL_DISABLE_CODEGEN
class TLLVMFieldsStructureState: public TLLVMFieldsStructure<TComputationValue<TState>> {
private:
@@ -804,6 +806,74 @@ private:
const size_t WideFieldsIndex_;
};
+class TWideFromBlocksStreamWrapper : public TMutableComputationNode<TWideFromBlocksStreamWrapper>
+{
+using TBaseComputation = TMutableComputationNode<TWideFromBlocksStreamWrapper>;
+using TState = TWideFromBlocksState;
+public:
+ TWideFromBlocksStreamWrapper(TComputationMutables& mutables,
+ IComputationNode* stream,
+ TVector<TType*>&& types)
+ : TBaseComputation(mutables, EValueRepresentation::Boxed)
+ , Stream_(stream)
+ , Types_(std::move(types))
+ {}
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const
+ {
+ const auto state = ctx.HolderFactory.Create<TState>(ctx, Types_);
+ return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,
+ std::move(state),
+ std::move(Stream_->GetValue(ctx)));
+ }
+
+private:
+ class TStreamValue : public TComputationValue<TStreamValue> {
+ using TBase = TComputationValue<TStreamValue>;
+ public:
+ TStreamValue(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory,
+ NUdf::TUnboxedValue&& blockState, NUdf::TUnboxedValue&& stream)
+ : TBase(memInfo)
+ , BlockState_(blockState)
+ , Stream_(stream)
+ , HolderFactory_(holderFactory)
+ {}
+
+ private:
+ NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
+ auto& blockState = *static_cast<TState*>(BlockState_.AsBoxed().Get());
+ auto* inputFields = blockState.Pointer_;
+ const size_t inputWidth = blockState.Values_.size();
+
+ if (blockState.Index_ == blockState.Count_) do {
+ if (const auto result = Stream_.WideFetch(inputFields, inputWidth); result != NUdf::EFetchStatus::Ok)
+ return result;
+
+ blockState.Index_ = 0;
+ blockState.Count_ = GetBlockCount(blockState.Values_.back());
+ } while (!blockState.Count_);
+
+ blockState.Current_ = blockState.Index_++;
+ for (size_t i = 0; i < width; i++) {
+ output[i] = blockState.Get(HolderFactory_, i);
+ }
+
+ return NUdf::EFetchStatus::Ok;
+ }
+
+ NUdf::TUnboxedValue BlockState_;
+ NUdf::TUnboxedValue Stream_;
+ const THolderFactory& HolderFactory_;
+ };
+
+ void RegisterDependencies() const final {
+ this->DependsOn(Stream_);
+ }
+
+ IComputationNode* const Stream_;
+ const TVector<TType*> Types_;
+};
+
class TPrecomputedArrowNode : public IArrowKernelComputationNode {
public:
TPrecomputedArrowNode(const arrow::Datum& datum, TStringBuf kernelName)
@@ -1209,17 +1279,29 @@ IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFact
IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
- const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto wideComponents = GetWideComponents(flowType);
+ const auto inputType = callable.GetInput(0).GetStaticType();
+ MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(),
+ "Expected either WideStream or WideFlow as an input");
+ const auto yieldsStream = callable.GetType()->GetReturnType()->IsStream();
+ MKQL_ENSURE(yieldsStream == inputType->IsStream(),
+ "Expected both input and output have to be either WideStream or WideFlow");
+
+ const auto wideComponents = GetWideComponents(inputType);
MKQL_ENSURE(wideComponents.size() > 0, "Expected at least one column");
TVector<TType*> items;
for (ui32 i = 0; i < wideComponents.size() - 1; ++i) {
items.push_back(AS_TYPE(TBlockType, wideComponents[i]));
}
- const auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
+ const auto wideFlowOrStream = LocateNode(ctx.NodeLocator, callable, 0);
+ if (yieldsStream) {
+ const auto wideStream = wideFlowOrStream;
+ return new TWideFromBlocksStreamWrapper(ctx.Mutables, wideStream, std::move(items));
+ }
+ // FIXME: Drop the branch below, when the time comes.
+ const auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(wideFlowOrStream);
MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
- return new TWideFromBlocksWrapper(ctx.Mutables, wideFlow, std::move(items));
+ return new TWideFromBlocksFlowWrapper(ctx.Mutables, wideFlow, std::move(items));
}
IComputationNode* WrapAsScalar(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp
index 93ad801338..5aaf9da44b 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp
@@ -86,7 +86,7 @@ void DoNestedTuplesCompressTest() {
node = pb.BlockExpandChunked(node);
node = pb.WideSkipBlocks(node, pb.template NewDataLiteral<ui64>(19));
node = pb.BlockCompress(node, 2);
- node = pb.WideFromBlocks(node);
+ node = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(node)));
node = pb.NarrowMap(node, [&](TRuntimeNode::TList items) -> TRuntimeNode {
return pb.NewTuple(resultTupleType, {items[0], items[1]});
@@ -186,7 +186,8 @@ Y_UNIT_TEST_LLVM(CompressBasic) {
const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList {
return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U)};
});
- const auto compressedFlow = pb.WideFromBlocks(pb.BlockCompress(pb.WideToBlocks(wideFlow), 0));
+ const auto compressedBlocks = pb.BlockCompress(pb.WideToBlocks(wideFlow), 0);
+ const auto compressedFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(compressedBlocks)));
const auto narrowFlow = pb.NarrowMap(compressedFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode {
return pb.NewTuple({items[0], items[1]});
});
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_exists_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_exists_ut.cpp
index 67cf2acc49..fe65e76d1e 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_exists_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_exists_ut.cpp
@@ -66,7 +66,7 @@ void DoBlockExistsOffset(size_t length, size_t offset) {
items[4],
};
});
- node = pb.WideFromBlocks(node);
+ node = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(node)));
node = pb.NarrowMap(node, [&](TRuntimeNode::TList items) -> TRuntimeNode {
return pb.NewTuple(outputTupleType, {items[0], items[1], items[2], items[3]});
});
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp
index 09db19aebe..0f4e6a0681 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp
@@ -131,7 +131,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideTakeSkipBlocks) {
const auto flow = MakeFlow(setup);
const auto part = pb.WideSkipBlocks(flow, pb.NewDataLiteral<ui64>(7));
- const auto plain = pb.WideFromBlocks(part);
+ const auto plain = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(part)));
const auto singleValueFlow = pb.NarrowMap(plain, [&](TRuntimeNode::TList items) -> TRuntimeNode {
return pb.Add(items[0], items[1]);
@@ -163,7 +163,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideTakeSkipBlocks) {
const auto flow = MakeFlow(setup);
const auto part = pb.WideTakeBlocks(flow, pb.NewDataLiteral<ui64>(4));
- const auto plain = pb.WideFromBlocks(part);
+ const auto plain = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(part)));
const auto singleValueFlow = pb.NarrowMap(plain, [&](TRuntimeNode::TList items) -> TRuntimeNode {
return pb.Add(items[0], items[1]);
@@ -198,7 +198,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideTakeSkipBlocks) {
const auto flow = MakeFlow(setup);
const auto part = pb.WideTakeBlocks(pb.WideSkipBlocks(flow, pb.NewDataLiteral<ui64>(3)), pb.NewDataLiteral<ui64>(5));
- const auto plain = pb.WideFromBlocks(part);
+ const auto plain = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(part)));
const auto singleValueFlow = pb.NarrowMap(plain, [&](TRuntimeNode::TList items) -> TRuntimeNode {
// 0, 0;
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_top_sort_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_top_sort_ut.cpp
index 2f523353cc..8303c709dc 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_top_sort_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_top_sort_ut.cpp
@@ -48,9 +48,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) {
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
- const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
+ const auto topBlocks = pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })),
- pb.NewDataLiteral<ui64>(4ULL), {{0U, pb.NewDataLiteral<bool>(true)}})),
+ pb.NewDataLiteral<ui64>(4ULL), {{0U, pb.NewDataLiteral<bool>(true)}});
+ const auto topFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topBlocks)));
+ const auto pgmReturn = pb.Collect(pb.NarrowMap(topFlow,
[&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); }
));
@@ -111,9 +113,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) {
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
- const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
+ const auto topBlocks = pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })),
- pb.NewDataLiteral<ui64>(6ULL), {{0U, pb.NewDataLiteral<bool>(false)}})),
+ pb.NewDataLiteral<ui64>(6ULL), {{0U, pb.NewDataLiteral<bool>(false)}});
+ const auto topFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topBlocks)));
+ const auto pgmReturn = pb.Collect(pb.NarrowMap(topFlow,
[&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); }
));
@@ -180,9 +184,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) {
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
- const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
+ const auto topBlocks = pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })),
- pb.NewDataLiteral<ui64>(3ULL), {{1U, pb.NewDataLiteral<bool>(true)}})),
+ pb.NewDataLiteral<ui64>(3ULL), {{1U, pb.NewDataLiteral<bool>(true)}});
+ const auto topFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topBlocks)));
+ const auto pgmReturn = pb.Collect(pb.NarrowMap(topFlow,
[&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); }
));
@@ -240,9 +246,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) {
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
- const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
+ const auto topBlocks = pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })),
- pb.NewDataLiteral<ui64>(2ULL), {{1U, pb.NewDataLiteral<bool>(false)}})),
+ pb.NewDataLiteral<ui64>(2ULL), {{1U, pb.NewDataLiteral<bool>(false)}});
+ const auto topFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topBlocks)));
+ const auto pgmReturn = pb.Collect(pb.NarrowMap(topFlow,
[&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); }
));
@@ -297,9 +305,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) {
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
- const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
+ const auto topSortBlocks = pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })),
- pb.NewDataLiteral<ui64>(4ULL), {{0U, pb.NewDataLiteral<bool>(true)}, {1U, pb.NewDataLiteral<bool>(false)}})),
+ pb.NewDataLiteral<ui64>(4ULL), {{0U, pb.NewDataLiteral<bool>(true)}, {1U, pb.NewDataLiteral<bool>(false)}});
+ const auto topSortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topSortBlocks)));
+ const auto pgmReturn = pb.Collect(pb.NarrowMap(topSortFlow,
[&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); }
));
@@ -360,9 +370,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) {
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
- const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
+ const auto topSortBlocks = pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })),
- pb.NewDataLiteral<ui64>(6ULL), {{0U, pb.NewDataLiteral<bool>(false)}, {1U, pb.NewDataLiteral<bool>(true)}})),
+ pb.NewDataLiteral<ui64>(6ULL), {{0U, pb.NewDataLiteral<bool>(false)}, {1U, pb.NewDataLiteral<bool>(true)}});
+ const auto topSortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topSortBlocks)));
+ const auto pgmReturn = pb.Collect(pb.NarrowMap(topSortFlow,
[&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); }
));
@@ -429,9 +441,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) {
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
- const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
+ const auto topSortBlocks = pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })),
- pb.NewDataLiteral<ui64>(4ULL), {{1U, pb.NewDataLiteral<bool>(true)}, {0U, pb.NewDataLiteral<bool>(false)}})),
+ pb.NewDataLiteral<ui64>(4ULL), {{1U, pb.NewDataLiteral<bool>(true)}, {0U, pb.NewDataLiteral<bool>(false)}});
+ const auto topSortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topSortBlocks)));
+ const auto pgmReturn = pb.Collect(pb.NarrowMap(topSortFlow,
[&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); }
));
@@ -492,9 +506,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) {
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
- const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
+ const auto topSortBlocks = pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })),
- pb.NewDataLiteral<ui64>(6ULL), {{1U, pb.NewDataLiteral<bool>(false)}, {0U, pb.NewDataLiteral<bool>(true)}})),
+ pb.NewDataLiteral<ui64>(6ULL), {{1U, pb.NewDataLiteral<bool>(false)}, {0U, pb.NewDataLiteral<bool>(true)}});
+ const auto topSortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topSortBlocks)));
+ const auto pgmReturn = pb.Collect(pb.NarrowMap(topSortFlow,
[&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); }
));
@@ -565,9 +581,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockSortTest) {
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
- const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
+ const auto sortBlocks = pb.WideSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list),
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })),
- {{0U, pb.NewDataLiteral<bool>(true)}})),
+ {{0U, pb.NewDataLiteral<bool>(true)}});
+ const auto sortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(sortBlocks)));
+ const auto pgmReturn = pb.Collect(pb.NarrowMap(sortFlow,
[&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); }
));
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
index ae4c3fce0b..f5c6f86884 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
@@ -196,10 +196,10 @@ void TestChunked(bool withBlockExpand) {
node = pb.BlockExpandChunked(node);
// WideTakeBlocks won't work on chunked blocks
node = pb.WideTakeBlocks(node, pb.NewDataLiteral<ui64>(19));
- node = pb.WideFromBlocks(node);
+ node = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(node)));
} else {
// WideFromBlocks should support chunked blocks
- node = pb.WideFromBlocks(node);
+ node = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(node)));
node = pb.Take(node, pb.NewDataLiteral<ui64>(19));
}
node = pb.NarrowMap(node, [&](TRuntimeNode::TList items) -> TRuntimeNode {
@@ -541,7 +541,7 @@ Y_UNIT_TEST_LLVM(TestWideFromBlocks) {
const auto blocksFlow = pb.ToBlocks(flow);
const auto wideFlow = pb.ExpandMap(blocksFlow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item, pb.AsScalar(pb.NewDataLiteral<ui64>(3ULL))}; });
- const auto wideFlow2 = pb.WideFromBlocks(wideFlow);
+ const auto wideFlow2 = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(wideFlow)));
const auto narrowFlow = pb.NarrowMap(wideFlow2, [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); });
const auto pgmReturn = pb.Collect(narrowFlow);
@@ -581,7 +581,7 @@ Y_UNIT_TEST_LLVM(TestWideToAndFromBlocks) {
return {pb.Nth(item, 0U), pb.Nth(item, 1U)};
});
const auto wideBlocksFlow = pb.WideToBlocks(wideFlow);
- const auto wideFlow2 = pb.WideFromBlocks(wideBlocksFlow);
+ const auto wideFlow2 = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(wideBlocksFlow)));
const auto narrowFlow = pb.NarrowMap(wideFlow2, [&](TRuntimeNode::TList items) -> TRuntimeNode {
return items[1];
});
diff --git a/yql/essentials/minikql/mkql_node.cpp b/yql/essentials/minikql/mkql_node.cpp
index 434171131b..82772bb923 100644
--- a/yql/essentials/minikql/mkql_node.cpp
+++ b/yql/essentials/minikql/mkql_node.cpp
@@ -2522,5 +2522,17 @@ TArrayRef<TType* const> GetWideComponents(const TStreamType* type) {
return AS_TYPE(TMultiType, type->GetItemType())->GetElements();
}
+TArrayRef<TType *const> GetWideComponents(const TType* type) {
+ if (type->IsFlow()) {
+ const auto outputFlowType = AS_TYPE(TFlowType, type);
+ return GetWideComponents(outputFlowType);
+ }
+ if (type->IsStream()) {
+ const auto outputStreamType = AS_TYPE(TStreamType, type);
+ return GetWideComponents(outputStreamType);
+ }
+ MKQL_ENSURE(false, "Expect either flow or stream");
+}
+
}
}
diff --git a/yql/essentials/minikql/mkql_node.h b/yql/essentials/minikql/mkql_node.h
index 6ac9c222d8..ab87008f24 100644
--- a/yql/essentials/minikql/mkql_node.h
+++ b/yql/essentials/minikql/mkql_node.h
@@ -1581,6 +1581,7 @@ EValueRepresentation GetValueRepresentation(NUdf::TDataTypeId typeId);
TArrayRef<TType* const> GetWideComponents(const TFlowType* type);
TArrayRef<TType* const> GetWideComponents(const TStreamType* type);
+TArrayRef<TType *const> GetWideComponents(const TType* type);
inline ui32 GetWideComponentsCount(const TFlowType* type) {
return (ui32)GetWideComponents(type).size();
@@ -1590,6 +1591,10 @@ inline ui32 GetWideComponentsCount(const TStreamType* type) {
return (ui32)GetWideComponents(type).size();
}
+inline ui32 GetWideComponentsCount(const TType* type) {
+ return (ui32)GetWideComponents(type).size();
+}
+
template <TType::EKind SingularKind>
TSingularType<SingularKind>* TSingularType<SingularKind>::Create(TTypeType* type, const TTypeEnvironment& env) {
return ::new(env.Allocate<TSingularType<SingularKind>>()) TSingularType<SingularKind>(type);
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp
index 82c1f604ba..6e832cf509 100644
--- a/yql/essentials/minikql/mkql_program_builder.cpp
+++ b/yql/essentials/minikql/mkql_program_builder.cpp
@@ -1503,12 +1503,28 @@ TRuntimeNode TProgramBuilder::FromBlocks(TRuntimeNode flow) {
return TRuntimeNode(callableBuilder.Build(), false);
}
-TRuntimeNode TProgramBuilder::WideFromBlocks(TRuntimeNode flow) {
- auto outputItems = ValidateBlockFlowType(flow.GetStaticType());
+TRuntimeNode TProgramBuilder::WideFromBlocks(TRuntimeNode stream) {
+ MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected WideStream as input type");
+ if constexpr (RuntimeVersion < 54U) {
+ // Preserve the old behaviour for ABI compatibility.
+ // Emit (FromFlow (WideFromBlocks (ToFlow (<stream>)))) to
+ // process the flow in favor to the given stream following
+ // the older MKQL ABI.
+ // FIXME: Drop the branch below, when the time comes.
+ const auto inputFlow = ToFlow(stream);
+ auto outputItems = ValidateBlockFlowType(inputFlow.GetStaticType());
+ outputItems.pop_back();
+ TType* outputMultiType = NewMultiType(outputItems);
+ TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputMultiType));
+ callableBuilder.Add(inputFlow);
+ const auto outputFlow = TRuntimeNode(callableBuilder.Build(), false);
+ return FromFlow(outputFlow);
+ }
+ auto outputItems = ValidateBlockStreamType(stream.GetStaticType());
outputItems.pop_back();
TType* outputMultiType = NewMultiType(outputItems);
- TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputMultiType));
- callableBuilder.Add(flow);
+ TCallableBuilder callableBuilder(Env, __func__, NewStreamType(outputMultiType));
+ callableBuilder.Add(stream);
return TRuntimeNode(callableBuilder.Build(), false);
}
diff --git a/yql/essentials/public/purecalc/common/transformations/utils.cpp b/yql/essentials/public/purecalc/common/transformations/utils.cpp
index 6f63be0208..8c5192da26 100644
--- a/yql/essentials/public/purecalc/common/transformations/utils.cpp
+++ b/yql/essentials/public/purecalc/common/transformations/utils.cpp
@@ -13,41 +13,36 @@ TExprNode::TPtr NYql::NPureCalc::NodeFromBlocks(
) {
const auto items = structType->GetItems();
Y_ENSURE(items.size() > 0);
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
return ctx.Builder(pos)
.Lambda()
.Param("stream")
.Callable(0, "FromFlow")
.Callable(0, "NarrowMap")
- .Callable(0, "WideFromBlocks")
- .Callable(0, "ExpandMap")
- .Callable(0, "ToFlow")
- .Arg(0, "stream")
- .Seal()
- .Lambda(1)
- .Param("item")
- .Do([&](TExprNodeBuilder& lambda) -> TExprNodeBuilder& {
- ui32 i = 0;
- for (const auto& item : items) {
- lambda.Callable(i++, "Member")
- .Arg(0, "item")
- .Atom(1, item->GetName())
- .Seal();
- }
- lambda.Callable(i, "Member")
- .Arg(0, "item")
- .Atom(1, PurecalcBlockColumnLength)
- .Seal();
- return lambda;
- })
+ .Callable(0, "ToFlow")
+ .Callable(0, "WideFromBlocks")
+ .Callable(0, "FromFlow")
+ .Callable(0, "ExpandMap")
+ .Callable(0, "ToFlow")
+ .Arg(0, "stream")
+ .Seal()
+ .Lambda(1)
+ .Param("item")
+ .Do([&](TExprNodeBuilder& lambda) -> TExprNodeBuilder& {
+ ui32 i = 0;
+ for (const auto& item : items) {
+ lambda.Callable(i++, "Member")
+ .Arg(0, "item")
+ .Atom(1, item->GetName())
+ .Seal();
+ }
+ lambda.Callable(i, "Member")
+ .Arg(0, "item")
+ .Atom(1, PurecalcBlockColumnLength)
+ .Seal();
+ return lambda;
+ })
+ .Seal()
+ .Seal()
.Seal()
.Seal()
.Seal()
diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp
index db7fdd7430..beb330b74b 100644
--- a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp
+++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp
@@ -643,7 +643,9 @@ void RegisterYtFileMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) {
values = NCommon::MkqlBuildExpr(ytMap.Mapper().Body().Ref(), innerCtx);
if (IsWideBlockType(lambdaOutputType)) {
- values = ctx.ProgramBuilder.WideFromBlocks(values);
+ values = ctx.ProgramBuilder.ToFlow(
+ ctx.ProgramBuilder.WideFromBlocks(
+ ctx.ProgramBuilder.FromFlow(values)));
}
if (ETypeAnnotationKind::Multi == lambdaOutputType.GetKind())
diff --git a/yt/yql/providers/yt/provider/yql_yt_block_input.cpp b/yt/yql/providers/yt/provider/yql_yt_block_input.cpp
index 9e4d509d8e..f92f152c31 100644
--- a/yt/yql/providers/yt/provider/yql_yt_block_input.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_block_input.cpp
@@ -41,21 +41,16 @@ private:
auto settings = RemoveSetting(map.Settings().Ref(), EYtSettingType::BlockInputReady, ctx);
settings = AddSetting(*settings, EYtSettingType::BlockInputApplied, TExprNode::TPtr(), ctx);
-
- // Static assert to ensure backward compatible change: if the
- // constant below is true, both input and output types of
- // WideFromBlocks callable have to be WideStream; otherwise,
- // both input and output types have to be WideFlow.
- // FIXME: When all spots using WideFromBlocks are adjusted
- // to work with WideStream, drop the assertion below.
- static_assert(!NYql::NBlockStreamIO::WideFromBlocks);
-
auto mapperLambda = Build<TCoLambda>(ctx, map.Mapper().Pos())
.Args({"flow"})
.Body<TExprApplier>()
.Apply(map.Mapper())
- .With<TCoWideFromBlocks>(0)
- .Input("flow")
+ .With<TCoToFlow>(0)
+ .Input<TCoWideFromBlocks>()
+ .Input<TCoFromFlow>()
+ .Input("flow")
+ .Build()
+ .Build()
.Build()
.Build()
.Done()
diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
index 9f4c2aa4a1..5d9949a257 100644
--- a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp
@@ -1636,8 +1636,8 @@ TExprNode::TPtr BuildBlockMapJoin(TExprNode::TPtr leftFlow, TExprNode::TPtr righ
return ctx.Builder(pos)
.Callable("NarrowMap")
- .Callable(0, "WideFromBlocks")
- .Callable(0, "ToFlow")
+ .Callable(0, "ToFlow")
+ .Callable(0, "WideFromBlocks")
.Callable(0, "BlockMapJoinCore")
.Callable(0, "FromFlow")
.Callable(0, "WideToBlocks")