diff options
| author | atarasov5 <[email protected]> | 2025-07-23 11:24:03 +0300 |
|---|---|---|
| committer | atarasov5 <[email protected]> | 2025-07-23 12:25:55 +0300 |
| commit | f531c2bc5154672d6ba28f691d3a4bdf7ff90c34 (patch) | |
| tree | ef0b71a8dd91f7719b54c95d5a01b3e4d729106a /yql/essentials/minikql/computation/mkql_computation_node_pack.cpp | |
| parent | 9d67ed79ecd1a554c6606e8ae27ca146bd828b05 (diff) | |
YQL-19995: Fix block serialization for child data with different offsets
В данном пулреквесте я добавил поддержку сериализации tuple/struct массивов с разными оффсетами.
Сделал это через новый флаг `EValuePackerVersion::{V0, V1}`.
Также нужно будет сделать коммит в contrib/ydb
commit_hash:79709ad660a4295958e5488d3dd24d660f32ca9a
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_computation_node_pack.cpp')
| -rw-r--r-- | yql/essentials/minikql/computation/mkql_computation_node_pack.cpp | 145 |
1 files changed, 116 insertions, 29 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp index 95e2148e728..d2527662575 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp @@ -75,6 +75,43 @@ T UnpackData(TChunkedInputBuffer& buf) { return res; } +class TBlockTransportFlags { +public: + enum class EFlag: ui64 { + ScalarsArePresentFlag = 1 << 0, + StoreOffsetsForEachChildData = 1 << 1, + }; + + TBlockTransportFlags(ui64 data) + : Data_(data) + { + } + + TBlockTransportFlags() + : Data_(0) + { + } + + bool HasFlag(EFlag flag) const { + return (Data_ & ToIntegral(flag)) != 0; + } + + TBlockTransportFlags* AddFlag(EFlag flag) { + Data_ = Data_ | ToIntegral(flag); + return this; + } + + ui64 Data() const { + return Data_; + } + +private: + static constexpr ui64 ToIntegral(EFlag flag) { + return static_cast<ui64>(flag); + } + ui64 Data_ = 0; +}; + NUdf::TUnboxedValuePod UnpackString(TChunkedInputBuffer& buf, ui32 size) { auto res = MakeStringNotFilled(size, 0); NYql::NUdf::TMutableStringRef ref = res.AsStringRef(); @@ -984,6 +1021,17 @@ bool IsMultiBlock(const TType* type, ui32& blockLengthIndex, TVector<const TBloc return true; } +bool IsOffsetExplicitStored(EValuePackerVersion valuePackerVersion) { + return valuePackerVersion >= EValuePackerVersion::V1; +}; + +size_t GetTopLevelOffsetsCount(EValuePackerVersion valuePackerVersion, ui32 width) { + if (!IsOffsetExplicitStored(valuePackerVersion)) { + return width - 1; + } + return 0; +}; + } // namespace template<bool Fast> @@ -1066,29 +1114,55 @@ TStringBuf TValuePackerGeneric<Fast>::Pack(const NUdf::TUnboxedValuePod& value) // Transport packer -template<bool Fast> -TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage) +template <bool Fast> +TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, EValuePackerVersion valuePackerVersion, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage) : Type_(type) , BufferPageAllocSize_(bufferPageAllocSize ? *bufferPageAllocSize : TBufferPage::DefaultPageAllocSize) , State_(ScanTypeProperties(Type_, false)) , IncrementalState_(ScanTypeProperties(Type_, true)) , ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool()) + , ValuePackerVersion_(valuePackerVersion) { MKQL_ENSURE(!stable, "Stable packing is not supported"); InitBlocks(minFillPercentage); } -template<bool Fast> -TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage) +template <bool Fast> +TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, + TMaybe<size_t> bufferPageAllocSize, + arrow::MemoryPool* pool, + TMaybe<ui8> minFillPercentage) + : TValuePackerTransport(type, EValuePackerVersion::V0, bufferPageAllocSize, pool, minFillPercentage) +{ +} + +template <bool Fast> +TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, EValuePackerVersion valuePackerVersion, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage) : Type_(type) , BufferPageAllocSize_(bufferPageAllocSize ? *bufferPageAllocSize : TBufferPage::DefaultPageAllocSize) , State_(ScanTypeProperties(Type_, false)) , IncrementalState_(ScanTypeProperties(Type_, true)) , ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool()) + , ValuePackerVersion_(valuePackerVersion) { InitBlocks(minFillPercentage); } +template <bool Fast> +TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, + const TType* type, + TMaybe<size_t> bufferPageAllocSize, + arrow::MemoryPool* ppol, + TMaybe<ui8> minFillPercentage) + : TValuePackerTransport(stable, + type, + EValuePackerVersion::V0, + bufferPageAllocSize, + ppol, + minFillPercentage) +{ +} + template<bool Fast> void TValuePackerTransport<Fast>::InitBlocks(TMaybe<ui8> minFillPercentage) { TVector<const TBlockType*> items; @@ -1098,10 +1172,10 @@ void TValuePackerTransport<Fast>::InitBlocks(TMaybe<ui8> minFillPercentage) { return; } - const TBlockSerializerParams serializerParams = { - .Pool = &ArrowPool_, - .MinFillPercentage = minFillPercentage - }; + const TBlockSerializerParams serializerParams( + &ArrowPool_, + minFillPercentage, + IsOffsetExplicitStored(ValuePackerVersion_)); IsBlock_ = true; ConvertedScalars_.resize(items.size()); @@ -1112,7 +1186,7 @@ void TValuePackerTransport<Fast>::InitBlocks(TMaybe<ui8> minFillPercentage) { if (i != BlockLenIndex_) { const TBlockType* itemType = items[i]; BlockSerializers_[i] = MakeBlockSerializer(TTypeInfoHelper(), itemType->GetItemType(), serializerParams); - BlockDeserializers_[i] = MakeBlockDeserializer(TTypeInfoHelper(), itemType->GetItemType()); + BlockDeserializers_[i] = MakeBlockDeserializer(TTypeInfoHelper(), itemType->GetItemType(), serializerParams); if (itemType->GetShape() == TBlockType::EShape::Scalar) { BlockReaders_[i] = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), itemType->GetItemType()); } @@ -1217,6 +1291,13 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons auto metadataBuffer = std::make_shared<TBuffer>(); + + TBlockTransportFlags flags; + flags.AddFlag(TBlockTransportFlags::EFlag::ScalarsArePresentFlag); + if (IsOffsetExplicitStored(ValuePackerVersion_)) { + flags.AddFlag(TBlockTransportFlags::EFlag::StoreOffsetsForEachChildData); + } + ui32 totalMetadataCount = 0; for (size_t i = 0; i < width; ++i) { if (i != BlockLenIndex_) { @@ -1227,11 +1308,11 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons // calculate approximate metadata size const size_t metadataReservedSize = - MAX_PACKED64_SIZE + // block len - MAX_PACKED64_SIZE + // feature flags - (width - 1) + // 1-byte offsets - MAX_PACKED32_SIZE + // metadata words count - MAX_PACKED64_SIZE * totalMetadataCount; // metadata words + MAX_PACKED64_SIZE + // block len + MAX_PACKED64_SIZE + // feature flags + GetTopLevelOffsetsCount(ValuePackerVersion_, width) + // 1-byte offsets for V0, empty for higher versions. + MAX_PACKED32_SIZE + // metadata words count + MAX_PACKED64_SIZE * totalMetadataCount; // metadata words metadataBuffer->Reserve(len ? metadataReservedSize : MAX_PACKED64_SIZE); // save block length @@ -1243,10 +1324,7 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons return *this; } - // save feature flags - // 1 = "scalars are present" - const ui64 metadataFlags = 1 << 0; - PackData<false>(metadataFlags, *metadataBuffer); + PackData<false>(flags.Data(), *metadataBuffer); TVector<std::shared_ptr<arrow::ArrayData>> arrays(width); // save reminder of original offset for each column - it is needed to properly handle offset in bitmaps @@ -1260,7 +1338,9 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons i64 offset = datum.array()->offset; MKQL_ENSURE(offset >= 0, "Negative offset"); // all offsets should be equal - MKQL_ENSURE(HasOffset(*datum.array(), offset), "Unexpected offset in child data"); + if (!IsOffsetExplicitStored(ValuePackerVersion_)) { + MKQL_ENSURE(HasOffset(*datum.array(), offset), "Unexpected offset in child data"); + } reminder = offset % 8; arrays[i] = datum.array(); } else { @@ -1274,7 +1354,9 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons } arrays[i] = ConvertedScalars_[i]; } - PackData<false>(reminder, *metadataBuffer); + if (!IsOffsetExplicitStored(ValuePackerVersion_)) { + PackData<false>(reminder, *metadataBuffer); + } } // save count of metadata words @@ -1291,7 +1373,7 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons } } - MKQL_ENSURE(savedMetadata == totalMetadataCount, "Serialization metadata error"); + MKQL_ENSURE(savedMetadata == totalMetadataCount, "Serialization metadata error. Expected: " << totalMetadataCount << ", actual: " << savedMetadata); BlockBuffer_.Append(TStringBuf(metadataBuffer->data(), metadataBuffer->size()), metadataBuffer); // save buffers @@ -1316,17 +1398,21 @@ void TValuePackerTransport<Fast>::UnpackBatchBlocks(TChunkedBuffer&& buf, const } // unpack flags - const ui64 metadataFlags = UnpackData<false, ui64>(chunked); - MKQL_ENSURE(metadataFlags == 1, "Unsupported metadata flags"); - + const auto metadataFlags = TBlockTransportFlags(UnpackData<false, ui64>(chunked)); + MKQL_ENSURE(metadataFlags.HasFlag(TBlockTransportFlags::EFlag::ScalarsArePresentFlag), "Scalars are present flag must be set."); + if (IsOffsetExplicitStored(ValuePackerVersion_)) { + MKQL_ENSURE(metadataFlags.HasFlag(TBlockTransportFlags::EFlag::StoreOffsetsForEachChildData), "Offsets must be explicitly stored."); + } // unpack array offsets const ui32 width = BlockDeserializers_.size(); MKQL_ENSURE(width > 0, "Invalid width"); TVector<ui64> offsets(width); - for (ui32 i = 0; i < width; ++i) { - if (BlockDeserializers_[i]) { - offsets[i] = UnpackData<false, ui8>(chunked); - MKQL_ENSURE(offsets[i] < 8, "Unexpected offset value"); + if (!IsOffsetExplicitStored(ValuePackerVersion_)) { + for (ui32 i = 0; i < width; ++i) { + if (BlockDeserializers_[i]) { + offsets[i] = UnpackData<false, ui8>(chunked); + MKQL_ENSURE(offsets[i] < 8, "Unexpected offset value. Actual offset is: " << offsets[i]); + } } } @@ -1350,7 +1436,8 @@ void TValuePackerTransport<Fast>::UnpackBatchBlocks(TChunkedBuffer&& buf, const if (i != BlockLenIndex_) { MKQL_ENSURE(BlockDeserializers_[i], "Missing deserializer"); const bool isScalar = BlockReaders_[i] != nullptr; - auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, offsets[i]); + TMaybe<size_t> offset = IsOffsetExplicitStored(ValuePackerVersion_) ? Nothing() : TMaybe<size_t>(offsets[i]); + auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, offset); if (isScalar) { TBlockItem item = BlockReaders_[i]->GetItem(*array, 0); const TType* itemType = IsLegacyBlock_ ? static_cast<const TStructType*>(Type_)->GetMemberType(i) : |
