summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <[email protected]>2023-10-31 07:40:55 +0300
committeraneporada <[email protected]>2023-10-31 08:09:38 +0300
commitbd8dc153a915ae55fc38b45d2d682460306e81f1 (patch)
tree9e38d2cbb23913c67d611dc8b2ec1096bab9adf4
parentf17b3be239426a47af2ecc5e1122dde81ba15db8 (diff)
Do not use raw Extend over wide block flows
-rw-r--r--ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp82
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_blocks.cpp46
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_blocks.h1
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_core.cpp2
4 files changed, 129 insertions, 2 deletions
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
index 8c4abc9a237..6d4f26fa675 100644
--- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
+++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
@@ -139,12 +139,64 @@ TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext&
for (auto& child : input.ChildrenList()) {
newChildren.emplace_back(ctx.ChangeChild(*node, 0, std::move(child)));
}
- return ctx.ChangeChildren(input, std::move(newChildren));
+ return ctx.NewCallable(input.Pos(), input.IsCallable("Extend") ? "BlockExtend" : "BlockOrderedExtend", std::move(newChildren));
}
return node;
}
+TExprNode::TPtr ExpandBlockExtend(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
+ Y_UNUSED(types);
+ YQL_CLOG(DEBUG, CorePeepHole) << "Expand " << node->Content();
+
+ TExprNodeList newChildren;
+ newChildren.reserve(node->ChildrenSize());
+ bool seenScalars = false;
+ for (auto& child : node->ChildrenList()) {
+ const auto& items = child->GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems();
+ const ui32 width = items.size();
+ YQL_ENSURE(width > 0);
+ if (AllOf(items.begin(), items.end() - 1, [](const auto& item) { return item->IsBlock(); })) {
+ newChildren.push_back(child);
+ continue;
+ }
+
+ seenScalars = true;
+
+ TExprNodeList args;
+ TExprNodeList bodyItems;
+
+ args.reserve(width);
+ bodyItems.reserve(width);
+ auto lastColumn = ctx.NewArgument(child->Pos(), "height");
+ for (ui32 i = 0; i < width; ++i) {
+ auto arg = (i + 1 == width) ? lastColumn : ctx.NewArgument(child->Pos(), "arg");
+ args.push_back(arg);
+
+ if (i + 1 == width || items[i]->IsBlock()) {
+ bodyItems.push_back(arg);
+ } else {
+ YQL_ENSURE(items[i]->IsScalar());
+ bodyItems.push_back(ctx.NewCallable(child->Pos(), "ReplicateScalar", { arg, lastColumn}));
+ }
+ }
+
+ newChildren.push_back(ctx.Builder(child->Pos())
+ .Callable("WideMap")
+ .Add(0, child)
+ .Add(1, ctx.NewLambda(child->Pos(), ctx.NewArguments(child->Pos(), std::move(args)), std::move(bodyItems)))
+ .Seal()
+ .Build()
+ );
+ }
+
+ const TStringBuf newName = node->IsCallable("BlockOrdredExtend") ? "OrdredExtend" : "Extend";
+ if (!seenScalars) {
+ return ctx.RenameNode(*node, newName);
+ }
+ return ctx.NewCallable(node->Pos(), newName, std::move(newChildren));
+}
+
TExprNode::TPtr SplitEquiJoinToPairsRecursive(const TExprNode& node, const TExprNode& joinTree, TExprContext& ctx,
std::vector<std::string_view>& outLabels, const TExprNode::TPtr& settings) {
const auto leftSubtree = joinTree.Child(1);
@@ -3772,12 +3824,18 @@ TExprNode::TPtr OptimizeExpandMap(const TExprNode::TPtr& node, TExprContext& ctx
if (const auto& input = node->Head(); input.IsCallable({"Extend", "OrderedExtend"})) {
YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Content() << " with " << input.Content();
+ bool isWideBlockFlow = AllOf(node->GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems(),
+ [](const auto& itemType) { return itemType->IsBlockOrScalar(); });
+ TString newName = ToString(input.Content());
+ if (isWideBlockFlow) {
+ newName = "Block" + newName;
+ }
TExprNodeList newChildren;
newChildren.reserve(input.ChildrenSize());
for (const auto& child : input.ChildrenList()) {
newChildren.emplace_back(ctx.ChangeChildren(*node, { child, ctx.DeepCopyLambda(node->Tail())}));
}
- return ctx.ChangeChildren(input, std::move(newChildren));
+ return ctx.NewCallable(input.Pos(), newName, std::move(newChildren));
}
/* TODO
@@ -7522,6 +7580,11 @@ struct TPeepHoleRules {
{"WideSort", &OptimizeTopOrSortBlocks},
};
+ const TExtPeepHoleOptimizerMap BlockStageExtFinalRules = {
+ {"BlockExtend", &ExpandBlockExtend},
+ {"BlockOrderedExtend", &ExpandBlockExtend},
+ };
+
static const TPeepHoleRules& Instance() {
return *Singleton<TPeepHoleRules>();
}
@@ -7613,6 +7676,21 @@ THolder<IGraphTransformer> CreatePeepHoleFinalStageTransformer(TTypeAnnotationCo
"PeepHoleBlock",
issueCode);
+ pipeline.Add(
+ CreateFunctorTransformer(
+ [&types](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> IGraphTransformer::TStatus {
+ if (types.UseBlocks) {
+ const auto& extStageRules = TPeepHoleRules::Instance().BlockStageExtFinalRules;
+ return PeepHoleBlockStage(input, output, ctx, types, extStageRules);
+ } else {
+ output = input;
+ return IGraphTransformer::TStatus::Ok;
+ }
+ }
+ ),
+ "PeepHoleFinalBlock",
+ issueCode);
+
if (peepholeSettings.FinalConfig) {
peepholeSettings.FinalConfig->AfterOptimize(&pipeline);
}
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
index 5cba38ac71f..be0b2b7edc3 100644
--- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
@@ -1019,5 +1019,51 @@ IGraphTransformer::TStatus BlockPgCallWrapper(const TExprNode::TPtr& input, TExp
return IGraphTransformer::TStatus::Ok;
}
+IGraphTransformer::TStatus BlockExtendWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureMinArgsCount(*input, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TTypeAnnotationNode::TListType commonItemTypes;
+ for (size_t idx = 0; idx < input->ChildrenSize(); ++idx) {
+ auto child = input->Child(idx);
+ TTypeAnnotationNode::TListType currentItemTypes;
+ if (!EnsureWideFlowBlockType(*child, idx ? currentItemTypes : commonItemTypes, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (idx == 0) {
+ continue;
+ }
+
+ if (currentItemTypes.size() != commonItemTypes.size()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(child->Pos()),
+ TStringBuilder() << "Expected same width ( " << commonItemTypes.size() << ") on all inputs, but got: " << *child->GetTypeAnn() << " on input #" << idx));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+
+ for (size_t i = 0; i < currentItemTypes.size(); ++i) {
+ if (!IsSameAnnotation(*currentItemTypes[i], *commonItemTypes[i])) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(child->Pos()),
+ TStringBuilder() << "Expected item type " << *commonItemTypes[i] << " at column #" << i << " on input #" << idx << ", but got : " << *currentItemTypes[i]));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+ }
+
+ TTypeAnnotationNode::TListType resultItemTypes;
+ for (size_t i = 0; i < commonItemTypes.size(); ++i) {
+ if (i + 1 == commonItemTypes.size()) {
+ resultItemTypes.emplace_back(ctx.Expr.MakeType<TScalarExprType>(commonItemTypes[i]));
+ } else {
+ resultItemTypes.emplace_back(ctx.Expr.MakeType<TBlockExprType>(commonItemTypes[i]));
+ }
+ }
+ input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(ctx.Expr.MakeType<TMultiExprType>(std::move(resultItemTypes))));
+ return IGraphTransformer::TStatus::Ok;
+}
+
} // namespace NTypeAnnImpl
}
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.h b/ydb/library/yql/core/type_ann/type_ann_blocks.h
index c6da40f5396..bf8da3edaeb 100644
--- a/ydb/library/yql/core/type_ann/type_ann_blocks.h
+++ b/ydb/library/yql/core/type_ann/type_ann_blocks.h
@@ -32,6 +32,7 @@ namespace NTypeAnnImpl {
IGraphTransformer::TStatus WideSortBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockPgOpWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockPgCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+ IGraphTransformer::TStatus BlockExtendWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
} // namespace NTypeAnnImpl
} // namespace NYql
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp
index 76e0758eb37..ca926a82f04 100644
--- a/ydb/library/yql/core/type_ann/type_ann_core.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp
@@ -12100,6 +12100,8 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["WideTopBlocks"] = &WideTopBlocksWrapper;
Functions["WideTopSortBlocks"] = &WideTopBlocksWrapper;
Functions["WideSortBlocks"] = &WideSortBlocksWrapper;
+ Functions["BlockExtend"] = &BlockExtendWrapper;
+ Functions["BlockOrderedExtend"] = &BlockExtendWrapper;
Functions["BlockCoalesce"] = &BlockCoalesceWrapper;
Functions["BlockAnd"] = &BlockLogicalWrapper;