diff options
author | imunkin <imunkin@yandex-team.com> | 2025-01-17 16:12:38 +0300 |
---|---|---|
committer | imunkin <imunkin@yandex-team.com> | 2025-01-17 16:29:14 +0300 |
commit | 5bf3fd8cf1463c43723f1c8ba9d2322073f93c04 (patch) | |
tree | 89f3800c888c589798de9e4f5a34384eda2f6808 | |
parent | e2b9ea56b89acb80b934fcfa26a782bb92e1d39e (diff) | |
download | ydb-5bf3fd8cf1463c43723f1c8ba9d2322073f93c04.tar.gz |
YQL-19424: Use WideStream instead of WideFlow in WideFromBlocks computation node
commit_hash:21c84a9004cc57883d949b8fc637fc3ae7bfbda9
18 files changed, 413 insertions, 290 deletions
diff --git a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp index f7ba75c96c..b02dc45e66 100644 --- a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -128,21 +128,26 @@ TExprNode::TPtr RebuildArgumentsOnlyLambdaForBlocks(const TExprNode& lambda, TEx TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { Y_UNUSED(types); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - - if (node->Head().IsCallable("WideFromBlocks")) { - YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << node->Head().Content(); - return ctx.NewCallable(node->Pos(), "ReplicateScalars", { node->Head().HeadPtr() }); - } - - if (const auto& input = node->Head(); input.IsCallable({"Extend", "OrderedExtend"})) { + const auto& input = node->Head(); + if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) { + const auto& wideFromBlocks = input.Head(); + // Technically, the code below rewrites the following sequence + // (WideToBlocks (ToFlow (WideFromBlocks (<input>))))) + // into (ReplicateScalars (<input>)), but ToFlow/FromFlow + // wrappers will be removed when all other nodes in block + // pipeline start using WideStream instead of the WideFlow. + // Hence, the logging is left intact. + YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << wideFromBlocks.Content(); + // If tail is FromFlow, its input is WideFlow and can be + // used intact; Otherwise the input is WideStream, so the + // new input should be converted to WideFlow. + const auto tail = wideFromBlocks.HeadPtr(); + const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr() + : ctx.NewCallable(tail->Pos(), "ToFlow", { tail }); + return ctx.NewCallable(node->Pos(), "ReplicateScalars", { flowInput }); + } + + if (input.IsCallable({"Extend", "OrderedExtend"})) { YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Content() << " with " << input.Content(); TExprNodeList newChildren; newChildren.reserve(input.ChildrenSize()); @@ -157,23 +162,35 @@ TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext& TExprNode::TPtr OptimizeWideFromBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { Y_UNUSED(types); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - - if (node->Head().IsCallable("WideToBlocks")) { - YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << node->Head().Content(); - return node->Head().HeadPtr(); - } - - if (node->Head().IsCallable("ReplicateScalars")) { - YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Head().Content() << " as input of " << node->Content(); - return ctx.ChangeChild(*node, 0, node->Head().HeadPtr()); + const auto& input = node->Head(); + if (input.IsCallable("FromFlow") && input.Head().IsCallable("WideToBlocks")) { + const auto& wideToBlocks = input.Head(); + // Technically, the code below rewrites the following sequence + // (WideFromBlocks (FromFlow (WideToBlocks (<input>)))) + // into (FromFlow (<input>)) (to match the ToFlow parent), + // but ToFlow/FromFlow wrappers will be removed when all + // other nodes in block pipeline start using WideStream + // instead of the WideFlow. Hence, the logging is left intact. + YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << wideToBlocks.Content(); + return ctx.NewCallable(node->Pos(), "FromFlow", {wideToBlocks.HeadPtr()}); + } + + if (input.IsCallable("FromFlow") && input.Head().IsCallable("ReplicateScalars")) { + const auto& replicateScalars = input.Head(); + // Technically, the code below rewrites the following sequence + // (WideFromBlocks (FromFlow (ReplicateScalars (<input>)))) + // into (WideFromBlocks (FromFlow (<input>))), but ToFlow/FromFlow + // wrappers will be removed when all other nodes in block + // pipeline start using WideStream instead of the WideFlow. + // Hence, the logging is left intact. + YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << replicateScalars.Content() << " as input of " << node->Content(); + return ctx.Builder(node->Pos()) + .Callable(node->Content()) + .Callable(0, input.Content()) + .Add(0, replicateScalars.HeadPtr()) + .Seal() + .Seal() + .Build(); } return node; @@ -6101,34 +6118,41 @@ TExprNode::TPtr OptimizeWideMapBlocks(const TExprNode::TPtr& node, TExprContext& Y_ENSURE(node->IsCallable("WideMap")); const auto lambda = node->TailPtr(); // Swap trivial WideMap and WideFromBlocks. - if (node->Head().IsCallable("WideFromBlocks")) { + const auto& input = node->Head(); + if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) { if (auto newLambda = RebuildArgumentsOnlyLambdaForBlocks(*lambda, ctx, types)) { - YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Head().Content() << " with " << node->Content(); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - + const auto& wideFromBlocks = input.Head(); + // Technically, the code below rewrites the following sequence + // (WideMap (ToFlow (WideFromBlocks (FromFlow (<input>))))) + // into (ToFlow (WideFromBlocks (FromFlow (WideMap (<input>)). + // Hence, the logging is left intact. + YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << wideFromBlocks.Content() << " with " << node->Content(); + // If tail is FromFlow, its input is WideFlow and can be + // used intact; Otherwise the input is WideStream, so the + // new input should be converted to WideFlow. + const auto tail = wideFromBlocks.HeadPtr(); + const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr() + : ctx.NewCallable(tail->Pos(), "ToFlow", { tail }); return ctx.Builder(node->Pos()) - .Callable("WideFromBlocks") - .Callable(0, "WideMap") - .Add(0, node->Head().HeadPtr()) - .Add(1, newLambda) + .Callable("ToFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "FromFlow") + .Callable(0, "WideMap") + .Add(0, flowInput) + .Add(1, newLambda) + .Seal() + .Seal() .Seal() .Seal() .Build(); } } - if (!CanRewriteToBlocksWithInput(node->Head(), types)) { + if (!CanRewriteToBlocksWithInput(input, types)) { return node; } - auto multiInputType = node->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>(); + auto multiInputType = input.GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>(); ui32 newNodes; TNodeMap<size_t> rewritePositions; TExprNode::TPtr blockLambda; @@ -6144,22 +6168,17 @@ TExprNode::TPtr OptimizeWideMapBlocks(const TExprNode::TPtr& node, TExprContext& YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to blocks, extra nodes: " << newNodes << ", extra columns: " << rewritePositions.size(); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - auto ret = ctx.Builder(node->Pos()) - .Callable("WideFromBlocks") - .Callable(0, "WideMap") - .Callable(0, "WideToBlocks") - .Add(0, node->HeadPtr()) + .Callable("ToFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "FromFlow") + .Callable(0, "WideMap") + .Callable(0, "WideToBlocks") + .Add(0, node->HeadPtr()) + .Seal() + .Add(1, blockLambda) + .Seal() .Seal() - .Add(1, blockLambda) .Seal() .Seal() .Build(); @@ -6220,18 +6239,13 @@ TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprConte YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to blocks, extra nodes: " << newNodes << ", extra columns: " << rewritePositions.size(); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - return ctx.Builder(node->Pos()) - .Callable("WideFromBlocks") - .Add(0, result) + .Callable("ToFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "FromFlow") + .Add(0, result) + .Seal() + .Seal() .Seal() .Build(); } @@ -6239,19 +6253,14 @@ TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprConte if (!newNodes) { return node; } - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - auto filtered = ctx.Builder(node->Pos()) .Callable("WideFilter") - .Callable(0, "WideFromBlocks") - .Add(0, blockMapped) + .Callable(0, "ToFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "FromFlow") + .Add(0, blockMapped) + .Seal() + .Seal() .Seal() .Add(1, restLambda) .Seal() @@ -6320,22 +6329,17 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte TStringBuf newName = node->Content() == "Skip" ? "WideSkipBlocks" : "WideTakeBlocks"; YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to " << newName; - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - return ctx.Builder(node->Pos()) - .Callable("WideFromBlocks") - .Callable(0, newName) - .Callable(0, "WideToBlocks") - .Add(0, node->HeadPtr()) + .Callable("ToFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "FromFlow") + .Callable(0, newName) + .Callable(0, "WideToBlocks") + .Add(0, node->HeadPtr()) + .Seal() + .Add(1, node->ChildPtr(1)) + .Seal() .Seal() - .Add(1, node->ChildPtr(1)) .Seal() .Seal() .Build(); @@ -6375,18 +6379,13 @@ TExprNode::TPtr OptimizeTopOrSortBlocks(const TExprNode::TPtr& node, TExprContex YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to " << newName; auto children = node->ChildrenList(); children[0] = ctx.NewCallable(node->Pos(), "WideToBlocks", { children[0] }); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - return ctx.Builder(node->Pos()) - .Callable("WideFromBlocks") - .Add(0, ctx.NewCallable(node->Pos(), newName, std::move(children))) + .Callable("ToFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "FromFlow") + .Add(0, ctx.NewCallable(node->Pos(), newName, std::move(children))) + .Seal() + .Seal() .Seal() .Build(); } @@ -6606,17 +6605,45 @@ TExprNode::TPtr OptimizeWideMaps(const TExprNode::TPtr& node, TExprContext& ctx) .Add(1, DropUnusedArgs(node->Tail(), actualUnused, ctx)) .Seal().Build(); } - } else if (input.IsCallable({"WideFromBlocks", "WideTakeBlocks", "WideSkipBlocks", "BlockExpandChunked"})) { + } else if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) { + const auto& wideFromBlocks = input.Head(); + // WideFromBlocks uses WideStream instead of WideFlow, + // so it's wrapped with ToFlow/FromFlow. Hence, to drop + // unused fields for particular WideFromBlocks node, + // the optimizer has to rewrite FromFlow child, but + // logging is left intact. + YQL_CLOG(DEBUG, CorePeepHole) << node->Content() << " over " << wideFromBlocks.Content() << " with " << unused.size() << " unused fields."; + const auto tail = wideFromBlocks.HeadPtr(); + const auto width = tail->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>()->GetSize(); + const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr() + : ctx.NewCallable(tail->Pos(), "ToFlow", { tail }); + return ctx.Builder(node->Pos()) + .Callable(node->Content()) + .Callable(0, "ToFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "FromFlow") + .Callable(0, "WideMap") + .Add(0, flowInput) + .Lambda(1) + .Params("items", width) + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + for (auto i = 0U, j = 0U; i < width; ++i) { + if (unused.cend() == std::find(unused.cbegin(), unused.cend(), i)) { + parent.Arg(j++, "items", i); + } + } + return parent; + }) + .Seal() + .Seal() + .Seal() + .Seal() + .Seal() + .Add(1, DropUnusedArgs(node->Tail(), unused, ctx)) + .Seal() + .Build(); + } else if (input.IsCallable({"WideTakeBlocks", "WideSkipBlocks", "BlockExpandChunked"})) { YQL_CLOG(DEBUG, CorePeepHole) << node->Content() << " over " << input.Content() << " with " << unused.size() << " unused fields."; - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - return ctx.Builder(node->Pos()) .Callable(node->Content()) .Add(0, ctx.ChangeChild(input, 0U, MakeWideMapForDropUnused(input.HeadPtr(), unused, ctx))) diff --git a/yql/essentials/core/type_ann/type_ann_blocks.cpp b/yql/essentials/core/type_ann/type_ann_blocks.cpp index 4f3a092dde..807620be9a 100644 --- a/yql/essentials/core/type_ann/type_ann_blocks.cpp +++ b/yql/essentials/core/type_ann/type_ann_blocks.cpp @@ -966,28 +966,19 @@ IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TEx IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { Y_UNUSED(output); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - if (!EnsureArgsCount(*input, 1U, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } TTypeAnnotationNode::TListType retMultiType; - if (!EnsureWideFlowBlockType(input->Head(), retMultiType, ctx.Expr)) { + if (!EnsureWideStreamBlockType(input->Head(), retMultiType, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } YQL_ENSURE(!retMultiType.empty()); retMultiType.pop_back(); auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType); - input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType)); + input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType)); return IGraphTransformer::TStatus::Ok; } diff --git a/yql/essentials/core/yql_aggregate_expander.cpp b/yql/essentials/core/yql_aggregate_expander.cpp index 0ebfaa988b..bad2c22071 100644 --- a/yql/essentials/core/yql_aggregate_expander.cpp +++ b/yql/essentials/core/yql_aggregate_expander.cpp @@ -708,18 +708,9 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() { TExprNode::TPtr aggWideFlow; if (hashed) { - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - aggWideFlow = Ctx.Builder(Node->Pos()) - .Callable("WideFromBlocks") - .Callable(0, "ToFlow") + .Callable("ToFlow") + .Callable(0, "WideFromBlocks") .Callable(0, "BlockCombineHashed") .Callable(0, "FromFlow") .Add(0, blocks) @@ -2939,15 +2930,15 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() { .Build(); } - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - - auto aggWideFlow = Ctx.NewCallable(Node->Pos(), "WideFromBlocks", { aggBlocks }); + auto aggWideFlow = Ctx.Builder(Node->Pos()) + .Callable("ToFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "FromFlow") + .Add(0, aggBlocks) + .Seal() + .Seal() + .Seal() + .Build(); auto finalFlow = MakeNarrowMap(Node->Pos(), outputColumns, aggWideFlow, Ctx); auto root = Ctx.NewCallable(Node->Pos(), "FromFlow", { finalFlow }); auto lambdaStream = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), { streamArg }), std::move(root)); diff --git a/yql/essentials/core/yql_expr_type_annotation.h b/yql/essentials/core/yql_expr_type_annotation.h index 508c9e4258..9345a0104e 100644 --- a/yql/essentials/core/yql_expr_type_annotation.h +++ b/yql/essentials/core/yql_expr_type_annotation.h @@ -355,7 +355,7 @@ TStringBuf NormalizeCallableName(TStringBuf name); void CheckExpectedTypeAndColumnOrder(const TExprNode& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx); namespace NBlockStreamIO { - constexpr bool WideFromBlocks = false; + constexpr bool WideFromBlocks = true; } // namespace NBlockStreamIO } diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_agg.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_agg.cpp index fef2712c4f..266e909b98 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_agg.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_agg.cpp @@ -442,18 +442,6 @@ size_t GetBitmapPopCount(const std::shared_ptr<arrow::ArrayData>& arr) { return GetSparseBitmapPopCount(src, len); } -TArrayRef<TType *const> GetWideComponents(TType* type) { - if (type->IsFlow()) { - const auto outputFlowType = AS_TYPE(TFlowType, type); - return GetWideComponents(outputFlowType); - } - if (type->IsStream()) { - const auto outputStreamType = AS_TYPE(TStreamType, type); - return GetWideComponents(outputStreamType); - } - MKQL_ENSURE(false, "Expect either flow or stream"); -} - size_t CalcMaxBlockLenForOutput(TType* out) { const auto wideComponents = GetWideComponents(out); MKQL_ENSURE(wideComponents.size() > 0, "Expecting at least one output column"); diff --git a/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp b/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp index 5947cb7a16..c05ccb9ca1 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp @@ -517,10 +517,54 @@ private: TType* ItemType_; }; -class TWideFromBlocksWrapper : public TStatefulWideFlowCodegeneratorNode<TWideFromBlocksWrapper> { -using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideFromBlocksWrapper>; +struct TWideFromBlocksState : public TComputationValue<TWideFromBlocksState> { + size_t Count_ = 0; + size_t Index_ = 0; + size_t Current_ = 0; + NUdf::TUnboxedValue* Pointer_ = nullptr; + TUnboxedValueVector Values_; + std::vector<std::unique_ptr<IBlockReader>> Readers_; + std::vector<std::unique_ptr<IBlockItemConverter>> Converters_; + const std::vector<arrow::ValueDescr> ValuesDescr_; + + TWideFromBlocksState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<TType*>& types) + : TComputationValue(memInfo) + , Values_(types.size() + 1) + , ValuesDescr_(ToValueDescr(types)) + { + Pointer_ = Values_.data(); + + const auto& pgBuilder = ctx.Builder->GetPgBuilder(); + for (size_t i = 0; i < types.size(); ++i) { + const TType* blockItemType = AS_TYPE(TBlockType, types[i])->GetItemType(); + Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType)); + Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder)); + } + } + + void ClearValues() { + Values_.assign(Values_.size(), NUdf::TUnboxedValuePod()); + } + + NUdf::TUnboxedValuePod Get(const THolderFactory& holderFactory, size_t idx) const { + TBlockItem item; + const auto& datum = TArrowBlock::From(Values_[idx]).GetDatum(); + ARROW_DEBUG_CHECK_DATUM_TYPES(ValuesDescr_[idx], datum.descr()); + if (datum.is_scalar()) { + item = Readers_[idx]->GetScalarItem(*datum.scalar()); + } else { + MKQL_ENSURE(datum.is_array(), "Expecting array"); + item = Readers_[idx]->GetItem(*datum.array(), Current_); + } + return Converters_[idx]->MakeValue(item, holderFactory); + } +}; + +class TWideFromBlocksFlowWrapper : public TStatefulWideFlowCodegeneratorNode<TWideFromBlocksFlowWrapper> { +using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideFromBlocksFlowWrapper>; +using TState = TWideFromBlocksState; public: - TWideFromBlocksWrapper(TComputationMutables& mutables, + TWideFromBlocksFlowWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TVector<TType*>&& types) : TBaseComputation(mutables, flow, EValueRepresentation::Boxed) @@ -589,7 +633,7 @@ public: const auto ptrType = PointerType::getUnqual(StructType::get(context)); const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block); - const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TWideFromBlocksWrapper::MakeState)); + const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TWideFromBlocksFlowWrapper::MakeState)); const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false); const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block); CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block); @@ -690,48 +734,6 @@ public: } #endif private: - struct TState : public TComputationValue<TState> { - size_t Count_ = 0; - size_t Index_ = 0; - size_t Current_ = 0; - NUdf::TUnboxedValue* Pointer_ = nullptr; - TUnboxedValueVector Values_; - std::vector<std::unique_ptr<IBlockReader>> Readers_; - std::vector<std::unique_ptr<IBlockItemConverter>> Converters_; - const std::vector<arrow::ValueDescr> ValuesDescr_; - - TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<TType*>& types) - : TComputationValue(memInfo) - , Values_(types.size() + 1) - , ValuesDescr_(ToValueDescr(types)) - { - Pointer_ = Values_.data(); - - const auto& pgBuilder = ctx.Builder->GetPgBuilder(); - for (size_t i = 0; i < types.size(); ++i) { - const TType* blockItemType = AS_TYPE(TBlockType, types[i])->GetItemType(); - Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType)); - Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder)); - } - } - - void ClearValues() { - Values_.assign(Values_.size(), NUdf::TUnboxedValuePod()); - } - - NUdf::TUnboxedValuePod Get(const THolderFactory& holderFactory, size_t idx) const { - TBlockItem item; - const auto& datum = TArrowBlock::From(Values_[idx]).GetDatum(); - ARROW_DEBUG_CHECK_DATUM_TYPES(ValuesDescr_[idx], datum.descr()); - if (datum.is_scalar()) { - item = Readers_[idx]->GetScalarItem(*datum.scalar()); - } else { - MKQL_ENSURE(datum.is_array(), "Expecting array"); - item = Readers_[idx]->GetItem(*datum.array(), Current_); - } - return Converters_[idx]->MakeValue(item, holderFactory); - } - }; #ifndef MKQL_DISABLE_CODEGEN class TLLVMFieldsStructureState: public TLLVMFieldsStructure<TComputationValue<TState>> { private: @@ -804,6 +806,74 @@ private: const size_t WideFieldsIndex_; }; +class TWideFromBlocksStreamWrapper : public TMutableComputationNode<TWideFromBlocksStreamWrapper> +{ +using TBaseComputation = TMutableComputationNode<TWideFromBlocksStreamWrapper>; +using TState = TWideFromBlocksState; +public: + TWideFromBlocksStreamWrapper(TComputationMutables& mutables, + IComputationNode* stream, + TVector<TType*>&& types) + : TBaseComputation(mutables, EValueRepresentation::Boxed) + , Stream_(stream) + , Types_(std::move(types)) + {} + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const + { + const auto state = ctx.HolderFactory.Create<TState>(ctx, Types_); + return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory, + std::move(state), + std::move(Stream_->GetValue(ctx))); + } + +private: + class TStreamValue : public TComputationValue<TStreamValue> { + using TBase = TComputationValue<TStreamValue>; + public: + TStreamValue(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, + NUdf::TUnboxedValue&& blockState, NUdf::TUnboxedValue&& stream) + : TBase(memInfo) + , BlockState_(blockState) + , Stream_(stream) + , HolderFactory_(holderFactory) + {} + + private: + NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) { + auto& blockState = *static_cast<TState*>(BlockState_.AsBoxed().Get()); + auto* inputFields = blockState.Pointer_; + const size_t inputWidth = blockState.Values_.size(); + + if (blockState.Index_ == blockState.Count_) do { + if (const auto result = Stream_.WideFetch(inputFields, inputWidth); result != NUdf::EFetchStatus::Ok) + return result; + + blockState.Index_ = 0; + blockState.Count_ = GetBlockCount(blockState.Values_.back()); + } while (!blockState.Count_); + + blockState.Current_ = blockState.Index_++; + for (size_t i = 0; i < width; i++) { + output[i] = blockState.Get(HolderFactory_, i); + } + + return NUdf::EFetchStatus::Ok; + } + + NUdf::TUnboxedValue BlockState_; + NUdf::TUnboxedValue Stream_; + const THolderFactory& HolderFactory_; + }; + + void RegisterDependencies() const final { + this->DependsOn(Stream_); + } + + IComputationNode* const Stream_; + const TVector<TType*> Types_; +}; + class TPrecomputedArrowNode : public IArrowKernelComputationNode { public: TPrecomputedArrowNode(const arrow::Datum& datum, TStringBuf kernelName) @@ -1209,17 +1279,29 @@ IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFact IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); - const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto wideComponents = GetWideComponents(flowType); + const auto inputType = callable.GetInput(0).GetStaticType(); + MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(), + "Expected either WideStream or WideFlow as an input"); + const auto yieldsStream = callable.GetType()->GetReturnType()->IsStream(); + MKQL_ENSURE(yieldsStream == inputType->IsStream(), + "Expected both input and output have to be either WideStream or WideFlow"); + + const auto wideComponents = GetWideComponents(inputType); MKQL_ENSURE(wideComponents.size() > 0, "Expected at least one column"); TVector<TType*> items; for (ui32 i = 0; i < wideComponents.size() - 1; ++i) { items.push_back(AS_TYPE(TBlockType, wideComponents[i])); } - const auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); + const auto wideFlowOrStream = LocateNode(ctx.NodeLocator, callable, 0); + if (yieldsStream) { + const auto wideStream = wideFlowOrStream; + return new TWideFromBlocksStreamWrapper(ctx.Mutables, wideStream, std::move(items)); + } + // FIXME: Drop the branch below, when the time comes. + const auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(wideFlowOrStream); MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); - return new TWideFromBlocksWrapper(ctx.Mutables, wideFlow, std::move(items)); + return new TWideFromBlocksFlowWrapper(ctx.Mutables, wideFlow, std::move(items)); } IComputationNode* WrapAsScalar(TCallable& callable, const TComputationNodeFactoryContext& ctx) { diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp index 93ad801338..5aaf9da44b 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_compress_ut.cpp @@ -86,7 +86,7 @@ void DoNestedTuplesCompressTest() { node = pb.BlockExpandChunked(node); node = pb.WideSkipBlocks(node, pb.template NewDataLiteral<ui64>(19)); node = pb.BlockCompress(node, 2); - node = pb.WideFromBlocks(node); + node = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(node))); node = pb.NarrowMap(node, [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(resultTupleType, {items[0], items[1]}); @@ -186,7 +186,8 @@ Y_UNIT_TEST_LLVM(CompressBasic) { const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U)}; }); - const auto compressedFlow = pb.WideFromBlocks(pb.BlockCompress(pb.WideToBlocks(wideFlow), 0)); + const auto compressedBlocks = pb.BlockCompress(pb.WideToBlocks(wideFlow), 0); + const auto compressedFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(compressedBlocks))); const auto narrowFlow = pb.NarrowMap(compressedFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple({items[0], items[1]}); }); diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_exists_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_exists_ut.cpp index 67cf2acc49..fe65e76d1e 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_exists_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_exists_ut.cpp @@ -66,7 +66,7 @@ void DoBlockExistsOffset(size_t length, size_t offset) { items[4], }; }); - node = pb.WideFromBlocks(node); + node = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(node))); node = pb.NarrowMap(node, [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(outputTupleType, {items[0], items[1], items[2], items[3]}); }); diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp index 09db19aebe..0f4e6a0681 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp @@ -131,7 +131,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideTakeSkipBlocks) { const auto flow = MakeFlow(setup); const auto part = pb.WideSkipBlocks(flow, pb.NewDataLiteral<ui64>(7)); - const auto plain = pb.WideFromBlocks(part); + const auto plain = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(part))); const auto singleValueFlow = pb.NarrowMap(plain, [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.Add(items[0], items[1]); @@ -163,7 +163,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideTakeSkipBlocks) { const auto flow = MakeFlow(setup); const auto part = pb.WideTakeBlocks(flow, pb.NewDataLiteral<ui64>(4)); - const auto plain = pb.WideFromBlocks(part); + const auto plain = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(part))); const auto singleValueFlow = pb.NarrowMap(plain, [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.Add(items[0], items[1]); @@ -198,7 +198,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideTakeSkipBlocks) { const auto flow = MakeFlow(setup); const auto part = pb.WideTakeBlocks(pb.WideSkipBlocks(flow, pb.NewDataLiteral<ui64>(3)), pb.NewDataLiteral<ui64>(5)); - const auto plain = pb.WideFromBlocks(part); + const auto plain = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(part))); const auto singleValueFlow = pb.NarrowMap(plain, [&](TRuntimeNode::TList items) -> TRuntimeNode { // 0, 0; diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_top_sort_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_top_sort_ut.cpp index 2f523353cc..8303c709dc 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_top_sort_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_top_sort_ut.cpp @@ -48,9 +48,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), + const auto topBlocks = pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })), - pb.NewDataLiteral<ui64>(4ULL), {{0U, pb.NewDataLiteral<bool>(true)}})), + pb.NewDataLiteral<ui64>(4ULL), {{0U, pb.NewDataLiteral<bool>(true)}}); + const auto topFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topBlocks))); + const auto pgmReturn = pb.Collect(pb.NarrowMap(topFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); } )); @@ -111,9 +113,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), + const auto topBlocks = pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })), - pb.NewDataLiteral<ui64>(6ULL), {{0U, pb.NewDataLiteral<bool>(false)}})), + pb.NewDataLiteral<ui64>(6ULL), {{0U, pb.NewDataLiteral<bool>(false)}}); + const auto topFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topBlocks))); + const auto pgmReturn = pb.Collect(pb.NarrowMap(topFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); } )); @@ -180,9 +184,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), + const auto topBlocks = pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })), - pb.NewDataLiteral<ui64>(3ULL), {{1U, pb.NewDataLiteral<bool>(true)}})), + pb.NewDataLiteral<ui64>(3ULL), {{1U, pb.NewDataLiteral<bool>(true)}}); + const auto topFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topBlocks))); + const auto pgmReturn = pb.Collect(pb.NarrowMap(topFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); } )); @@ -240,9 +246,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), + const auto topBlocks = pb.WideTopBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })), - pb.NewDataLiteral<ui64>(2ULL), {{1U, pb.NewDataLiteral<bool>(false)}})), + pb.NewDataLiteral<ui64>(2ULL), {{1U, pb.NewDataLiteral<bool>(false)}}); + const auto topFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topBlocks))); + const auto pgmReturn = pb.Collect(pb.NarrowMap(topFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); } )); @@ -297,9 +305,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), + const auto topSortBlocks = pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })), - pb.NewDataLiteral<ui64>(4ULL), {{0U, pb.NewDataLiteral<bool>(true)}, {1U, pb.NewDataLiteral<bool>(false)}})), + pb.NewDataLiteral<ui64>(4ULL), {{0U, pb.NewDataLiteral<bool>(true)}, {1U, pb.NewDataLiteral<bool>(false)}}); + const auto topSortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topSortBlocks))); + const auto pgmReturn = pb.Collect(pb.NarrowMap(topSortFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); } )); @@ -360,9 +370,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), + const auto topSortBlocks = pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })), - pb.NewDataLiteral<ui64>(6ULL), {{0U, pb.NewDataLiteral<bool>(false)}, {1U, pb.NewDataLiteral<bool>(true)}})), + pb.NewDataLiteral<ui64>(6ULL), {{0U, pb.NewDataLiteral<bool>(false)}, {1U, pb.NewDataLiteral<bool>(true)}}); + const auto topSortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topSortBlocks))); + const auto pgmReturn = pb.Collect(pb.NarrowMap(topSortFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); } )); @@ -429,9 +441,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), + const auto topSortBlocks = pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })), - pb.NewDataLiteral<ui64>(4ULL), {{1U, pb.NewDataLiteral<bool>(true)}, {0U, pb.NewDataLiteral<bool>(false)}})), + pb.NewDataLiteral<ui64>(4ULL), {{1U, pb.NewDataLiteral<bool>(true)}, {0U, pb.NewDataLiteral<bool>(false)}}); + const auto topSortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topSortBlocks))); + const auto pgmReturn = pb.Collect(pb.NarrowMap(topSortFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); } )); @@ -492,9 +506,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockTopTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), + const auto topSortBlocks = pb.WideTopSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })), - pb.NewDataLiteral<ui64>(6ULL), {{1U, pb.NewDataLiteral<bool>(false)}, {0U, pb.NewDataLiteral<bool>(true)}})), + pb.NewDataLiteral<ui64>(6ULL), {{1U, pb.NewDataLiteral<bool>(false)}, {0U, pb.NewDataLiteral<bool>(true)}}); + const auto topSortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(topSortBlocks))); + const auto pgmReturn = pb.Collect(pb.NarrowMap(topSortFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); } )); @@ -565,9 +581,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockSortTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9}); - const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideFromBlocks(pb.WideSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), + const auto sortBlocks = pb.WideSortBlocks(pb.WideToBlocks(pb.ExpandMap(pb.ToFlow(list), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; })), - {{0U, pb.NewDataLiteral<bool>(true)}})), + {{0U, pb.NewDataLiteral<bool>(true)}}); + const auto sortFlow = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(sortBlocks))); + const auto pgmReturn = pb.Collect(pb.NarrowMap(sortFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(tupleType, items); } )); diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp index ae4c3fce0b..f5c6f86884 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp @@ -196,10 +196,10 @@ void TestChunked(bool withBlockExpand) { node = pb.BlockExpandChunked(node); // WideTakeBlocks won't work on chunked blocks node = pb.WideTakeBlocks(node, pb.NewDataLiteral<ui64>(19)); - node = pb.WideFromBlocks(node); + node = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(node))); } else { // WideFromBlocks should support chunked blocks - node = pb.WideFromBlocks(node); + node = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(node))); node = pb.Take(node, pb.NewDataLiteral<ui64>(19)); } node = pb.NarrowMap(node, [&](TRuntimeNode::TList items) -> TRuntimeNode { @@ -541,7 +541,7 @@ Y_UNIT_TEST_LLVM(TestWideFromBlocks) { const auto blocksFlow = pb.ToBlocks(flow); const auto wideFlow = pb.ExpandMap(blocksFlow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item, pb.AsScalar(pb.NewDataLiteral<ui64>(3ULL))}; }); - const auto wideFlow2 = pb.WideFromBlocks(wideFlow); + const auto wideFlow2 = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(wideFlow))); const auto narrowFlow = pb.NarrowMap(wideFlow2, [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }); const auto pgmReturn = pb.Collect(narrowFlow); @@ -581,7 +581,7 @@ Y_UNIT_TEST_LLVM(TestWideToAndFromBlocks) { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }); const auto wideBlocksFlow = pb.WideToBlocks(wideFlow); - const auto wideFlow2 = pb.WideFromBlocks(wideBlocksFlow); + const auto wideFlow2 = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(wideBlocksFlow))); const auto narrowFlow = pb.NarrowMap(wideFlow2, [&](TRuntimeNode::TList items) -> TRuntimeNode { return items[1]; }); diff --git a/yql/essentials/minikql/mkql_node.cpp b/yql/essentials/minikql/mkql_node.cpp index 434171131b..82772bb923 100644 --- a/yql/essentials/minikql/mkql_node.cpp +++ b/yql/essentials/minikql/mkql_node.cpp @@ -2522,5 +2522,17 @@ TArrayRef<TType* const> GetWideComponents(const TStreamType* type) { return AS_TYPE(TMultiType, type->GetItemType())->GetElements(); } +TArrayRef<TType *const> GetWideComponents(const TType* type) { + if (type->IsFlow()) { + const auto outputFlowType = AS_TYPE(TFlowType, type); + return GetWideComponents(outputFlowType); + } + if (type->IsStream()) { + const auto outputStreamType = AS_TYPE(TStreamType, type); + return GetWideComponents(outputStreamType); + } + MKQL_ENSURE(false, "Expect either flow or stream"); +} + } } diff --git a/yql/essentials/minikql/mkql_node.h b/yql/essentials/minikql/mkql_node.h index 6ac9c222d8..ab87008f24 100644 --- a/yql/essentials/minikql/mkql_node.h +++ b/yql/essentials/minikql/mkql_node.h @@ -1581,6 +1581,7 @@ EValueRepresentation GetValueRepresentation(NUdf::TDataTypeId typeId); TArrayRef<TType* const> GetWideComponents(const TFlowType* type); TArrayRef<TType* const> GetWideComponents(const TStreamType* type); +TArrayRef<TType *const> GetWideComponents(const TType* type); inline ui32 GetWideComponentsCount(const TFlowType* type) { return (ui32)GetWideComponents(type).size(); @@ -1590,6 +1591,10 @@ inline ui32 GetWideComponentsCount(const TStreamType* type) { return (ui32)GetWideComponents(type).size(); } +inline ui32 GetWideComponentsCount(const TType* type) { + return (ui32)GetWideComponents(type).size(); +} + template <TType::EKind SingularKind> TSingularType<SingularKind>* TSingularType<SingularKind>::Create(TTypeType* type, const TTypeEnvironment& env) { return ::new(env.Allocate<TSingularType<SingularKind>>()) TSingularType<SingularKind>(type); diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp index 82c1f604ba..6e832cf509 100644 --- a/yql/essentials/minikql/mkql_program_builder.cpp +++ b/yql/essentials/minikql/mkql_program_builder.cpp @@ -1503,12 +1503,28 @@ TRuntimeNode TProgramBuilder::FromBlocks(TRuntimeNode flow) { return TRuntimeNode(callableBuilder.Build(), false); } -TRuntimeNode TProgramBuilder::WideFromBlocks(TRuntimeNode flow) { - auto outputItems = ValidateBlockFlowType(flow.GetStaticType()); +TRuntimeNode TProgramBuilder::WideFromBlocks(TRuntimeNode stream) { + MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected WideStream as input type"); + if constexpr (RuntimeVersion < 54U) { + // Preserve the old behaviour for ABI compatibility. + // Emit (FromFlow (WideFromBlocks (ToFlow (<stream>)))) to + // process the flow in favor to the given stream following + // the older MKQL ABI. + // FIXME: Drop the branch below, when the time comes. + const auto inputFlow = ToFlow(stream); + auto outputItems = ValidateBlockFlowType(inputFlow.GetStaticType()); + outputItems.pop_back(); + TType* outputMultiType = NewMultiType(outputItems); + TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputMultiType)); + callableBuilder.Add(inputFlow); + const auto outputFlow = TRuntimeNode(callableBuilder.Build(), false); + return FromFlow(outputFlow); + } + auto outputItems = ValidateBlockStreamType(stream.GetStaticType()); outputItems.pop_back(); TType* outputMultiType = NewMultiType(outputItems); - TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputMultiType)); - callableBuilder.Add(flow); + TCallableBuilder callableBuilder(Env, __func__, NewStreamType(outputMultiType)); + callableBuilder.Add(stream); return TRuntimeNode(callableBuilder.Build(), false); } diff --git a/yql/essentials/public/purecalc/common/transformations/utils.cpp b/yql/essentials/public/purecalc/common/transformations/utils.cpp index 6f63be0208..8c5192da26 100644 --- a/yql/essentials/public/purecalc/common/transformations/utils.cpp +++ b/yql/essentials/public/purecalc/common/transformations/utils.cpp @@ -13,41 +13,36 @@ TExprNode::TPtr NYql::NPureCalc::NodeFromBlocks( ) { const auto items = structType->GetItems(); Y_ENSURE(items.size() > 0); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - return ctx.Builder(pos) .Lambda() .Param("stream") .Callable(0, "FromFlow") .Callable(0, "NarrowMap") - .Callable(0, "WideFromBlocks") - .Callable(0, "ExpandMap") - .Callable(0, "ToFlow") - .Arg(0, "stream") - .Seal() - .Lambda(1) - .Param("item") - .Do([&](TExprNodeBuilder& lambda) -> TExprNodeBuilder& { - ui32 i = 0; - for (const auto& item : items) { - lambda.Callable(i++, "Member") - .Arg(0, "item") - .Atom(1, item->GetName()) - .Seal(); - } - lambda.Callable(i, "Member") - .Arg(0, "item") - .Atom(1, PurecalcBlockColumnLength) - .Seal(); - return lambda; - }) + .Callable(0, "ToFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "FromFlow") + .Callable(0, "ExpandMap") + .Callable(0, "ToFlow") + .Arg(0, "stream") + .Seal() + .Lambda(1) + .Param("item") + .Do([&](TExprNodeBuilder& lambda) -> TExprNodeBuilder& { + ui32 i = 0; + for (const auto& item : items) { + lambda.Callable(i++, "Member") + .Arg(0, "item") + .Atom(1, item->GetName()) + .Seal(); + } + lambda.Callable(i, "Member") + .Arg(0, "item") + .Atom(1, PurecalcBlockColumnLength) + .Seal(); + return lambda; + }) + .Seal() + .Seal() .Seal() .Seal() .Seal() diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp index db7fdd7430..beb330b74b 100644 --- a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp +++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp @@ -643,7 +643,9 @@ void RegisterYtFileMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) { values = NCommon::MkqlBuildExpr(ytMap.Mapper().Body().Ref(), innerCtx); if (IsWideBlockType(lambdaOutputType)) { - values = ctx.ProgramBuilder.WideFromBlocks(values); + values = ctx.ProgramBuilder.ToFlow( + ctx.ProgramBuilder.WideFromBlocks( + ctx.ProgramBuilder.FromFlow(values))); } if (ETypeAnnotationKind::Multi == lambdaOutputType.GetKind()) diff --git a/yt/yql/providers/yt/provider/yql_yt_block_input.cpp b/yt/yql/providers/yt/provider/yql_yt_block_input.cpp index 9e4d509d8e..f92f152c31 100644 --- a/yt/yql/providers/yt/provider/yql_yt_block_input.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_block_input.cpp @@ -41,21 +41,16 @@ private: auto settings = RemoveSetting(map.Settings().Ref(), EYtSettingType::BlockInputReady, ctx); settings = AddSetting(*settings, EYtSettingType::BlockInputApplied, TExprNode::TPtr(), ctx); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - auto mapperLambda = Build<TCoLambda>(ctx, map.Mapper().Pos()) .Args({"flow"}) .Body<TExprApplier>() .Apply(map.Mapper()) - .With<TCoWideFromBlocks>(0) - .Input("flow") + .With<TCoToFlow>(0) + .Input<TCoWideFromBlocks>() + .Input<TCoFromFlow>() + .Input("flow") + .Build() + .Build() .Build() .Build() .Done() diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp index 9f4c2aa4a1..5d9949a257 100644 --- a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp @@ -1636,8 +1636,8 @@ TExprNode::TPtr BuildBlockMapJoin(TExprNode::TPtr leftFlow, TExprNode::TPtr righ return ctx.Builder(pos) .Callable("NarrowMap") - .Callable(0, "WideFromBlocks") - .Callable(0, "ToFlow") + .Callable(0, "ToFlow") + .Callable(0, "WideFromBlocks") .Callable(0, "BlockMapJoinCore") .Callable(0, "FromFlow") .Callable(0, "WideToBlocks") |