diff options
author | aneporada <[email protected]> | 2023-01-03 12:39:43 +0300 |
---|---|---|
committer | aneporada <[email protected]> | 2023-01-03 12:39:43 +0300 |
commit | 8ee4eaa91898ce3adcdf302ba33311fc9e627282 (patch) | |
tree | 554f3337b05d9d555c169ef87164fd627ded8ebd | |
parent | 3b9d6412ef442facb9ef6eb1e7aa279d4279145a (diff) |
Rework chunked blocks handling - do not use TChunkedBlockExpr
-rw-r--r-- | ydb/library/yql/ast/yql_expr.h | 4 | ||||
-rw-r--r-- | ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp | 46 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_blocks.cpp | 30 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_wide.cpp | 7 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_aggregate_expander.cpp | 1 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_expr_type_annotation.cpp | 32 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_expr_type_annotation.h | 8 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp | 7 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp | 13 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_impl.h | 145 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp | 13 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp | 105 |
12 files changed, 215 insertions, 196 deletions
diff --git a/ydb/library/yql/ast/yql_expr.h b/ydb/library/yql/ast/yql_expr.h index 6c77bbb30f9..9ce91c7f218 100644 --- a/ydb/library/yql/ast/yql_expr.h +++ b/ydb/library/yql/ast/yql_expr.h @@ -236,9 +236,9 @@ public: return kind == ETypeAnnotationKind::Optional || kind == ETypeAnnotationKind::Null || kind == ETypeAnnotationKind::Pg; } - bool IsAnyBlockOrScalar() const { + bool IsBlockOrScalar() const { auto kind = GetKind(); - return kind == ETypeAnnotationKind::Block || kind == ETypeAnnotationKind::ChunkedBlock || kind == ETypeAnnotationKind::Scalar; + return kind == ETypeAnnotationKind::Block || kind == ETypeAnnotationKind::Scalar; } bool HasFixedSizeRepr() const { diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp index 125986d05f1..db7aedc5857 100644 --- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -4700,7 +4700,7 @@ bool CollectBlockRewrites(const TMultiExprType* multiInputType, bool keepInputCo TVector<const TTypeAnnotationNode*> allInputTypes; for (const auto& i : multiInputType->GetItems()) { - if (i->IsAnyBlockOrScalar()) { + if (i->IsBlockOrScalar()) { return false; } @@ -4800,24 +4800,19 @@ bool CollectBlockRewrites(const TMultiExprType* multiInputType, bool keepInputCo } TVector<const TTypeAnnotationNode*> argTypes; + bool hasBlockArg = false; for (ui32 i = isUdf ? 1 : 0; i < node->ChildrenSize(); ++i) { auto child = node->Child(i); if (child->IsComplete()) { argTypes.push_back(ctx.MakeType<TScalarExprType>(child->GetTypeAnn())); - } else if (child->GetTypeAnn()->HasFixedSizeRepr()) { - argTypes.push_back(ctx.MakeType<TBlockExprType>(child->GetTypeAnn())); } else { - argTypes.push_back(ctx.MakeType<TChunkedBlockExprType>(child->GetTypeAnn())); + hasBlockArg = true; + argTypes.push_back(ctx.MakeType<TBlockExprType>(child->GetTypeAnn())); } } - const TTypeAnnotationNode* outType = node->GetTypeAnn(); - if (outType->HasFixedSizeRepr()) { - outType = ctx.MakeType<TBlockExprType>(outType); - } else { - outType = ctx.MakeType<TChunkedBlockExprType>(outType); - } - + YQL_ENSURE(!node->IsComplete() && hasBlockArg); + const TTypeAnnotationNode* outType = ctx.MakeType<TBlockExprType>(node->GetTypeAnn()); if (isUdf) { funcArgs.push_back(ctx.Builder(node->Head().Pos()) .Callable("Udf") @@ -4989,16 +4984,15 @@ TExprNode::TPtr OptimizeWideMapBlocks(const TExprNode::TPtr& node, TExprContext& YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to blocks, extra nodes: " << newNodes << ", extra columns: " << rewritePositions.size(); - // TODO: since arrow kernels handle chunked blocks automatically, we may need an optimizer that drops such BlockExpandChunked auto ret = ctx.Builder(node->Pos()) .Callable("WideFromBlocks") - .Callable(0, "WideMap") - .Callable(0, "BlockExpandChunked") + .Callable(0, "BlockExpandChunked") + .Callable(0, "WideMap") .Callable(0, "WideToBlocks") .Add(0, node->HeadPtr()) .Seal() + .Add(1, blockLambda) .Seal() - .Add(1, blockLambda) .Seal() .Seal() .Build(); @@ -5026,13 +5020,13 @@ TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprConte } auto blockMapped = ctx.Builder(node->Pos()) - .Callable("WideMap") - .Callable(0, "BlockExpandChunked") + .Callable("BlockExpandChunked") + .Callable(0, "WideMap") .Callable(0, "WideToBlocks") .Add(0, node->HeadPtr()) .Seal() + .Add(1, blockLambda) .Seal() - .Add(1, blockLambda) .Seal() .Build(); @@ -5044,9 +5038,7 @@ TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprConte YQL_ENSURE(it->second == multiInputType->GetSize(), "Block filter column must follow original input columns"); auto result = ctx.Builder(node->Pos()) .Callable("BlockCompress") - .Callable(0, "BlockExpandChunked") - .Add(0, blockMapped) - .Seal() + .Add(0, blockMapped) .Atom(1, ToString(it->second), TNodeFlags::Default) .Seal() .Build(); @@ -5054,9 +5046,7 @@ TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprConte if (node->ChildrenSize() == 3) { result = ctx.Builder(node->Pos()) .Callable("WideTakeBlocks") - .Callable(0, "BlockExpandChunked") - .Add(0, result) - .Seal() + .Add(0, result) .Add(1, node->ChildPtr(2)) .Seal() .Build(); @@ -5128,7 +5118,7 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte } const auto& allTypes = flowItemType->Cast<TMultiExprType>()->GetItems(); - if (AnyOf(allTypes, [](const TTypeAnnotationNode* type) { return type->IsAnyBlockOrScalar(); })) { + if (AnyOf(allTypes, [](const TTypeAnnotationNode* type) { return type->IsBlockOrScalar(); })) { return node; } @@ -5144,10 +5134,8 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte return ctx.Builder(node->Pos()) .Callable("WideFromBlocks") .Callable(0, newName) - .Callable(0, "BlockExpandChunked") - .Callable(0, "WideToBlocks") - .Add(0, node->HeadPtr()) - .Seal() + .Callable(0, "WideToBlocks") + .Add(0, node->HeadPtr()) .Seal() .Add(1, node->ChildPtr(1)) .Seal() diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp index 9db337ec7f9..8800570e233 100644 --- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp @@ -83,27 +83,18 @@ IGraphTransformer::TStatus BlockExpandChunkedWrapper(const TExprNode::TPtr& inpu } TTypeAnnotationNode::TListType blockItemTypes; - const bool allowChunked = true; - if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr, allowChunked)) { + if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } auto flowItemTypes = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems(); - bool hasChunked = false; - for (auto& flowItemType : flowItemTypes) { - if (flowItemType->GetKind() == ETypeAnnotationKind::ChunkedBlock) { - hasChunked = true; - flowItemType = ctx.Expr.MakeType<TBlockExprType>(flowItemType->Cast<TChunkedBlockExprType>()->GetItemType()); - } - } - - if (!hasChunked) { + bool allScalars = AllOf(flowItemTypes, [](const TTypeAnnotationNode* item) { return item->GetKind() == ETypeAnnotationKind::Scalar; }); + if (allScalars) { output = input->HeadPtr(); return IGraphTransformer::TStatus::Repeat; } - auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(flowItemTypes); - input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType)); + input->SetTypeAnn(input->Head().GetTypeAnn()); return IGraphTransformer::TStatus::Ok; } @@ -215,13 +206,12 @@ IGraphTransformer::TStatus BlockFuncWrapper(const TExprNode::TPtr& input, TExprN } auto returnType = input->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); - const bool allowChunked = true; - if (!EnsureBlockOrScalarType(input->Child(1)->Pos(), *returnType, ctx.Expr, allowChunked)) { + if (!EnsureBlockOrScalarType(input->Child(1)->Pos(), *returnType, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } for (ui32 i = 2; i < input->ChildrenSize(); ++i) { - if (!EnsureBlockOrScalarType(*input->Child(i), ctx.Expr, allowChunked)) { + if (!EnsureBlockOrScalarType(*input->Child(i), ctx.Expr)) { return IGraphTransformer::TStatus::Error; } } @@ -265,7 +255,7 @@ IGraphTransformer::TStatus BlockBitCastWrapper(const TExprNode::TPtr& input, TEx if (isScalar) { input->SetTypeAnn(ctx.Expr.MakeType<TScalarExprType>(outputType)); } else { - input->SetTypeAnn(MakeBlockType(*outputType, ctx.Expr)); + input->SetTypeAnn(ctx.Expr.MakeType<TBlockExprType>(outputType)); } return IGraphTransformer::TStatus::Ok; @@ -429,7 +419,7 @@ IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& inpu } for (auto& t : retMultiType) { - t = MakeBlockType(*t, ctx.Expr); + t = ctx.Expr.MakeType<TBlockExprType>(t); } retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64))); @@ -446,7 +436,7 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr } TTypeAnnotationNode::TListType blockItemTypes; - if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr, false, !many)) { + if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr, !many)) { return IGraphTransformer::TStatus::Error; } @@ -460,7 +450,7 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr } for (auto& t : retMultiType) { - t = MakeBlockType(*t, ctx.Expr); + t = ctx.Expr.MakeType<TBlockExprType>(t); } if (many) { diff --git a/ydb/library/yql/core/type_ann/type_ann_wide.cpp b/ydb/library/yql/core/type_ann/type_ann_wide.cpp index ef2429c79e8..4ea0c30d011 100644 --- a/ydb/library/yql/core/type_ann/type_ann_wide.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_wide.cpp @@ -638,7 +638,7 @@ IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TEx const auto multiType = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>(); TTypeAnnotationNode::TListType retMultiType; for (const auto& type : multiType->GetItems()) { - if (type->IsAnyBlockOrScalar()) { + if (type->IsBlockOrScalar()) { ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Input type should not be a block or scalar")); return IGraphTransformer::TStatus::Error; } @@ -647,7 +647,7 @@ IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TEx return IGraphTransformer::TStatus::Error; } - retMultiType.push_back(MakeBlockType(*type, ctx.Expr)); + retMultiType.push_back(ctx.Expr.MakeType<TBlockExprType>(type)); } retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64))); @@ -663,8 +663,7 @@ IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, T } TTypeAnnotationNode::TListType retMultiType; - const bool allowChunked = true; - if (!EnsureWideFlowBlockType(input->Head(), retMultiType, ctx.Expr, allowChunked)) { + if (!EnsureWideFlowBlockType(input->Head(), retMultiType, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp index 46b3a86cf6a..3ed00c85eba 100644 --- a/ydb/library/yql/core/yql_aggregate_expander.cpp +++ b/ydb/library/yql/core/yql_aggregate_expander.cpp @@ -614,7 +614,6 @@ TExprNode::TPtr TAggregateExpander::MakeInputBlocks(const TExprNode::TPtr& strea auto extractorLambda = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), std::move(extractorArgs)), std::move(extractorRoots)); auto mappedWideFlow = Ctx.NewCallable(Node->Pos(), "WideMap", { wideFlow, extractorLambda }); auto blocks = Ctx.NewCallable(Node->Pos(), "WideToBlocks", { mappedWideFlow }); - blocks = Ctx.NewCallable(Node->Pos(), "BlockExpandChunked", { blocks }); return blocks; } diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp index 0436652dfc8..c3a4e1e2a7a 100644 --- a/ydb/library/yql/core/yql_expr_type_annotation.cpp +++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp @@ -2697,7 +2697,7 @@ bool EnsureWideFlowType(TPositionHandle position, const TTypeAnnotationNode& typ return true; } -bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowChunked, bool allowScalar) { +bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar) { if (!EnsureWideFlowType(node, ctx)) { return false; } @@ -2711,7 +2711,7 @@ bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListTy bool isScalar; for (ui32 i = 0; i < items.size(); ++i) { const auto& type = items[i]; - if (!EnsureBlockOrScalarType(node.Pos(), *type, ctx, allowChunked)) { + if (!EnsureBlockOrScalarType(node.Pos(), *type, ctx)) { return false; } @@ -5333,21 +5333,18 @@ bool HasContextFuncs(const TExprNode& input) { return needCtx; } -bool EnsureBlockOrScalarType(const TExprNode& node, TExprContext& ctx, bool allowChunked) { +bool EnsureBlockOrScalarType(const TExprNode& node, TExprContext& ctx) { if (HasError(node.GetTypeAnn(), ctx) || !node.GetTypeAnn()) { YQL_ENSURE(node.Type() == TExprNode::Lambda); ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Expected block or scalar type, but got lambda")); return false; } - return EnsureBlockOrScalarType(node.Pos(), *node.GetTypeAnn(), ctx, allowChunked); + return EnsureBlockOrScalarType(node.Pos(), *node.GetTypeAnn(), ctx); } -bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx, bool allowChunked) { - bool valid = allowChunked ? type.IsAnyBlockOrScalar() : - (type.GetKind() == ETypeAnnotationKind::Block || - type.GetKind() == ETypeAnnotationKind::Scalar); - if (HasError(&type, ctx) || !valid) { +bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx) { + if (HasError(&type, ctx) || !type.IsBlockOrScalar()) { ctx.AddError(TIssue(ctx.GetPosition(position), TStringBuilder() << "Expected block or scalar type, but got: " << type)); return false; } @@ -5356,25 +5353,14 @@ bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode } const TTypeAnnotationNode* GetBlockItemType(const TTypeAnnotationNode& type, bool& isScalar) { - YQL_ENSURE(type.IsAnyBlockOrScalar()); + YQL_ENSURE(type.IsBlockOrScalar()); const auto kind = type.GetKind(); if (kind == ETypeAnnotationKind::Block) { isScalar = false; return type.Cast<TBlockExprType>()->GetItemType(); - } else if (kind == ETypeAnnotationKind::ChunkedBlock) { - isScalar = false; - return type.Cast<TChunkedBlockExprType>()->GetItemType(); - } else { - isScalar = true; - return type.Cast<TScalarExprType>()->GetItemType(); - } -} - -const TTypeAnnotationNode* MakeBlockType(const TTypeAnnotationNode& blockItemType, TExprContext& ctx, bool withChunked) { - if (withChunked && !blockItemType.HasFixedSizeRepr()) { - return ctx.MakeType<TChunkedBlockExprType>(&blockItemType); } - return ctx.MakeType<TBlockExprType>(&blockItemType); + isScalar = true; + return type.Cast<TScalarExprType>()->GetItemType(); } const TTypeAnnotationNode* AggApplySerializedStateType(const TExprNode::TPtr& input, TExprContext& ctx) { diff --git a/ydb/library/yql/core/yql_expr_type_annotation.h b/ydb/library/yql/core/yql_expr_type_annotation.h index 7f55dd94ae9..0d01545625d 100644 --- a/ydb/library/yql/core/yql_expr_type_annotation.h +++ b/ydb/library/yql/core/yql_expr_type_annotation.h @@ -118,8 +118,7 @@ bool EnsureFlowType(const TExprNode& node, TExprContext& ctx); bool EnsureFlowType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx); bool EnsureWideFlowType(const TExprNode& node, TExprContext& ctx); bool EnsureWideFlowType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx); -bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, - bool allowChunked = false, bool allowScalar = true); +bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar = true); bool EnsureOptionalType(const TExprNode& node, TExprContext& ctx); bool EnsureOptionalType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx); bool EnsureType(const TExprNode& node, TExprContext& ctx); @@ -298,10 +297,9 @@ bool IsCallableTypeHasStreams(const TCallableExprType* callableType); bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, bool& convertToPg, TPositionHandle pos, TExprContext& ctx); bool HasContextFuncs(const TExprNode& input); -bool EnsureBlockOrScalarType(const TExprNode& node, TExprContext& ctx, bool allowChunked = false); -bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx, bool allowChunked = false); +bool EnsureBlockOrScalarType(const TExprNode& node, TExprContext& ctx); +bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx); const TTypeAnnotationNode* GetBlockItemType(const TTypeAnnotationNode& type, bool& isScalar); -const TTypeAnnotationNode* MakeBlockType(const TTypeAnnotationNode& blockItemType, TExprContext& ctx, bool withChunked = true); const TTypeAnnotationNode* AggApplySerializedStateType(const TExprNode::TPtr& input, TExprContext& ctx); bool GetSumResultType(const TPositionHandle& pos, const TTypeAnnotationNode& inputType, const TTypeAnnotationNode*& retType, TExprContext& ctx); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp index 9d5e76dcb98..0e877ee21c0 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp @@ -1,6 +1,7 @@ #include "mkql_block_agg.h" #include "mkql_block_agg_factory.h" #include "mkql_block_builder.h" +#include "mkql_block_impl.h" #include "mkql_rh_hash.h" #include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h> @@ -841,10 +842,10 @@ TStringBuf GetKeyView(const TSSOKey& key) { } template <typename TKey, typename TAggregator, typename TFixedAggState, bool UseSet, bool UseFilter, bool Finalize, bool Many, typename TDerived> -class THashedWrapperBase : public TStatefulWideFlowComputationNode<TDerived> { +class THashedWrapperBase : public TStatefulWideFlowBlockComputationNode<TDerived> { public: using TSelf = THashedWrapperBase<TKey, TAggregator, TFixedAggState, UseSet, UseFilter, Finalize, Many, TDerived>; - using TBase = TStatefulWideFlowComputationNode<TDerived>; + using TBase = TStatefulWideFlowBlockComputationNode<TDerived>; static constexpr bool UseArena = !InlineAggState && std::is_same<TFixedAggState, TStateArena>::value; @@ -857,7 +858,7 @@ public: TVector<TAggParams<TAggregator>>&& aggsParams, ui32 streamIndex, TVector<TVector<ui32>>&& streams) - : TBase(mutables, flow, EValueRepresentation::Any) + : TBase(mutables, flow, keys.size() + aggsParams.size() + 1) , Flow_(flow) , FilterColumn_(filterColumn) , Width_(width) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp index f142718bcb6..68801afd6d5 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp @@ -1,5 +1,6 @@ #include "mkql_block_compress.h" #include "mkql_block_builder.h" +#include "mkql_block_impl.h" #include <ydb/library/yql/minikql/arrow/arrow_util.h> #include <ydb/library/yql/minikql/arrow/mkql_bit_utils.h> @@ -13,10 +14,10 @@ namespace NMiniKQL { namespace { -class TCompressWithScalarBitmap : public TStatefulWideFlowComputationNode<TCompressWithScalarBitmap> { +class TCompressWithScalarBitmap : public TStatefulWideFlowBlockComputationNode<TCompressWithScalarBitmap> { public: TCompressWithScalarBitmap(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 bitmapIndex, ui32 width) - : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any) + : TStatefulWideFlowBlockComputationNode(mutables, flow, width - 1) , Flow_(flow) , BitmapIndex_(bitmapIndex) , Width_(width) @@ -89,10 +90,10 @@ size_t GetBitmapPopCount(const std::shared_ptr<arrow::ArrayData>& arr) { return GetSparseBitmapPopCount(src, len); } -class TCompressScalars : public TStatefulWideFlowComputationNode<TCompressScalars> { +class TCompressScalars : public TStatefulWideFlowBlockComputationNode<TCompressScalars> { public: TCompressScalars(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 bitmapIndex, ui32 width) - : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any) + : TStatefulWideFlowBlockComputationNode(mutables, flow, width - 1) , Flow_(flow) , BitmapIndex_(bitmapIndex) , Width_(width) @@ -156,10 +157,10 @@ private: const ui32 Width_; }; -class TCompressBlocks : public TStatefulWideFlowComputationNode<TCompressBlocks> { +class TCompressBlocks : public TStatefulWideFlowBlockComputationNode<TCompressBlocks> { public: TCompressBlocks(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 bitmapIndex, TVector<TBlockType*>&& types) - : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any) + : TStatefulWideFlowBlockComputationNode(mutables, flow, types.size() - 1) , Flow_(flow) , BitmapIndex_(bitmapIndex) , Types_(std::move(types)) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.h new file mode 100644 index 00000000000..58885a2b1e1 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.h @@ -0,0 +1,145 @@ +#pragma once + +#include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> + +#include <ydb/library/yql/minikql/arrow/arrow_util.h> + +#include <arrow/array.h> + +namespace NKikimr::NMiniKQL { + +template <typename TDerived> +class TStatefulWideFlowBlockComputationNode: public TWideFlowBaseComputationNode<TDerived> +{ +protected: + TStatefulWideFlowBlockComputationNode(TComputationMutables& mutables, const IComputationNode* source, ui32 width) + : TWideFlowBaseComputationNode<TDerived>(source) + , StateIndex(mutables.CurValueIndex++) + , StateKind(EValueRepresentation::Any) + , Width(width) + { + MKQL_ENSURE(width > 0, "Wide flow blocks should have at least one column (block length)"); + } + +private: + struct TState : public TComputationValue<TState> { + using TBase = TComputationValue<TState>; + + TVector<NUdf::TUnboxedValue> Values; + TVector<NUdf::TUnboxedValue*> ValuePointers; + NUdf::TUnboxedValue ChildState; + TVector<TDeque<std::shared_ptr<arrow::ArrayData>>> Arrays; + ui64 Count = 0; + + TState(TMemoryUsageInfo* memInfo, size_t width, NUdf::TUnboxedValue*const* values, TComputationContext& ctx) + : TBase(memInfo) + , Values(width) + , ValuePointers(width) + , Arrays(width - 1) + { + for (size_t i = 0; i < width - 1; ++i) { + ValuePointers[i] = values[i] ? &Values[i] : nullptr; + } + ValuePointers.back() = &Values.back(); + } + }; + + TState& GetState(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { + auto& state = ctx.MutableValues[GetIndex()]; + if (!state.HasValue()) { + state = ctx.HolderFactory.Create<TState>(Width, output, ctx); + } + return *static_cast<TState*>(state.AsBoxed().Get()); + } + + EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const final { + TState& s = GetState(ctx, output); + const TDerived& child = static_cast<const TDerived&>(*this); + while (s.Count == 0) { + auto result = static_cast<const TDerived*>(this)->DoCalculate(s.ChildState, ctx, s.ValuePointers.data()); + if (result != EFetchResult::One) { + return result; + } + + auto& counterDatum = TArrowBlock::From(s.Values.back()).GetDatum(); + MKQL_ENSURE(counterDatum.is_scalar(), "Unexpected block length type (expecting scalar)"); + s.Count = counterDatum.template scalar_as<arrow::UInt64Scalar>().value; + if (!s.Count) { + continue; + } + for (size_t i = 0; i < Width - 1; ++i) { + s.Arrays[i].clear(); + if (!output[i]) { + continue; + } + auto& datum = TArrowBlock::From(s.Values[i]).GetDatum(); + if (datum.is_scalar()) { + continue; + } + MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)"); + if (datum.is_array()) { + s.Arrays[i].push_back(datum.array()); + } else { + for (auto& chunk : datum.chunks()) { + s.Arrays[i].push_back(chunk->data()); + } + } + } + } + ui64 sliceSize = s.Count; + for (size_t i = 0; i < s.Arrays.size(); ++i) { + const auto& arr = s.Arrays[i]; + if (arr.empty()) { + continue; + } + + MKQL_ENSURE(arr.front()->length <= s.Count, "Unexpected array length at column #" << i); + sliceSize = std::min<ui64>(sliceSize, arr.front()->length); + } + + for (size_t i = 0; i < s.Arrays.size(); ++i) { + if (!output[i]) { + continue; + } + if (s.Arrays[i].empty()) { + *(output[i]) = s.Values[i]; + continue; + } + + auto& array = s.Arrays[i].front(); + if (array->length == sliceSize) { + *(output[i]) = ctx.HolderFactory.CreateArrowBlock(std::move(array)); + s.Arrays[i].pop_front(); + } else { + *(output[i]) = ctx.HolderFactory.CreateArrowBlock(Chop(array, sliceSize)); + } + } + + if (output[Width - 1]) { + *(output[Width - 1]) = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize))); + } + s.Count -= sliceSize; + return EFetchResult::One; + } + + ui32 GetIndex() const final { + return StateIndex; + } + + void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final { + if (this == owner) + return; + + const auto ins = dependencies.emplace(StateIndex, StateKind); + if (ins.second && this->Dependence) { + this->Dependence->CollectDependentIndexes(owner, dependencies); + } + } + + const ui32 StateIndex; + const EValueRepresentation StateKind; + const ui32 Width; +}; + +} //namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp index e0266ea2b18..b60617c24af 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp @@ -1,4 +1,5 @@ #include "mkql_block_skiptake.h" +#include "mkql_block_impl.h" #include <ydb/library/yql/minikql/arrow/arrow_defs.h> #include <ydb/library/yql/minikql/arrow/arrow_util.h> @@ -12,12 +13,12 @@ namespace NMiniKQL { namespace { -class TWideSkipBlocksWrapper: public TStatefulWideFlowComputationNode<TWideSkipBlocksWrapper> { - typedef TStatefulWideFlowComputationNode<TWideSkipBlocksWrapper> TBaseComputation; +class TWideSkipBlocksWrapper: public TStatefulWideFlowBlockComputationNode<TWideSkipBlocksWrapper> { + typedef TStatefulWideFlowBlockComputationNode<TWideSkipBlocksWrapper> TBaseComputation; public: TWideSkipBlocksWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, IComputationNode* count, size_t width) - : TBaseComputation(mutables, flow, EValueRepresentation::Any) + : TBaseComputation(mutables, flow, width) , Flow(flow) , Count(count) , Width(width) @@ -77,12 +78,12 @@ private: const size_t Width; }; -class TWideTakeBlocksWrapper: public TStatefulWideFlowComputationNode<TWideTakeBlocksWrapper> { - typedef TStatefulWideFlowComputationNode<TWideTakeBlocksWrapper> TBaseComputation; +class TWideTakeBlocksWrapper: public TStatefulWideFlowBlockComputationNode<TWideTakeBlocksWrapper> { + typedef TStatefulWideFlowBlockComputationNode<TWideTakeBlocksWrapper> TBaseComputation; public: TWideTakeBlocksWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, IComputationNode* count, size_t width) - : TBaseComputation(mutables, flow, EValueRepresentation::Any) + : TBaseComputation(mutables, flow, width) , Flow(flow) , Count(count) , Width(width) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp index 55dc51100ff..488dd2af044 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp @@ -1,6 +1,7 @@ #include "mkql_blocks.h" #include "mkql_block_builder.h" #include "mkql_block_reader.h" +#include "mkql_block_impl.h" #include <ydb/library/yql/minikql/arrow/arrow_defs.h> #include <ydb/library/yql/minikql/arrow/arrow_util.h> @@ -55,12 +56,12 @@ private: TType* ItemType_; }; -class TWideToBlocksWrapper : public TStatefulWideFlowComputationNode<TWideToBlocksWrapper> { +class TWideToBlocksWrapper : public TStatefulWideFlowBlockComputationNode<TWideToBlocksWrapper> { public: TWideToBlocksWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TVector<TType*>&& types) - : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any) + : TStatefulWideFlowBlockComputationNode(mutables, flow, types.size() + 1) , Flow_(flow) , Types_(std::move(types)) , Width_(Types_.size()) @@ -410,116 +411,27 @@ private: TType* Type_; }; -class TBlockExpandChunkedWrapper : public TStatefulWideFlowComputationNode<TBlockExpandChunkedWrapper> { +class TBlockExpandChunkedWrapper : public TStatefulWideFlowBlockComputationNode<TBlockExpandChunkedWrapper> { public: TBlockExpandChunkedWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 width) - : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any) + : TStatefulWideFlowBlockComputationNode(mutables, flow, width) , Flow_(flow) - , Width_(width) { } EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { - auto& s = GetState(state, ctx); - while (s.Count_ == 0) { - auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data()); - if (result != EFetchResult::One) { - return result; - } - - s.Count_ = TArrowBlock::From(s.Values_[Width_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; - for (size_t i = 0; i < Width_; ++i) { - s.Arrays_[i].clear(); - if (!output[i]) { - continue; - } - auto& datum = TArrowBlock::From(s.Values_[i]).GetDatum(); - if (datum.is_scalar()) { - continue; - } - Y_VERIFY(datum.is_arraylike()); - if (datum.is_array()) { - s.Arrays_[i].push_back(datum.array()); - } else { - for (auto& chunk : datum.chunks()) { - s.Arrays_[i].push_back(chunk->data()); - } - } - } - } - - ui64 sliceSize = s.Count_; - for (size_t i = 0; i < Width_; ++i) { - if (s.Arrays_[i].empty()) { - continue; - } - - Y_VERIFY(s.Arrays_[i].front()->length <= s.Count_); - sliceSize = std::min<ui64>(sliceSize, s.Arrays_[i].front()->length); - } - - for (size_t i = 0; i < Width_; ++i) { - if (!output[i]) { - continue; - } - - if (s.Arrays_[i].empty()) { - *(output[i]) = s.Values_[i]; - continue; - } - - auto& array = s.Arrays_[i].front(); - Y_VERIFY(array->length >= sliceSize); - if (array->length == sliceSize) { - *(output[i]) = ctx.HolderFactory.CreateArrowBlock(std::move(array)); - s.Arrays_[i].pop_front(); - } else { - *(output[i]) = ctx.HolderFactory.CreateArrowBlock(Chop(array, sliceSize)); - } - } - - MKQL_ENSURE(output[Width_], "Block size is unused"); - *(output[Width_]) = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize))); - s.Count_ -= sliceSize; - return EFetchResult::One; + Y_UNUSED(state); + return Flow_->FetchValues(ctx, output); } private: - struct TState : public TComputationValue<TState> { - TVector<NUdf::TUnboxedValue> Values_; - TVector<NUdf::TUnboxedValue*> ValuePointers_; - TVector<TDeque<std::shared_ptr<arrow::ArrayData>>> Arrays_; - ui64 Count_ = 0; - - TState(TMemoryUsageInfo* memInfo, size_t width, TComputationContext& ctx) - : TComputationValue(memInfo) - , Values_(width + 1) - , ValuePointers_(width + 1) - , Arrays_(width) - { - for (size_t i = 0; i < width + 1; ++i) { - ValuePointers_[i] = &Values_[i]; - } - } - }; - -private: void RegisterDependencies() const final { FlowDependsOn(Flow_); } - TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { - if (!state.HasValue()) { - state = ctx.HolderFactory.Create<TState>(Width_, ctx); - } - return *static_cast<TState*>(state.AsBoxed().Get()); - } - -private: IComputationWideFlowNode* Flow_; - const size_t Width_; }; @@ -584,12 +496,11 @@ IComputationNode* WrapBlockExpandChunked(TCallable& callable, const TComputation const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); - MKQL_ENSURE(tupleType->GetElementsCount() > 0, "Expected at least one column"); auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); - return new TBlockExpandChunkedWrapper(ctx.Mutables, wideFlow, tupleType->GetElementsCount() - 1); + return new TBlockExpandChunkedWrapper(ctx.Mutables, wideFlow, tupleType->GetElementsCount()); } } |