summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <[email protected]>2023-01-03 12:39:43 +0300
committeraneporada <[email protected]>2023-01-03 12:39:43 +0300
commit8ee4eaa91898ce3adcdf302ba33311fc9e627282 (patch)
tree554f3337b05d9d555c169ef87164fd627ded8ebd
parent3b9d6412ef442facb9ef6eb1e7aa279d4279145a (diff)
Rework chunked blocks handling - do not use TChunkedBlockExpr
-rw-r--r--ydb/library/yql/ast/yql_expr.h4
-rw-r--r--ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp46
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_blocks.cpp30
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_wide.cpp7
-rw-r--r--ydb/library/yql/core/yql_aggregate_expander.cpp1
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.cpp32
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.h8
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp7
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp13
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_impl.h145
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp13
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp105
12 files changed, 215 insertions, 196 deletions
diff --git a/ydb/library/yql/ast/yql_expr.h b/ydb/library/yql/ast/yql_expr.h
index 6c77bbb30f9..9ce91c7f218 100644
--- a/ydb/library/yql/ast/yql_expr.h
+++ b/ydb/library/yql/ast/yql_expr.h
@@ -236,9 +236,9 @@ public:
return kind == ETypeAnnotationKind::Optional || kind == ETypeAnnotationKind::Null || kind == ETypeAnnotationKind::Pg;
}
- bool IsAnyBlockOrScalar() const {
+ bool IsBlockOrScalar() const {
auto kind = GetKind();
- return kind == ETypeAnnotationKind::Block || kind == ETypeAnnotationKind::ChunkedBlock || kind == ETypeAnnotationKind::Scalar;
+ return kind == ETypeAnnotationKind::Block || kind == ETypeAnnotationKind::Scalar;
}
bool HasFixedSizeRepr() const {
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
index 125986d05f1..db7aedc5857 100644
--- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
+++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
@@ -4700,7 +4700,7 @@ bool CollectBlockRewrites(const TMultiExprType* multiInputType, bool keepInputCo
TVector<const TTypeAnnotationNode*> allInputTypes;
for (const auto& i : multiInputType->GetItems()) {
- if (i->IsAnyBlockOrScalar()) {
+ if (i->IsBlockOrScalar()) {
return false;
}
@@ -4800,24 +4800,19 @@ bool CollectBlockRewrites(const TMultiExprType* multiInputType, bool keepInputCo
}
TVector<const TTypeAnnotationNode*> argTypes;
+ bool hasBlockArg = false;
for (ui32 i = isUdf ? 1 : 0; i < node->ChildrenSize(); ++i) {
auto child = node->Child(i);
if (child->IsComplete()) {
argTypes.push_back(ctx.MakeType<TScalarExprType>(child->GetTypeAnn()));
- } else if (child->GetTypeAnn()->HasFixedSizeRepr()) {
- argTypes.push_back(ctx.MakeType<TBlockExprType>(child->GetTypeAnn()));
} else {
- argTypes.push_back(ctx.MakeType<TChunkedBlockExprType>(child->GetTypeAnn()));
+ hasBlockArg = true;
+ argTypes.push_back(ctx.MakeType<TBlockExprType>(child->GetTypeAnn()));
}
}
- const TTypeAnnotationNode* outType = node->GetTypeAnn();
- if (outType->HasFixedSizeRepr()) {
- outType = ctx.MakeType<TBlockExprType>(outType);
- } else {
- outType = ctx.MakeType<TChunkedBlockExprType>(outType);
- }
-
+ YQL_ENSURE(!node->IsComplete() && hasBlockArg);
+ const TTypeAnnotationNode* outType = ctx.MakeType<TBlockExprType>(node->GetTypeAnn());
if (isUdf) {
funcArgs.push_back(ctx.Builder(node->Head().Pos())
.Callable("Udf")
@@ -4989,16 +4984,15 @@ TExprNode::TPtr OptimizeWideMapBlocks(const TExprNode::TPtr& node, TExprContext&
YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to blocks, extra nodes: " << newNodes
<< ", extra columns: " << rewritePositions.size();
- // TODO: since arrow kernels handle chunked blocks automatically, we may need an optimizer that drops such BlockExpandChunked
auto ret = ctx.Builder(node->Pos())
.Callable("WideFromBlocks")
- .Callable(0, "WideMap")
- .Callable(0, "BlockExpandChunked")
+ .Callable(0, "BlockExpandChunked")
+ .Callable(0, "WideMap")
.Callable(0, "WideToBlocks")
.Add(0, node->HeadPtr())
.Seal()
+ .Add(1, blockLambda)
.Seal()
- .Add(1, blockLambda)
.Seal()
.Seal()
.Build();
@@ -5026,13 +5020,13 @@ TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprConte
}
auto blockMapped = ctx.Builder(node->Pos())
- .Callable("WideMap")
- .Callable(0, "BlockExpandChunked")
+ .Callable("BlockExpandChunked")
+ .Callable(0, "WideMap")
.Callable(0, "WideToBlocks")
.Add(0, node->HeadPtr())
.Seal()
+ .Add(1, blockLambda)
.Seal()
- .Add(1, blockLambda)
.Seal()
.Build();
@@ -5044,9 +5038,7 @@ TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprConte
YQL_ENSURE(it->second == multiInputType->GetSize(), "Block filter column must follow original input columns");
auto result = ctx.Builder(node->Pos())
.Callable("BlockCompress")
- .Callable(0, "BlockExpandChunked")
- .Add(0, blockMapped)
- .Seal()
+ .Add(0, blockMapped)
.Atom(1, ToString(it->second), TNodeFlags::Default)
.Seal()
.Build();
@@ -5054,9 +5046,7 @@ TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprConte
if (node->ChildrenSize() == 3) {
result = ctx.Builder(node->Pos())
.Callable("WideTakeBlocks")
- .Callable(0, "BlockExpandChunked")
- .Add(0, result)
- .Seal()
+ .Add(0, result)
.Add(1, node->ChildPtr(2))
.Seal()
.Build();
@@ -5128,7 +5118,7 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte
}
const auto& allTypes = flowItemType->Cast<TMultiExprType>()->GetItems();
- if (AnyOf(allTypes, [](const TTypeAnnotationNode* type) { return type->IsAnyBlockOrScalar(); })) {
+ if (AnyOf(allTypes, [](const TTypeAnnotationNode* type) { return type->IsBlockOrScalar(); })) {
return node;
}
@@ -5144,10 +5134,8 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte
return ctx.Builder(node->Pos())
.Callable("WideFromBlocks")
.Callable(0, newName)
- .Callable(0, "BlockExpandChunked")
- .Callable(0, "WideToBlocks")
- .Add(0, node->HeadPtr())
- .Seal()
+ .Callable(0, "WideToBlocks")
+ .Add(0, node->HeadPtr())
.Seal()
.Add(1, node->ChildPtr(1))
.Seal()
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
index 9db337ec7f9..8800570e233 100644
--- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
@@ -83,27 +83,18 @@ IGraphTransformer::TStatus BlockExpandChunkedWrapper(const TExprNode::TPtr& inpu
}
TTypeAnnotationNode::TListType blockItemTypes;
- const bool allowChunked = true;
- if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr, allowChunked)) {
+ if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
auto flowItemTypes = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems();
- bool hasChunked = false;
- for (auto& flowItemType : flowItemTypes) {
- if (flowItemType->GetKind() == ETypeAnnotationKind::ChunkedBlock) {
- hasChunked = true;
- flowItemType = ctx.Expr.MakeType<TBlockExprType>(flowItemType->Cast<TChunkedBlockExprType>()->GetItemType());
- }
- }
-
- if (!hasChunked) {
+ bool allScalars = AllOf(flowItemTypes, [](const TTypeAnnotationNode* item) { return item->GetKind() == ETypeAnnotationKind::Scalar; });
+ if (allScalars) {
output = input->HeadPtr();
return IGraphTransformer::TStatus::Repeat;
}
- auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(flowItemTypes);
- input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
+ input->SetTypeAnn(input->Head().GetTypeAnn());
return IGraphTransformer::TStatus::Ok;
}
@@ -215,13 +206,12 @@ IGraphTransformer::TStatus BlockFuncWrapper(const TExprNode::TPtr& input, TExprN
}
auto returnType = input->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
- const bool allowChunked = true;
- if (!EnsureBlockOrScalarType(input->Child(1)->Pos(), *returnType, ctx.Expr, allowChunked)) {
+ if (!EnsureBlockOrScalarType(input->Child(1)->Pos(), *returnType, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
for (ui32 i = 2; i < input->ChildrenSize(); ++i) {
- if (!EnsureBlockOrScalarType(*input->Child(i), ctx.Expr, allowChunked)) {
+ if (!EnsureBlockOrScalarType(*input->Child(i), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
}
@@ -265,7 +255,7 @@ IGraphTransformer::TStatus BlockBitCastWrapper(const TExprNode::TPtr& input, TEx
if (isScalar) {
input->SetTypeAnn(ctx.Expr.MakeType<TScalarExprType>(outputType));
} else {
- input->SetTypeAnn(MakeBlockType(*outputType, ctx.Expr));
+ input->SetTypeAnn(ctx.Expr.MakeType<TBlockExprType>(outputType));
}
return IGraphTransformer::TStatus::Ok;
@@ -429,7 +419,7 @@ IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& inpu
}
for (auto& t : retMultiType) {
- t = MakeBlockType(*t, ctx.Expr);
+ t = ctx.Expr.MakeType<TBlockExprType>(t);
}
retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64)));
@@ -446,7 +436,7 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr
}
TTypeAnnotationNode::TListType blockItemTypes;
- if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr, false, !many)) {
+ if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr, !many)) {
return IGraphTransformer::TStatus::Error;
}
@@ -460,7 +450,7 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr
}
for (auto& t : retMultiType) {
- t = MakeBlockType(*t, ctx.Expr);
+ t = ctx.Expr.MakeType<TBlockExprType>(t);
}
if (many) {
diff --git a/ydb/library/yql/core/type_ann/type_ann_wide.cpp b/ydb/library/yql/core/type_ann/type_ann_wide.cpp
index ef2429c79e8..4ea0c30d011 100644
--- a/ydb/library/yql/core/type_ann/type_ann_wide.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_wide.cpp
@@ -638,7 +638,7 @@ IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TEx
const auto multiType = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>();
TTypeAnnotationNode::TListType retMultiType;
for (const auto& type : multiType->GetItems()) {
- if (type->IsAnyBlockOrScalar()) {
+ if (type->IsBlockOrScalar()) {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Input type should not be a block or scalar"));
return IGraphTransformer::TStatus::Error;
}
@@ -647,7 +647,7 @@ IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TEx
return IGraphTransformer::TStatus::Error;
}
- retMultiType.push_back(MakeBlockType(*type, ctx.Expr));
+ retMultiType.push_back(ctx.Expr.MakeType<TBlockExprType>(type));
}
retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64)));
@@ -663,8 +663,7 @@ IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, T
}
TTypeAnnotationNode::TListType retMultiType;
- const bool allowChunked = true;
- if (!EnsureWideFlowBlockType(input->Head(), retMultiType, ctx.Expr, allowChunked)) {
+ if (!EnsureWideFlowBlockType(input->Head(), retMultiType, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp
index 46b3a86cf6a..3ed00c85eba 100644
--- a/ydb/library/yql/core/yql_aggregate_expander.cpp
+++ b/ydb/library/yql/core/yql_aggregate_expander.cpp
@@ -614,7 +614,6 @@ TExprNode::TPtr TAggregateExpander::MakeInputBlocks(const TExprNode::TPtr& strea
auto extractorLambda = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), std::move(extractorArgs)), std::move(extractorRoots));
auto mappedWideFlow = Ctx.NewCallable(Node->Pos(), "WideMap", { wideFlow, extractorLambda });
auto blocks = Ctx.NewCallable(Node->Pos(), "WideToBlocks", { mappedWideFlow });
- blocks = Ctx.NewCallable(Node->Pos(), "BlockExpandChunked", { blocks });
return blocks;
}
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp
index 0436652dfc8..c3a4e1e2a7a 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.cpp
+++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp
@@ -2697,7 +2697,7 @@ bool EnsureWideFlowType(TPositionHandle position, const TTypeAnnotationNode& typ
return true;
}
-bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowChunked, bool allowScalar) {
+bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar) {
if (!EnsureWideFlowType(node, ctx)) {
return false;
}
@@ -2711,7 +2711,7 @@ bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListTy
bool isScalar;
for (ui32 i = 0; i < items.size(); ++i) {
const auto& type = items[i];
- if (!EnsureBlockOrScalarType(node.Pos(), *type, ctx, allowChunked)) {
+ if (!EnsureBlockOrScalarType(node.Pos(), *type, ctx)) {
return false;
}
@@ -5333,21 +5333,18 @@ bool HasContextFuncs(const TExprNode& input) {
return needCtx;
}
-bool EnsureBlockOrScalarType(const TExprNode& node, TExprContext& ctx, bool allowChunked) {
+bool EnsureBlockOrScalarType(const TExprNode& node, TExprContext& ctx) {
if (HasError(node.GetTypeAnn(), ctx) || !node.GetTypeAnn()) {
YQL_ENSURE(node.Type() == TExprNode::Lambda);
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Expected block or scalar type, but got lambda"));
return false;
}
- return EnsureBlockOrScalarType(node.Pos(), *node.GetTypeAnn(), ctx, allowChunked);
+ return EnsureBlockOrScalarType(node.Pos(), *node.GetTypeAnn(), ctx);
}
-bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx, bool allowChunked) {
- bool valid = allowChunked ? type.IsAnyBlockOrScalar() :
- (type.GetKind() == ETypeAnnotationKind::Block ||
- type.GetKind() == ETypeAnnotationKind::Scalar);
- if (HasError(&type, ctx) || !valid) {
+bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx) {
+ if (HasError(&type, ctx) || !type.IsBlockOrScalar()) {
ctx.AddError(TIssue(ctx.GetPosition(position), TStringBuilder() << "Expected block or scalar type, but got: " << type));
return false;
}
@@ -5356,25 +5353,14 @@ bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode
}
const TTypeAnnotationNode* GetBlockItemType(const TTypeAnnotationNode& type, bool& isScalar) {
- YQL_ENSURE(type.IsAnyBlockOrScalar());
+ YQL_ENSURE(type.IsBlockOrScalar());
const auto kind = type.GetKind();
if (kind == ETypeAnnotationKind::Block) {
isScalar = false;
return type.Cast<TBlockExprType>()->GetItemType();
- } else if (kind == ETypeAnnotationKind::ChunkedBlock) {
- isScalar = false;
- return type.Cast<TChunkedBlockExprType>()->GetItemType();
- } else {
- isScalar = true;
- return type.Cast<TScalarExprType>()->GetItemType();
- }
-}
-
-const TTypeAnnotationNode* MakeBlockType(const TTypeAnnotationNode& blockItemType, TExprContext& ctx, bool withChunked) {
- if (withChunked && !blockItemType.HasFixedSizeRepr()) {
- return ctx.MakeType<TChunkedBlockExprType>(&blockItemType);
}
- return ctx.MakeType<TBlockExprType>(&blockItemType);
+ isScalar = true;
+ return type.Cast<TScalarExprType>()->GetItemType();
}
const TTypeAnnotationNode* AggApplySerializedStateType(const TExprNode::TPtr& input, TExprContext& ctx) {
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.h b/ydb/library/yql/core/yql_expr_type_annotation.h
index 7f55dd94ae9..0d01545625d 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.h
+++ b/ydb/library/yql/core/yql_expr_type_annotation.h
@@ -118,8 +118,7 @@ bool EnsureFlowType(const TExprNode& node, TExprContext& ctx);
bool EnsureFlowType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx);
bool EnsureWideFlowType(const TExprNode& node, TExprContext& ctx);
bool EnsureWideFlowType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx);
-bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx,
- bool allowChunked = false, bool allowScalar = true);
+bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar = true);
bool EnsureOptionalType(const TExprNode& node, TExprContext& ctx);
bool EnsureOptionalType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx);
bool EnsureType(const TExprNode& node, TExprContext& ctx);
@@ -298,10 +297,9 @@ bool IsCallableTypeHasStreams(const TCallableExprType* callableType);
bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, bool& convertToPg, TPositionHandle pos, TExprContext& ctx);
bool HasContextFuncs(const TExprNode& input);
-bool EnsureBlockOrScalarType(const TExprNode& node, TExprContext& ctx, bool allowChunked = false);
-bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx, bool allowChunked = false);
+bool EnsureBlockOrScalarType(const TExprNode& node, TExprContext& ctx);
+bool EnsureBlockOrScalarType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx);
const TTypeAnnotationNode* GetBlockItemType(const TTypeAnnotationNode& type, bool& isScalar);
-const TTypeAnnotationNode* MakeBlockType(const TTypeAnnotationNode& blockItemType, TExprContext& ctx, bool withChunked = true);
const TTypeAnnotationNode* AggApplySerializedStateType(const TExprNode::TPtr& input, TExprContext& ctx);
bool GetSumResultType(const TPositionHandle& pos, const TTypeAnnotationNode& inputType, const TTypeAnnotationNode*& retType, TExprContext& ctx);
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
index 9d5e76dcb98..0e877ee21c0 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
@@ -1,6 +1,7 @@
#include "mkql_block_agg.h"
#include "mkql_block_agg_factory.h"
#include "mkql_block_builder.h"
+#include "mkql_block_impl.h"
#include "mkql_rh_hash.h"
#include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h>
@@ -841,10 +842,10 @@ TStringBuf GetKeyView(const TSSOKey& key) {
}
template <typename TKey, typename TAggregator, typename TFixedAggState, bool UseSet, bool UseFilter, bool Finalize, bool Many, typename TDerived>
-class THashedWrapperBase : public TStatefulWideFlowComputationNode<TDerived> {
+class THashedWrapperBase : public TStatefulWideFlowBlockComputationNode<TDerived> {
public:
using TSelf = THashedWrapperBase<TKey, TAggregator, TFixedAggState, UseSet, UseFilter, Finalize, Many, TDerived>;
- using TBase = TStatefulWideFlowComputationNode<TDerived>;
+ using TBase = TStatefulWideFlowBlockComputationNode<TDerived>;
static constexpr bool UseArena = !InlineAggState && std::is_same<TFixedAggState, TStateArena>::value;
@@ -857,7 +858,7 @@ public:
TVector<TAggParams<TAggregator>>&& aggsParams,
ui32 streamIndex,
TVector<TVector<ui32>>&& streams)
- : TBase(mutables, flow, EValueRepresentation::Any)
+ : TBase(mutables, flow, keys.size() + aggsParams.size() + 1)
, Flow_(flow)
, FilterColumn_(filterColumn)
, Width_(width)
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
index f142718bcb6..68801afd6d5 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
@@ -1,5 +1,6 @@
#include "mkql_block_compress.h"
#include "mkql_block_builder.h"
+#include "mkql_block_impl.h"
#include <ydb/library/yql/minikql/arrow/arrow_util.h>
#include <ydb/library/yql/minikql/arrow/mkql_bit_utils.h>
@@ -13,10 +14,10 @@ namespace NMiniKQL {
namespace {
-class TCompressWithScalarBitmap : public TStatefulWideFlowComputationNode<TCompressWithScalarBitmap> {
+class TCompressWithScalarBitmap : public TStatefulWideFlowBlockComputationNode<TCompressWithScalarBitmap> {
public:
TCompressWithScalarBitmap(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 bitmapIndex, ui32 width)
- : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
+ : TStatefulWideFlowBlockComputationNode(mutables, flow, width - 1)
, Flow_(flow)
, BitmapIndex_(bitmapIndex)
, Width_(width)
@@ -89,10 +90,10 @@ size_t GetBitmapPopCount(const std::shared_ptr<arrow::ArrayData>& arr) {
return GetSparseBitmapPopCount(src, len);
}
-class TCompressScalars : public TStatefulWideFlowComputationNode<TCompressScalars> {
+class TCompressScalars : public TStatefulWideFlowBlockComputationNode<TCompressScalars> {
public:
TCompressScalars(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 bitmapIndex, ui32 width)
- : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
+ : TStatefulWideFlowBlockComputationNode(mutables, flow, width - 1)
, Flow_(flow)
, BitmapIndex_(bitmapIndex)
, Width_(width)
@@ -156,10 +157,10 @@ private:
const ui32 Width_;
};
-class TCompressBlocks : public TStatefulWideFlowComputationNode<TCompressBlocks> {
+class TCompressBlocks : public TStatefulWideFlowBlockComputationNode<TCompressBlocks> {
public:
TCompressBlocks(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 bitmapIndex, TVector<TBlockType*>&& types)
- : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
+ : TStatefulWideFlowBlockComputationNode(mutables, flow, types.size() - 1)
, Flow_(flow)
, BitmapIndex_(bitmapIndex)
, Types_(std::move(types))
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.h
new file mode 100644
index 00000000000..58885a2b1e1
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_impl.h
@@ -0,0 +1,145 @@
+#pragma once
+
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+
+#include <ydb/library/yql/minikql/arrow/arrow_util.h>
+
+#include <arrow/array.h>
+
+namespace NKikimr::NMiniKQL {
+
+template <typename TDerived>
+class TStatefulWideFlowBlockComputationNode: public TWideFlowBaseComputationNode<TDerived>
+{
+protected:
+ TStatefulWideFlowBlockComputationNode(TComputationMutables& mutables, const IComputationNode* source, ui32 width)
+ : TWideFlowBaseComputationNode<TDerived>(source)
+ , StateIndex(mutables.CurValueIndex++)
+ , StateKind(EValueRepresentation::Any)
+ , Width(width)
+ {
+ MKQL_ENSURE(width > 0, "Wide flow blocks should have at least one column (block length)");
+ }
+
+private:
+ struct TState : public TComputationValue<TState> {
+ using TBase = TComputationValue<TState>;
+
+ TVector<NUdf::TUnboxedValue> Values;
+ TVector<NUdf::TUnboxedValue*> ValuePointers;
+ NUdf::TUnboxedValue ChildState;
+ TVector<TDeque<std::shared_ptr<arrow::ArrayData>>> Arrays;
+ ui64 Count = 0;
+
+ TState(TMemoryUsageInfo* memInfo, size_t width, NUdf::TUnboxedValue*const* values, TComputationContext& ctx)
+ : TBase(memInfo)
+ , Values(width)
+ , ValuePointers(width)
+ , Arrays(width - 1)
+ {
+ for (size_t i = 0; i < width - 1; ++i) {
+ ValuePointers[i] = values[i] ? &Values[i] : nullptr;
+ }
+ ValuePointers.back() = &Values.back();
+ }
+ };
+
+ TState& GetState(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
+ auto& state = ctx.MutableValues[GetIndex()];
+ if (!state.HasValue()) {
+ state = ctx.HolderFactory.Create<TState>(Width, output, ctx);
+ }
+ return *static_cast<TState*>(state.AsBoxed().Get());
+ }
+
+ EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const final {
+ TState& s = GetState(ctx, output);
+ const TDerived& child = static_cast<const TDerived&>(*this);
+ while (s.Count == 0) {
+ auto result = static_cast<const TDerived*>(this)->DoCalculate(s.ChildState, ctx, s.ValuePointers.data());
+ if (result != EFetchResult::One) {
+ return result;
+ }
+
+ auto& counterDatum = TArrowBlock::From(s.Values.back()).GetDatum();
+ MKQL_ENSURE(counterDatum.is_scalar(), "Unexpected block length type (expecting scalar)");
+ s.Count = counterDatum.template scalar_as<arrow::UInt64Scalar>().value;
+ if (!s.Count) {
+ continue;
+ }
+ for (size_t i = 0; i < Width - 1; ++i) {
+ s.Arrays[i].clear();
+ if (!output[i]) {
+ continue;
+ }
+ auto& datum = TArrowBlock::From(s.Values[i]).GetDatum();
+ if (datum.is_scalar()) {
+ continue;
+ }
+ MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)");
+ if (datum.is_array()) {
+ s.Arrays[i].push_back(datum.array());
+ } else {
+ for (auto& chunk : datum.chunks()) {
+ s.Arrays[i].push_back(chunk->data());
+ }
+ }
+ }
+ }
+ ui64 sliceSize = s.Count;
+ for (size_t i = 0; i < s.Arrays.size(); ++i) {
+ const auto& arr = s.Arrays[i];
+ if (arr.empty()) {
+ continue;
+ }
+
+ MKQL_ENSURE(arr.front()->length <= s.Count, "Unexpected array length at column #" << i);
+ sliceSize = std::min<ui64>(sliceSize, arr.front()->length);
+ }
+
+ for (size_t i = 0; i < s.Arrays.size(); ++i) {
+ if (!output[i]) {
+ continue;
+ }
+ if (s.Arrays[i].empty()) {
+ *(output[i]) = s.Values[i];
+ continue;
+ }
+
+ auto& array = s.Arrays[i].front();
+ if (array->length == sliceSize) {
+ *(output[i]) = ctx.HolderFactory.CreateArrowBlock(std::move(array));
+ s.Arrays[i].pop_front();
+ } else {
+ *(output[i]) = ctx.HolderFactory.CreateArrowBlock(Chop(array, sliceSize));
+ }
+ }
+
+ if (output[Width - 1]) {
+ *(output[Width - 1]) = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize)));
+ }
+ s.Count -= sliceSize;
+ return EFetchResult::One;
+ }
+
+ ui32 GetIndex() const final {
+ return StateIndex;
+ }
+
+ void CollectDependentIndexes(const IComputationNode* owner, IComputationNode::TIndexesMap& dependencies) const final {
+ if (this == owner)
+ return;
+
+ const auto ins = dependencies.emplace(StateIndex, StateKind);
+ if (ins.second && this->Dependence) {
+ this->Dependence->CollectDependentIndexes(owner, dependencies);
+ }
+ }
+
+ const ui32 StateIndex;
+ const EValueRepresentation StateKind;
+ const ui32 Width;
+};
+
+} //namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp
index e0266ea2b18..b60617c24af 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp
@@ -1,4 +1,5 @@
#include "mkql_block_skiptake.h"
+#include "mkql_block_impl.h"
#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
#include <ydb/library/yql/minikql/arrow/arrow_util.h>
@@ -12,12 +13,12 @@ namespace NMiniKQL {
namespace {
-class TWideSkipBlocksWrapper: public TStatefulWideFlowComputationNode<TWideSkipBlocksWrapper> {
- typedef TStatefulWideFlowComputationNode<TWideSkipBlocksWrapper> TBaseComputation;
+class TWideSkipBlocksWrapper: public TStatefulWideFlowBlockComputationNode<TWideSkipBlocksWrapper> {
+ typedef TStatefulWideFlowBlockComputationNode<TWideSkipBlocksWrapper> TBaseComputation;
public:
TWideSkipBlocksWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, IComputationNode* count, size_t width)
- : TBaseComputation(mutables, flow, EValueRepresentation::Any)
+ : TBaseComputation(mutables, flow, width)
, Flow(flow)
, Count(count)
, Width(width)
@@ -77,12 +78,12 @@ private:
const size_t Width;
};
-class TWideTakeBlocksWrapper: public TStatefulWideFlowComputationNode<TWideTakeBlocksWrapper> {
- typedef TStatefulWideFlowComputationNode<TWideTakeBlocksWrapper> TBaseComputation;
+class TWideTakeBlocksWrapper: public TStatefulWideFlowBlockComputationNode<TWideTakeBlocksWrapper> {
+ typedef TStatefulWideFlowBlockComputationNode<TWideTakeBlocksWrapper> TBaseComputation;
public:
TWideTakeBlocksWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, IComputationNode* count, size_t width)
- : TBaseComputation(mutables, flow, EValueRepresentation::Any)
+ : TBaseComputation(mutables, flow, width)
, Flow(flow)
, Count(count)
, Width(width)
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
index 55dc51100ff..488dd2af044 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
@@ -1,6 +1,7 @@
#include "mkql_blocks.h"
#include "mkql_block_builder.h"
#include "mkql_block_reader.h"
+#include "mkql_block_impl.h"
#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
#include <ydb/library/yql/minikql/arrow/arrow_util.h>
@@ -55,12 +56,12 @@ private:
TType* ItemType_;
};
-class TWideToBlocksWrapper : public TStatefulWideFlowComputationNode<TWideToBlocksWrapper> {
+class TWideToBlocksWrapper : public TStatefulWideFlowBlockComputationNode<TWideToBlocksWrapper> {
public:
TWideToBlocksWrapper(TComputationMutables& mutables,
IComputationWideFlowNode* flow,
TVector<TType*>&& types)
- : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
+ : TStatefulWideFlowBlockComputationNode(mutables, flow, types.size() + 1)
, Flow_(flow)
, Types_(std::move(types))
, Width_(Types_.size())
@@ -410,116 +411,27 @@ private:
TType* Type_;
};
-class TBlockExpandChunkedWrapper : public TStatefulWideFlowComputationNode<TBlockExpandChunkedWrapper> {
+class TBlockExpandChunkedWrapper : public TStatefulWideFlowBlockComputationNode<TBlockExpandChunkedWrapper> {
public:
TBlockExpandChunkedWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 width)
- : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
+ : TStatefulWideFlowBlockComputationNode(mutables, flow, width)
, Flow_(flow)
- , Width_(width)
{
}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx,
NUdf::TUnboxedValue*const* output) const
{
- auto& s = GetState(state, ctx);
- while (s.Count_ == 0) {
- auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data());
- if (result != EFetchResult::One) {
- return result;
- }
-
- s.Count_ = TArrowBlock::From(s.Values_[Width_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
- for (size_t i = 0; i < Width_; ++i) {
- s.Arrays_[i].clear();
- if (!output[i]) {
- continue;
- }
- auto& datum = TArrowBlock::From(s.Values_[i]).GetDatum();
- if (datum.is_scalar()) {
- continue;
- }
- Y_VERIFY(datum.is_arraylike());
- if (datum.is_array()) {
- s.Arrays_[i].push_back(datum.array());
- } else {
- for (auto& chunk : datum.chunks()) {
- s.Arrays_[i].push_back(chunk->data());
- }
- }
- }
- }
-
- ui64 sliceSize = s.Count_;
- for (size_t i = 0; i < Width_; ++i) {
- if (s.Arrays_[i].empty()) {
- continue;
- }
-
- Y_VERIFY(s.Arrays_[i].front()->length <= s.Count_);
- sliceSize = std::min<ui64>(sliceSize, s.Arrays_[i].front()->length);
- }
-
- for (size_t i = 0; i < Width_; ++i) {
- if (!output[i]) {
- continue;
- }
-
- if (s.Arrays_[i].empty()) {
- *(output[i]) = s.Values_[i];
- continue;
- }
-
- auto& array = s.Arrays_[i].front();
- Y_VERIFY(array->length >= sliceSize);
- if (array->length == sliceSize) {
- *(output[i]) = ctx.HolderFactory.CreateArrowBlock(std::move(array));
- s.Arrays_[i].pop_front();
- } else {
- *(output[i]) = ctx.HolderFactory.CreateArrowBlock(Chop(array, sliceSize));
- }
- }
-
- MKQL_ENSURE(output[Width_], "Block size is unused");
- *(output[Width_]) = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize)));
- s.Count_ -= sliceSize;
- return EFetchResult::One;
+ Y_UNUSED(state);
+ return Flow_->FetchValues(ctx, output);
}
private:
- struct TState : public TComputationValue<TState> {
- TVector<NUdf::TUnboxedValue> Values_;
- TVector<NUdf::TUnboxedValue*> ValuePointers_;
- TVector<TDeque<std::shared_ptr<arrow::ArrayData>>> Arrays_;
- ui64 Count_ = 0;
-
- TState(TMemoryUsageInfo* memInfo, size_t width, TComputationContext& ctx)
- : TComputationValue(memInfo)
- , Values_(width + 1)
- , ValuePointers_(width + 1)
- , Arrays_(width)
- {
- for (size_t i = 0; i < width + 1; ++i) {
- ValuePointers_[i] = &Values_[i];
- }
- }
- };
-
-private:
void RegisterDependencies() const final {
FlowDependsOn(Flow_);
}
- TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
- if (!state.HasValue()) {
- state = ctx.HolderFactory.Create<TState>(Width_, ctx);
- }
- return *static_cast<TState*>(state.AsBoxed().Get());
- }
-
-private:
IComputationWideFlowNode* Flow_;
- const size_t Width_;
};
@@ -584,12 +496,11 @@ IComputationNode* WrapBlockExpandChunked(TCallable& callable, const TComputation
const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
- MKQL_ENSURE(tupleType->GetElementsCount() > 0, "Expected at least one column");
auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
- return new TBlockExpandChunkedWrapper(ctx.Mutables, wideFlow, tupleType->GetElementsCount() - 1);
+ return new TBlockExpandChunkedWrapper(ctx.Mutables, wideFlow, tupleType->GetElementsCount());
}
}