diff options
author | vvvv <vvvv@ydb.tech> | 2023-02-09 15:43:53 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-02-09 15:43:53 +0300 |
commit | 397118ba29266e7a2e0344ccdd5d00e7ccb48434 (patch) | |
tree | 684e24969e1c1f41e5ebf2b64a36f226d2fbce2a | |
parent | 9a7a7e29c66796feedb8db5bb17bc8795433c371 (diff) | |
download | ydb-397118ba29266e7a2e0344ccdd5d00e7ccb48434.tar.gz |
initial implementation of block top [sort]
16 files changed, 640 insertions, 117 deletions
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp index f1eee57a5c9..8f143c35e2d 100644 --- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -5314,6 +5314,47 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte .Build(); } +TExprNode::TPtr OptimizeTopBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { + if (!types.ArrowResolver) { + return node; + } + + if (node->Head().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Flow) { + return node; + } + + auto flowItemType = node->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType(); + if (flowItemType->GetKind() != ETypeAnnotationKind::Multi) { + return node; + } + + const auto& allTypes = flowItemType->Cast<TMultiExprType>()->GetItems(); + if (AnyOf(allTypes, [](const TTypeAnnotationNode* type) { return type->IsBlockOrScalar(); })) { + return node; + } + + auto resolveStatus = types.ArrowResolver->AreTypesSupported(ctx.GetPosition(node->Head().Pos()), + TVector<const TTypeAnnotationNode*>(allTypes.begin(), allTypes.end()), ctx); + YQL_ENSURE(resolveStatus != IArrowResolver::ERROR); + if (resolveStatus != IArrowResolver::OK) { + return node; + } + + TStringBuf newName = node->Content() == "WideTop" ? "WideTopBlocks" : "WideTopSortBlocks"; + YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to " << newName; + return ctx.Builder(node->Pos()) + .Callable("WideFromBlocks") + .Callable(0, newName) + .Callable(0, "WideToBlocks") + .Add(0, node->HeadPtr()) + .Seal() + .Add(1, node->ChildPtr(1)) + .Add(2, node->ChildPtr(2)) + .Seal() + .Seal() + .Build(); +} + TExprNode::TPtr UpdateBlockCombineColumns(const TExprNode::TPtr& node, std::optional<ui32> filterColumn, const TVector<ui32>& argIndices, TExprContext& ctx) { auto combineChildren = node->ChildrenList(); combineChildren[0] = node->Head().HeadPtr(); @@ -7169,6 +7210,8 @@ struct TPeepHoleRules { {"Take", &OptimizeSkipTakeToBlocks}, {"BlockCombineAll", &OptimizeBlockCombine}, {"BlockCombineHashed", &OptimizeBlockCombine}, + {"WideTop", &OptimizeTopBlocks}, + {"WideTopSort", &OptimizeTopBlocks}, }; TPeepHoleRules() diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp index b2db1032cdb..4d398d996a1 100644 --- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp @@ -1,5 +1,6 @@ #include "type_ann_blocks.h" #include "type_ann_list.h" +#include "type_ann_wide.h" #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> #include <ydb/library/yql/core/yql_expr_type_annotation.h> @@ -545,5 +546,110 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr return IGraphTransformer::TStatus::Ok; } +IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + Y_UNUSED(output); + if (!EnsureArgsCount(*input, 1U, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureWideFlowType(input->Head(), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + const auto multiType = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>(); + TTypeAnnotationNode::TListType retMultiType; + for (const auto& type : multiType->GetItems()) { + if (type->IsBlockOrScalar()) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Input type should not be a block or scalar")); + return IGraphTransformer::TStatus::Error; + } + + if (!EnsurePersistableType(input->Pos(), *type, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + retMultiType.push_back(ctx.Expr.MakeType<TBlockExprType>(type)); + } + + retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64))); + auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType); + input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType)); + return IGraphTransformer::TStatus::Ok; +} + +IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + Y_UNUSED(output); + if (!EnsureArgsCount(*input, 1U, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + TTypeAnnotationNode::TListType retMultiType; + if (!EnsureWideFlowBlockType(input->Head(), retMultiType, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + YQL_ENSURE(!retMultiType.empty()); + retMultiType.pop_back(); + auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType); + input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType)); + return IGraphTransformer::TStatus::Ok; +} + +IGraphTransformer::TStatus WideSkipTakeBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + if (!EnsureArgsCount(*input, 2U, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + TTypeAnnotationNode::TListType blockItemTypes; + if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + output = input; + const TTypeAnnotationNode* expectedType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64); + auto convertStatus = TryConvertTo(input->ChildRef(1), *expectedType, ctx.Expr); + if (convertStatus.Level == IGraphTransformer::TStatus::Error) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(1)->Pos()), "Can not convert argument to Uint64")); + return IGraphTransformer::TStatus::Error; + } + + if (convertStatus.Level != IGraphTransformer::TStatus::Ok) { + return convertStatus; + } + + input->SetTypeAnn(input->Head().GetTypeAnn()); + return IGraphTransformer::TStatus::Ok; +} + +IGraphTransformer::TStatus WideTopBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + if (!EnsureArgsCount(*input, 3U, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + TTypeAnnotationNode::TListType blockItemTypes; + if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + output = input; + const TTypeAnnotationNode* expectedType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64); + auto convertStatus = TryConvertTo(input->ChildRef(1), *expectedType, ctx.Expr); + if (convertStatus.Level == IGraphTransformer::TStatus::Error) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(1)->Pos()), "Can not convert argument to Uint64")); + return IGraphTransformer::TStatus::Error; + } + + if (convertStatus.Level != IGraphTransformer::TStatus::Ok) { + return convertStatus; + } + + if (!ValidateWideTopKeys(input->Tail(), blockItemTypes, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + input->SetTypeAnn(input->Head().GetTypeAnn()); + return IGraphTransformer::TStatus::Ok; +} + } // namespace NTypeAnnImpl } diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.h b/ydb/library/yql/core/type_ann/type_ann_blocks.h index 39b99c70795..377d6521b72 100644 --- a/ydb/library/yql/core/type_ann/type_ann_blocks.h +++ b/ydb/library/yql/core/type_ann/type_ann_blocks.h @@ -20,6 +20,10 @@ namespace NTypeAnnImpl { IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); + IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); + IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); + IGraphTransformer::TStatus WideSkipTakeBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); + IGraphTransformer::TStatus WideTopBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); } // namespace NTypeAnnImpl } // namespace NYql diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index a35a98c7a45..299845b8ee4 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -11859,6 +11859,8 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["WideTakeBlocks"] = &WideSkipTakeBlocksWrapper; Functions["BlockCompress"] = &BlockCompressWrapper; Functions["BlockExpandChunked"] = &BlockExpandChunkedWrapper; + Functions["WideTopBlocks"] = &WideTopBlocksWrapper; + Functions["WideTopSortBlocks"] = &WideTopBlocksWrapper; Functions["AsScalar"] = &AsScalarWrapper; Functions["BlockCoalesce"] = &BlockCoalesceWrapper; diff --git a/ydb/library/yql/core/type_ann/type_ann_wide.cpp b/ydb/library/yql/core/type_ann/type_ann_wide.cpp index 4c363c74879..7c4a2364976 100644 --- a/ydb/library/yql/core/type_ann/type_ann_wide.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_wide.cpp @@ -625,9 +625,8 @@ IGraphTransformer::TStatus NarrowFlatMapWrapper(const TExprNode::TPtr& input, TE return IGraphTransformer::TStatus::Ok; } -IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { - Y_UNUSED(output); - if (!EnsureArgsCount(*input, 1U, ctx.Expr)) { +IGraphTransformer::TStatus WideTopWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + if (!EnsureArgsCount(*input, 3U, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } @@ -635,132 +634,66 @@ IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TEx return IGraphTransformer::TStatus::Error; } - const auto multiType = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>(); - TTypeAnnotationNode::TListType retMultiType; - for (const auto& type : multiType->GetItems()) { - if (type->IsBlockOrScalar()) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Input type should not be a block or scalar")); - return IGraphTransformer::TStatus::Error; - } - - if (!EnsurePersistableType(input->Pos(), *type, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - retMultiType.push_back(ctx.Expr.MakeType<TBlockExprType>(type)); - } - - retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64))); - auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType); - input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType)); - return IGraphTransformer::TStatus::Ok; -} - -IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { - Y_UNUSED(output); - if (!EnsureArgsCount(*input, 1U, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - TTypeAnnotationNode::TListType retMultiType; - if (!EnsureWideFlowBlockType(input->Head(), retMultiType, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - YQL_ENSURE(!retMultiType.empty()); - retMultiType.pop_back(); - auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType); - input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType)); - return IGraphTransformer::TStatus::Ok; -} - -IGraphTransformer::TStatus WideSkipTakeBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { - if (!EnsureArgsCount(*input, 2U, ctx.Expr)) { + if (!EnsureSpecificDataType(*input->Child(1U), EDataSlot::Uint64, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - TTypeAnnotationNode::TListType blockItemTypes; - if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) { + const auto& types = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems(); + if (!ValidateWideTopKeys(input->Tail(), types, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } output = input; - const TTypeAnnotationNode* expectedType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64); - auto convertStatus = TryConvertTo(input->ChildRef(1), *expectedType, ctx.Expr); - if (convertStatus.Level == IGraphTransformer::TStatus::Error) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(1)->Pos()), "Can not convert argument to Uint64")); - return IGraphTransformer::TStatus::Error; - } - - if (convertStatus.Level != IGraphTransformer::TStatus::Ok) { - return convertStatus; - } - input->SetTypeAnn(input->Head().GetTypeAnn()); return IGraphTransformer::TStatus::Ok; } -IGraphTransformer::TStatus WideTopWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { - if (!EnsureArgsCount(*input, 3U, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - if (!EnsureWideFlowType(input->Head(), ctx.Expr)) { - return IGraphTransformer::TStatus::Error; +bool ValidateWideTopKeys(const TExprNode& keys, const TTypeAnnotationNode::TListType& types, TExprContext& ctx) { + if (!(EnsureTupleMinSize(keys, 1U, ctx) && EnsureTupleMaxSize(keys, types.size(), ctx))) { + return false; } - if (!EnsureSpecificDataType(*input->Child(1U), EDataSlot::Uint64, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - const auto& types = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems(); - if (!(EnsureTupleMinSize(input->Tail(), 1U, ctx.Expr) && EnsureTupleMaxSize(input->Tail(), types.size(), ctx.Expr))) { - return IGraphTransformer::TStatus::Error; - } - - std::unordered_set<ui32> indexes(input->Tail().ChildrenSize()); - for (const auto& item : input->Tail().Children()) { - if (!EnsureTupleSize(*item, 2U, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; + std::unordered_set<ui32> indexes(keys.ChildrenSize()); + for (const auto& item : keys.Children()) { + if (!EnsureTupleSize(*item, 2U, ctx)) { + return false; } - if (!EnsureAtom(item->Head(), ctx.Expr)) { - return IGraphTransformer::TStatus::Error; + if (!EnsureAtom(item->Head(), ctx)) { + return false; } if (ui32 index; TryFromString(item->Head().Content(), index)) { if (index >= types.size()) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(item->Head().Pos()), + ctx.AddError(TIssue(ctx.GetPosition(item->Head().Pos()), TStringBuilder() << "Index too large: " << index)); - return IGraphTransformer::TStatus::Error; + return false; } else if (!indexes.emplace(index).second) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(item->Head().Pos()), + ctx.AddError(TIssue(ctx.GetPosition(item->Head().Pos()), TStringBuilder() << "Duplicate index: " << index)); - return IGraphTransformer::TStatus::Error; + return false; } const TDataExprType* type = nullptr; - if (bool opt; !EnsureDataOrOptionalOfData( item->Head().Pos(), types[index], opt, type, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; + if (bool opt; !EnsureDataOrOptionalOfData( item->Head().Pos(), types[index], opt, type, ctx)) { + return false; } - if (!EnsureComparableType(input->Pos(), *type, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; + if (!EnsureComparableType(item->Pos(), *type, ctx)) { + return false; } } else { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(item->Head().Pos()), + ctx.AddError(TIssue(ctx.GetPosition(item->Head().Pos()), TStringBuilder() << "Invalid index value: " << item->Head().Content())); - return IGraphTransformer::TStatus::Error; + return false; } - if (!EnsureSpecificDataType(item->Tail(), EDataSlot::Bool, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; + if (!EnsureSpecificDataType(item->Tail(), EDataSlot::Bool, ctx)) { + return false; } } - output = input; - input->SetTypeAnn(input->Head().GetTypeAnn()); - return IGraphTransformer::TStatus::Ok; + return true; } } // namespace NTypeAnnImpl diff --git a/ydb/library/yql/core/type_ann/type_ann_wide.h b/ydb/library/yql/core/type_ann/type_ann_wide.h index 5f66e3e58e7..0b662477558 100644 --- a/ydb/library/yql/core/type_ann/type_ann_wide.h +++ b/ydb/library/yql/core/type_ann/type_ann_wide.h @@ -8,6 +8,7 @@ namespace NYql { namespace NTypeAnnImpl { IGraphTransformer::TStatus ExpandMapWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); + bool ValidateWideTopKeys(const TExprNode& keys, const TTypeAnnotationNode::TListType& types, TExprContext& ctx); IGraphTransformer::TStatus WideMapWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus WideFilterWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); @@ -21,11 +22,6 @@ namespace NTypeAnnImpl { IGraphTransformer::TStatus NarrowFlatMapWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus NarrowMultiMapWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); - IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); - IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); - - IGraphTransformer::TStatus WideSkipTakeBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); - IGraphTransformer::TStatus WideTopWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); } // namespace NTypeAnnImpl } // namespace NYql diff --git a/ydb/library/yql/core/yql_expr_constraint.cpp b/ydb/library/yql/core/yql_expr_constraint.cpp index 302c966edc5..f2ec21658ed 100644 --- a/ydb/library/yql/core/yql_expr_constraint.cpp +++ b/ydb/library/yql/core/yql_expr_constraint.cpp @@ -224,6 +224,10 @@ public: Functions["WithWorld"] = &TCallableConstraintTransformer::CopyAllFrom<0>; Functions["WideTop"] = &TCallableConstraintTransformer::WideTopWrap<false>; Functions["WideTopSort"] = &TCallableConstraintTransformer::WideTopWrap<true>; + Functions["WideTopBlocks"] = &TCallableConstraintTransformer::WideTopWrap<false>; + Functions["WideTopSortBlocks"] = &TCallableConstraintTransformer::WideTopWrap<true>; + Functions["WideToBlocks"] = &TCallableConstraintTransformer::CopyAllFrom<0>; + Functions["WideFromBlocks"] = &TCallableConstraintTransformer::CopyAllFrom<0>; } std::optional<IGraphTransformer::TStatus> ProcessCore(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt index 0584fa1f2e0..cbcaa7ca2ba 100644 --- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt +++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin.txt @@ -52,6 +52,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_callable.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_chain_map.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt index 870b944bfec..871ee8e99d0 100644 --- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt @@ -53,6 +53,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_callable.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_chain_map.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt index 870b944bfec..871ee8e99d0 100644 --- a/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt +++ b/ydb/library/yql/minikql/comp_nodes/CMakeLists.linux.txt @@ -53,6 +53,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_callable.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_chain_map.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp new file mode 100644 index 00000000000..a09c9f77fa9 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp @@ -0,0 +1,404 @@ +#include "mkql_block_top.h" +#include "mkql_block_impl.h" +#include "mkql_block_reader.h" +#include "mkql_block_builder.h" + +#include <ydb/library/yql/public/udf/arrow/block_item_comparator.h> + +#include <ydb/library/yql/minikql/arrow/arrow_defs.h> +#include <ydb/library/yql/minikql/arrow/arrow_util.h> +#include <ydb/library/yql/minikql/mkql_type_builder.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/mkql_node_builder.h> +#include <ydb/library/yql/minikql/mkql_node_cast.h> + +namespace NKikimr { +namespace NMiniKQL { + +namespace { + +class TTopBlocksWrapper : public TStatefulWideFlowBlockComputationNode<TTopBlocksWrapper> { +public: + TTopBlocksWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TTupleType* tupleType, IComputationNode* count, + TVector<IComputationNode*>&& directions, TVector<ui32>&& keyIndicies, bool sort) + : TStatefulWideFlowBlockComputationNode(mutables, flow, tupleType->GetElementsCount()) + , Flow_(flow) + , Count_(count) + , Directions_(std::move(directions)) + , KeyIndicies_(std::move(keyIndicies)) + , Sort_(sort) + { + for (ui32 i = 0; i < tupleType->GetElementsCount() - 1; ++i) { + Columns_.push_back(AS_TYPE(TBlockType, tupleType->GetElementType(i))); + } + } + + EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { + Y_VERIFY(output[Columns_.size()]); + + auto& s = GetState(state, ctx); + if (s.IsFinished_) { + return EFetchResult::Finish; + } + + if (!s.PreparedCountAndDirections_) { + s.Count_ = Count_->GetValue(ctx).Get<ui64>(); + for (ui32 k = 0; k < KeyIndicies_.size(); ++k) { + s.Directions_[k] = Directions_[k]->GetValue(ctx).Get<bool>(); + } + + s.PreparedCountAndDirections_ = true; + } + + if (!s.Count_) { + s.IsFinished_ = true; + return EFetchResult::Finish; + } + + if (!s.PreparedBuilders_) { + s.AllocateBuilders(Columns_, ctx); + s.PreparedBuilders_ = true; + } + + for (;;) { + auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data()); + if (result == EFetchResult::Yield) { + return result; + } else if (result == EFetchResult::One) { + if (!s.ScalarsFilled_) { + for (ui32 i = 0; i < Columns_.size(); ++i) { + if (Columns_[i]->GetShape() == TBlockType::EShape::Scalar) { + s.ScalarValues_[i] = s.Values_[i]; + } + } + + s.ScalarsFilled_ = true; + } + + ui64 blockLen = TArrowBlock::From(s.Values_.back()).GetDatum().scalar_as<arrow::UInt64Scalar>().value; + + // shrink input block + TMaybe<TVector<ui64>> blockIndicies; + if (blockLen > s.Count_) { + blockIndicies.ConstructInPlace(); + blockIndicies->reserve(blockLen); + for (ui64 row = 0; row < blockLen; ++row) { + blockIndicies->emplace_back(row); + } + + TBlockLess cmp(KeyIndicies_, s, s.Values_); + std::nth_element(blockIndicies->begin(), blockIndicies->begin() + s.Count_, blockIndicies->end(), cmp); + } + + // copy all to builders + s.AddTop(Columns_, blockIndicies, blockLen); + if (s.BuilderLength_ + s.Count_ > s.BuilderMaxLength_) { + s.CompressBuilders(false, Columns_, KeyIndicies_, ctx); + } + + } else { + s.IsFinished_ = true; + if (!s.BuilderLength_) { + return EFetchResult::Finish; + } + + if (s.BuilderLength_ > s.Count_ || Sort_) { + s.CompressBuilders(Sort_, Columns_, KeyIndicies_, ctx); + } + + s.FillOutput(Columns_, output, ctx); + return EFetchResult::One; + } + } + } + +private: + void RegisterDependencies() const final { + if (const auto flow = FlowDependsOn(Flow_)) { + DependsOn(flow, Count_); + + for (auto dir : Directions_) { + DependsOn(flow, dir); + } + } + } + + class TState : public TComputationValue<TState> { + public: + bool IsFinished_ = false; + bool PreparedCountAndDirections_ = false; + ui64 Count_ = 0; + TVector<bool> Directions_; + bool ScalarsFilled_ = false; + TVector<NUdf::TUnboxedValue> ScalarValues_; + TVector<std::unique_ptr<IBlockReader>> LeftReaders_; + TVector<std::unique_ptr<IBlockReader>> RightReaders_; + bool PreparedBuilders_ = false; + TVector<std::unique_ptr<IArrayBuilder>> Builders_; + ui64 BuilderMaxLength_ = 0; + ui64 BuilderLength_ = 0; + TVector<std::unique_ptr<NUdf::IBlockItemComparator>> Comparators_; // by key columns only + + TVector<NUdf::TUnboxedValue> Values_; + TVector<NUdf::TUnboxedValue*> ValuePointers_; + + TVector<NUdf::TUnboxedValue> TmpValues_; + + TState(TMemoryUsageInfo* memInfo, const TVector<ui32>& keyIndicies, const TVector<TBlockType*>& columns) + : TComputationValue(memInfo) + { + Directions_.resize(keyIndicies.size()); + LeftReaders_.resize(columns.size()); + RightReaders_.resize(columns.size()); + Builders_.resize(columns.size()); + for (ui32 i = 0; i < columns.size(); ++i) { + if (columns[i]->GetShape() == TBlockType::EShape::Scalar) { + continue; + } + + LeftReaders_[i] = MakeBlockReader(TTypeInfoHelper(), columns[i]->GetItemType()); + RightReaders_[i] = MakeBlockReader(TTypeInfoHelper(), columns[i]->GetItemType()); + } + + Values_.resize(columns.size() + 1); + ValuePointers_.resize(columns.size() + 1); + for (ui32 i = 0; i <= columns.size(); ++i) { + ValuePointers_[i] = &Values_[i]; + } + + TmpValues_.resize(columns.size()); + + Comparators_.resize(keyIndicies.size()); + for (ui32 k = 0; k < keyIndicies.size(); ++k) { + Comparators_[k] = NUdf::MakeBlockItemComparator(TTypeInfoHelper(), columns[keyIndicies[k]]->GetItemType()); + } + } + + ui64 GetStorageLength() const { + Y_VERIFY(PreparedCountAndDirections_); + return 2 * Count_; + } + + void AllocateBuilders(const TVector<TBlockType*>& columns, TComputationContext& ctx) { + BuilderMaxLength_ = GetStorageLength(); + for (ui32 i = 0; i < columns.size(); ++i) { + if (columns[i]->GetShape() == TBlockType::EShape::Scalar) { + continue; + } + + BuilderMaxLength_ = Max(BuilderMaxLength_, CalcBlockLen(CalcMaxBlockItemSize(columns[i]->GetItemType()))); + }; + + for (ui32 i = 0; i < columns.size(); ++i) { + if (columns[i]->GetShape() == TBlockType::EShape::Scalar) { + continue; + } + + Builders_[i] = MakeArrayBuilder(TTypeInfoHelper(), columns[i]->GetItemType(), ctx.ArrowMemoryPool, BuilderMaxLength_); + } + } + + void CompressBuilders(bool sort, const TVector<TBlockType*>& columns, const TVector<ui32>& keyIndicies, TComputationContext& ctx) { + Y_VERIFY(ScalarsFilled_); + for (ui32 i = 0; i < columns.size(); ++i) { + if (columns[i]->GetShape() == TBlockType::EShape::Scalar) { + TmpValues_[i] = ScalarValues_[i]; + } else { + TmpValues_[i] = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(Builders_[i]->Build(false))); + } + } + + TVector<ui64> blockIndicies; + blockIndicies.reserve(BuilderLength_); + for (ui64 row = 0; row < BuilderLength_; ++row) { + blockIndicies.push_back(row); + } + + ui64 blockLen = Min(BuilderLength_, Count_); + TBlockLess cmp(keyIndicies, *this, TmpValues_); + if (BuilderLength_ <= Count_) { + if (sort) { + std::sort(blockIndicies.begin(), blockIndicies.end(), cmp); + } + } else { + if (sort) { + std::partial_sort(blockIndicies.begin(), blockIndicies.begin() + blockLen, blockIndicies.end(), cmp); + } else { + std::nth_element(blockIndicies.begin(), blockIndicies.begin() + blockLen, blockIndicies.end(), cmp); + } + } + + for (ui32 i = 0; i < columns.size(); ++i) { + if (columns[i]->GetShape() == TBlockType::EShape::Scalar) { + continue; + } + + const auto& datum = TArrowBlock::From(TmpValues_[i]).GetDatum(); + const auto& array = *datum.array(); + for (ui64 j = 0; j < blockLen; ++j) { + Builders_[i]->Add(LeftReaders_[i]->GetItem(array, blockIndicies[j])); + } + } + + BuilderLength_ = blockLen; + + for (ui32 i = 0; i < columns.size(); ++i) { + TmpValues_[i] = {}; + } + } + + void FillOutput(const TVector<TBlockType*>& columns, NUdf::TUnboxedValue*const* output, TComputationContext& ctx) { + for (ui32 i = 0; i < columns.size(); ++i) { + if (!output[i]) { + continue; + } + + if (columns[i]->GetShape() == TBlockType::EShape::Scalar) { + *output[i] = ScalarValues_[i]; + } else { + *output[i] = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(Builders_[i]->Build(true))); + } + } + + *output[columns.size()] = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(BuilderLength_))); + } + + void AddTop(const TVector<TBlockType*>& columns, const TMaybe<TVector<ui64>>& blockIndicies, ui64 blockLen) { + for (ui32 i = 0; i < columns.size(); ++i) { + if (columns[i]->GetShape() == TBlockType::EShape::Scalar) { + continue; + } + + const auto& datum = TArrowBlock::From(Values_[i]).GetDatum(); + const auto& array = *datum.array(); + if (blockIndicies) { + for (ui64 j = 0; j < Count_; ++j) { + Builders_[i]->Add(LeftReaders_[i]->GetItem(array, (*blockIndicies)[j])); + } + } else { + for (ui64 row = 0; row < blockLen; ++row) { + Builders_[i]->Add(LeftReaders_[i]->GetItem(array, row)); + } + } + } + + if (blockIndicies) { + BuilderLength_ += Count_; + } else { + BuilderLength_ += blockLen; + } + } + }; + + TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { + if (!state.HasValue()) { + state = ctx.HolderFactory.Create<TState>(KeyIndicies_, Columns_); + } + + return *static_cast<TState*>(state.AsBoxed().Get()); + } + + class TBlockLess { + public: + TBlockLess(const TVector<ui32>& keyIndicies, const TState& state, const TVector<NUdf::TUnboxedValue>& values) + : KeyIndicies_(keyIndicies) + , State_(state) + , Values_(values) + {} + + bool operator()(ui64 lhs, ui64 rhs) const { + if (KeyIndicies_.size() == 1) { + auto i = KeyIndicies_[0]; + const auto& datum = TArrowBlock::From(Values_[i]).GetDatum(); + if (datum.is_scalar()) { + return false; + } + + const auto& array = *datum.array(); + auto leftItem = State_.LeftReaders_[i]->GetItem(array, lhs); + auto rightItem = State_.RightReaders_[i]->GetItem(array, rhs); + if (State_.Directions_[0]) { + return State_.Comparators_[0]->Less(leftItem, rightItem); + } else { + return State_.Comparators_[0]->Greater(leftItem, rightItem); + } + } else { + for (ui32 k = 0; k < KeyIndicies_.size(); ++k) { + auto i = KeyIndicies_[k]; + const auto& datum = TArrowBlock::From(Values_[i]).GetDatum(); + if (datum.is_scalar()) { + continue; + } + + const auto& array = *datum.array(); + auto leftItem = State_.LeftReaders_[i]->GetItem(array, lhs); + auto rightItem = State_.RightReaders_[i]->GetItem(array, rhs); + auto cmp = State_.Comparators_[k]->Compare(leftItem, rightItem); + if (cmp == 0) { + continue; + } + + if (State_.Directions_[k]) { + return cmp < 0; + } else { + return cmp > 0; + } + } + + return false; + } + } + + private: + const TVector<ui32>& KeyIndicies_; + const TState& State_; + const TVector<NUdf::TUnboxedValue>& Values_; + }; + + IComputationWideFlowNode* Flow_; + IComputationNode* Count_; + const TVector<IComputationNode*> Directions_; + const TVector<ui32> KeyIndicies_; + const bool Sort_; + TVector<TBlockType*> Columns_; +}; + +IComputationNode* WrapTop(TCallable& callable, const TComputationNodeFactoryContext& ctx, bool sort) { + MKQL_ENSURE(callable.GetInputsCount() > 2U && !(callable.GetInputsCount() % 2U), "Expected more arguments."); + + const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); + const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); + MKQL_ENSURE(tupleType->GetElementsCount() > 0, "Expected at least one column"); + + auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); + MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); + + const auto count = LocateNode(ctx.NodeLocator, callable, 1); + const auto countType = AS_TYPE(TDataType, callable.GetInput(1).GetStaticType()); + MKQL_ENSURE(countType->GetSchemeType() == NUdf::TDataType<ui64>::Id, "Expected ui64"); + + TVector<IComputationNode*> directions; + TVector<ui32> keyIndicies; + for (ui32 i = 2; i < callable.GetInputsCount(); i += 2) { + ui32 keyIndex = AS_VALUE(TDataLiteral, callable.GetInput(i))->AsValue().Get<ui32>(); + MKQL_ENSURE(keyIndex + 1 < tupleType->GetElementsCount(), "Wrong key index"); + keyIndicies.push_back(keyIndex); + directions.push_back(LocateNode(ctx.NodeLocator, callable, i + 1)); + } + + return new TTopBlocksWrapper(ctx.Mutables, wideFlow, tupleType, count, std::move(directions), std::move(keyIndicies), sort); +} + +} //namespace + +IComputationNode* WrapWideTopBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + return WrapTop(callable, ctx, false); +} + +IComputationNode* WrapWideTopSortBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + return WrapTop(callable, ctx, true); +} + + +} +} diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_top.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_top.h new file mode 100644 index 00000000000..80b6cd70c5f --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_top.h @@ -0,0 +1,11 @@ +#pragma once +#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> + +namespace NKikimr { +namespace NMiniKQL { + +IComputationNode* WrapWideTopBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); +IComputationNode* WrapWideTopSortBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); + +} +} diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp index 38e76c0aa41..21400a0e143 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp @@ -13,6 +13,7 @@ #include "mkql_block_logical.h" #include "mkql_block_compress.h" #include "mkql_block_skiptake.h" +#include "mkql_block_top.h" #include "mkql_callable.h" #include "mkql_chain_map.h" #include "mkql_chain1_map.h" @@ -277,6 +278,8 @@ struct TCallableComputationNodeBuilderFuncMapFiller { {"WideFromBlocks", &WrapWideFromBlocks}, {"WideSkipBlocks", &WrapWideSkipBlocks}, {"WideTakeBlocks", &WrapWideTakeBlocks}, + {"WideTopBlocks", &WrapWideTopBlocks}, + {"WideTopSortBlocks", &WrapWideTopSortBlocks}, {"AsScalar", &WrapAsScalar}, {"BlockCoalesce", &WrapBlockCoalesce}, {"BlockIf", &WrapBlockIf}, diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 1f98018a678..4ebec923a98 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -1483,6 +1483,14 @@ TRuntimeNode TProgramBuilder::WideTakeBlocks(TRuntimeNode flow, TRuntimeNode cou return BuildWideSkipTakeBlocks(__func__, flow, count); } +TRuntimeNode TProgramBuilder::WideTopBlocks(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys) { + return BuildWideTop(__func__, flow, count, keys); +} + +TRuntimeNode TProgramBuilder::WideTopSortBlocks(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys) { + return BuildWideTop(__func__, flow, count, keys); +} + TRuntimeNode TProgramBuilder::AsScalar(TRuntimeNode value) { TCallableBuilder callableBuilder(Env, __func__, NewBlockType(value.GetStaticType(), TBlockType::EShape::Scalar)); callableBuilder.Add(value); diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 7f861c60c08..45dd4d200d6 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -246,6 +246,8 @@ public: TRuntimeNode WideFromBlocks(TRuntimeNode flow); TRuntimeNode WideSkipBlocks(TRuntimeNode flow, TRuntimeNode count); TRuntimeNode WideTakeBlocks(TRuntimeNode flow, TRuntimeNode count); + TRuntimeNode WideTopBlocks(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys); + TRuntimeNode WideTopSortBlocks(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys); TRuntimeNode AsScalar(TRuntimeNode value); TRuntimeNode BlockCompress(TRuntimeNode flow, ui32 bitmapIndex); TRuntimeNode BlockExpandChunked(TRuntimeNode flow); diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index 0230ba2659f..fb969e86349 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -25,6 +25,20 @@ using namespace NKikimr::NMiniKQL; namespace NYql { namespace NCommon { +TRuntimeNode WideTopImpl(const TExprNode& node, TMkqlBuildContext& ctx, + TRuntimeNode(TProgramBuilder::*func)(TRuntimeNode, TRuntimeNode, const std::vector<std::pair<ui32, TRuntimeNode>>&)) { + const auto flow = MkqlBuildExpr(node.Head(), ctx); + const auto count = MkqlBuildExpr(*node.Child(1U), ctx); + + std::vector<std::pair<ui32, TRuntimeNode>> directions; + directions.reserve(node.Tail().ChildrenSize()); + node.Tail().ForEachChild([&](const TExprNode& dir) { + directions.emplace_back(std::make_pair(::FromString<ui32>(dir.Head().Content()), MkqlBuildExpr(dir.Tail(), ctx))); + }); + + return (ctx.ProgramBuilder.*func)(flow, count, directions); +} + TRuntimeNode CombineByKeyImpl(const TExprNode& node, TMkqlBuildContext& ctx) { NNodes::TCoCombineByKey combine(&node); const bool isStreamOrFlow = combine.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Stream || @@ -745,29 +759,19 @@ TMkqlCommonCallableCompiler::TShared::TShared() { }); AddCallable("WideTop", [](const TExprNode& node, TMkqlBuildContext& ctx) { - const auto flow = MkqlBuildExpr(node.Head(), ctx); - const auto count = MkqlBuildExpr(*node.Child(1U), ctx); - - std::vector<std::pair<ui32, TRuntimeNode>> directions; - directions.reserve(node.Tail().ChildrenSize()); - node.Tail().ForEachChild([&](const TExprNode& dir) { - directions.emplace_back(std::make_pair(::FromString<ui32>(dir.Head().Content()), MkqlBuildExpr(dir.Tail(), ctx))); - }); - - return ctx.ProgramBuilder.WideTop(flow, count, directions); + return WideTopImpl(node, ctx, &TProgramBuilder::WideTop); }); AddCallable("WideTopSort", [](const TExprNode& node, TMkqlBuildContext& ctx) { - const auto flow = MkqlBuildExpr(node.Head(), ctx); - const auto count = MkqlBuildExpr(*node.Child(1U), ctx); + return WideTopImpl(node, ctx, &TProgramBuilder::WideTopSort); + }); - std::vector<std::pair<ui32, TRuntimeNode>> directions; - directions.reserve(node.Tail().ChildrenSize()); - node.Tail().ForEachChild([&](const TExprNode& dir) { - directions.emplace_back(std::make_pair(::FromString<ui32>(dir.Head().Content()), MkqlBuildExpr(dir.Tail(), ctx))); - }); + AddCallable("WideTopBlocks", [](const TExprNode& node, TMkqlBuildContext& ctx) { + return WideTopImpl(node, ctx, &TProgramBuilder::WideTopBlocks); + }); - return ctx.ProgramBuilder.WideTopSort(flow, count, directions); + AddCallable("WideTopSortBlocks", [](const TExprNode& node, TMkqlBuildContext& ctx) { + return WideTopImpl(node, ctx, &TProgramBuilder::WideTopSortBlocks); }); AddCallable("Iterable", [](const TExprNode& node, TMkqlBuildContext& ctx) { |