diff options
author | aneporada <aneporada@ydb.tech> | 2022-12-21 14:56:17 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2022-12-21 14:56:17 +0300 |
commit | 84ca810714ae4a9cb7c61bb07920b769c49ec31b (patch) | |
tree | 44ea70b534898c86281d13930eea0e2043b2f595 | |
parent | 10c915d6deca5ce9d5a6902f69b719d89402ffa5 (diff) | |
download | ydb-84ca810714ae4a9cb7c61bb07920b769c49ec31b.tar.gz |
New BlockExpandChunked runtime node. Fix block builder for strings, add tests
10 files changed, 282 insertions, 6 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp index e95cd3cd33..f220ce2a35 100644 --- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp @@ -76,6 +76,36 @@ IGraphTransformer::TStatus BlockCompressWrapper(const TExprNode::TPtr& input, TE return IGraphTransformer::TStatus::Ok; } +IGraphTransformer::TStatus BlockExpandChunkedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + if (!EnsureArgsCount(*input, 1U, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + TTypeAnnotationNode::TListType blockItemTypes; + const bool allowChunked = true; + if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr, allowChunked)) { + return IGraphTransformer::TStatus::Error; + } + + auto flowItemTypes = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems(); + bool hasChunked = false; + for (auto& flowItemType : flowItemTypes) { + if (flowItemType->GetKind() == ETypeAnnotationKind::ChunkedBlock) { + hasChunked = true; + flowItemType = ctx.Expr.MakeType<TBlockExprType>(flowItemType->Cast<TChunkedBlockExprType>()->GetItemType()); + } + } + + if (!hasChunked) { + output = input->HeadPtr(); + return IGraphTransformer::TStatus::Repeat; + } + + auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(flowItemTypes); + input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType)); + return IGraphTransformer::TStatus::Ok; +} + IGraphTransformer::TStatus BlockCoalesceWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { if (!EnsureArgsCount(*input, 2U, ctx.Expr)) { return IGraphTransformer::TStatus::Error; diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.h b/ydb/library/yql/core/type_ann/type_ann_blocks.h index e461142643..9e2364dacf 100644 --- a/ydb/library/yql/core/type_ann/type_ann_blocks.h +++ b/ydb/library/yql/core/type_ann/type_ann_blocks.h @@ -10,6 +10,7 @@ namespace NTypeAnnImpl { IGraphTransformer::TStatus AsScalarWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus BlockCompressWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); + IGraphTransformer::TStatus BlockExpandChunkedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus BlockCoalesceWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus BlockLogicalWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus BlockFuncWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index 361f97dec5..91c59447cd 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -11783,8 +11783,10 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["WideFromBlocks"] = &WideFromBlocksWrapper; Functions["WideSkipBlocks"] = &WideSkipTakeBlocksWrapper; Functions["WideTakeBlocks"] = &WideSkipTakeBlocksWrapper; - Functions["AsScalar"] = &AsScalarWrapper; Functions["BlockCompress"] = &BlockCompressWrapper; + Functions["BlockExpandChunked"] = &BlockExpandChunkedWrapper; + + Functions["AsScalar"] = &AsScalarWrapper; Functions["BlockCoalesce"] = &BlockCoalesceWrapper; Functions["BlockAnd"] = &BlockLogicalWrapper; Functions["BlockOr"] = &BlockLogicalWrapper; @@ -11792,6 +11794,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["BlockNot"] = &BlockLogicalWrapper; ExtFunctions["BlockFunc"] = &BlockFuncWrapper; ExtFunctions["BlockBitCast"] = &BlockBitCastWrapper; + ExtFunctions["BlockCombineAll"] = &BlockCombineAllWrapper; ExtFunctions["BlockCombineHashed"] = &BlockCombineHashedWrapper; ExtFunctions["BlockMergeFinalizeHashed"] = &BlockMergeFinalizeHashedWrapper; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp index 958f421f2b..d999ea1987 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp @@ -268,10 +268,14 @@ public: const TStringBuf str = value.AsStringRef(); size_t currentLen = DataBuilder->length(); - Y_VERIFY(currentLen <= MaxBlockSizeInBytes); - if (str.size() > (MaxBlockSizeInBytes - currentLen)) { - MKQL_ENSURE(str.size() < std::numeric_limits<typename TStringType::offset_type>::max(), "Too big string for Arrow"); - FlushChunk(false); + // empty string can always be appended + if (!str.empty() && currentLen + str.size() > MaxBlockSizeInBytes) { + if (currentLen) { + FlushChunk(false); + } + if (str.size() > MaxBlockSizeInBytes) { + ARROW_OK(DataBuilder->Reserve(str.size())); + } } NullBitmapBuilder->UnsafeAppend(true); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp index daa5613e68..b4d3b045ad 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp @@ -9,6 +9,8 @@ #include <ydb/library/yql/minikql/mkql_node_cast.h> #include <arrow/scalar.h> +#include <arrow/array.h> +#include <arrow/datum.h> namespace NKikimr { namespace NMiniKQL { @@ -239,7 +241,7 @@ public: NUdf::TUnboxedValue*const* output) const { auto& s = GetState(state, ctx); - if (s.Index_ == s.Count_) { + while (s.Index_ == s.Count_) { auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data()); if (result != EFetchResult::One) { return result; @@ -408,6 +410,121 @@ private: TType* Type_; }; +class TBlockExpandChunkedWrapper : public TStatefulWideFlowComputationNode<TBlockExpandChunkedWrapper> { +public: + TBlockExpandChunkedWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 width) + : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any) + , Flow_(flow) + , Width_(width) + { + } + + EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, + NUdf::TUnboxedValue*const* output) const + { + auto& s = GetState(state, ctx); + while (s.Count_ == 0) { + auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data()); + if (result != EFetchResult::One) { + return result; + } + + s.Count_ = TArrowBlock::From(s.Values_[Width_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; + for (size_t i = 0; i < Width_; ++i) { + s.Arrays_[i].clear(); + if (!output[i]) { + continue; + } + auto& datum = TArrowBlock::From(s.Values_[i]).GetDatum(); + if (datum.is_scalar()) { + continue; + } + Y_VERIFY(datum.is_arraylike()); + if (datum.is_array()) { + s.Arrays_[i].push_back(datum.array()); + } else { + for (auto& chunk : datum.chunks()) { + s.Arrays_[i].push_back(chunk->data()); + } + } + } + } + + ui64 sliceSize = s.Count_; + for (size_t i = 0; i < Width_; ++i) { + if (s.Arrays_[i].empty()) { + continue; + } + + Y_VERIFY(s.Arrays_[i].front()->length <= s.Count_); + sliceSize = std::min<ui64>(sliceSize, s.Arrays_[i].front()->length); + } + + for (size_t i = 0; i < Width_; ++i) { + if (!output[i]) { + continue; + } + + if (s.Arrays_[i].empty()) { + *(output[i]) = s.Values_[i]; + continue; + } + + auto& array = s.Arrays_[i].front(); + Y_VERIFY(array->length >= sliceSize); + if (array->length == sliceSize) { + *(output[i]) = ctx.HolderFactory.CreateArrowBlock(std::move(array)); + s.Arrays_[i].pop_front(); + } else { + auto sliced = array->Slice(0, sliceSize); + array = array->Slice(sliceSize, array->length - sliceSize); + *(output[i]) = ctx.HolderFactory.CreateArrowBlock(std::move(sliced)); + } + } + + MKQL_ENSURE(output[Width_], "Block size is unused"); + *(output[Width_]) = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize))); + s.Count_ -= sliceSize; + return EFetchResult::One; + } + +private: + struct TState : public TComputationValue<TState> { + TVector<NUdf::TUnboxedValue> Values_; + TVector<NUdf::TUnboxedValue*> ValuePointers_; + TVector<TDeque<std::shared_ptr<arrow::ArrayData>>> Arrays_; + ui64 Count_ = 0; + + TState(TMemoryUsageInfo* memInfo, size_t width, TComputationContext& ctx) + : TComputationValue(memInfo) + , Values_(width + 1) + , ValuePointers_(width + 1) + , Arrays_(width) + { + for (size_t i = 0; i < width + 1; ++i) { + ValuePointers_[i] = &Values_[i]; + } + } + }; + +private: + void RegisterDependencies() const final { + FlowDependsOn(Flow_); + } + + TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { + if (!state.HasValue()) { + state = ctx.HolderFactory.Create<TState>(Width_, ctx); + } + return *static_cast<TState*>(state.AsBoxed().Get()); + } + +private: + IComputationWideFlowNode* Flow_; + const size_t Width_; +}; + + } // namespace IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { @@ -464,5 +581,18 @@ IComputationNode* WrapAsScalar(TCallable& callable, const TComputationNodeFactor return new TAsScalarWrapper(ctx.Mutables, LocateNode(ctx.NodeLocator, callable, 0), callable.GetInput(0).GetStaticType()); } +IComputationNode* WrapBlockExpandChunked(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); + + const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); + const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); + MKQL_ENSURE(tupleType->GetElementsCount() > 0, "Expected at least one column"); + + auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); + MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); + + return new TBlockExpandChunkedWrapper(ctx.Mutables, wideFlow, tupleType->GetElementsCount() - 1); +} + } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.h b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.h index 2b5adef159..f0568a7570 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.h @@ -10,6 +10,7 @@ IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFa IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); IComputationNode* WrapAsScalar(TCallable& callable, const TComputationNodeFactoryContext& ctx); +IComputationNode* WrapBlockExpandChunked(TCallable& callable, const TComputationNodeFactoryContext& ctx); } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp index a6d4882ac3..fc79af0349 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp @@ -281,6 +281,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller { {"BlockXor", &WrapBlockXor}, {"BlockNot", &WrapBlockNot}, {"BlockCompress", &WrapBlockCompress}, + {"BlockExpandChunked", &WrapBlockExpandChunked}, {"BlockCombineAll", &WrapBlockCombineAll}, {"BlockCombineHashed", &WrapBlockCombineHashed}, {"BlockMergeFinalizeHashed", &WrapBlockMergeFinalizeHashed}, diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp index 5409bffa5c..f2ddd99866 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp @@ -86,6 +86,104 @@ Y_UNIT_TEST(TestWideToBlocks) { UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 30); } +namespace { +void TestChunked(bool withBlockExpand) { + TSetup<false> setup; + auto& pb = *setup.PgmBuilder; + + const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id); + const auto boolType = pb.NewDataType(NUdf::TDataType<bool>::Id); + const auto stringType = pb.NewDataType(NUdf::EDataSlot::String); + const auto utf8Type = pb.NewDataType(NUdf::EDataSlot::Utf8); + + const auto tupleType = pb.NewTupleType({ui64Type, boolType, stringType, utf8Type}); + + TVector<TRuntimeNode> items; + const size_t bigStrSize = 1024 * 1024 + 100; + const size_t smallStrSize = 256 * 1024; + for (size_t i = 0; i < 20; ++i) { + + if (i % 2 == 0) { + std::string big(bigStrSize, '0' + i); + std::string small(smallStrSize, 'A' + i); + + items.push_back(pb.NewTuple(tupleType, { pb.NewDataLiteral<ui64>(i), pb.NewDataLiteral<bool>(true), + pb.NewDataLiteral<NUdf::EDataSlot::String>(big), + pb.NewDataLiteral<NUdf::EDataSlot::Utf8>(small), + })); + } else { + items.push_back(pb.NewTuple(tupleType, { pb.NewDataLiteral<ui64>(i), pb.NewDataLiteral<bool>(false), + pb.NewDataLiteral<NUdf::EDataSlot::String>(""), + pb.NewDataLiteral<NUdf::EDataSlot::Utf8>(""), + })); + + } + } + + const auto list = pb.NewList(tupleType, std::move(items)); + + auto node = pb.ToFlow(list); + node = pb.ExpandMap(node, [&](TRuntimeNode item) -> TRuntimeNode::TList { + return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U), pb.Nth(item, 3U)}; + }); + node = pb.WideToBlocks(node); + if (withBlockExpand) { + node = pb.BlockExpandChunked(node); + // WideTakeBlocks won't work on chunked blocks + node = pb.WideTakeBlocks(node, pb.NewDataLiteral<ui64>(19)); + node = pb.WideFromBlocks(node); + } else { + // WideFromBlocks should support chunked blocks + node = pb.WideFromBlocks(node); + node = pb.Take(node, pb.NewDataLiteral<ui64>(19)); + } + node = pb.NarrowMap(node, [&](TRuntimeNode::TList items) -> TRuntimeNode { + return pb.NewTuple(tupleType, {items[0], items[1], items[2], items[3]}); + }); + + const auto pgmReturn = pb.ForwardList(node); + const auto graph = setup.BuildGraph(pgmReturn); + const auto iterator = graph->GetValue().GetListIterator(); + + for (size_t i = 0; i < 19; ++i) { + NUdf::TUnboxedValue item; + UNIT_ASSERT(iterator.Next(item)); + ui64 num = item.GetElement(0).Get<ui64>(); + bool bl = item.GetElement(1).Get<bool>(); + auto strVal = item.GetElement(2); + auto utf8Val = item.GetElement(3); + std::string_view str = strVal.AsStringRef(); + std::string_view utf8 = utf8Val.AsStringRef(); + + UNIT_ASSERT_VALUES_EQUAL(num, i); + UNIT_ASSERT_VALUES_EQUAL(bl, i % 2 == 0); + if (i % 2 == 0) { + std::string big(bigStrSize, '0' + i); + std::string small(smallStrSize, 'A' + i); + UNIT_ASSERT_VALUES_EQUAL(str, big); + UNIT_ASSERT_VALUES_EQUAL(utf8, small); + } else { + UNIT_ASSERT_VALUES_EQUAL(str.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(utf8.size(), 0); + } + } + + NUdf::TUnboxedValue item; + UNIT_ASSERT(!iterator.Next(item)); + UNIT_ASSERT(!iterator.Next(item)); + +} + +} // namespace + +Y_UNIT_TEST(TestBlockExpandChunked) { + TestChunked(true); +} + +Y_UNIT_TEST(TestWideFromBlocksForChunked) { + TestChunked(false); +} + Y_UNIT_TEST(TestScalar) { const ui64 testValue = 42; diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 0f4ec2b575..6403fc25ec 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -1513,6 +1513,13 @@ TRuntimeNode TProgramBuilder::BlockCompress(TRuntimeNode flow, ui32 bitmapIndex) return TRuntimeNode(callableBuilder.Build(), false); } +TRuntimeNode TProgramBuilder::BlockExpandChunked(TRuntimeNode flow) { + ValidateBlockFlowType(flow.GetStaticType()); + TCallableBuilder callableBuilder(Env, __func__, flow.GetStaticType()); + callableBuilder.Add(flow); + return TRuntimeNode(callableBuilder.Build(), false); +} + TRuntimeNode TProgramBuilder::BlockCoalesce(TRuntimeNode first, TRuntimeNode second) { auto firstType = AS_TYPE(TBlockType, first.GetStaticType()); auto secondType = AS_TYPE(TBlockType, second.GetStaticType()); diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 81f4b13e6a..407db818da 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -247,6 +247,7 @@ public: TRuntimeNode WideTakeBlocks(TRuntimeNode flow, TRuntimeNode count); TRuntimeNode AsScalar(TRuntimeNode value); TRuntimeNode BlockCompress(TRuntimeNode flow, ui32 bitmapIndex); + TRuntimeNode BlockExpandChunked(TRuntimeNode flow); TRuntimeNode BlockCoalesce(TRuntimeNode first, TRuntimeNode second); //-- logical functions |