aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorimunkin <imunkin@yandex-team.com>2025-03-24 11:37:28 +0300
committerimunkin <imunkin@yandex-team.com>2025-03-24 11:52:50 +0300
commit71f6767025c7a8ac0fe7c9c45556faf1d7f7391c (patch)
tree75cbae5feb954fae8d1c7f5d380e42ea5abfb7b3
parent17bac99361ace1e8bcd148a599a7785624f7583e (diff)
downloadydb-71f6767025c7a8ac0fe7c9c45556faf1d7f7391c.tar.gz
Introduce a flag to control type annotation for ReplicateScalars
commit_hash:6c0ce61026840ced37221b08988bfcbe5df5f931
-rw-r--r--yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp114
-rw-r--r--yql/essentials/core/type_ann/type_ann_blocks.cpp9
-rw-r--r--yql/essentials/core/yql_expr_type_annotation.h4
3 files changed, 127 insertions, 0 deletions
diff --git a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp
index 9cf52f84e7..1be5b5349c 100644
--- a/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp
+++ b/yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp
@@ -137,6 +137,15 @@ TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext&
const auto tail = input.HeadPtr();
const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr()
: ctx.NewCallable(tail->Pos(), "ToFlow", { tail });
+
+ // Static assert to ensure backward compatible change: if the
+ // constant below is true, both input and output types of
+ // ReplicateScalars callable have to be WideStream; otherwise,
+ // both input and output types have to be WideFlow.
+ // FIXME: When all spots using ReplicateScalars are adjusted
+ // to work with WideStream, drop the assertion below.
+ static_assert(!NYql::NBlockStreamIO::ReplicateScalars);
+
return ctx.Builder(node->Pos())
.Callable("FromFlow")
.Callable(0, "ReplicateScalars")
@@ -190,6 +199,14 @@ TExprNode::TPtr OptimizeWideFromBlocks(const TExprNode::TPtr& node, TExprContext
return input.HeadPtr();
}
+ // Static assert to ensure backward compatible change: if the
+ // constant below is true, both input and output types of
+ // ReplicateScalars callable have to be WideStream; otherwise,
+ // both input and output types have to be WideFlow.
+ // FIXME: When all spots using ReplicateScalars are adjusted
+ // to work with WideStream, drop the assertion below.
+ static_assert(!NYql::NBlockStreamIO::ReplicateScalars);
+
if (input.IsCallable("FromFlow") && input.Head().IsCallable("ReplicateScalars")) {
const auto& replicateScalars = input.Head();
// Technically, the code below rewrites the following sequence
@@ -213,6 +230,15 @@ TExprNode::TPtr OptimizeWideFromBlocks(const TExprNode::TPtr& node, TExprContext
TExprNode::TPtr OptimizeWideTakeSkipBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
+
+ // Static assert to ensure backward compatible change: if the
+ // constant below is true, both input and output types of
+ // ReplicateScalars callable have to be WideStream; otherwise,
+ // both input and output types have to be WideFlow.
+ // FIXME: When all spots using ReplicateScalars are adjusted
+ // to work with WideStream, drop the assertion below.
+ static_assert(!NYql::NBlockStreamIO::ReplicateScalars);
+
if (node->Head().IsCallable("ReplicateScalars")) {
YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Content() << " with " << node->Head().Content();
return ctx.SwapWithHead(*node);
@@ -223,6 +249,15 @@ TExprNode::TPtr OptimizeWideTakeSkipBlocks(const TExprNode::TPtr& node, TExprCon
TExprNode::TPtr OptimizeBlockCompress(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
+
+ // Static assert to ensure backward compatible change: if the
+ // constant below is true, both input and output types of
+ // ReplicateScalars callable have to be WideStream; otherwise,
+ // both input and output types have to be WideFlow.
+ // FIXME: When all spots using ReplicateScalars are adjusted
+ // to work with WideStream, drop the assertion below.
+ static_assert(!NYql::NBlockStreamIO::ReplicateScalars);
+
if (node->Head().IsCallable("ReplicateScalars")) {
YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Content() << " with " << node->Head().Content();
if (node->Head().ChildrenSize() == 1) {
@@ -250,6 +285,15 @@ TExprNode::TPtr OptimizeBlockCompress(const TExprNode::TPtr& node, TExprContext&
TExprNode::TPtr OptimizeBlocksTopOrSort(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
+
+ // Static assert to ensure backward compatible change: if the
+ // constant below is true, both input and output types of
+ // ReplicateScalars callable have to be WideStream; otherwise,
+ // both input and output types have to be WideFlow.
+ // FIXME: When all spots using ReplicateScalars are adjusted
+ // to work with WideStream, drop the assertion below.
+ static_assert(!NYql::NBlockStreamIO::ReplicateScalars);
+
if (node->Head().IsCallable("ReplicateScalars")) {
YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Content() << " with " << node->Head().Content();
return ctx.SwapWithHead(*node);
@@ -263,6 +307,15 @@ TExprNode::TPtr OptimizeBlockExtend(const TExprNode::TPtr& node, TExprContext& c
TExprNodeList inputs = node->ChildrenList();
bool hasReplicateScalars = false;
for (auto& input : inputs) {
+
+ // Static assert to ensure backward compatible change: if the
+ // constant below is true, both input and output types of
+ // ReplicateScalars callable have to be WideStream; otherwise,
+ // both input and output types have to be WideFlow.
+ // FIXME: When all spots using ReplicateScalars are adjusted
+ // to work with WideStream, drop the assertion below.
+ static_assert(!NYql::NBlockStreamIO::ReplicateScalars);
+
if (input->IsCallable("ReplicateScalars")) {
hasReplicateScalars = true;
input = input->HeadPtr();
@@ -279,6 +332,15 @@ TExprNode::TPtr OptimizeBlockExtend(const TExprNode::TPtr& node, TExprContext& c
TExprNode::TPtr OptimizeReplicateScalars(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
+
+ // Static assert to ensure backward compatible change: if the
+ // constant below is true, both input and output types of
+ // ReplicateScalars callable have to be WideStream; otherwise,
+ // both input and output types have to be WideFlow.
+ // FIXME: When all spots using ReplicateScalars are adjusted
+ // to work with WideStream, drop the assertion below.
+ static_assert(!NYql::NBlockStreamIO::ReplicateScalars);
+
if (node->Head().IsCallable("ReplicateScalars")) {
if (node->ChildrenSize() == 1) {
YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Head().Content() << " as input of " << node->Content();
@@ -322,6 +384,15 @@ TExprNode::TPtr ExpandBlockExtend(const TExprNode::TPtr& node, TExprContext& ctx
const bool hasScalars = AnyOf(items.begin(), items.end() - 1, [](const auto& item) { return item->IsScalar(); });
seenScalars = seenScalars || hasScalars;
+
+ // Static assert to ensure backward compatible change: if the
+ // constant below is true, both input and output types of
+ // ReplicateScalars callable have to be WideStream; otherwise,
+ // both input and output types have to be WideFlow.
+ // FIXME: When all spots using ReplicateScalars are adjusted
+ // to work with WideStream, drop the assertion below.
+ static_assert(!NYql::NBlockStreamIO::ReplicateScalars);
+
newChildren.push_back(ctx.WrapByCallableIf(hasScalars, "ReplicateScalars", std::move(child)));
}
@@ -334,6 +405,15 @@ TExprNode::TPtr ExpandBlockExtend(const TExprNode::TPtr& node, TExprContext& ctx
TExprNode::TPtr ExpandReplicateScalars(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
+
+ // Static assert to ensure backward compatible change: if the
+ // constant below is true, both input and output types of
+ // ReplicateScalars callable have to be WideStream; otherwise,
+ // both input and output types have to be WideFlow.
+ // FIXME: When all spots using ReplicateScalars are adjusted
+ // to work with WideStream, drop the assertion below.
+ static_assert(!NYql::NBlockStreamIO::ReplicateScalars);
+
YQL_CLOG(DEBUG, CorePeepHole) << "Expand " << node->Content();
const auto& items = node->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems();
const ui32 width = items.size();
@@ -6686,6 +6766,14 @@ TExprNode::TPtr OptimizeBlockCombine(const TExprNode::TPtr& node, TExprContext&
return UpdateBlockCombineColumns(node, filterIndex, argIndices, ctx);
}
+ // Static assert to ensure backward compatible change: if the
+ // constant below is true, both input and output types of
+ // ReplicateScalars callable have to be WideStream; otherwise,
+ // both input and output types have to be WideFlow.
+ // FIXME: When all spots using ReplicateScalars are adjusted
+ // to work with WideStream, drop the assertion below.
+ static_assert(!NYql::NBlockStreamIO::ReplicateScalars);
+
if (node->Head().IsCallable("ReplicateScalars")) {
YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Head().Content() << " as input of " << node->Content();
return ctx.ChangeChild(*node, 0, node->Head().HeadPtr());
@@ -6696,6 +6784,15 @@ TExprNode::TPtr OptimizeBlockCombine(const TExprNode::TPtr& node, TExprContext&
TExprNode::TPtr OptimizeBlockMerge(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
+
+ // Static assert to ensure backward compatible change: if the
+ // constant below is true, both input and output types of
+ // ReplicateScalars callable have to be WideStream; otherwise,
+ // both input and output types have to be WideFlow.
+ // FIXME: When all spots using ReplicateScalars are adjusted
+ // to work with WideStream, drop the assertion below.
+ static_assert(!NYql::NBlockStreamIO::ReplicateScalars);
+
if (node->Head().IsCallable("ReplicateScalars")) {
YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Head().Content() << " as input of " << node->Content();
return ctx.ChangeChild(*node, 0, node->Head().HeadPtr());
@@ -6768,6 +6865,14 @@ TExprNode::TPtr SwapReplicateScalarsWithWideMap(const TExprNode::TPtr& wideMap,
}
}
+ // Static assert to ensure backward compatible change: if the
+ // constant below is true, both input and output types of
+ // ReplicateScalars callable have to be WideStream; otherwise,
+ // both input and output types have to be WideFlow.
+ // FIXME: When all spots using ReplicateScalars are adjusted
+ // to work with WideStream, drop the assertion below.
+ static_assert(!NYql::NBlockStreamIO::ReplicateScalars);
+
return ctx.Builder(wideMap->Pos())
.Callable("ReplicateScalars")
.Callable(0, "WideMap")
@@ -6949,6 +7054,15 @@ TExprNode::TPtr OptimizeWideMaps(const TExprNode::TPtr& node, TExprContext& ctx)
.Add(1, DropUnusedArgs(node->Tail(), unused, ctx))
.Seal().Build();
} else if (node->IsCallable("WideMap") && input.IsCallable("ReplicateScalars")) {
+
+ // Static assert to ensure backward compatible change: if the
+ // constant below is true, both input and output types of
+ // ReplicateScalars callable have to be WideStream; otherwise,
+ // both input and output types have to be WideFlow.
+ // FIXME: When all spots using ReplicateScalars are adjusted
+ // to work with WideStream, drop the assertion below.
+ static_assert(!NYql::NBlockStreamIO::ReplicateScalars);
+
YQL_CLOG(DEBUG, CorePeepHole) << node->Content() << " over " << input.Content();
return SwapReplicateScalarsWithWideMap(node, ctx);
}
diff --git a/yql/essentials/core/type_ann/type_ann_blocks.cpp b/yql/essentials/core/type_ann/type_ann_blocks.cpp
index 496fe493f4..c674d61ed5 100644
--- a/yql/essentials/core/type_ann/type_ann_blocks.cpp
+++ b/yql/essentials/core/type_ann/type_ann_blocks.cpp
@@ -76,6 +76,15 @@ IGraphTransformer::TStatus ReplicateScalarWrapper(const TExprNode::TPtr& input,
}
IGraphTransformer::TStatus ReplicateScalarsWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+
+ // Static assert to ensure backward compatible change: if the
+ // constant below is true, both input and output types of
+ // ReplicateScalars callable have to be WideStream; otherwise,
+ // both input and output types have to be WideFlow.
+ // FIXME: When all spots using ReplicateScalars are adjusted
+ // to work with WideStream, drop the assertion below.
+ static_assert(!NYql::NBlockStreamIO::ReplicateScalars);
+
if (!EnsureMinArgsCount(*input, 1, ctx.Expr) || !EnsureMaxArgsCount(*input, 2, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
diff --git a/yql/essentials/core/yql_expr_type_annotation.h b/yql/essentials/core/yql_expr_type_annotation.h
index 3d2ad4199b..13907df704 100644
--- a/yql/essentials/core/yql_expr_type_annotation.h
+++ b/yql/essentials/core/yql_expr_type_annotation.h
@@ -354,4 +354,8 @@ TStringBuf NormalizeCallableName(TStringBuf name);
void CheckExpectedTypeAndColumnOrder(const TExprNode& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx);
+namespace NBlockStreamIO {
+ constexpr bool ReplicateScalars = false;
+} // namespace NBlockStreamIO
+
}