diff options
author | imunkin <imunkin@yandex-team.com> | 2025-01-17 16:12:38 +0300 |
---|---|---|
committer | imunkin <imunkin@yandex-team.com> | 2025-01-17 16:29:14 +0300 |
commit | 5bf3fd8cf1463c43723f1c8ba9d2322073f93c04 (patch) | |
tree | 89f3800c888c589798de9e4f5a34384eda2f6808 /yql/essentials/core | |
parent | e2b9ea56b89acb80b934fcfa26a782bb92e1d39e (diff) | |
download | ydb-5bf3fd8cf1463c43723f1c8ba9d2322073f93c04.tar.gz |
YQL-19424: Use WideStream instead of WideFlow in WideFromBlocks computation node
commit_hash:21c84a9004cc57883d949b8fc637fc3ae7bfbda9
Diffstat (limited to 'yql/essentials/core')
-rw-r--r-- | yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp | 267 | ||||
-rw-r--r-- | yql/essentials/core/type_ann/type_ann_blocks.cpp | 13 | ||||
-rw-r--r-- | yql/essentials/core/yql_aggregate_expander.cpp | 31 | ||||
-rw-r--r-- | yql/essentials/core/yql_expr_type_annotation.h | 2 |
4 files changed, 161 insertions, 152 deletions
diff --git a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp index f7ba75c96c..b02dc45e66 100644 --- a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -128,21 +128,26 @@ TExprNode::TPtr RebuildArgumentsOnlyLambdaForBlocks(const TExprNode& lambda, TEx TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { Y_UNUSED(types); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - - if (node->Head().IsCallable("WideFromBlocks")) { - YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << node->Head().Content(); - return ctx.NewCallable(node->Pos(), "ReplicateScalars", { node->Head().HeadPtr() }); - } - - if (const auto& input = node->Head(); input.IsCallable({"Extend", "OrderedExtend"})) { + const auto& input = node->Head(); + if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) { + const auto& wideFromBlocks = input.Head(); + // Technically, the code below rewrites the following sequence + // (WideToBlocks (ToFlow (WideFromBlocks (<input>))))) + // into (ReplicateScalars (<input>)), but ToFlow/FromFlow + // wrappers will be removed when all other nodes in block + // pipeline start using WideStream instead of the WideFlow. + // Hence, the logging is left intact. + YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << wideFromBlocks.Content(); + // If tail is FromFlow, its input is WideFlow and can be + // used intact; Otherwise the input is WideStream, so the + // new input should be converted to WideFlow. + const auto tail = wideFromBlocks.HeadPtr(); + const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr() + : ctx.NewCallable(tail->Pos(), "ToFlow", { tail }); + return ctx.NewCallable(node->Pos(), "ReplicateScalars", { flowInput }); + } + + if (input.IsCallable({"Extend", "OrderedExtend"})) { YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Content() << " with " << input.Content(); TExprNodeList newChildren; newChildren.reserve(input.ChildrenSize()); @@ -157,23 +162,35 @@ TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext& TExprNode::TPtr OptimizeWideFromBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { Y_UNUSED(types); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - - if (node->Head().IsCallable("WideToBlocks")) { - YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << node->Head().Content(); - return node->Head().HeadPtr(); - } - - if (node->Head().IsCallable("ReplicateScalars")) { - YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Head().Content() << " as input of " << node->Content(); - return ctx.ChangeChild(*node, 0, node->Head().HeadPtr()); + const auto& input = node->Head(); + if (input.IsCallable("FromFlow") && input.Head().IsCallable("WideToBlocks")) { + const auto& wideToBlocks = input.Head(); + // Technically, the code below rewrites the following sequence + // (WideFromBlocks (FromFlow (WideToBlocks (<input>)))) + // into (FromFlow (<input>)) (to match the ToFlow parent), + // but ToFlow/FromFlow wrappers will be removed when all + // other nodes in block pipeline start using WideStream + // instead of the WideFlow. Hence, the logging is left intact. + YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << wideToBlocks.Content(); + return ctx.NewCallable(node->Pos(), "FromFlow", {wideToBlocks.HeadPtr()}); + } + + if (input.IsCallable("FromFlow") && input.Head().IsCallable("ReplicateScalars")) { + const auto& replicateScalars = input.Head(); + // Technically, the code below rewrites the following sequence + // (WideFromBlocks (FromFlow (ReplicateScalars (<input>)))) + // into (WideFromBlocks (FromFlow (<input>))), but ToFlow/FromFlow + // wrappers will be removed when all other nodes in block + // pipeline start using WideStream instead of the WideFlow. + // Hence, the logging is left intact. + YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << replicateScalars.Content() << " as input of " << node->Content(); + return ctx.Builder(node->Pos()) + .Callable(node->Content()) + .Callable(0, input.Content()) + .Add(0, replicateScalars.HeadPtr()) + .Seal() + .Seal() + .Build(); } return node; @@ -6101,34 +6118,41 @@ TExprNode::TPtr OptimizeWideMapBlocks(const TExprNode::TPtr& node, TExprContext& Y_ENSURE(node->IsCallable("WideMap")); const auto lambda = node->TailPtr(); // Swap trivial WideMap and WideFromBlocks. - if (node->Head().IsCallable("WideFromBlocks")) { + const auto& input = node->Head(); + if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) { if (auto newLambda = RebuildArgumentsOnlyLambdaForBlocks(*lambda, ctx, types)) { - YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Head().Content() << " with " << node->Content(); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - + const auto& wideFromBlocks = input.Head(); + // Technically, the code below rewrites the following sequence + // (WideMap (ToFlow (WideFromBlocks (FromFlow (<input>))))) + // into (ToFlow (WideFromBlocks (FromFlow (WideMap (<input>)). + // Hence, the logging is left intact. + YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << wideFromBlocks.Content() << " with " << node->Content(); + // If tail is FromFlow, its input is WideFlow and can be + // used intact; Otherwise the input is WideStream, so the + // new input should be converted to WideFlow. + const auto tail = wideFromBlocks.HeadPtr(); + const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr() + : ctx.NewCallable(tail->Pos(), "ToFlow", { tail }); return ctx.Builder(node->Pos()) - .Callable("WideFromBlocks") - .Callable(0, "WideMap") - .Add(0, node->Head().HeadPtr()) - .Add(1, newLambda) + .Callable("ToFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "FromFlow") + .Callable(0, "WideMap") + .Add(0, flowInput) + .Add(1, newLambda) + .Seal() + .Seal() .Seal() .Seal() .Build(); } } - if (!CanRewriteToBlocksWithInput(node->Head(), types)) { + if (!CanRewriteToBlocksWithInput(input, types)) { return node; } - auto multiInputType = node->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>(); + auto multiInputType = input.GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>(); ui32 newNodes; TNodeMap<size_t> rewritePositions; TExprNode::TPtr blockLambda; @@ -6144,22 +6168,17 @@ TExprNode::TPtr OptimizeWideMapBlocks(const TExprNode::TPtr& node, TExprContext& YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to blocks, extra nodes: " << newNodes << ", extra columns: " << rewritePositions.size(); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - auto ret = ctx.Builder(node->Pos()) - .Callable("WideFromBlocks") - .Callable(0, "WideMap") - .Callable(0, "WideToBlocks") - .Add(0, node->HeadPtr()) + .Callable("ToFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "FromFlow") + .Callable(0, "WideMap") + .Callable(0, "WideToBlocks") + .Add(0, node->HeadPtr()) + .Seal() + .Add(1, blockLambda) + .Seal() .Seal() - .Add(1, blockLambda) .Seal() .Seal() .Build(); @@ -6220,18 +6239,13 @@ TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprConte YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to blocks, extra nodes: " << newNodes << ", extra columns: " << rewritePositions.size(); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - return ctx.Builder(node->Pos()) - .Callable("WideFromBlocks") - .Add(0, result) + .Callable("ToFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "FromFlow") + .Add(0, result) + .Seal() + .Seal() .Seal() .Build(); } @@ -6239,19 +6253,14 @@ TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprConte if (!newNodes) { return node; } - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - auto filtered = ctx.Builder(node->Pos()) .Callable("WideFilter") - .Callable(0, "WideFromBlocks") - .Add(0, blockMapped) + .Callable(0, "ToFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "FromFlow") + .Add(0, blockMapped) + .Seal() + .Seal() .Seal() .Add(1, restLambda) .Seal() @@ -6320,22 +6329,17 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte TStringBuf newName = node->Content() == "Skip" ? "WideSkipBlocks" : "WideTakeBlocks"; YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to " << newName; - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - return ctx.Builder(node->Pos()) - .Callable("WideFromBlocks") - .Callable(0, newName) - .Callable(0, "WideToBlocks") - .Add(0, node->HeadPtr()) + .Callable("ToFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "FromFlow") + .Callable(0, newName) + .Callable(0, "WideToBlocks") + .Add(0, node->HeadPtr()) + .Seal() + .Add(1, node->ChildPtr(1)) + .Seal() .Seal() - .Add(1, node->ChildPtr(1)) .Seal() .Seal() .Build(); @@ -6375,18 +6379,13 @@ TExprNode::TPtr OptimizeTopOrSortBlocks(const TExprNode::TPtr& node, TExprContex YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to " << newName; auto children = node->ChildrenList(); children[0] = ctx.NewCallable(node->Pos(), "WideToBlocks", { children[0] }); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - return ctx.Builder(node->Pos()) - .Callable("WideFromBlocks") - .Add(0, ctx.NewCallable(node->Pos(), newName, std::move(children))) + .Callable("ToFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "FromFlow") + .Add(0, ctx.NewCallable(node->Pos(), newName, std::move(children))) + .Seal() + .Seal() .Seal() .Build(); } @@ -6606,17 +6605,45 @@ TExprNode::TPtr OptimizeWideMaps(const TExprNode::TPtr& node, TExprContext& ctx) .Add(1, DropUnusedArgs(node->Tail(), actualUnused, ctx)) .Seal().Build(); } - } else if (input.IsCallable({"WideFromBlocks", "WideTakeBlocks", "WideSkipBlocks", "BlockExpandChunked"})) { + } else if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) { + const auto& wideFromBlocks = input.Head(); + // WideFromBlocks uses WideStream instead of WideFlow, + // so it's wrapped with ToFlow/FromFlow. Hence, to drop + // unused fields for particular WideFromBlocks node, + // the optimizer has to rewrite FromFlow child, but + // logging is left intact. + YQL_CLOG(DEBUG, CorePeepHole) << node->Content() << " over " << wideFromBlocks.Content() << " with " << unused.size() << " unused fields."; + const auto tail = wideFromBlocks.HeadPtr(); + const auto width = tail->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>()->GetSize(); + const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr() + : ctx.NewCallable(tail->Pos(), "ToFlow", { tail }); + return ctx.Builder(node->Pos()) + .Callable(node->Content()) + .Callable(0, "ToFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "FromFlow") + .Callable(0, "WideMap") + .Add(0, flowInput) + .Lambda(1) + .Params("items", width) + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + for (auto i = 0U, j = 0U; i < width; ++i) { + if (unused.cend() == std::find(unused.cbegin(), unused.cend(), i)) { + parent.Arg(j++, "items", i); + } + } + return parent; + }) + .Seal() + .Seal() + .Seal() + .Seal() + .Seal() + .Add(1, DropUnusedArgs(node->Tail(), unused, ctx)) + .Seal() + .Build(); + } else if (input.IsCallable({"WideTakeBlocks", "WideSkipBlocks", "BlockExpandChunked"})) { YQL_CLOG(DEBUG, CorePeepHole) << node->Content() << " over " << input.Content() << " with " << unused.size() << " unused fields."; - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - return ctx.Builder(node->Pos()) .Callable(node->Content()) .Add(0, ctx.ChangeChild(input, 0U, MakeWideMapForDropUnused(input.HeadPtr(), unused, ctx))) diff --git a/yql/essentials/core/type_ann/type_ann_blocks.cpp b/yql/essentials/core/type_ann/type_ann_blocks.cpp index 4f3a092dde..807620be9a 100644 --- a/yql/essentials/core/type_ann/type_ann_blocks.cpp +++ b/yql/essentials/core/type_ann/type_ann_blocks.cpp @@ -966,28 +966,19 @@ IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TEx IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { Y_UNUSED(output); - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - if (!EnsureArgsCount(*input, 1U, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } TTypeAnnotationNode::TListType retMultiType; - if (!EnsureWideFlowBlockType(input->Head(), retMultiType, ctx.Expr)) { + if (!EnsureWideStreamBlockType(input->Head(), retMultiType, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } YQL_ENSURE(!retMultiType.empty()); retMultiType.pop_back(); auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType); - input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType)); + input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType)); return IGraphTransformer::TStatus::Ok; } diff --git a/yql/essentials/core/yql_aggregate_expander.cpp b/yql/essentials/core/yql_aggregate_expander.cpp index 0ebfaa988b..bad2c22071 100644 --- a/yql/essentials/core/yql_aggregate_expander.cpp +++ b/yql/essentials/core/yql_aggregate_expander.cpp @@ -708,18 +708,9 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() { TExprNode::TPtr aggWideFlow; if (hashed) { - - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - aggWideFlow = Ctx.Builder(Node->Pos()) - .Callable("WideFromBlocks") - .Callable(0, "ToFlow") + .Callable("ToFlow") + .Callable(0, "WideFromBlocks") .Callable(0, "BlockCombineHashed") .Callable(0, "FromFlow") .Add(0, blocks) @@ -2939,15 +2930,15 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() { .Build(); } - // Static assert to ensure backward compatible change: if the - // constant below is true, both input and output types of - // WideFromBlocks callable have to be WideStream; otherwise, - // both input and output types have to be WideFlow. - // FIXME: When all spots using WideFromBlocks are adjusted - // to work with WideStream, drop the assertion below. - static_assert(!NYql::NBlockStreamIO::WideFromBlocks); - - auto aggWideFlow = Ctx.NewCallable(Node->Pos(), "WideFromBlocks", { aggBlocks }); + auto aggWideFlow = Ctx.Builder(Node->Pos()) + .Callable("ToFlow") + .Callable(0, "WideFromBlocks") + .Callable(0, "FromFlow") + .Add(0, aggBlocks) + .Seal() + .Seal() + .Seal() + .Build(); auto finalFlow = MakeNarrowMap(Node->Pos(), outputColumns, aggWideFlow, Ctx); auto root = Ctx.NewCallable(Node->Pos(), "FromFlow", { finalFlow }); auto lambdaStream = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), { streamArg }), std::move(root)); diff --git a/yql/essentials/core/yql_expr_type_annotation.h b/yql/essentials/core/yql_expr_type_annotation.h index 508c9e4258..9345a0104e 100644 --- a/yql/essentials/core/yql_expr_type_annotation.h +++ b/yql/essentials/core/yql_expr_type_annotation.h @@ -355,7 +355,7 @@ TStringBuf NormalizeCallableName(TStringBuf name); void CheckExpectedTypeAndColumnOrder(const TExprNode& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx); namespace NBlockStreamIO { - constexpr bool WideFromBlocks = false; + constexpr bool WideFromBlocks = true; } // namespace NBlockStreamIO } |