diff options
author | ziganshinmr <ziganshinmr@yandex-team.com> | 2025-01-10 17:23:43 +0300 |
---|---|---|
committer | ziganshinmr <ziganshinmr@yandex-team.com> | 2025-01-10 17:39:29 +0300 |
commit | f3443af4eb64d5b450497b61625932d66a27dfc8 (patch) | |
tree | 3559af21c163f1a2ef04a21fcbb94b2ca0bc0b20 | |
parent | 9729f7ea47301feb1fa702b0aa3ab98198c5e3a2 (diff) | |
download | ydb-f3443af4eb64d5b450497b61625932d66a27dfc8.tar.gz |
Block trimmer and its usage in BlockMapJoinCore
commit_hash:568373541db82f01bd26ce36651f8dbb92a007e1
17 files changed, 917 insertions, 80 deletions
diff --git a/yql/essentials/minikql/arrow/arrow_util.h b/yql/essentials/minikql/arrow/arrow_util.h index efb071ff71..083ca37141 100644 --- a/yql/essentials/minikql/arrow/arrow_util.h +++ b/yql/essentials/minikql/arrow/arrow_util.h @@ -22,6 +22,8 @@ std::shared_ptr<arrow::ArrayData> Unwrap(const arrow::ArrayData& data, TType* it using NYql::NUdf::AllocateBitmapWithReserve; using NYql::NUdf::MakeDenseBitmap; +using NYql::NUdf::MakeDenseBitmapCopy; +using NYql::NUdf::MakeDenseFalseBitmap; inline arrow::internal::Bitmap GetBitmap(const arrow::ArrayData& arr, int index) { return arrow::internal::Bitmap{ arr.buffers[index], arr.offset, arr.length }; diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp index 827e295db6..847e4409b6 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp @@ -3,6 +3,7 @@ #include <yql/essentials/minikql/computation/mkql_block_builder.h> #include <yql/essentials/minikql/computation/mkql_block_impl.h> #include <yql/essentials/minikql/computation/mkql_block_reader.h> +#include <yql/essentials/minikql/computation/mkql_block_trimmer.h> #include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h> #include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h> #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h> @@ -329,12 +330,12 @@ public: public: TIterator() = default; - TIterator(TBlockIndex* blockIndex) + TIterator(const TBlockIndex* blockIndex) : Type_(EIteratorType::EMPTY) , BlockIndex_(blockIndex) {} - TIterator(TBlockIndex* blockIndex, TIndexEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) + TIterator(const TBlockIndex* blockIndex, TIndexEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) : Type_(EIteratorType::INPLACE) , BlockIndex_(blockIndex) , Entry_(entry) @@ -342,7 +343,7 @@ public: , ItemsToLookup_(std::move(itemsToLookup)) {} - TIterator(TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) + TIterator(const TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) : Type_(EIteratorType::LIST) , BlockIndex_(blockIndex) , Node_(node) @@ -432,7 +433,7 @@ public: private: EIteratorType Type_; - TBlockIndex* BlockIndex_ = nullptr; + const TBlockIndex* BlockIndex_ = nullptr; union { TIndexNode* Node_; @@ -451,7 +452,8 @@ public: const TVector<TType*>& itemTypes, const TVector<ui32>& keyColumns, NUdf::TUnboxedValue stream, - bool any + bool any, + arrow::MemoryPool* pool ) : TBase(memInfo) , InputsDescr_(ToValueDescr(itemTypes)) @@ -466,6 +468,7 @@ public: Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType)); Hashers_.push_back(helper.MakeHasher(blockItemType)); Comparators_.push_back(helper.MakeComparator(blockItemType)); + Trimmers_.push_back(MakeBlockTrimmer(TTypeInfoHelper(), blockItemType, pool)); } } @@ -484,7 +487,12 @@ public: for (size_t i = 0; i < Inputs_.size() - 1; i++) { auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum(); ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr()); - block.push_back(std::move(datum)); + if (datum.is_scalar()) { + block.push_back(datum); + } else { + MKQL_ENSURE(datum.is_array(), "Expecting array"); + block.push_back(Trimmers_[i]->Trim(datum.array())); + } } Data_.push_back(std::move(block)); } @@ -565,7 +573,7 @@ public: return; } - auto value = static_cast<TIndexMapValue*>(Index_.GetMutablePayload(iter)); + auto value = static_cast<const TIndexMapValue*>(Index_.GetPayload(iter)); if (value->IsInplace()) { iterators[i] = TIterator(this, value->GetEntry(), std::move(itemsBatch[i])); } else { @@ -574,23 +582,20 @@ public: }); } - TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) { + TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) const { Y_ENSURE(entry.BlockOffset < Data_.size()); Y_ENSURE(columnIdx < Inputs_.size() - 1); - - auto& datum = Data_[entry.BlockOffset][columnIdx]; - MKQL_ENSURE(datum.is_array(), "Expecting array"); - return Readers_[columnIdx]->GetItem(*datum.array(), entry.ItemOffset); + return GetItemFromBlock(Data_[entry.BlockOffset], columnIdx, entry.ItemOffset); } - void GetRow(TIndexEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) { + void GetRow(TIndexEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const { Y_ENSURE(row.size() == ioMap.size()); for (size_t i = 0; i < row.size(); i++) { row[i] = GetItem(entry, ioMap[i]); } } - bool IsKeyEquals(TIndexEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) { + bool IsKeyEquals(TIndexEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const { Y_ENSURE(keyItems.size() == KeyColumns_.size()); for (size_t i = 0; i < KeyColumns_.size(); i++) { auto indexItem = GetItem(entry, KeyColumns_[i]); @@ -607,10 +612,7 @@ private: ui64 keyHash = 0; keyItems.clear(); for (ui32 keyColumn : KeyColumns_) { - auto& datum = block[keyColumn]; - MKQL_ENSURE(datum.is_array(), "Expecting array"); - - auto item = Readers_[keyColumn]->GetItem(*datum.array(), offset); + auto item = GetItemFromBlock(block, keyColumn, offset); if (!item) { keyItems.clear(); return 0; @@ -623,11 +625,21 @@ private: return keyHash; } + TBlockItem GetItemFromBlock(const std::vector<arrow::Datum>& block, ui32 columnIdx, size_t offset) const { + const auto& datum = block[columnIdx]; + if (datum.is_scalar()) { + return Readers_[columnIdx]->GetScalarItem(*datum.scalar()); + } else { + MKQL_ENSURE(datum.is_array(), "Expecting array"); + return Readers_[columnIdx]->GetItem(*datum.array(), offset); + } + } + TIndexNode* InsertIndexNode(TIndexEntry entry, TIndexNode* currentHead = nullptr) { return &IndexNodes_.emplace_back(entry, currentHead); } - bool ContainsKey(const TIndexMapValue* chain, const std::vector<NYql::NUdf::TBlockItem>& keyItems) { + bool ContainsKey(const TIndexMapValue* chain, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const { if (chain->IsInplace()) { return IsKeyEquals(chain->GetEntry(), keyItems); } else { @@ -650,6 +662,7 @@ private: TVector<std::unique_ptr<IBlockReader>> Readers_; TVector<NUdf::IBlockItemHasher::TPtr> Hashers_; TVector<NUdf::IBlockItemComparator::TPtr> Comparators_; + TVector<IBlockTrimmer::TPtr> Trimmers_; std::vector<std::vector<arrow::Datum>> Data_; @@ -705,7 +718,8 @@ public: RightItemTypes_, RightKeyColumns_, std::move(RightStream_->GetValue(ctx)), - RightAny + RightAny, + &ctx.ArrowMemoryPool ); return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory, diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp index 214b7ae8ff..ca710d0328 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp @@ -96,19 +96,19 @@ TRuntimeNode BuildBlockJoin(TProgramBuilder& pgmBuilder, EJoinKind joinKind, NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup, TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops, TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector<ui32>& rightKeyColumns, const TVector<ui32>& rightKeyDrops, bool rightAny, - EJoinKind joinKind, size_t blockSize + EJoinKind joinKind, size_t blockSize, bool scalar ) { TProgramBuilder& pb = *setup.PgmBuilder; Y_ENSURE(leftType->IsList(), "Left node has to be list"); const auto leftItemType = AS_TYPE(TListType, leftType)->GetItemType(); Y_ENSURE(leftItemType->IsTuple(), "List item has to be tuple"); - TType* leftBlockType = MakeBlockTupleType(pb, leftItemType); + TType* leftBlockType = MakeBlockTupleType(pb, leftItemType, scalar); Y_ENSURE(rightType->IsList(), "Right node has to be list"); const auto rightItemType = AS_TYPE(TListType, rightType)->GetItemType(); Y_ENSURE(rightItemType->IsTuple(), "Right item has to be tuple"); - TType* rightBlockType = MakeBlockTupleType(pb, rightItemType); + TType* rightBlockType = MakeBlockTupleType(pb, rightItemType, scalar); TRuntimeNode leftList = pb.Arg(pb.NewListType(leftBlockType)); TRuntimeNode rightList = pb.Arg(pb.NewListType(rightBlockType)); @@ -122,8 +122,18 @@ NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup, const auto graph = setup.BuildGraph(joinNode, {leftList.GetNode(), rightList.GetNode()}); auto& ctx = graph->GetContext(); - graph->GetEntryPoint(0, true)->SetValue(ctx, ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, leftItemType)->GetElements(), std::move(leftListValue))); - graph->GetEntryPoint(1, true)->SetValue(ctx, ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, rightItemType)->GetElements(), std::move(rightListValue))); + + NUdf::TUnboxedValuePod leftBlockListValue, rightBlockListValue; + if (scalar) { + leftBlockListValue = MakeUint64ScalarBlock(ctx, blockSize, AS_TYPE(TTupleType, leftItemType)->GetElements(), std::move(leftListValue)); + rightBlockListValue = MakeUint64ScalarBlock(ctx, blockSize, AS_TYPE(TTupleType, rightItemType)->GetElements(), std::move(rightListValue)); + } else { + leftBlockListValue = ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, leftItemType)->GetElements(), std::move(leftListValue)); + rightBlockListValue = ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, rightItemType)->GetElements(), std::move(rightListValue)); + } + + graph->GetEntryPoint(0, true)->SetValue(ctx, leftBlockListValue); + graph->GetEntryPoint(1, true)->SetValue(ctx, rightBlockListValue); return FromBlocks(ctx, AS_TYPE(TTupleType, joinItemType)->GetElements(), graph->GetValue()); } @@ -131,14 +141,15 @@ void RunTestBlockJoin(TSetup<false>& setup, EJoinKind joinKind, TType* expectedType, const NUdf::TUnboxedValue& expected, TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector<ui32>& leftKeyColumns, TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector<ui32>& rightKeyColumns, - const TVector<ui32>& leftKeyDrops = {}, const TVector<ui32>& rightKeyDrops = {}, bool rightAny = false + const TVector<ui32>& leftKeyDrops = {}, const TVector<ui32>& rightKeyDrops = {}, + bool rightAny = false, bool scalar = false ) { const size_t testSize = leftListValue.GetListLength(); for (size_t blockSize = 1; blockSize <= testSize; blockSize <<= 1) { const auto got = DoTestBlockJoin(setup, leftType, std::move(leftListValue), leftKeyColumns, leftKeyDrops, rightType, std::move(rightListValue), rightKeyColumns, rightKeyDrops, rightAny, - joinKind, blockSize + joinKind, blockSize, scalar ); CompareResults(expectedType, expected, got); } @@ -685,6 +696,52 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTestBasic) { ); } + Y_UNIT_TEST(TestScalar) { + TSetup<false> setup(GetNodeFactory()); + const size_t testSize = 1 << 7; + + // 1. Make input for the "left" stream. + TVector<ui64> leftKeyInit(testSize, 1); + TVector<ui64> leftSubkeyInit(testSize, 2); + TVector<ui64> leftValueInit(testSize, 3); + + // 2. Make input for the "right" stream. + TVector<ui64> rightKeyInit(testSize, 1); + TVector<ui64> rightValueInit(testSize, 2); + + // 3. Make "expected" data. + TMultiMap<ui64, ui64> rightMap; + for (size_t i = 0; i < testSize; i++) { + rightMap.insert({rightKeyInit[i], rightValueInit[i]}); + } + TVector<ui64> expectedKey; + TVector<ui64> expectedSubkey; + TVector<ui64> expectedValue; + TVector<ui64> expectedRightValue; + for (size_t i = 0; i < testSize; i++) { + const auto& [begin, end] = rightMap.equal_range(leftKeyInit[i]); + for (auto it = begin; it != end; it++) { + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); + expectedRightValue.push_back(it->second); + } + } + + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKeyInit, leftSubkeyInit, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, + rightKeyInit, rightValueInit); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedKey, expectedSubkey, expectedValue, expectedRightValue); + + RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected, + leftType, std::move(leftList), {0}, + rightType, std::move(rightList), {0}, + {}, {0}, false, true + ); + } + } // Y_UNIT_TEST_SUITE Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTestOptional) { diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp index 2a65da92ed..fdd65bbba2 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp @@ -137,7 +137,7 @@ IComputationNode* WrapWideStreamDethrottler(TCallable& callable, const TComputat } -TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType) { +TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType, bool scalar) { const auto itemTypes = AS_TYPE(TTupleType, tupleType)->GetElements(); const auto ui64Type = pgmBuilder.NewDataType(NUdf::TDataType<ui64>::Id); const auto blockLenType = pgmBuilder.NewBlockType(ui64Type, TBlockType::EShape::Scalar); @@ -145,7 +145,7 @@ TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType) { TVector<TType*> blockItemTypes; std::transform(itemTypes.cbegin(), itemTypes.cend(), std::back_inserter(blockItemTypes), [&](const auto& itemType) { - return pgmBuilder.NewBlockType(itemType, TBlockType::EShape::Many); + return pgmBuilder.NewBlockType(itemType, scalar ? TBlockType::EShape::Scalar : TBlockType::EShape::Many); }); // XXX: Mind the last block length column. blockItemTypes.push_back(blockLenType); @@ -204,6 +204,37 @@ NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize, return holderFactory.CreateDirectListHolder(std::move(listValues)); } +NUdf::TUnboxedValuePod MakeUint64ScalarBlock(TComputationContext& ctx, size_t blockSize, + const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values +) { + // Creates a block of scalar values using the first element of the given list + + for (auto type : types) { + // Because IScalarBuilder has no implementations + Y_ENSURE(AS_TYPE(TDataType, type)->GetDataSlot() == NYql::NUdf::EDataSlot::Uint64); + } + + const auto& holderFactory = ctx.HolderFactory; + const size_t width = types.size(); + const size_t rowsCount = values.GetListLength(); + + NUdf::TUnboxedValue row; + Y_ENSURE(values.GetListIterator().Next(row)); + TDefaultListRepresentation listValues; + for (size_t rowOffset = 0; rowOffset < rowsCount; rowOffset += blockSize) { + NUdf::TUnboxedValue* items = nullptr; + const auto tuple = holderFactory.CreateDirectArrayHolder(width + 1, items); + for (size_t i = 0; i < width; i++) { + const NUdf::TUnboxedValuePod& item = row.GetElement(i); + items[i] = holderFactory.CreateArrowBlock(arrow::Datum(static_cast<uint64_t>(item.Get<ui64>()))); + } + items[width] = MakeBlockCount(holderFactory, std::min(blockSize, rowsCount - rowOffset)); + listValues = listValues.Append(std::move(tuple)); + } + + return holderFactory.CreateDirectListHolder(std::move(listValues)); +} + NUdf::TUnboxedValuePod FromBlocks(TComputationContext& ctx, const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values ) { diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h index 2c2f32b312..ecff426c92 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h @@ -9,10 +9,12 @@ inline bool IsOptionalOrNull(const TType* type) { return type->IsOptional() || type->IsNull() || type->IsPg(); } -TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType); +TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType, bool scalar); NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize, const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values); +NUdf::TUnboxedValuePod MakeUint64ScalarBlock(TComputationContext& ctx, size_t blockSize, + const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values); NUdf::TUnboxedValuePod FromBlocks(TComputationContext& ctx, const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values); diff --git a/yql/essentials/minikql/computation/mkql_block_trimmer.cpp b/yql/essentials/minikql/computation/mkql_block_trimmer.cpp new file mode 100644 index 0000000000..c6f0f9109a --- /dev/null +++ b/yql/essentials/minikql/computation/mkql_block_trimmer.cpp @@ -0,0 +1,251 @@ +#include "mkql_block_trimmer.h" + +#include <yql/essentials/minikql/arrow/arrow_util.h> +#include <yql/essentials/public/decimal/yql_decimal.h> +#include <yql/essentials/public/udf/arrow/block_reader.h> +#include <yql/essentials/public/udf/arrow/defs.h> +#include <yql/essentials/public/udf/arrow/util.h> +#include <yql/essentials/public/udf/udf_type_inspection.h> +#include <yql/essentials/public/udf/udf_value.h> +#include <yql/essentials/public/udf/udf_value_builder.h> +#include <yql/essentials/utils/yql_panic.h> + +#include <arrow/array/data.h> +#include <arrow/datum.h> + +namespace NKikimr::NMiniKQL { + +class TBlockTrimmerBase : public IBlockTrimmer { +protected: + TBlockTrimmerBase(arrow::MemoryPool* pool) + : Pool_(pool) + {} + + TBlockTrimmerBase() = delete; + + std::shared_ptr<arrow::Buffer> TrimNullBitmap(const std::shared_ptr<arrow::ArrayData>& array) { + auto& nullBitmapBuffer = array->buffers[0]; + + std::shared_ptr<arrow::Buffer> result; + auto nullCount = array->GetNullCount(); + if (nullCount == array->length) { + result = MakeDenseFalseBitmap(array->length, Pool_); + } else if (nullCount > 0) { + result = MakeDenseBitmapCopy(nullBitmapBuffer->data(), array->length, array->offset, Pool_); + } + + return result; + } + +protected: + arrow::MemoryPool* Pool_; +}; + +template<typename TLayout, bool Nullable> +class TFixedSizeBlockTrimmer : public TBlockTrimmerBase { +public: + TFixedSizeBlockTrimmer(arrow::MemoryPool* pool) + : TBlockTrimmerBase(pool) + {} + + std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override { + Y_ENSURE(array->buffers.size() == 2); + Y_ENSURE(array->child_data.empty()); + + std::shared_ptr<arrow::Buffer> trimmedNullBitmap; + if constexpr (Nullable) { + trimmedNullBitmap = TrimNullBitmap(array); + } + + auto origData = array->GetValues<TLayout>(1); + auto dataSize = sizeof(TLayout) * array->length; + + auto trimmedDataBuffer = NUdf::AllocateResizableBuffer(dataSize, Pool_); + memcpy(trimmedDataBuffer->mutable_data(), origData, dataSize); + + return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap), std::move(trimmedDataBuffer)}, array->GetNullCount()); + } +}; + +template<bool Nullable> +class TResourceBlockTrimmer : public TBlockTrimmerBase { +public: + TResourceBlockTrimmer(arrow::MemoryPool* pool) + : TBlockTrimmerBase(pool) + {} + + std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override { + Y_ENSURE(array->buffers.size() == 2); + Y_ENSURE(array->child_data.empty()); + + std::shared_ptr<arrow::Buffer> trimmedNullBitmap; + if constexpr (Nullable) { + trimmedNullBitmap = TrimNullBitmap(array); + } + + auto origData = array->GetValues<NUdf::TUnboxedValue>(1); + auto dataSize = sizeof(NUdf::TUnboxedValue) * array->length; + + auto trimmedBuffer = NUdf::AllocateResizableBuffer<NUdf::TResizableManagedBuffer<NUdf::TUnboxedValue>>(dataSize, Pool_); + ARROW_OK(trimmedBuffer->Resize(dataSize)); + auto trimmedBufferData = reinterpret_cast<NUdf::TUnboxedValue*>(trimmedBuffer->mutable_data()); + + for (int64_t i = 0; i < array->length; i++) { + ::new(&trimmedBufferData[i]) NUdf::TUnboxedValue(origData[i]); + } + + return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap), std::move(trimmedBuffer)}, array->GetNullCount()); + } +}; + +template<typename TStringType, bool Nullable> +class TStringBlockTrimmer : public TBlockTrimmerBase { + using TOffset = typename TStringType::offset_type; + +public: + TStringBlockTrimmer(arrow::MemoryPool* pool) + : TBlockTrimmerBase(pool) + {} + + std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override { + Y_ENSURE(array->buffers.size() == 3); + Y_ENSURE(array->child_data.empty()); + + std::shared_ptr<arrow::Buffer> trimmedNullBitmap; + if constexpr (Nullable) { + trimmedNullBitmap = TrimNullBitmap(array); + } + + auto origOffsetData = array->GetValues<TOffset>(1); + auto origStringData = reinterpret_cast<const char*>(array->buffers[2]->data() + origOffsetData[0]); + auto stringDataSize = origOffsetData[array->length] - origOffsetData[0]; + + auto trimmedOffsetBuffer = NUdf::AllocateResizableBuffer(sizeof(TOffset) * (array->length + 1), Pool_); + auto trimmedStringBuffer = NUdf::AllocateResizableBuffer(stringDataSize, Pool_); + + auto trimmedOffsetBufferData = reinterpret_cast<TOffset*>(trimmedOffsetBuffer->mutable_data()); + auto trimmedStringBufferData = reinterpret_cast<char*>(trimmedStringBuffer->mutable_data()); + + for (int64_t i = 0; i < array->length + 1; i++) { + trimmedOffsetBufferData[i] = origOffsetData[i] - origOffsetData[0]; + } + memcpy(trimmedStringBufferData, origStringData, stringDataSize); + + return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap), std::move(trimmedOffsetBuffer), std::move(trimmedStringBuffer)}, array->GetNullCount()); + } +}; + +template<bool Nullable> +class TTupleBlockTrimmer : public TBlockTrimmerBase { +public: + TTupleBlockTrimmer(std::vector<IBlockTrimmer::TPtr> children, arrow::MemoryPool* pool) + : TBlockTrimmerBase(pool) + , Children_(std::move(children)) + {} + + std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override { + Y_ENSURE(array->buffers.size() == 1); + + std::shared_ptr<arrow::Buffer> trimmedNullBitmap; + if constexpr (Nullable) { + trimmedNullBitmap = TrimNullBitmap(array); + } + + std::vector<std::shared_ptr<arrow::ArrayData>> trimmedChildren; + Y_ENSURE(array->child_data.size() == Children_.size()); + for (size_t i = 0; i < Children_.size(); i++) { + trimmedChildren.push_back(Children_[i]->Trim(array->child_data[i])); + } + + return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap)}, std::move(trimmedChildren), array->GetNullCount()); + } + +protected: + TTupleBlockTrimmer(arrow::MemoryPool* pool) + : TBlockTrimmerBase(pool) + {} + +protected: + std::vector<IBlockTrimmer::TPtr> Children_; +}; + +template<typename TDate, bool Nullable> +class TTzDateBlockTrimmer : public TTupleBlockTrimmer<Nullable> { + using TBase = TTupleBlockTrimmer<Nullable>; + using TDateLayout = typename NUdf::TDataType<TDate>::TLayout; + +public: + TTzDateBlockTrimmer(arrow::MemoryPool* pool) + : TBase(pool) + { + this->Children_.push_back(std::make_unique<TFixedSizeBlockTrimmer<TDateLayout, false>>(pool)); + this->Children_.push_back(std::make_unique<TFixedSizeBlockTrimmer<ui16, false>>(pool)); + } +}; + +class TExternalOptionalBlockTrimmer : public TBlockTrimmerBase { +public: + TExternalOptionalBlockTrimmer(IBlockTrimmer::TPtr inner, arrow::MemoryPool* pool) + : TBlockTrimmerBase(pool) + , Inner_(std::move(inner)) + {} + + std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override { + Y_ENSURE(array->buffers.size() == 1); + Y_ENSURE(array->child_data.size() == 1); + + auto trimmedNullBitmap = TrimNullBitmap(array); + auto trimmedInner = Inner_->Trim(array->child_data[0]); + + return arrow::ArrayData::Make(array->type, array->length, {std::move(trimmedNullBitmap)}, {std::move(trimmedInner)}, array->GetNullCount()); + } + +private: + IBlockTrimmer::TPtr Inner_; +}; + +struct TTrimmerTraits { + using TResult = IBlockTrimmer; + template <bool Nullable> + using TTuple = TTupleBlockTrimmer<Nullable>; + template <typename T, bool Nullable> + using TFixedSize = TFixedSizeBlockTrimmer<T, Nullable>; + template <typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot> + using TStrings = TStringBlockTrimmer<TStringType, Nullable>; + using TExtOptional = TExternalOptionalBlockTrimmer; + template<bool Nullable> + using TResource = TResourceBlockTrimmer<Nullable>; + template<typename TTzDate, bool Nullable> + using TTzDateReader = TTzDateBlockTrimmer<TTzDate, Nullable>; + + static TResult::TPtr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool* pool) { + if (desc.PassByValue) { + return std::make_unique<TFixedSize<ui64, true>>(pool); + } else { + return std::make_unique<TStrings<arrow::BinaryType, true, NKikimr::NUdf::EDataSlot::String>>(pool); + } + } + + static TResult::TPtr MakeResource(bool isOptional, arrow::MemoryPool* pool) { + if (isOptional) { + return std::make_unique<TResource<true>>(pool); + } else { + return std::make_unique<TResource<false>>(pool); + } + } + + template<typename TTzDate> + static TResult::TPtr MakeTzDate(bool isOptional, arrow::MemoryPool* pool) { + if (isOptional) { + return std::make_unique<TTzDateReader<TTzDate, true>>(pool); + } else { + return std::make_unique<TTzDateReader<TTzDate, false>>(pool); + } + } +}; + +IBlockTrimmer::TPtr MakeBlockTrimmer(const NUdf::ITypeInfoHelper& typeInfoHelper, const NUdf::TType* type, arrow::MemoryPool* pool) { + return MakeBlockReaderImpl<TTrimmerTraits>(typeInfoHelper, type, nullptr, pool); +} + +} diff --git a/yql/essentials/minikql/computation/mkql_block_trimmer.h b/yql/essentials/minikql/computation/mkql_block_trimmer.h new file mode 100644 index 0000000000..0ec46ea83c --- /dev/null +++ b/yql/essentials/minikql/computation/mkql_block_trimmer.h @@ -0,0 +1,21 @@ +#pragma once + +#include <util/generic/noncopyable.h> +#include <yql/essentials/public/udf/udf_types.h> + +#include <arrow/type.h> + +namespace NKikimr::NMiniKQL { + +class IBlockTrimmer : private TNonCopyable { +public: + using TPtr = std::unique_ptr<IBlockTrimmer>; + + virtual ~IBlockTrimmer() = default; + + virtual std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) = 0; +}; + +IBlockTrimmer::TPtr MakeBlockTrimmer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type, arrow::MemoryPool* pool); + +} diff --git a/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp b/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp new file mode 100644 index 0000000000..4ebb74f3ec --- /dev/null +++ b/yql/essentials/minikql/computation/mkql_block_trimmer_ut.cpp @@ -0,0 +1,373 @@ +#include "mkql_block_trimmer.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <yql/essentials/public/udf/arrow/block_builder.h> +#include <yql/essentials/public/udf/arrow/memory_pool.h> +#include <yql/essentials/minikql/mkql_type_builder.h> +#include <yql/essentials/minikql/mkql_function_registry.h> +#include <yql/essentials/minikql/mkql_program_builder.h> +#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h> + +using namespace NYql::NUdf; +using namespace NKikimr; + +struct TBlockTrimmerTestData { + TBlockTrimmerTestData() + : FunctionRegistry(NMiniKQL::CreateFunctionRegistry(NMiniKQL::CreateBuiltinRegistry())) + , Alloc(__LOCATION__) + , Env(Alloc) + , PgmBuilder(Env, *FunctionRegistry) + , MemInfo("Memory") + , ArrowPool(GetYqlMemoryPool()) + { + } + + TIntrusivePtr<NMiniKQL::IFunctionRegistry> FunctionRegistry; + NMiniKQL::TScopedAlloc Alloc; + NMiniKQL::TTypeEnvironment Env; + NMiniKQL::TProgramBuilder PgmBuilder; + NMiniKQL::TMemoryUsageInfo MemInfo; + arrow::MemoryPool* const ArrowPool; +}; + +Y_UNIT_TEST_SUITE(TBlockTrimmerTest) { + Y_UNIT_TEST(TestFixedSize) { + TBlockTrimmerTestData data; + + const auto int64Type = data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64, false); + + size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(int64Type); + size_t blockLen = NMiniKQL::CalcBlockLen(itemSize); + Y_ENSURE(blockLen > 8); + + constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / sizeof(i64); + constexpr auto sliceSize = 1024; + static_assert(testSize % sliceSize == 0); + + auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), int64Type, *data.ArrowPool, blockLen, nullptr); + auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), int64Type); + auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), int64Type, data.ArrowPool); + + for (size_t i = 0; i < testSize; i++) { + builder->Add(TBlockItem(i)); + } + auto datum = builder->Build(true); + Y_ENSURE(datum.is_array()); + auto array = datum.array(); + + for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { + auto slice = Chop(array, sliceSize); + auto trimmedSlice = trimmer->Trim(slice); + + for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { + TBlockItem lhs = reader->GetItem(*slice, elemIdx); + TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx); + UNIT_ASSERT_VALUES_EQUAL_C(lhs.Get<i64>(), rhs.Get<i64>(), "Expected the same data after trim"); + } + } + } + + Y_UNIT_TEST(TestString) { + TBlockTrimmerTestData data; + + const auto stringType = data.PgmBuilder.NewDataType(NUdf::EDataSlot::String, false); + + size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(stringType); + size_t blockLen = NMiniKQL::CalcBlockLen(itemSize); + Y_ENSURE(blockLen > 8); + + // To fit all strings into single block + constexpr auto testSize = 512; + constexpr auto sliceSize = 128; + static_assert(testSize % sliceSize == 0); + + auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), stringType, *data.ArrowPool, blockLen, nullptr); + auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), stringType); + auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), stringType, data.ArrowPool); + + std::string testString; + testString.resize(testSize); + for (size_t i = 0; i < testSize; i++) { + testString[i] = static_cast<char>(i); + if (i % 2) { + builder->Add(TBlockItem(TStringRef(testString.data(), i + 1))); + } else { + // Empty string + builder->Add(TBlockItem(TStringRef())); + } + } + auto datum = builder->Build(true); + Y_ENSURE(datum.is_array()); + auto array = datum.array(); + + for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { + auto slice = Chop(array, sliceSize); + auto trimmedSlice = trimmer->Trim(slice); + + for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { + TBlockItem lhs = reader->GetItem(*slice, elemIdx); + TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx); + UNIT_ASSERT_VALUES_EQUAL_C(lhs.AsStringRef(), rhs.AsStringRef(), "Expected the same data after trim"); + } + } + } + + Y_UNIT_TEST(TestOptional) { + TBlockTrimmerTestData data; + + const auto optionalInt64Type = data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64, true); + + size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(optionalInt64Type); + size_t blockLen = NMiniKQL::CalcBlockLen(itemSize); + Y_ENSURE(blockLen > 8); + + constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / sizeof(i64); + constexpr auto sliceSize = 1024; + static_assert(testSize % sliceSize == 0); + + auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), optionalInt64Type, *data.ArrowPool, blockLen, nullptr); + auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), optionalInt64Type); + auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), optionalInt64Type, data.ArrowPool); + + for (size_t i = 0; i < testSize; i++) { + if (i % 2) { + builder->Add(TBlockItem()); + } else { + builder->Add(TBlockItem(i)); + } + } + auto datum = builder->Build(true); + Y_ENSURE(datum.is_array()); + auto array = datum.array(); + + for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { + auto slice = Chop(array, sliceSize); + auto trimmedSlice = trimmer->Trim(slice); + + for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { + TBlockItem lhs = reader->GetItem(*slice, elemIdx); + TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx); + UNIT_ASSERT_VALUES_EQUAL_C(bool(lhs), bool(rhs), "Expected the same optionality after trim"); + + if (lhs) { + UNIT_ASSERT_VALUES_EQUAL_C(lhs.Get<i64>(), rhs.Get<i64>(), "Expected the same data after trim"); + } + } + } + } + + Y_UNIT_TEST(TestExternalOptional) { + TBlockTrimmerTestData data; + + const auto doubleOptInt64Type = data.PgmBuilder.NewOptionalType(data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64, true)); + + size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(doubleOptInt64Type); + size_t blockLen = NMiniKQL::CalcBlockLen(itemSize); + Y_ENSURE(blockLen > 8); + + constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / sizeof(i64); + constexpr auto sliceSize = 1024; + static_assert(testSize % sliceSize == 0); + + auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), doubleOptInt64Type, *data.ArrowPool, blockLen, nullptr); + auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), doubleOptInt64Type); + auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), doubleOptInt64Type, data.ArrowPool); + + for (size_t i = 0; i < testSize; i++) { + if (i % 2) { + builder->Add(TBlockItem(i).MakeOptional()); + } else if (i % 4) { + builder->Add(TBlockItem()); + } else { + builder->Add(TBlockItem().MakeOptional()); + } + } + auto datum = builder->Build(true); + Y_ENSURE(datum.is_array()); + auto array = datum.array(); + + for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { + auto slice = Chop(array, sliceSize); + auto trimmedSlice = trimmer->Trim(slice); + + for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { + TBlockItem lhs = reader->GetItem(*slice, elemIdx); + TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx); + + for (size_t i = 0; i < 2; i++) { + UNIT_ASSERT_VALUES_EQUAL_C(bool(lhs), bool(rhs), "Expected the same optionality after trim"); + if (!lhs) { + break; + } + + lhs = lhs.GetOptionalValue(); + rhs = rhs.GetOptionalValue(); + } + + if (lhs) { + UNIT_ASSERT_VALUES_EQUAL_C(lhs.Get<i64>(), rhs.Get<i64>(), "Expected the same data after trim"); + } + } + } + } + + Y_UNIT_TEST(TestTuple) { + TBlockTrimmerTestData data; + + std::vector<NMiniKQL::TType*> types; + types.push_back(data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64)); + types.push_back(data.PgmBuilder.NewDataType(NUdf::EDataSlot::String)); + types.push_back(data.PgmBuilder.NewDataType(NUdf::EDataSlot::Int64, true)); + const auto tupleType = data.PgmBuilder.NewTupleType(types); + + size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(tupleType); + size_t blockLen = NMiniKQL::CalcBlockLen(itemSize); + Y_ENSURE(blockLen > 8); + + // To fit all strings into single block + constexpr auto testSize = 512; + constexpr auto sliceSize = 128; + static_assert(testSize % sliceSize == 0); + + auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), tupleType, *data.ArrowPool, blockLen, nullptr); + auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), tupleType); + auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), tupleType, data.ArrowPool); + + std::string testString; + testString.resize(testSize); + std::vector<TBlockItem*> testTuples(testSize); + for (size_t i = 0; i < testSize; i++) { + testString[i] = static_cast<char>(i); + + TBlockItem* tupleItems = new TBlockItem[3]; + testTuples.push_back(tupleItems); + tupleItems[0] = TBlockItem(i); + tupleItems[1] = TBlockItem(TStringRef(testString.data(), i + 1)); + tupleItems[2] = i % 2 ? TBlockItem(i) : TBlockItem(); + + builder->Add(TBlockItem(tupleItems)); + } + auto datum = builder->Build(true); + Y_ENSURE(datum.is_array()); + auto array = datum.array(); + + for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { + auto slice = Chop(array, sliceSize); + auto trimmedSlice = trimmer->Trim(slice); + + for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { + TBlockItem lhs = reader->GetItem(*slice, elemIdx); + TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx); + + UNIT_ASSERT_VALUES_EQUAL_C(lhs.GetElement(0).Get<i64>(), rhs.GetElement(0).Get<i64>(), "Expected the same data after trim"); + UNIT_ASSERT_VALUES_EQUAL_C(lhs.GetElement(1).AsStringRef(), rhs.GetElement(1).AsStringRef(), "Expected the same data after trim"); + UNIT_ASSERT_VALUES_EQUAL_C(bool(lhs.GetElement(2)), bool(rhs.GetElement(2)), "Expected the same optionality after trim"); + if (bool(lhs.GetElement(2))) { + UNIT_ASSERT_VALUES_EQUAL_C(lhs.GetElement(2).Get<i64>(), rhs.GetElement(2).Get<i64>(), "Expected the same data after trim"); + } + } + } + + for (auto tupleItems : testTuples) { + delete[] tupleItems; + } + } + + Y_UNIT_TEST(TestTzDate) { + TBlockTrimmerTestData data; + using TDtLayout = TDataType<TTzDatetime>::TLayout; + + const auto tzDatetimeType = data.PgmBuilder.NewDataType(NUdf::EDataSlot::TzDatetime, false); + + size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(tzDatetimeType); + size_t blockLen = NMiniKQL::CalcBlockLen(itemSize); + Y_ENSURE(blockLen > 8); + + constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / (sizeof(TDtLayout) + sizeof(ui16)); + constexpr auto sliceSize = 1024; + static_assert(testSize % sliceSize == 0); + + auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), tzDatetimeType, *data.ArrowPool, blockLen, nullptr); + auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), tzDatetimeType); + auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), tzDatetimeType, data.ArrowPool); + + for (size_t i = 0; i < testSize; i++) { + TBlockItem dt = TBlockItem(i); + dt.SetTimezoneId(i * 2); + builder->Add(dt); + } + auto datum = builder->Build(true); + Y_ENSURE(datum.is_array()); + auto array = datum.array(); + + for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { + auto slice = Chop(array, sliceSize); + auto trimmedSlice = trimmer->Trim(slice); + + for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { + TBlockItem lhs = reader->GetItem(*slice, elemIdx); + TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx); + UNIT_ASSERT_VALUES_EQUAL_C(lhs.Get<TDtLayout>(), rhs.Get<TDtLayout>(), "Expected the same data after trim"); + UNIT_ASSERT_VALUES_EQUAL_C(lhs.GetTimezoneId(), rhs.GetTimezoneId(), "Expected the same data after trim"); + } + } + } + + extern const char ResourceName[] = "Resource.Name"; + Y_UNIT_TEST(TestResource) { + TBlockTrimmerTestData data; + + const auto resourceType = data.PgmBuilder.NewResourceType(ResourceName); + + size_t itemSize = NMiniKQL::CalcMaxBlockItemSize(resourceType); + size_t blockLen = NMiniKQL::CalcBlockLen(itemSize); + Y_ENSURE(blockLen > 8); + + constexpr auto testSize = NMiniKQL::MaxBlockSizeInBytes / sizeof(TUnboxedValue); + constexpr auto sliceSize = 1024; + static_assert(testSize % sliceSize == 0); + + auto builder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), resourceType, *data.ArrowPool, blockLen, nullptr); + auto reader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), resourceType); + auto trimmer = MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), resourceType, data.ArrowPool); + + struct TWithDtor { + int Payload; + std::shared_ptr<int> DestructorCallsCnt; + TWithDtor(int payload, std::shared_ptr<int> destructorCallsCnt): + Payload(payload), DestructorCallsCnt(std::move(destructorCallsCnt)) { + } + ~TWithDtor() { + *DestructorCallsCnt = *DestructorCallsCnt + 1; + } + }; + using TTestResource = TBoxedResource<TWithDtor, ResourceName>; + + auto destructorCallsCnt = std::make_shared<int>(0); + { + for (size_t i = 0; i < testSize; i++) { + builder->Add(TUnboxedValuePod(new TTestResource(i, destructorCallsCnt))); + } + auto datum = builder->Build(true); + Y_ENSURE(datum.is_array()); + auto array = datum.array(); + + for (size_t sliceIdx = 0; sliceIdx < testSize / sliceSize; sliceIdx++) { + auto slice = Chop(array, sliceSize); + auto trimmedSlice = trimmer->Trim(slice); + + for (size_t elemIdx = 0; elemIdx < sliceSize; elemIdx++) { + TBlockItem lhs = reader->GetItem(*slice, elemIdx); + TBlockItem rhs = reader->GetItem(*trimmedSlice, elemIdx); + + auto lhsResource = reinterpret_cast<TTestResource*>(lhs.GetBoxed().Get()); + auto rhsResource = reinterpret_cast<TTestResource*>(rhs.GetBoxed().Get()); + UNIT_ASSERT_VALUES_EQUAL_C(lhsResource->Get()->Payload, rhsResource->Get()->Payload, "Expected the same data after trim"); + } + } + } + + UNIT_ASSERT_VALUES_EQUAL_C(*destructorCallsCnt, testSize, "Expected 1 call to resource destructor"); + } +} diff --git a/yql/essentials/minikql/computation/ut/ya.make.inc b/yql/essentials/minikql/computation/ut/ya.make.inc index 4de4b13fc5..1f7ae7d6dc 100644 --- a/yql/essentials/minikql/computation/ut/ya.make.inc +++ b/yql/essentials/minikql/computation/ut/ya.make.inc @@ -12,6 +12,7 @@ ENDIF() SRCDIR(yql/essentials/minikql/computation) SRCS( + mkql_block_trimmer_ut.cpp mkql_computation_node_holders_ut.cpp mkql_computation_node_pack_ut.cpp mkql_computation_node_list_ut.cpp diff --git a/yql/essentials/minikql/computation/ya.make b/yql/essentials/minikql/computation/ya.make index c5a4490817..4374567135 100644 --- a/yql/essentials/minikql/computation/ya.make +++ b/yql/essentials/minikql/computation/ya.make @@ -5,6 +5,7 @@ SRCS( mkql_block_impl.cpp mkql_block_reader.cpp mkql_block_transport.cpp + mkql_block_trimmer.cpp mkql_computation_node.cpp mkql_computation_node_holders.cpp mkql_computation_node_impl.cpp @@ -22,6 +23,7 @@ PEERDIR( yql/essentials/public/types yql/essentials/parser/pg_wrapper/interface yql/essentials/public/udf + yql/essentials/public/udf/arrow yql/essentials/minikql/arrow ) diff --git a/yql/essentials/minikql/computation/ya.make.inc b/yql/essentials/minikql/computation/ya.make.inc index 7a663f1a46..14f51bfcee 100644 --- a/yql/essentials/minikql/computation/ya.make.inc +++ b/yql/essentials/minikql/computation/ya.make.inc @@ -23,6 +23,7 @@ PEERDIR( yql/essentials/minikql/computation yql/essentials/parser/pg_wrapper/interface yql/essentials/public/udf + yql/essentials/public/udf/arrow yql/essentials/utils library/cpp/threading/future ) diff --git a/yql/essentials/public/udf/arrow/bit_util.h b/yql/essentials/public/udf/arrow/bit_util.h index c2c891b269..4dbe785aa7 100644 --- a/yql/essentials/public/udf/arrow/bit_util.h +++ b/yql/essentials/public/udf/arrow/bit_util.h @@ -94,5 +94,30 @@ inline T* CompressArray(const T* src, const ui8* sparseBitmap, T* dst, size_t co return dst; } +inline void CopyDenseBitmap(ui8* dst, const ui8* src, size_t srcOffset, size_t len) { + if ((srcOffset & 7) != 0) { + size_t offsetBytes = srcOffset >> 3; + src += offsetBytes; + + ui8 offsetTail = srcOffset & 7; + ui8 offsetHead = 8 - offsetTail; + + ui8 remainder = *src++ >> offsetTail; + size_t dstOffset = offsetHead; + for (; dstOffset < len; dstOffset += 8) { + *dst++ = remainder | (*src << offsetHead); + remainder = *src >> offsetTail; + src++; + } + // dst is guaranteed to have extra length even if it's not needed + *dst++ = remainder; + } else { + src += srcOffset >> 3; + // Round up to 8 + len = (len + 7u) & ~size_t(7u); + memcpy(dst, src, len >> 3); + } +} + } } diff --git a/yql/essentials/public/udf/arrow/block_reader.h b/yql/essentials/public/udf/arrow/block_reader.h index d7863329ee..77a74b155a 100644 --- a/yql/essentials/public/udf/arrow/block_reader.h +++ b/yql/essentials/public/udf/arrow/block_reader.h @@ -32,7 +32,7 @@ struct TBlockItemSerializeProps { bool IsFixed = true; // true if each block item takes fixed size }; -template<typename T, bool Nullable, typename TDerived> +template<typename T, bool Nullable, typename TDerived> class TFixedSizeBlockReaderBase : public IBlockReader { public: TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { @@ -309,7 +309,7 @@ public: return TBlockItem(Items.data()); } - + size_t GetDataWeightImpl(const TBlockItem& item) const { const TBlockItem* items = nullptr; ui64 size = 0; @@ -352,7 +352,7 @@ public: Children[i]->SaveItem(*data.child_data[i], index, out); } } - + void SaveChildrenScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const { for (ui32 i = 0; i < Children.size(); ++i) { Children[i]->SaveScalarItem(*structScalar.value[i], out); @@ -396,7 +396,7 @@ public: Y_UNUSED(item); return GetChildrenDefaultDataWeight(); } - + size_t GetChildrenDefaultDataWeight() const { ui64 size = 0; if constexpr (Nullable) { @@ -412,7 +412,7 @@ public: DateReader_.SaveItem(*data.child_data[0], index, out); TimezoneReader_.SaveItem(*data.child_data[1], index, out); } - + void SaveChildrenScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const { DateReader_.SaveScalarItem(*structScalar.value[0], out); TimezoneReader_.SaveScalarItem(*structScalar.value[1], out); @@ -525,31 +525,30 @@ struct TReaderTraits { } }; -template <typename TTraits> -std::unique_ptr<typename TTraits::TResult> MakeTupleBlockReaderImpl(bool isOptional, TVector<std::unique_ptr<typename TTraits::TResult>>&& children) { +template <typename TTraits, typename... TArgs> +std::unique_ptr<typename TTraits::TResult> MakeTupleBlockReaderImpl(bool isOptional, TVector<std::unique_ptr<typename TTraits::TResult>>&& children, TArgs... args) { if (isOptional) { - return std::make_unique<typename TTraits::template TTuple<true>>(std::move(children)); + return std::make_unique<typename TTraits::template TTuple<true>>(std::move(children), args...); } else { - return std::make_unique<typename TTraits::template TTuple<false>>(std::move(children)); + return std::make_unique<typename TTraits::template TTuple<false>>(std::move(children), args...); } } -template <typename TTraits, typename T> -std::unique_ptr<typename TTraits::TResult> MakeFixedSizeBlockReaderImpl(bool isOptional) { +template <typename TTraits, typename T, typename... TArgs> +std::unique_ptr<typename TTraits::TResult> MakeFixedSizeBlockReaderImpl(bool isOptional, TArgs... args) { if (isOptional) { - return std::make_unique<typename TTraits::template TFixedSize<T, true>>(); + return std::make_unique<typename TTraits::template TFixedSize<T, true>>(args...); } else { - return std::make_unique<typename TTraits::template TFixedSize<T, false>>(); + return std::make_unique<typename TTraits::template TFixedSize<T, false>>(args...); } } - -template <typename TTraits, typename T, NKikimr::NUdf::EDataSlot TOriginal> -std::unique_ptr<typename TTraits::TResult> MakeStringBlockReaderImpl(bool isOptional) { +template <typename TTraits, typename T, NKikimr::NUdf::EDataSlot TOriginal, typename... TArgs> +std::unique_ptr<typename TTraits::TResult> MakeStringBlockReaderImpl(bool isOptional, TArgs... args) { if (isOptional) { - return std::make_unique<typename TTraits::template TStrings<T, true, TOriginal>>(); + return std::make_unique<typename TTraits::template TStrings<T, true, TOriginal>>(args...); } else { - return std::make_unique<typename TTraits::template TStrings<T, false, TOriginal>>(); + return std::make_unique<typename TTraits::template TStrings<T, false, TOriginal>>(args...); } } @@ -558,8 +557,8 @@ concept CanInstantiateBlockReaderForDecimal = requires { typename TTraits::template TFixedSize<NYql::NDecimal::TInt128, true>; }; -template <typename TTraits> -std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHelper& typeInfoHelper, const TType* type, const IPgBuilder* pgBuilder) { +template <typename TTraits, typename... TArgs> +std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHelper& typeInfoHelper, const TType* type, const IPgBuilder* pgBuilder, TArgs... args) { const TType* unpacked = type; TOptionalTypeInspector typeOpt(typeInfoHelper, type); bool isOptional = false; @@ -591,9 +590,9 @@ std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHe ++nestLevel; } - auto reader = MakeBlockReaderImpl<TTraits>(typeInfoHelper, previousType, pgBuilder); + auto reader = MakeBlockReaderImpl<TTraits>(typeInfoHelper, previousType, pgBuilder, args...); for (ui32 i = 1; i < nestLevel; ++i) { - reader = std::make_unique<typename TTraits::TExtOptional>(std::move(reader)); + reader = std::make_unique<typename TTraits::TExtOptional>(std::move(reader), args...); } return reader; @@ -606,20 +605,20 @@ std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHe if (typeStruct) { TVector<std::unique_ptr<typename TTraits::TResult>> members; for (ui32 i = 0; i < typeStruct.GetMembersCount(); i++) { - members.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeStruct.GetMemberType(i), pgBuilder)); + members.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeStruct.GetMemberType(i), pgBuilder, args...)); } // XXX: Use Tuple block reader for Struct. - return MakeTupleBlockReaderImpl<TTraits>(isOptional, std::move(members)); + return MakeTupleBlockReaderImpl<TTraits>(isOptional, std::move(members), args...); } TTupleTypeInspector typeTuple(typeInfoHelper, type); if (typeTuple) { TVector<std::unique_ptr<typename TTraits::TResult>> children; for (ui32 i = 0; i < typeTuple.GetElementsCount(); ++i) { - children.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeTuple.GetElementType(i), pgBuilder)); + children.emplace_back(MakeBlockReaderImpl<TTraits>(typeInfoHelper, typeTuple.GetElementType(i), pgBuilder, args...)); } - return MakeTupleBlockReaderImpl<TTraits>(isOptional, std::move(children)); + return MakeTupleBlockReaderImpl<TTraits>(isOptional, std::move(children), args...); } TDataTypeInspector typeData(typeInfoHelper, type); @@ -627,59 +626,59 @@ std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHe auto typeId = typeData.GetTypeId(); switch (GetDataSlot(typeId)) { case NUdf::EDataSlot::Int8: - return MakeFixedSizeBlockReaderImpl<TTraits, i8>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, i8>(isOptional, args...); case NUdf::EDataSlot::Bool: case NUdf::EDataSlot::Uint8: - return MakeFixedSizeBlockReaderImpl<TTraits, ui8>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, ui8>(isOptional, args...); case NUdf::EDataSlot::Int16: - return MakeFixedSizeBlockReaderImpl<TTraits, i16>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, i16>(isOptional, args...); case NUdf::EDataSlot::Uint16: case NUdf::EDataSlot::Date: - return MakeFixedSizeBlockReaderImpl<TTraits, ui16>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, ui16>(isOptional, args...); case NUdf::EDataSlot::Int32: case NUdf::EDataSlot::Date32: - return MakeFixedSizeBlockReaderImpl<TTraits, i32>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, i32>(isOptional, args...); case NUdf::EDataSlot::Uint32: case NUdf::EDataSlot::Datetime: - return MakeFixedSizeBlockReaderImpl<TTraits, ui32>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, ui32>(isOptional, args...); case NUdf::EDataSlot::Int64: case NUdf::EDataSlot::Interval: case NUdf::EDataSlot::Interval64: case NUdf::EDataSlot::Datetime64: case NUdf::EDataSlot::Timestamp64: - return MakeFixedSizeBlockReaderImpl<TTraits, i64>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, i64>(isOptional, args...); case NUdf::EDataSlot::Uint64: case NUdf::EDataSlot::Timestamp: - return MakeFixedSizeBlockReaderImpl<TTraits, ui64>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, ui64>(isOptional, args...); case NUdf::EDataSlot::Float: - return MakeFixedSizeBlockReaderImpl<TTraits, float>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, float>(isOptional, args...); case NUdf::EDataSlot::Double: - return MakeFixedSizeBlockReaderImpl<TTraits, double>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, double>(isOptional, args...); case NUdf::EDataSlot::String: - return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::String>(isOptional); + return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::String>(isOptional, args...); case NUdf::EDataSlot::Yson: - return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::Yson>(isOptional); + return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::Yson>(isOptional, args...); case NUdf::EDataSlot::JsonDocument: - return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::JsonDocument>(isOptional); + return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::JsonDocument>(isOptional, args...); case NUdf::EDataSlot::Utf8: - return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Utf8>(isOptional); + return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Utf8>(isOptional, args...); case NUdf::EDataSlot::Json: - return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Json>(isOptional); + return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Json>(isOptional, args...); case NUdf::EDataSlot::TzDate: - return TTraits::template MakeTzDate<TTzDate>(isOptional); + return TTraits::template MakeTzDate<TTzDate>(isOptional, args...); case NUdf::EDataSlot::TzDatetime: - return TTraits::template MakeTzDate<TTzDatetime>(isOptional); + return TTraits::template MakeTzDate<TTzDatetime>(isOptional, args...); case NUdf::EDataSlot::TzTimestamp: - return TTraits::template MakeTzDate<TTzTimestamp>(isOptional); + return TTraits::template MakeTzDate<TTzTimestamp>(isOptional, args...); case NUdf::EDataSlot::TzDate32: - return TTraits::template MakeTzDate<TTzDate32>(isOptional); + return TTraits::template MakeTzDate<TTzDate32>(isOptional, args...); case NUdf::EDataSlot::TzDatetime64: - return TTraits::template MakeTzDate<TTzDatetime64>(isOptional); + return TTraits::template MakeTzDate<TTzDatetime64>(isOptional, args...); case NUdf::EDataSlot::TzTimestamp64: - return TTraits::template MakeTzDate<TTzTimestamp64>(isOptional); + return TTraits::template MakeTzDate<TTzTimestamp64>(isOptional, args...); case NUdf::EDataSlot::Decimal: { if constexpr (CanInstantiateBlockReaderForDecimal<TTraits>) { - return MakeFixedSizeBlockReaderImpl<TTraits, NYql::NDecimal::TInt128>(isOptional); + return MakeFixedSizeBlockReaderImpl<TTraits, NYql::NDecimal::TInt128>(isOptional, args...); } else { Y_ENSURE(false, "Unsupported data slot"); } @@ -692,13 +691,13 @@ std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHe TResourceTypeInspector resource(typeInfoHelper, type); if (resource) { - return TTraits::MakeResource(isOptional); + return TTraits::MakeResource(isOptional, args...); } TPgTypeInspector typePg(typeInfoHelper, type); if (typePg) { auto desc = typeInfoHelper.FindPgTypeDescription(typePg.GetTypeId()); - return TTraits::MakePg(*desc, pgBuilder); + return TTraits::MakePg(*desc, pgBuilder, args...); } Y_ENSURE(false, "Unsupported type"); diff --git a/yql/essentials/public/udf/arrow/ut/bit_util_ut.cpp b/yql/essentials/public/udf/arrow/ut/bit_util_ut.cpp new file mode 100644 index 0000000000..601d3be7c8 --- /dev/null +++ b/yql/essentials/public/udf/arrow/ut/bit_util_ut.cpp @@ -0,0 +1,41 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <yql/essentials/public/udf/arrow/bit_util.h> +#include <util/random/random.h> + +#include <arrow/util/bit_util.h> + +using namespace NYql::NUdf; + +namespace { + void PerformTest(const ui8* testData, size_t offset, size_t length) { + std::vector<ui8> copied(arrow::BitUtil::BytesForBits(length) + 1, 0); + CopyDenseBitmap(copied.data(), testData, offset, length); + + std::vector<ui8> origSparse(length), copiedSparse(length); + DecompressToSparseBitmap(origSparse.data(), testData, offset, length); + DecompressToSparseBitmap(copiedSparse.data(), copied.data(), 0, length); + for (size_t i = 0; i < length; i++) { + UNIT_ASSERT_EQUAL_C(origSparse[i], copiedSparse[i], "Expected the same data"); + } + } +} + +Y_UNIT_TEST_SUITE(CopyDenseBitmapTest) { + constexpr size_t testSize = 32; + + Y_UNIT_TEST(Test) { + SetRandomSeed(0); + + std::vector<ui8> testData(testSize); + for (size_t i = 0; i < testSize; i++) { + testData[i] = RandomNumber<ui8>(); + } + + for (size_t offset = 0; offset < testSize * 8; offset++) { + for (size_t length = 0; length <= testSize * 8 - offset; length++) { + PerformTest(testData.data(), offset, length); + } + } + } +} diff --git a/yql/essentials/public/udf/arrow/ut/ya.make b/yql/essentials/public/udf/arrow/ut/ya.make index da52d50ded..ecd94b9c03 100644 --- a/yql/essentials/public/udf/arrow/ut/ya.make +++ b/yql/essentials/public/udf/arrow/ut/ya.make @@ -2,9 +2,11 @@ UNITTEST() SRCS( array_builder_ut.cpp + bit_util_ut.cpp ) PEERDIR( + contrib/libs/apache/arrow yql/essentials/public/udf/arrow yql/essentials/public/udf/service/exception_policy yql/essentials/sql/pg_dummy diff --git a/yql/essentials/public/udf/arrow/util.cpp b/yql/essentials/public/udf/arrow/util.cpp index 55a728207b..b36b4f9ce5 100644 --- a/yql/essentials/public/udf/arrow/util.cpp +++ b/yql/essentials/public/udf/arrow/util.cpp @@ -68,6 +68,18 @@ std::shared_ptr<arrow::Buffer> MakeDenseBitmapNegate(const ui8* srcSparse, size_ return bitmap; } +std::shared_ptr<arrow::Buffer> MakeDenseBitmapCopy(const ui8* src, size_t len, size_t offset, arrow::MemoryPool* pool) { + auto bitmap = AllocateBitmapWithReserve(len, pool); + CopyDenseBitmap(bitmap->mutable_data(), src, offset, len); + return bitmap; +} + +std::shared_ptr<arrow::Buffer> MakeDenseFalseBitmap(int64_t len, arrow::MemoryPool* pool) { + auto bitmap = AllocateBitmapWithReserve(len, pool); + std::memset(bitmap->mutable_data(), 0, bitmap->size()); + return bitmap; +} + std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len) { Y_ENSURE(data->length >= 0); Y_ENSURE(offset + len <= (size_t)data->length); diff --git a/yql/essentials/public/udf/arrow/util.h b/yql/essentials/public/udf/arrow/util.h index b4068f10c9..f7bdb715f9 100644 --- a/yql/essentials/public/udf/arrow/util.h +++ b/yql/essentials/public/udf/arrow/util.h @@ -25,6 +25,9 @@ enum class EPgStringType { std::shared_ptr<arrow::Buffer> AllocateBitmapWithReserve(size_t bitCount, arrow::MemoryPool* pool); std::shared_ptr<arrow::Buffer> MakeDenseBitmap(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool); std::shared_ptr<arrow::Buffer> MakeDenseBitmapNegate(const ui8* srcSparse, size_t len, arrow::MemoryPool* pool); +std::shared_ptr<arrow::Buffer> MakeDenseBitmapCopy(const ui8* src, size_t len, size_t offset, arrow::MemoryPool* pool); + +std::shared_ptr<arrow::Buffer> MakeDenseFalseBitmap(int64_t len, arrow::MemoryPool* pool); /// \brief Recursive version of ArrayData::Slice() method std::shared_ptr<arrow::ArrayData> DeepSlice(const std::shared_ptr<arrow::ArrayData>& data, size_t offset, size_t len); |