diff options
author | imunkin <imunkin@yandex-team.com> | 2025-03-24 11:37:28 +0300 |
---|---|---|
committer | imunkin <imunkin@yandex-team.com> | 2025-03-24 11:52:50 +0300 |
commit | 71f6767025c7a8ac0fe7c9c45556faf1d7f7391c (patch) | |
tree | 75cbae5feb954fae8d1c7f5d380e42ea5abfb7b3 | |
parent | 17bac99361ace1e8bcd148a599a7785624f7583e (diff) | |
download | ydb-71f6767025c7a8ac0fe7c9c45556faf1d7f7391c.tar.gz |
Introduce a flag to control type annotation for ReplicateScalars
commit_hash:6c0ce61026840ced37221b08988bfcbe5df5f931
-rw-r--r-- | yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp | 114 | ||||
-rw-r--r-- | yql/essentials/core/type_ann/type_ann_blocks.cpp | 9 | ||||
-rw-r--r-- | yql/essentials/core/yql_expr_type_annotation.h | 4 |
3 files changed, 127 insertions, 0 deletions
diff --git a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp index 9cf52f84e7..1be5b5349c 100644 --- a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -137,6 +137,15 @@ TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext& const auto tail = input.HeadPtr(); const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr() : ctx.NewCallable(tail->Pos(), "ToFlow", { tail }); + + // Static assert to ensure backward compatible change: if the + // constant below is true, both input and output types of + // ReplicateScalars callable have to be WideStream; otherwise, + // both input and output types have to be WideFlow. + // FIXME: When all spots using ReplicateScalars are adjusted + // to work with WideStream, drop the assertion below. + static_assert(!NYql::NBlockStreamIO::ReplicateScalars); + return ctx.Builder(node->Pos()) .Callable("FromFlow") .Callable(0, "ReplicateScalars") @@ -190,6 +199,14 @@ TExprNode::TPtr OptimizeWideFromBlocks(const TExprNode::TPtr& node, TExprContext return input.HeadPtr(); } + // Static assert to ensure backward compatible change: if the + // constant below is true, both input and output types of + // ReplicateScalars callable have to be WideStream; otherwise, + // both input and output types have to be WideFlow. + // FIXME: When all spots using ReplicateScalars are adjusted + // to work with WideStream, drop the assertion below. + static_assert(!NYql::NBlockStreamIO::ReplicateScalars); + if (input.IsCallable("FromFlow") && input.Head().IsCallable("ReplicateScalars")) { const auto& replicateScalars = input.Head(); // Technically, the code below rewrites the following sequence @@ -213,6 +230,15 @@ TExprNode::TPtr OptimizeWideFromBlocks(const TExprNode::TPtr& node, TExprContext TExprNode::TPtr OptimizeWideTakeSkipBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { Y_UNUSED(types); + + // Static assert to ensure backward compatible change: if the + // constant below is true, both input and output types of + // ReplicateScalars callable have to be WideStream; otherwise, + // both input and output types have to be WideFlow. + // FIXME: When all spots using ReplicateScalars are adjusted + // to work with WideStream, drop the assertion below. + static_assert(!NYql::NBlockStreamIO::ReplicateScalars); + if (node->Head().IsCallable("ReplicateScalars")) { YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Content() << " with " << node->Head().Content(); return ctx.SwapWithHead(*node); @@ -223,6 +249,15 @@ TExprNode::TPtr OptimizeWideTakeSkipBlocks(const TExprNode::TPtr& node, TExprCon TExprNode::TPtr OptimizeBlockCompress(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { Y_UNUSED(types); + + // Static assert to ensure backward compatible change: if the + // constant below is true, both input and output types of + // ReplicateScalars callable have to be WideStream; otherwise, + // both input and output types have to be WideFlow. + // FIXME: When all spots using ReplicateScalars are adjusted + // to work with WideStream, drop the assertion below. + static_assert(!NYql::NBlockStreamIO::ReplicateScalars); + if (node->Head().IsCallable("ReplicateScalars")) { YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Content() << " with " << node->Head().Content(); if (node->Head().ChildrenSize() == 1) { @@ -250,6 +285,15 @@ TExprNode::TPtr OptimizeBlockCompress(const TExprNode::TPtr& node, TExprContext& TExprNode::TPtr OptimizeBlocksTopOrSort(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { Y_UNUSED(types); + + // Static assert to ensure backward compatible change: if the + // constant below is true, both input and output types of + // ReplicateScalars callable have to be WideStream; otherwise, + // both input and output types have to be WideFlow. + // FIXME: When all spots using ReplicateScalars are adjusted + // to work with WideStream, drop the assertion below. + static_assert(!NYql::NBlockStreamIO::ReplicateScalars); + if (node->Head().IsCallable("ReplicateScalars")) { YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Content() << " with " << node->Head().Content(); return ctx.SwapWithHead(*node); @@ -263,6 +307,15 @@ TExprNode::TPtr OptimizeBlockExtend(const TExprNode::TPtr& node, TExprContext& c TExprNodeList inputs = node->ChildrenList(); bool hasReplicateScalars = false; for (auto& input : inputs) { + + // Static assert to ensure backward compatible change: if the + // constant below is true, both input and output types of + // ReplicateScalars callable have to be WideStream; otherwise, + // both input and output types have to be WideFlow. + // FIXME: When all spots using ReplicateScalars are adjusted + // to work with WideStream, drop the assertion below. + static_assert(!NYql::NBlockStreamIO::ReplicateScalars); + if (input->IsCallable("ReplicateScalars")) { hasReplicateScalars = true; input = input->HeadPtr(); @@ -279,6 +332,15 @@ TExprNode::TPtr OptimizeBlockExtend(const TExprNode::TPtr& node, TExprContext& c TExprNode::TPtr OptimizeReplicateScalars(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { Y_UNUSED(types); + + // Static assert to ensure backward compatible change: if the + // constant below is true, both input and output types of + // ReplicateScalars callable have to be WideStream; otherwise, + // both input and output types have to be WideFlow. + // FIXME: When all spots using ReplicateScalars are adjusted + // to work with WideStream, drop the assertion below. + static_assert(!NYql::NBlockStreamIO::ReplicateScalars); + if (node->Head().IsCallable("ReplicateScalars")) { if (node->ChildrenSize() == 1) { YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Head().Content() << " as input of " << node->Content(); @@ -322,6 +384,15 @@ TExprNode::TPtr ExpandBlockExtend(const TExprNode::TPtr& node, TExprContext& ctx const bool hasScalars = AnyOf(items.begin(), items.end() - 1, [](const auto& item) { return item->IsScalar(); }); seenScalars = seenScalars || hasScalars; + + // Static assert to ensure backward compatible change: if the + // constant below is true, both input and output types of + // ReplicateScalars callable have to be WideStream; otherwise, + // both input and output types have to be WideFlow. + // FIXME: When all spots using ReplicateScalars are adjusted + // to work with WideStream, drop the assertion below. + static_assert(!NYql::NBlockStreamIO::ReplicateScalars); + newChildren.push_back(ctx.WrapByCallableIf(hasScalars, "ReplicateScalars", std::move(child))); } @@ -334,6 +405,15 @@ TExprNode::TPtr ExpandBlockExtend(const TExprNode::TPtr& node, TExprContext& ctx TExprNode::TPtr ExpandReplicateScalars(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { Y_UNUSED(types); + + // Static assert to ensure backward compatible change: if the + // constant below is true, both input and output types of + // ReplicateScalars callable have to be WideStream; otherwise, + // both input and output types have to be WideFlow. + // FIXME: When all spots using ReplicateScalars are adjusted + // to work with WideStream, drop the assertion below. + static_assert(!NYql::NBlockStreamIO::ReplicateScalars); + YQL_CLOG(DEBUG, CorePeepHole) << "Expand " << node->Content(); const auto& items = node->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems(); const ui32 width = items.size(); @@ -6686,6 +6766,14 @@ TExprNode::TPtr OptimizeBlockCombine(const TExprNode::TPtr& node, TExprContext& return UpdateBlockCombineColumns(node, filterIndex, argIndices, ctx); } + // Static assert to ensure backward compatible change: if the + // constant below is true, both input and output types of + // ReplicateScalars callable have to be WideStream; otherwise, + // both input and output types have to be WideFlow. + // FIXME: When all spots using ReplicateScalars are adjusted + // to work with WideStream, drop the assertion below. + static_assert(!NYql::NBlockStreamIO::ReplicateScalars); + if (node->Head().IsCallable("ReplicateScalars")) { YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Head().Content() << " as input of " << node->Content(); return ctx.ChangeChild(*node, 0, node->Head().HeadPtr()); @@ -6696,6 +6784,15 @@ TExprNode::TPtr OptimizeBlockCombine(const TExprNode::TPtr& node, TExprContext& TExprNode::TPtr OptimizeBlockMerge(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { Y_UNUSED(types); + + // Static assert to ensure backward compatible change: if the + // constant below is true, both input and output types of + // ReplicateScalars callable have to be WideStream; otherwise, + // both input and output types have to be WideFlow. + // FIXME: When all spots using ReplicateScalars are adjusted + // to work with WideStream, drop the assertion below. + static_assert(!NYql::NBlockStreamIO::ReplicateScalars); + if (node->Head().IsCallable("ReplicateScalars")) { YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Head().Content() << " as input of " << node->Content(); return ctx.ChangeChild(*node, 0, node->Head().HeadPtr()); @@ -6768,6 +6865,14 @@ TExprNode::TPtr SwapReplicateScalarsWithWideMap(const TExprNode::TPtr& wideMap, } } + // Static assert to ensure backward compatible change: if the + // constant below is true, both input and output types of + // ReplicateScalars callable have to be WideStream; otherwise, + // both input and output types have to be WideFlow. + // FIXME: When all spots using ReplicateScalars are adjusted + // to work with WideStream, drop the assertion below. + static_assert(!NYql::NBlockStreamIO::ReplicateScalars); + return ctx.Builder(wideMap->Pos()) .Callable("ReplicateScalars") .Callable(0, "WideMap") @@ -6949,6 +7054,15 @@ TExprNode::TPtr OptimizeWideMaps(const TExprNode::TPtr& node, TExprContext& ctx) .Add(1, DropUnusedArgs(node->Tail(), unused, ctx)) .Seal().Build(); } else if (node->IsCallable("WideMap") && input.IsCallable("ReplicateScalars")) { + + // Static assert to ensure backward compatible change: if the + // constant below is true, both input and output types of + // ReplicateScalars callable have to be WideStream; otherwise, + // both input and output types have to be WideFlow. + // FIXME: When all spots using ReplicateScalars are adjusted + // to work with WideStream, drop the assertion below. + static_assert(!NYql::NBlockStreamIO::ReplicateScalars); + YQL_CLOG(DEBUG, CorePeepHole) << node->Content() << " over " << input.Content(); return SwapReplicateScalarsWithWideMap(node, ctx); } diff --git a/yql/essentials/core/type_ann/type_ann_blocks.cpp b/yql/essentials/core/type_ann/type_ann_blocks.cpp index 496fe493f4..c674d61ed5 100644 --- a/yql/essentials/core/type_ann/type_ann_blocks.cpp +++ b/yql/essentials/core/type_ann/type_ann_blocks.cpp @@ -76,6 +76,15 @@ IGraphTransformer::TStatus ReplicateScalarWrapper(const TExprNode::TPtr& input, } IGraphTransformer::TStatus ReplicateScalarsWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + + // Static assert to ensure backward compatible change: if the + // constant below is true, both input and output types of + // ReplicateScalars callable have to be WideStream; otherwise, + // both input and output types have to be WideFlow. + // FIXME: When all spots using ReplicateScalars are adjusted + // to work with WideStream, drop the assertion below. + static_assert(!NYql::NBlockStreamIO::ReplicateScalars); + if (!EnsureMinArgsCount(*input, 1, ctx.Expr) || !EnsureMaxArgsCount(*input, 2, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } diff --git a/yql/essentials/core/yql_expr_type_annotation.h b/yql/essentials/core/yql_expr_type_annotation.h index 3d2ad4199b..13907df704 100644 --- a/yql/essentials/core/yql_expr_type_annotation.h +++ b/yql/essentials/core/yql_expr_type_annotation.h @@ -354,4 +354,8 @@ TStringBuf NormalizeCallableName(TStringBuf name); void CheckExpectedTypeAndColumnOrder(const TExprNode& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx); +namespace NBlockStreamIO { + constexpr bool ReplicateScalars = false; +} // namespace NBlockStreamIO + } |