aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2022-12-21 14:56:17 +0300
committeraneporada <aneporada@ydb.tech>2022-12-21 14:56:17 +0300
commit84ca810714ae4a9cb7c61bb07920b769c49ec31b (patch)
tree44ea70b534898c86281d13930eea0e2043b2f595
parent10c915d6deca5ce9d5a6902f69b719d89402ffa5 (diff)
downloadydb-84ca810714ae4a9cb7c61bb07920b769c49ec31b.tar.gz
New BlockExpandChunked runtime node. Fix block builder for strings, add tests
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_blocks.cpp30
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_blocks.h1
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_core.cpp5
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp12
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp132
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_blocks.h1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp98
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp7
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.h1
10 files changed, 282 insertions, 6 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
index e95cd3cd33..f220ce2a35 100644
--- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
@@ -76,6 +76,36 @@ IGraphTransformer::TStatus BlockCompressWrapper(const TExprNode::TPtr& input, TE
return IGraphTransformer::TStatus::Ok;
}
+IGraphTransformer::TStatus BlockExpandChunkedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ if (!EnsureArgsCount(*input, 1U, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TTypeAnnotationNode::TListType blockItemTypes;
+ const bool allowChunked = true;
+ if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr, allowChunked)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto flowItemTypes = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems();
+ bool hasChunked = false;
+ for (auto& flowItemType : flowItemTypes) {
+ if (flowItemType->GetKind() == ETypeAnnotationKind::ChunkedBlock) {
+ hasChunked = true;
+ flowItemType = ctx.Expr.MakeType<TBlockExprType>(flowItemType->Cast<TChunkedBlockExprType>()->GetItemType());
+ }
+ }
+
+ if (!hasChunked) {
+ output = input->HeadPtr();
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(flowItemTypes);
+ input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
+ return IGraphTransformer::TStatus::Ok;
+}
+
IGraphTransformer::TStatus BlockCoalesceWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
if (!EnsureArgsCount(*input, 2U, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.h b/ydb/library/yql/core/type_ann/type_ann_blocks.h
index e461142643..9e2364dacf 100644
--- a/ydb/library/yql/core/type_ann/type_ann_blocks.h
+++ b/ydb/library/yql/core/type_ann/type_ann_blocks.h
@@ -10,6 +10,7 @@ namespace NTypeAnnImpl {
IGraphTransformer::TStatus AsScalarWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockCompressWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+ IGraphTransformer::TStatus BlockExpandChunkedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockCoalesceWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockLogicalWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockFuncWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp
index 361f97dec5..91c59447cd 100644
--- a/ydb/library/yql/core/type_ann/type_ann_core.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp
@@ -11783,8 +11783,10 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["WideFromBlocks"] = &WideFromBlocksWrapper;
Functions["WideSkipBlocks"] = &WideSkipTakeBlocksWrapper;
Functions["WideTakeBlocks"] = &WideSkipTakeBlocksWrapper;
- Functions["AsScalar"] = &AsScalarWrapper;
Functions["BlockCompress"] = &BlockCompressWrapper;
+ Functions["BlockExpandChunked"] = &BlockExpandChunkedWrapper;
+
+ Functions["AsScalar"] = &AsScalarWrapper;
Functions["BlockCoalesce"] = &BlockCoalesceWrapper;
Functions["BlockAnd"] = &BlockLogicalWrapper;
Functions["BlockOr"] = &BlockLogicalWrapper;
@@ -11792,6 +11794,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["BlockNot"] = &BlockLogicalWrapper;
ExtFunctions["BlockFunc"] = &BlockFuncWrapper;
ExtFunctions["BlockBitCast"] = &BlockBitCastWrapper;
+
ExtFunctions["BlockCombineAll"] = &BlockCombineAllWrapper;
ExtFunctions["BlockCombineHashed"] = &BlockCombineHashedWrapper;
ExtFunctions["BlockMergeFinalizeHashed"] = &BlockMergeFinalizeHashedWrapper;
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
index 958f421f2b..d999ea1987 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
@@ -268,10 +268,14 @@ public:
const TStringBuf str = value.AsStringRef();
size_t currentLen = DataBuilder->length();
- Y_VERIFY(currentLen <= MaxBlockSizeInBytes);
- if (str.size() > (MaxBlockSizeInBytes - currentLen)) {
- MKQL_ENSURE(str.size() < std::numeric_limits<typename TStringType::offset_type>::max(), "Too big string for Arrow");
- FlushChunk(false);
+ // empty string can always be appended
+ if (!str.empty() && currentLen + str.size() > MaxBlockSizeInBytes) {
+ if (currentLen) {
+ FlushChunk(false);
+ }
+ if (str.size() > MaxBlockSizeInBytes) {
+ ARROW_OK(DataBuilder->Reserve(str.size()));
+ }
}
NullBitmapBuilder->UnsafeAppend(true);
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
index daa5613e68..b4d3b045ad 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
@@ -9,6 +9,8 @@
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <arrow/scalar.h>
+#include <arrow/array.h>
+#include <arrow/datum.h>
namespace NKikimr {
namespace NMiniKQL {
@@ -239,7 +241,7 @@ public:
NUdf::TUnboxedValue*const* output) const
{
auto& s = GetState(state, ctx);
- if (s.Index_ == s.Count_) {
+ while (s.Index_ == s.Count_) {
auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data());
if (result != EFetchResult::One) {
return result;
@@ -408,6 +410,121 @@ private:
TType* Type_;
};
+class TBlockExpandChunkedWrapper : public TStatefulWideFlowComputationNode<TBlockExpandChunkedWrapper> {
+public:
+ TBlockExpandChunkedWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 width)
+ : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
+ , Flow_(flow)
+ , Width_(width)
+ {
+ }
+
+ EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx,
+ NUdf::TUnboxedValue*const* output) const
+ {
+ auto& s = GetState(state, ctx);
+ while (s.Count_ == 0) {
+ auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data());
+ if (result != EFetchResult::One) {
+ return result;
+ }
+
+ s.Count_ = TArrowBlock::From(s.Values_[Width_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
+ for (size_t i = 0; i < Width_; ++i) {
+ s.Arrays_[i].clear();
+ if (!output[i]) {
+ continue;
+ }
+ auto& datum = TArrowBlock::From(s.Values_[i]).GetDatum();
+ if (datum.is_scalar()) {
+ continue;
+ }
+ Y_VERIFY(datum.is_arraylike());
+ if (datum.is_array()) {
+ s.Arrays_[i].push_back(datum.array());
+ } else {
+ for (auto& chunk : datum.chunks()) {
+ s.Arrays_[i].push_back(chunk->data());
+ }
+ }
+ }
+ }
+
+ ui64 sliceSize = s.Count_;
+ for (size_t i = 0; i < Width_; ++i) {
+ if (s.Arrays_[i].empty()) {
+ continue;
+ }
+
+ Y_VERIFY(s.Arrays_[i].front()->length <= s.Count_);
+ sliceSize = std::min<ui64>(sliceSize, s.Arrays_[i].front()->length);
+ }
+
+ for (size_t i = 0; i < Width_; ++i) {
+ if (!output[i]) {
+ continue;
+ }
+
+ if (s.Arrays_[i].empty()) {
+ *(output[i]) = s.Values_[i];
+ continue;
+ }
+
+ auto& array = s.Arrays_[i].front();
+ Y_VERIFY(array->length >= sliceSize);
+ if (array->length == sliceSize) {
+ *(output[i]) = ctx.HolderFactory.CreateArrowBlock(std::move(array));
+ s.Arrays_[i].pop_front();
+ } else {
+ auto sliced = array->Slice(0, sliceSize);
+ array = array->Slice(sliceSize, array->length - sliceSize);
+ *(output[i]) = ctx.HolderFactory.CreateArrowBlock(std::move(sliced));
+ }
+ }
+
+ MKQL_ENSURE(output[Width_], "Block size is unused");
+ *(output[Width_]) = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize)));
+ s.Count_ -= sliceSize;
+ return EFetchResult::One;
+ }
+
+private:
+ struct TState : public TComputationValue<TState> {
+ TVector<NUdf::TUnboxedValue> Values_;
+ TVector<NUdf::TUnboxedValue*> ValuePointers_;
+ TVector<TDeque<std::shared_ptr<arrow::ArrayData>>> Arrays_;
+ ui64 Count_ = 0;
+
+ TState(TMemoryUsageInfo* memInfo, size_t width, TComputationContext& ctx)
+ : TComputationValue(memInfo)
+ , Values_(width + 1)
+ , ValuePointers_(width + 1)
+ , Arrays_(width)
+ {
+ for (size_t i = 0; i < width + 1; ++i) {
+ ValuePointers_[i] = &Values_[i];
+ }
+ }
+ };
+
+private:
+ void RegisterDependencies() const final {
+ FlowDependsOn(Flow_);
+ }
+
+ TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
+ if (!state.HasValue()) {
+ state = ctx.HolderFactory.Create<TState>(Width_, ctx);
+ }
+ return *static_cast<TState*>(state.AsBoxed().Get());
+ }
+
+private:
+ IComputationWideFlowNode* Flow_;
+ const size_t Width_;
+};
+
+
} // namespace
IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
@@ -464,5 +581,18 @@ IComputationNode* WrapAsScalar(TCallable& callable, const TComputationNodeFactor
return new TAsScalarWrapper(ctx.Mutables, LocateNode(ctx.NodeLocator, callable, 0), callable.GetInput(0).GetStaticType());
}
+IComputationNode* WrapBlockExpandChunked(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
+
+ const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
+ const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
+ MKQL_ENSURE(tupleType->GetElementsCount() > 0, "Expected at least one column");
+
+ auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
+ MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
+
+ return new TBlockExpandChunkedWrapper(ctx.Mutables, wideFlow, tupleType->GetElementsCount() - 1);
+}
+
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.h b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.h
index 2b5adef159..f0568a7570 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.h
@@ -10,6 +10,7 @@ IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFa
IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapAsScalar(TCallable& callable, const TComputationNodeFactoryContext& ctx);
+IComputationNode* WrapBlockExpandChunked(TCallable& callable, const TComputationNodeFactoryContext& ctx);
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
index a6d4882ac3..fc79af0349 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
@@ -281,6 +281,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
{"BlockXor", &WrapBlockXor},
{"BlockNot", &WrapBlockNot},
{"BlockCompress", &WrapBlockCompress},
+ {"BlockExpandChunked", &WrapBlockExpandChunked},
{"BlockCombineAll", &WrapBlockCombineAll},
{"BlockCombineHashed", &WrapBlockCombineHashed},
{"BlockMergeFinalizeHashed", &WrapBlockMergeFinalizeHashed},
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
index 5409bffa5c..f2ddd99866 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
@@ -86,6 +86,104 @@ Y_UNIT_TEST(TestWideToBlocks) {
UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 30);
}
+namespace {
+void TestChunked(bool withBlockExpand) {
+ TSetup<false> setup;
+ auto& pb = *setup.PgmBuilder;
+
+ const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
+ const auto boolType = pb.NewDataType(NUdf::TDataType<bool>::Id);
+ const auto stringType = pb.NewDataType(NUdf::EDataSlot::String);
+ const auto utf8Type = pb.NewDataType(NUdf::EDataSlot::Utf8);
+
+ const auto tupleType = pb.NewTupleType({ui64Type, boolType, stringType, utf8Type});
+
+ TVector<TRuntimeNode> items;
+ const size_t bigStrSize = 1024 * 1024 + 100;
+ const size_t smallStrSize = 256 * 1024;
+ for (size_t i = 0; i < 20; ++i) {
+
+ if (i % 2 == 0) {
+ std::string big(bigStrSize, '0' + i);
+ std::string small(smallStrSize, 'A' + i);
+
+ items.push_back(pb.NewTuple(tupleType, { pb.NewDataLiteral<ui64>(i), pb.NewDataLiteral<bool>(true),
+ pb.NewDataLiteral<NUdf::EDataSlot::String>(big),
+ pb.NewDataLiteral<NUdf::EDataSlot::Utf8>(small),
+ }));
+ } else {
+ items.push_back(pb.NewTuple(tupleType, { pb.NewDataLiteral<ui64>(i), pb.NewDataLiteral<bool>(false),
+ pb.NewDataLiteral<NUdf::EDataSlot::String>(""),
+ pb.NewDataLiteral<NUdf::EDataSlot::Utf8>(""),
+ }));
+
+ }
+ }
+
+ const auto list = pb.NewList(tupleType, std::move(items));
+
+ auto node = pb.ToFlow(list);
+ node = pb.ExpandMap(node, [&](TRuntimeNode item) -> TRuntimeNode::TList {
+ return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U), pb.Nth(item, 3U)};
+ });
+ node = pb.WideToBlocks(node);
+ if (withBlockExpand) {
+ node = pb.BlockExpandChunked(node);
+ // WideTakeBlocks won't work on chunked blocks
+ node = pb.WideTakeBlocks(node, pb.NewDataLiteral<ui64>(19));
+ node = pb.WideFromBlocks(node);
+ } else {
+ // WideFromBlocks should support chunked blocks
+ node = pb.WideFromBlocks(node);
+ node = pb.Take(node, pb.NewDataLiteral<ui64>(19));
+ }
+ node = pb.NarrowMap(node, [&](TRuntimeNode::TList items) -> TRuntimeNode {
+ return pb.NewTuple(tupleType, {items[0], items[1], items[2], items[3]});
+ });
+
+ const auto pgmReturn = pb.ForwardList(node);
+ const auto graph = setup.BuildGraph(pgmReturn);
+ const auto iterator = graph->GetValue().GetListIterator();
+
+ for (size_t i = 0; i < 19; ++i) {
+ NUdf::TUnboxedValue item;
+ UNIT_ASSERT(iterator.Next(item));
+ ui64 num = item.GetElement(0).Get<ui64>();
+ bool bl = item.GetElement(1).Get<bool>();
+ auto strVal = item.GetElement(2);
+ auto utf8Val = item.GetElement(3);
+ std::string_view str = strVal.AsStringRef();
+ std::string_view utf8 = utf8Val.AsStringRef();
+
+ UNIT_ASSERT_VALUES_EQUAL(num, i);
+ UNIT_ASSERT_VALUES_EQUAL(bl, i % 2 == 0);
+ if (i % 2 == 0) {
+ std::string big(bigStrSize, '0' + i);
+ std::string small(smallStrSize, 'A' + i);
+ UNIT_ASSERT_VALUES_EQUAL(str, big);
+ UNIT_ASSERT_VALUES_EQUAL(utf8, small);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(str.size(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(utf8.size(), 0);
+ }
+ }
+
+ NUdf::TUnboxedValue item;
+ UNIT_ASSERT(!iterator.Next(item));
+ UNIT_ASSERT(!iterator.Next(item));
+
+}
+
+} // namespace
+
+Y_UNIT_TEST(TestBlockExpandChunked) {
+ TestChunked(true);
+}
+
+Y_UNIT_TEST(TestWideFromBlocksForChunked) {
+ TestChunked(false);
+}
+
Y_UNIT_TEST(TestScalar) {
const ui64 testValue = 42;
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index 0f4ec2b575..6403fc25ec 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -1513,6 +1513,13 @@ TRuntimeNode TProgramBuilder::BlockCompress(TRuntimeNode flow, ui32 bitmapIndex)
return TRuntimeNode(callableBuilder.Build(), false);
}
+TRuntimeNode TProgramBuilder::BlockExpandChunked(TRuntimeNode flow) {
+ ValidateBlockFlowType(flow.GetStaticType());
+ TCallableBuilder callableBuilder(Env, __func__, flow.GetStaticType());
+ callableBuilder.Add(flow);
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
TRuntimeNode TProgramBuilder::BlockCoalesce(TRuntimeNode first, TRuntimeNode second) {
auto firstType = AS_TYPE(TBlockType, first.GetStaticType());
auto secondType = AS_TYPE(TBlockType, second.GetStaticType());
diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h
index 81f4b13e6a..407db818da 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.h
+++ b/ydb/library/yql/minikql/mkql_program_builder.h
@@ -247,6 +247,7 @@ public:
TRuntimeNode WideTakeBlocks(TRuntimeNode flow, TRuntimeNode count);
TRuntimeNode AsScalar(TRuntimeNode value);
TRuntimeNode BlockCompress(TRuntimeNode flow, ui32 bitmapIndex);
+ TRuntimeNode BlockExpandChunked(TRuntimeNode flow);
TRuntimeNode BlockCoalesce(TRuntimeNode first, TRuntimeNode second);
//-- logical functions