aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-02-09 15:43:53 +0300
committervvvv <vvvv@ydb.tech>2023-02-09 15:43:53 +0300
commit397118ba29266e7a2e0344ccdd5d00e7ccb48434 (patch)
tree684e24969e1c1f41e5ebf2b64a36f226d2fbce2a
parent9a7a7e29c66796feedb8db5bb17bc8795433c371 (diff)
downloadydb-397118ba29266e7a2e0344ccdd5d00e7ccb48434.tar.gz
initial implementation of block top [sort]
-rw-r--r--ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp43
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_blocks.cpp106
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_blocks.h4
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_core.cpp2
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_wide.cpp121
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_wide.h6
-rw-r--r--ydb/library/yql/core/yql_expr_constraint.cpp4
-rw-r--r--ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp404
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_top.h11
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp3
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp8
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.h2
-rw-r--r--ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp40
16 files changed, 640 insertions, 117 deletions
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
index f1eee57a5c9..8f143c35e2d 100644
--- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
+++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
@@ -5314,6 +5314,47 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte
.Build();
}
+TExprNode::TPtr OptimizeTopBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
+ if (!types.ArrowResolver) {
+ return node;
+ }
+
+ if (node->Head().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Flow) {
+ return node;
+ }
+
+ auto flowItemType = node->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType();
+ if (flowItemType->GetKind() != ETypeAnnotationKind::Multi) {
+ return node;
+ }
+
+ const auto& allTypes = flowItemType->Cast<TMultiExprType>()->GetItems();
+ if (AnyOf(allTypes, [](const TTypeAnnotationNode* type) { return type->IsBlockOrScalar(); })) {
+ return node;
+ }
+
+ auto resolveStatus = types.ArrowResolver->AreTypesSupported(ctx.GetPosition(node->Head().Pos()),
+ TVector<const TTypeAnnotationNode*>(allTypes.begin(), allTypes.end()), ctx);
+ YQL_ENSURE(resolveStatus != IArrowResolver::ERROR);
+ if (resolveStatus != IArrowResolver::OK) {
+ return node;
+ }
+
+ TStringBuf newName = node->Content() == "WideTop" ? "WideTopBlocks" : "WideTopSortBlocks";
+ YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to " << newName;
+ return ctx.Builder(node->Pos())
+ .Callable("WideFromBlocks")
+ .Callable(0, newName)
+ .Callable(0, "WideToBlocks")
+ .Add(0, node->HeadPtr())
+ .Seal()
+ .Add(1, node->ChildPtr(1))
+ .Add(2, node->ChildPtr(2))
+ .Seal()
+ .Seal()
+ .Build();
+}
+
TExprNode::TPtr UpdateBlockCombineColumns(const TExprNode::TPtr& node, std::optional<ui32> filterColumn, const TVector<ui32>& argIndices, TExprContext& ctx) {
auto combineChildren = node->ChildrenList();
combineChildren[0] = node->Head().HeadPtr();
@@ -7169,6 +7210,8 @@ struct TPeepHoleRules {
{"Take", &OptimizeSkipTakeToBlocks},
{"BlockCombineAll", &OptimizeBlockCombine},
{"BlockCombineHashed", &OptimizeBlockCombine},
+ {"WideTop", &OptimizeTopBlocks},
+ {"WideTopSort", &OptimizeTopBlocks},
};
TPeepHoleRules()
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
index b2db1032cdb..4d398d996a1 100644
--- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
@@ -1,5 +1,6 @@
#include "type_ann_blocks.h"
#include "type_ann_list.h"
+#include "type_ann_wide.h"
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
@@ -545,5 +546,110 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr
return IGraphTransformer::TStatus::Ok;
}
+IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 1U, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureWideFlowType(input->Head(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto multiType = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>();
+ TTypeAnnotationNode::TListType retMultiType;
+ for (const auto& type : multiType->GetItems()) {
+ if (type->IsBlockOrScalar()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Input type should not be a block or scalar"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsurePersistableType(input->Pos(), *type, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ retMultiType.push_back(ctx.Expr.MakeType<TBlockExprType>(type));
+ }
+
+ retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64)));
+ auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
+ input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 1U, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TTypeAnnotationNode::TListType retMultiType;
+ if (!EnsureWideFlowBlockType(input->Head(), retMultiType, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ YQL_ENSURE(!retMultiType.empty());
+ retMultiType.pop_back();
+ auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
+ input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus WideSkipTakeBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ if (!EnsureArgsCount(*input, 2U, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TTypeAnnotationNode::TListType blockItemTypes;
+ if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ output = input;
+ const TTypeAnnotationNode* expectedType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64);
+ auto convertStatus = TryConvertTo(input->ChildRef(1), *expectedType, ctx.Expr);
+ if (convertStatus.Level == IGraphTransformer::TStatus::Error) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(1)->Pos()), "Can not convert argument to Uint64"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (convertStatus.Level != IGraphTransformer::TStatus::Ok) {
+ return convertStatus;
+ }
+
+ input->SetTypeAnn(input->Head().GetTypeAnn());
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus WideTopBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ if (!EnsureArgsCount(*input, 3U, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TTypeAnnotationNode::TListType blockItemTypes;
+ if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ output = input;
+ const TTypeAnnotationNode* expectedType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64);
+ auto convertStatus = TryConvertTo(input->ChildRef(1), *expectedType, ctx.Expr);
+ if (convertStatus.Level == IGraphTransformer::TStatus::Error) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(1)->Pos()), "Can not convert argument to Uint64"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (convertStatus.Level != IGraphTransformer::TStatus::Ok) {
+ return convertStatus;
+ }
+
+ if (!ValidateWideTopKeys(input->Tail(), blockItemTypes, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ input->SetTypeAnn(input->Head().GetTypeAnn());
+ return IGraphTransformer::TStatus::Ok;
+}
+
} // namespace NTypeAnnImpl
}
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.h b/ydb/library/yql/core/type_ann/type_ann_blocks.h
index 39b99c70795..377d6521b72 100644
--- a/ydb/library/yql/core/type_ann/type_ann_blocks.h
+++ b/ydb/library/yql/core/type_ann/type_ann_blocks.h
@@ -20,6 +20,10 @@ namespace NTypeAnnImpl {
IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
+ IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+ IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+ IGraphTransformer::TStatus WideSkipTakeBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+ IGraphTransformer::TStatus WideTopBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
} // namespace NTypeAnnImpl
} // namespace NYql
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp
index a35a98c7a45..299845b8ee4 100644
--- a/ydb/library/yql/core/type_ann/type_ann_core.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp
@@ -11859,6 +11859,8 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["WideTakeBlocks"] = &WideSkipTakeBlocksWrapper;
Functions["BlockCompress"] = &BlockCompressWrapper;
Functions["BlockExpandChunked"] = &BlockExpandChunkedWrapper;
+ Functions["WideTopBlocks"] = &WideTopBlocksWrapper;
+ Functions["WideTopSortBlocks"] = &WideTopBlocksWrapper;
Functions["AsScalar"] = &AsScalarWrapper;
Functions["BlockCoalesce"] = &BlockCoalesceWrapper;
diff --git a/ydb/library/yql/core/type_ann/type_ann_wide.cpp b/ydb/library/yql/core/type_ann/type_ann_wide.cpp
index 4c363c74879..7c4a2364976 100644
--- a/ydb/library/yql/core/type_ann/type_ann_wide.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_wide.cpp
@@ -625,9 +625,8 @@ IGraphTransformer::TStatus NarrowFlatMapWrapper(const TExprNode::TPtr& input, TE
return IGraphTransformer::TStatus::Ok;
}
-IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 1U, ctx.Expr)) {
+IGraphTransformer::TStatus WideTopWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ if (!EnsureArgsCount(*input, 3U, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
@@ -635,132 +634,66 @@ IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TEx
return IGraphTransformer::TStatus::Error;
}
- const auto multiType = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>();
- TTypeAnnotationNode::TListType retMultiType;
- for (const auto& type : multiType->GetItems()) {
- if (type->IsBlockOrScalar()) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Input type should not be a block or scalar"));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsurePersistableType(input->Pos(), *type, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- retMultiType.push_back(ctx.Expr.MakeType<TBlockExprType>(type));
- }
-
- retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64)));
- auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
- input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
- return IGraphTransformer::TStatus::Ok;
-}
-
-IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 1U, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- TTypeAnnotationNode::TListType retMultiType;
- if (!EnsureWideFlowBlockType(input->Head(), retMultiType, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- YQL_ENSURE(!retMultiType.empty());
- retMultiType.pop_back();
- auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
- input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
- return IGraphTransformer::TStatus::Ok;
-}
-
-IGraphTransformer::TStatus WideSkipTakeBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- if (!EnsureArgsCount(*input, 2U, ctx.Expr)) {
+ if (!EnsureSpecificDataType(*input->Child(1U), EDataSlot::Uint64, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
- TTypeAnnotationNode::TListType blockItemTypes;
- if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
+ const auto& types = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems();
+ if (!ValidateWideTopKeys(input->Tail(), types, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
output = input;
- const TTypeAnnotationNode* expectedType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64);
- auto convertStatus = TryConvertTo(input->ChildRef(1), *expectedType, ctx.Expr);
- if (convertStatus.Level == IGraphTransformer::TStatus::Error) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(1)->Pos()), "Can not convert argument to Uint64"));
- return IGraphTransformer::TStatus::Error;
- }
-
- if (convertStatus.Level != IGraphTransformer::TStatus::Ok) {
- return convertStatus;
- }
-
input->SetTypeAnn(input->Head().GetTypeAnn());
return IGraphTransformer::TStatus::Ok;
}
-IGraphTransformer::TStatus WideTopWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- if (!EnsureArgsCount(*input, 3U, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureWideFlowType(input->Head(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
+bool ValidateWideTopKeys(const TExprNode& keys, const TTypeAnnotationNode::TListType& types, TExprContext& ctx) {
+ if (!(EnsureTupleMinSize(keys, 1U, ctx) && EnsureTupleMaxSize(keys, types.size(), ctx))) {
+ return false;
}
- if (!EnsureSpecificDataType(*input->Child(1U), EDataSlot::Uint64, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- const auto& types = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems();
- if (!(EnsureTupleMinSize(input->Tail(), 1U, ctx.Expr) && EnsureTupleMaxSize(input->Tail(), types.size(), ctx.Expr))) {
- return IGraphTransformer::TStatus::Error;
- }
-
- std::unordered_set<ui32> indexes(input->Tail().ChildrenSize());
- for (const auto& item : input->Tail().Children()) {
- if (!EnsureTupleSize(*item, 2U, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
+ std::unordered_set<ui32> indexes(keys.ChildrenSize());
+ for (const auto& item : keys.Children()) {
+ if (!EnsureTupleSize(*item, 2U, ctx)) {
+ return false;
}
- if (!EnsureAtom(item->Head(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
+ if (!EnsureAtom(item->Head(), ctx)) {
+ return false;
}
if (ui32 index; TryFromString(item->Head().Content(), index)) {
if (index >= types.size()) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(item->Head().Pos()),
+ ctx.AddError(TIssue(ctx.GetPosition(item->Head().Pos()),
TStringBuilder() << "Index too large: " << index));
- return IGraphTransformer::TStatus::Error;
+ return false;
} else if (!indexes.emplace(index).second) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(item->Head().Pos()),
+ ctx.AddError(TIssue(ctx.GetPosition(item->Head().Pos()),
TStringBuilder() << "Duplicate index: " << index));
- return IGraphTransformer::TStatus::Error;
+ return false;
}
const TDataExprType* type = nullptr;
- if (bool opt; !EnsureDataOrOptionalOfData( item->Head().Pos(), types[index], opt, type, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
+ if (bool opt; !EnsureDataOrOptionalOfData( item->Head().Pos(), types[index], opt, type, ctx)) {
+ return false;
}
- if (!EnsureComparableType(input->Pos(), *type, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
+ if (!EnsureComparableType(item->Pos(), *type, ctx)) {
+ return false;
}
} else {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(item->Head().Pos()),
+ ctx.AddError(TIssue(ctx.GetPosition(item->Head().Pos()),
TStringBuilder() << "Invalid index value: " << item->Head().Content()));
- return IGraphTransformer::TStatus::Error;
+ return false;
}
- if (!EnsureSpecificDataType(item->Tail(), EDataSlot::Bool, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
+ if (!EnsureSpecificDataType(item->Tail(), EDataSlot::Bool, ctx)) {
+ return false;
}
}
- output = input;
- input->SetTypeAnn(input->Head().GetTypeAnn());
- return IGraphTransformer::TStatus::Ok;
+ return true;
}
} // namespace NTypeAnnImpl
diff --git a/ydb/library/yql/core/type_ann/type_ann_wide.h b/ydb/library/yql/core/type_ann/type_ann_wide.h
index 5f66e3e58e7..0b662477558 100644
--- a/ydb/library/yql/core/type_ann/type_ann_wide.h
+++ b/ydb/library/yql/core/type_ann/type_ann_wide.h
@@ -8,6 +8,7 @@
namespace NYql {
namespace NTypeAnnImpl {
IGraphTransformer::TStatus ExpandMapWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+ bool ValidateWideTopKeys(const TExprNode& keys, const TTypeAnnotationNode::TListType& types, TExprContext& ctx);
IGraphTransformer::TStatus WideMapWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus WideFilterWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
@@ -21,11 +22,6 @@ namespace NTypeAnnImpl {
IGraphTransformer::TStatus NarrowFlatMapWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus NarrowMultiMapWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
- IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
- IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
-
- IGraphTransformer::TStatus WideSkipTakeBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
-
IGraphTransformer::TStatus WideTopWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
} // namespace NTypeAnnImpl
} // namespace NYql
diff --git a/ydb/library/yql/core/yql_expr_constraint.cpp b/ydb/library/yql/core/yql_expr_constraint.cpp
index 302c966edc5..f2ec21658ed 100644
--- a/ydb/library/yql/core/yql_expr_constraint.cpp
+++ b/ydb/library/yql/core/yql_expr_constraint.cpp
@@ -224,6 +224,10 @@ public:
Functions["WithWorld"] = &TCallableConstraintTransformer::CopyAllFrom<0>;
Functions["WideTop"] = &TCallableConstraintTransformer::WideTopWrap<false>;
Functions["WideTopSort"] = &TCallableConstraintTransformer::WideTopWrap<true>;
+ Functions["WideTopBlocks"] = &TCallableConstraintTransformer::WideTopWrap<false>;
+ Functions["WideTopSortBlocks"] = &TCallableConstraintTransformer::WideTopWrap<true>;
+ Functions["WideToBlocks"] = &TCallableConstraintTransformer::CopyAllFrom<0>;
+ Functions["WideFromBlocks"] = &TCallableConstraintTransformer::CopyAllFrom<0>;
}
std::optional<IGraphTransformer::TStatus> ProcessCore(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt
index 0584fa1f2e0..cbcaa7ca2ba 100644
--- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt
+++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt
@@ -52,6 +52,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_callable.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_chain_map.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt
index 870b944bfec..871ee8e99d0 100644
--- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt
@@ -53,6 +53,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_callable.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_chain_map.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt
index 870b944bfec..871ee8e99d0 100644
--- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt
+++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt
@@ -53,6 +53,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_callable.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_chain_map.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp
new file mode 100644
index 00000000000..a09c9f77fa9
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp
@@ -0,0 +1,404 @@
+#include "mkql_block_top.h"
+#include "mkql_block_impl.h"
+#include "mkql_block_reader.h"
+#include "mkql_block_builder.h"
+
+#include <ydb/library/yql/public/udf/arrow/block_item_comparator.h>
+
+#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
+#include <ydb/library/yql/minikql/arrow/arrow_util.h>
+#include <ydb/library/yql/minikql/mkql_type_builder.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/mkql_node_builder.h>
+#include <ydb/library/yql/minikql/mkql_node_cast.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+namespace {
+
+class TTopBlocksWrapper : public TStatefulWideFlowBlockComputationNode<TTopBlocksWrapper> {
+public:
+ TTopBlocksWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TTupleType* tupleType, IComputationNode* count,
+ TVector<IComputationNode*>&& directions, TVector<ui32>&& keyIndicies, bool sort)
+ : TStatefulWideFlowBlockComputationNode(mutables, flow, tupleType->GetElementsCount())
+ , Flow_(flow)
+ , Count_(count)
+ , Directions_(std::move(directions))
+ , KeyIndicies_(std::move(keyIndicies))
+ , Sort_(sort)
+ {
+ for (ui32 i = 0; i < tupleType->GetElementsCount() - 1; ++i) {
+ Columns_.push_back(AS_TYPE(TBlockType, tupleType->GetElementType(i)));
+ }
+ }
+
+ EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
+ Y_VERIFY(output[Columns_.size()]);
+
+ auto& s = GetState(state, ctx);
+ if (s.IsFinished_) {
+ return EFetchResult::Finish;
+ }
+
+ if (!s.PreparedCountAndDirections_) {
+ s.Count_ = Count_->GetValue(ctx).Get<ui64>();
+ for (ui32 k = 0; k < KeyIndicies_.size(); ++k) {
+ s.Directions_[k] = Directions_[k]->GetValue(ctx).Get<bool>();
+ }
+
+ s.PreparedCountAndDirections_ = true;
+ }
+
+ if (!s.Count_) {
+ s.IsFinished_ = true;
+ return EFetchResult::Finish;
+ }
+
+ if (!s.PreparedBuilders_) {
+ s.AllocateBuilders(Columns_, ctx);
+ s.PreparedBuilders_ = true;
+ }
+
+ for (;;) {
+ auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data());
+ if (result == EFetchResult::Yield) {
+ return result;
+ } else if (result == EFetchResult::One) {
+ if (!s.ScalarsFilled_) {
+ for (ui32 i = 0; i < Columns_.size(); ++i) {
+ if (Columns_[i]->GetShape() == TBlockType::EShape::Scalar) {
+ s.ScalarValues_[i] = s.Values_[i];
+ }
+ }
+
+ s.ScalarsFilled_ = true;
+ }
+
+ ui64 blockLen = TArrowBlock::From(s.Values_.back()).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
+
+ // shrink input block
+ TMaybe<TVector<ui64>> blockIndicies;
+ if (blockLen > s.Count_) {
+ blockIndicies.ConstructInPlace();
+ blockIndicies->reserve(blockLen);
+ for (ui64 row = 0; row < blockLen; ++row) {
+ blockIndicies->emplace_back(row);
+ }
+
+ TBlockLess cmp(KeyIndicies_, s, s.Values_);
+ std::nth_element(blockIndicies->begin(), blockIndicies->begin() + s.Count_, blockIndicies->end(), cmp);
+ }
+
+ // copy all to builders
+ s.AddTop(Columns_, blockIndicies, blockLen);
+ if (s.BuilderLength_ + s.Count_ > s.BuilderMaxLength_) {
+ s.CompressBuilders(false, Columns_, KeyIndicies_, ctx);
+ }
+
+ } else {
+ s.IsFinished_ = true;
+ if (!s.BuilderLength_) {
+ return EFetchResult::Finish;
+ }
+
+ if (s.BuilderLength_ > s.Count_ || Sort_) {
+ s.CompressBuilders(Sort_, Columns_, KeyIndicies_, ctx);
+ }
+
+ s.FillOutput(Columns_, output, ctx);
+ return EFetchResult::One;
+ }
+ }
+ }
+
+private:
+ void RegisterDependencies() const final {
+ if (const auto flow = FlowDependsOn(Flow_)) {
+ DependsOn(flow, Count_);
+
+ for (auto dir : Directions_) {
+ DependsOn(flow, dir);
+ }
+ }
+ }
+
+ class TState : public TComputationValue<TState> {
+ public:
+ bool IsFinished_ = false;
+ bool PreparedCountAndDirections_ = false;
+ ui64 Count_ = 0;
+ TVector<bool> Directions_;
+ bool ScalarsFilled_ = false;
+ TVector<NUdf::TUnboxedValue> ScalarValues_;
+ TVector<std::unique_ptr<IBlockReader>> LeftReaders_;
+ TVector<std::unique_ptr<IBlockReader>> RightReaders_;
+ bool PreparedBuilders_ = false;
+ TVector<std::unique_ptr<IArrayBuilder>> Builders_;
+ ui64 BuilderMaxLength_ = 0;
+ ui64 BuilderLength_ = 0;
+ TVector<std::unique_ptr<NUdf::IBlockItemComparator>> Comparators_; // by key columns only
+
+ TVector<NUdf::TUnboxedValue> Values_;
+ TVector<NUdf::TUnboxedValue*> ValuePointers_;
+
+ TVector<NUdf::TUnboxedValue> TmpValues_;
+
+ TState(TMemoryUsageInfo* memInfo, const TVector<ui32>& keyIndicies, const TVector<TBlockType*>& columns)
+ : TComputationValue(memInfo)
+ {
+ Directions_.resize(keyIndicies.size());
+ LeftReaders_.resize(columns.size());
+ RightReaders_.resize(columns.size());
+ Builders_.resize(columns.size());
+ for (ui32 i = 0; i < columns.size(); ++i) {
+ if (columns[i]->GetShape() == TBlockType::EShape::Scalar) {
+ continue;
+ }
+
+ LeftReaders_[i] = MakeBlockReader(TTypeInfoHelper(), columns[i]->GetItemType());
+ RightReaders_[i] = MakeBlockReader(TTypeInfoHelper(), columns[i]->GetItemType());
+ }
+
+ Values_.resize(columns.size() + 1);
+ ValuePointers_.resize(columns.size() + 1);
+ for (ui32 i = 0; i <= columns.size(); ++i) {
+ ValuePointers_[i] = &Values_[i];
+ }
+
+ TmpValues_.resize(columns.size());
+
+ Comparators_.resize(keyIndicies.size());
+ for (ui32 k = 0; k < keyIndicies.size(); ++k) {
+ Comparators_[k] = NUdf::MakeBlockItemComparator(TTypeInfoHelper(), columns[keyIndicies[k]]->GetItemType());
+ }
+ }
+
+ ui64 GetStorageLength() const {
+ Y_VERIFY(PreparedCountAndDirections_);
+ return 2 * Count_;
+ }
+
+ void AllocateBuilders(const TVector<TBlockType*>& columns, TComputationContext& ctx) {
+ BuilderMaxLength_ = GetStorageLength();
+ for (ui32 i = 0; i < columns.size(); ++i) {
+ if (columns[i]->GetShape() == TBlockType::EShape::Scalar) {
+ continue;
+ }
+
+ BuilderMaxLength_ = Max(BuilderMaxLength_, CalcBlockLen(CalcMaxBlockItemSize(columns[i]->GetItemType())));
+ };
+
+ for (ui32 i = 0; i < columns.size(); ++i) {
+ if (columns[i]->GetShape() == TBlockType::EShape::Scalar) {
+ continue;
+ }
+
+ Builders_[i] = MakeArrayBuilder(TTypeInfoHelper(), columns[i]->GetItemType(), ctx.ArrowMemoryPool, BuilderMaxLength_);
+ }
+ }
+
+ void CompressBuilders(bool sort, const TVector<TBlockType*>& columns, const TVector<ui32>& keyIndicies, TComputationContext& ctx) {
+ Y_VERIFY(ScalarsFilled_);
+ for (ui32 i = 0; i < columns.size(); ++i) {
+ if (columns[i]->GetShape() == TBlockType::EShape::Scalar) {
+ TmpValues_[i] = ScalarValues_[i];
+ } else {
+ TmpValues_[i] = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(Builders_[i]->Build(false)));
+ }
+ }
+
+ TVector<ui64> blockIndicies;
+ blockIndicies.reserve(BuilderLength_);
+ for (ui64 row = 0; row < BuilderLength_; ++row) {
+ blockIndicies.push_back(row);
+ }
+
+ ui64 blockLen = Min(BuilderLength_, Count_);
+ TBlockLess cmp(keyIndicies, *this, TmpValues_);
+ if (BuilderLength_ <= Count_) {
+ if (sort) {
+ std::sort(blockIndicies.begin(), blockIndicies.end(), cmp);
+ }
+ } else {
+ if (sort) {
+ std::partial_sort(blockIndicies.begin(), blockIndicies.begin() + blockLen, blockIndicies.end(), cmp);
+ } else {
+ std::nth_element(blockIndicies.begin(), blockIndicies.begin() + blockLen, blockIndicies.end(), cmp);
+ }
+ }
+
+ for (ui32 i = 0; i < columns.size(); ++i) {
+ if (columns[i]->GetShape() == TBlockType::EShape::Scalar) {
+ continue;
+ }
+
+ const auto& datum = TArrowBlock::From(TmpValues_[i]).GetDatum();
+ const auto& array = *datum.array();
+ for (ui64 j = 0; j < blockLen; ++j) {
+ Builders_[i]->Add(LeftReaders_[i]->GetItem(array, blockIndicies[j]));
+ }
+ }
+
+ BuilderLength_ = blockLen;
+
+ for (ui32 i = 0; i < columns.size(); ++i) {
+ TmpValues_[i] = {};
+ }
+ }
+
+ void FillOutput(const TVector<TBlockType*>& columns, NUdf::TUnboxedValue*const* output, TComputationContext& ctx) {
+ for (ui32 i = 0; i < columns.size(); ++i) {
+ if (!output[i]) {
+ continue;
+ }
+
+ if (columns[i]->GetShape() == TBlockType::EShape::Scalar) {
+ *output[i] = ScalarValues_[i];
+ } else {
+ *output[i] = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(Builders_[i]->Build(true)));
+ }
+ }
+
+ *output[columns.size()] = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(BuilderLength_)));
+ }
+
+ void AddTop(const TVector<TBlockType*>& columns, const TMaybe<TVector<ui64>>& blockIndicies, ui64 blockLen) {
+ for (ui32 i = 0; i < columns.size(); ++i) {
+ if (columns[i]->GetShape() == TBlockType::EShape::Scalar) {
+ continue;
+ }
+
+ const auto& datum = TArrowBlock::From(Values_[i]).GetDatum();
+ const auto& array = *datum.array();
+ if (blockIndicies) {
+ for (ui64 j = 0; j < Count_; ++j) {
+ Builders_[i]->Add(LeftReaders_[i]->GetItem(array, (*blockIndicies)[j]));
+ }
+ } else {
+ for (ui64 row = 0; row < blockLen; ++row) {
+ Builders_[i]->Add(LeftReaders_[i]->GetItem(array, row));
+ }
+ }
+ }
+
+ if (blockIndicies) {
+ BuilderLength_ += Count_;
+ } else {
+ BuilderLength_ += blockLen;
+ }
+ }
+ };
+
+ TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
+ if (!state.HasValue()) {
+ state = ctx.HolderFactory.Create<TState>(KeyIndicies_, Columns_);
+ }
+
+ return *static_cast<TState*>(state.AsBoxed().Get());
+ }
+
+ class TBlockLess {
+ public:
+ TBlockLess(const TVector<ui32>& keyIndicies, const TState& state, const TVector<NUdf::TUnboxedValue>& values)
+ : KeyIndicies_(keyIndicies)
+ , State_(state)
+ , Values_(values)
+ {}
+
+ bool operator()(ui64 lhs, ui64 rhs) const {
+ if (KeyIndicies_.size() == 1) {
+ auto i = KeyIndicies_[0];
+ const auto& datum = TArrowBlock::From(Values_[i]).GetDatum();
+ if (datum.is_scalar()) {
+ return false;
+ }
+
+ const auto& array = *datum.array();
+ auto leftItem = State_.LeftReaders_[i]->GetItem(array, lhs);
+ auto rightItem = State_.RightReaders_[i]->GetItem(array, rhs);
+ if (State_.Directions_[0]) {
+ return State_.Comparators_[0]->Less(leftItem, rightItem);
+ } else {
+ return State_.Comparators_[0]->Greater(leftItem, rightItem);
+ }
+ } else {
+ for (ui32 k = 0; k < KeyIndicies_.size(); ++k) {
+ auto i = KeyIndicies_[k];
+ const auto& datum = TArrowBlock::From(Values_[i]).GetDatum();
+ if (datum.is_scalar()) {
+ continue;
+ }
+
+ const auto& array = *datum.array();
+ auto leftItem = State_.LeftReaders_[i]->GetItem(array, lhs);
+ auto rightItem = State_.RightReaders_[i]->GetItem(array, rhs);
+ auto cmp = State_.Comparators_[k]->Compare(leftItem, rightItem);
+ if (cmp == 0) {
+ continue;
+ }
+
+ if (State_.Directions_[k]) {
+ return cmp < 0;
+ } else {
+ return cmp > 0;
+ }
+ }
+
+ return false;
+ }
+ }
+
+ private:
+ const TVector<ui32>& KeyIndicies_;
+ const TState& State_;
+ const TVector<NUdf::TUnboxedValue>& Values_;
+ };
+
+ IComputationWideFlowNode* Flow_;
+ IComputationNode* Count_;
+ const TVector<IComputationNode*> Directions_;
+ const TVector<ui32> KeyIndicies_;
+ const bool Sort_;
+ TVector<TBlockType*> Columns_;
+};
+
+IComputationNode* WrapTop(TCallable& callable, const TComputationNodeFactoryContext& ctx, bool sort) {
+ MKQL_ENSURE(callable.GetInputsCount() > 2U && !(callable.GetInputsCount() % 2U), "Expected more arguments.");
+
+ const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
+ const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
+ MKQL_ENSURE(tupleType->GetElementsCount() > 0, "Expected at least one column");
+
+ auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
+ MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
+
+ const auto count = LocateNode(ctx.NodeLocator, callable, 1);
+ const auto countType = AS_TYPE(TDataType, callable.GetInput(1).GetStaticType());
+ MKQL_ENSURE(countType->GetSchemeType() == NUdf::TDataType<ui64>::Id, "Expected ui64");
+
+ TVector<IComputationNode*> directions;
+ TVector<ui32> keyIndicies;
+ for (ui32 i = 2; i < callable.GetInputsCount(); i += 2) {
+ ui32 keyIndex = AS_VALUE(TDataLiteral, callable.GetInput(i))->AsValue().Get<ui32>();
+ MKQL_ENSURE(keyIndex + 1 < tupleType->GetElementsCount(), "Wrong key index");
+ keyIndicies.push_back(keyIndex);
+ directions.push_back(LocateNode(ctx.NodeLocator, callable, i + 1));
+ }
+
+ return new TTopBlocksWrapper(ctx.Mutables, wideFlow, tupleType, count, std::move(directions), std::move(keyIndicies), sort);
+}
+
+} //namespace
+
+IComputationNode* WrapWideTopBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ return WrapTop(callable, ctx, false);
+}
+
+IComputationNode* WrapWideTopSortBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ return WrapTop(callable, ctx, true);
+}
+
+
+}
+}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_top.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_top.h
new file mode 100644
index 00000000000..80b6cd70c5f
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_top.h
@@ -0,0 +1,11 @@
+#pragma once
+#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+IComputationNode* WrapWideTopBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
+IComputationNode* WrapWideTopSortBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
+
+}
+}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
index 38e76c0aa41..21400a0e143 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
@@ -13,6 +13,7 @@
#include "mkql_block_logical.h"
#include "mkql_block_compress.h"
#include "mkql_block_skiptake.h"
+#include "mkql_block_top.h"
#include "mkql_callable.h"
#include "mkql_chain_map.h"
#include "mkql_chain1_map.h"
@@ -277,6 +278,8 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
{"WideFromBlocks", &WrapWideFromBlocks},
{"WideSkipBlocks", &WrapWideSkipBlocks},
{"WideTakeBlocks", &WrapWideTakeBlocks},
+ {"WideTopBlocks", &WrapWideTopBlocks},
+ {"WideTopSortBlocks", &WrapWideTopSortBlocks},
{"AsScalar", &WrapAsScalar},
{"BlockCoalesce", &WrapBlockCoalesce},
{"BlockIf", &WrapBlockIf},
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index 1f98018a678..4ebec923a98 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -1483,6 +1483,14 @@ TRuntimeNode TProgramBuilder::WideTakeBlocks(TRuntimeNode flow, TRuntimeNode cou
return BuildWideSkipTakeBlocks(__func__, flow, count);
}
+TRuntimeNode TProgramBuilder::WideTopBlocks(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys) {
+ return BuildWideTop(__func__, flow, count, keys);
+}
+
+TRuntimeNode TProgramBuilder::WideTopSortBlocks(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys) {
+ return BuildWideTop(__func__, flow, count, keys);
+}
+
TRuntimeNode TProgramBuilder::AsScalar(TRuntimeNode value) {
TCallableBuilder callableBuilder(Env, __func__, NewBlockType(value.GetStaticType(), TBlockType::EShape::Scalar));
callableBuilder.Add(value);
diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h
index 7f861c60c08..45dd4d200d6 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.h
+++ b/ydb/library/yql/minikql/mkql_program_builder.h
@@ -246,6 +246,8 @@ public:
TRuntimeNode WideFromBlocks(TRuntimeNode flow);
TRuntimeNode WideSkipBlocks(TRuntimeNode flow, TRuntimeNode count);
TRuntimeNode WideTakeBlocks(TRuntimeNode flow, TRuntimeNode count);
+ TRuntimeNode WideTopBlocks(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys);
+ TRuntimeNode WideTopSortBlocks(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys);
TRuntimeNode AsScalar(TRuntimeNode value);
TRuntimeNode BlockCompress(TRuntimeNode flow, ui32 bitmapIndex);
TRuntimeNode BlockExpandChunked(TRuntimeNode flow);
diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
index 0230ba2659f..fb969e86349 100644
--- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
+++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
@@ -25,6 +25,20 @@ using namespace NKikimr::NMiniKQL;
namespace NYql {
namespace NCommon {
+TRuntimeNode WideTopImpl(const TExprNode& node, TMkqlBuildContext& ctx,
+ TRuntimeNode(TProgramBuilder::*func)(TRuntimeNode, TRuntimeNode, const std::vector<std::pair<ui32, TRuntimeNode>>&)) {
+ const auto flow = MkqlBuildExpr(node.Head(), ctx);
+ const auto count = MkqlBuildExpr(*node.Child(1U), ctx);
+
+ std::vector<std::pair<ui32, TRuntimeNode>> directions;
+ directions.reserve(node.Tail().ChildrenSize());
+ node.Tail().ForEachChild([&](const TExprNode& dir) {
+ directions.emplace_back(std::make_pair(::FromString<ui32>(dir.Head().Content()), MkqlBuildExpr(dir.Tail(), ctx)));
+ });
+
+ return (ctx.ProgramBuilder.*func)(flow, count, directions);
+}
+
TRuntimeNode CombineByKeyImpl(const TExprNode& node, TMkqlBuildContext& ctx) {
NNodes::TCoCombineByKey combine(&node);
const bool isStreamOrFlow = combine.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Stream ||
@@ -745,29 +759,19 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
});
AddCallable("WideTop", [](const TExprNode& node, TMkqlBuildContext& ctx) {
- const auto flow = MkqlBuildExpr(node.Head(), ctx);
- const auto count = MkqlBuildExpr(*node.Child(1U), ctx);
-
- std::vector<std::pair<ui32, TRuntimeNode>> directions;
- directions.reserve(node.Tail().ChildrenSize());
- node.Tail().ForEachChild([&](const TExprNode& dir) {
- directions.emplace_back(std::make_pair(::FromString<ui32>(dir.Head().Content()), MkqlBuildExpr(dir.Tail(), ctx)));
- });
-
- return ctx.ProgramBuilder.WideTop(flow, count, directions);
+ return WideTopImpl(node, ctx, &TProgramBuilder::WideTop);
});
AddCallable("WideTopSort", [](const TExprNode& node, TMkqlBuildContext& ctx) {
- const auto flow = MkqlBuildExpr(node.Head(), ctx);
- const auto count = MkqlBuildExpr(*node.Child(1U), ctx);
+ return WideTopImpl(node, ctx, &TProgramBuilder::WideTopSort);
+ });
- std::vector<std::pair<ui32, TRuntimeNode>> directions;
- directions.reserve(node.Tail().ChildrenSize());
- node.Tail().ForEachChild([&](const TExprNode& dir) {
- directions.emplace_back(std::make_pair(::FromString<ui32>(dir.Head().Content()), MkqlBuildExpr(dir.Tail(), ctx)));
- });
+ AddCallable("WideTopBlocks", [](const TExprNode& node, TMkqlBuildContext& ctx) {
+ return WideTopImpl(node, ctx, &TProgramBuilder::WideTopBlocks);
+ });
- return ctx.ProgramBuilder.WideTopSort(flow, count, directions);
+ AddCallable("WideTopSortBlocks", [](const TExprNode& node, TMkqlBuildContext& ctx) {
+ return WideTopImpl(node, ctx, &TProgramBuilder::WideTopSortBlocks);
});
AddCallable("Iterable", [](const TExprNode& node, TMkqlBuildContext& ctx) {