diff options
| author | atarasov5 <[email protected]> | 2025-07-23 11:24:03 +0300 |
|---|---|---|
| committer | atarasov5 <[email protected]> | 2025-07-23 12:25:55 +0300 |
| commit | f531c2bc5154672d6ba28f691d3a4bdf7ff90c34 (patch) | |
| tree | ef0b71a8dd91f7719b54c95d5a01b3e4d729106a /yql/essentials/minikql/computation | |
| parent | 9d67ed79ecd1a554c6606e8ae27ca146bd828b05 (diff) | |
YQL-19995: Fix block serialization for child data with different offsets
В данном пулреквесте я добавил поддержку сериализации tuple/struct массивов с разными оффсетами.
Сделал это через новый флаг `EValuePackerVersion::{V0, V1}`.
Также нужно будет сделать коммит в contrib/ydb
commit_hash:79709ad660a4295958e5488d3dd24d660f32ca9a
Diffstat (limited to 'yql/essentials/minikql/computation')
6 files changed, 959 insertions, 293 deletions
diff --git a/yql/essentials/minikql/computation/mkql_block_transport.cpp b/yql/essentials/minikql/computation/mkql_block_transport.cpp index dcf9542f5df..a7ed5306fbf 100644 --- a/yql/essentials/minikql/computation/mkql_block_transport.cpp +++ b/yql/essentials/minikql/computation/mkql_block_transport.cpp @@ -149,33 +149,45 @@ std::shared_ptr<arrow::Buffer> LoadNullsBitmap(TChunkedBuffer& source, TMaybe<ui class TBlockSerializerBase : public IBlockSerializer { public: explicit TBlockSerializerBase(const TBlockSerializerParams& params) - : Pool_(params.Pool) - , MinFillPercentage_(params.MinFillPercentage) + : Pool_(params.Pool()) + , MinFillPercentage_(params.MinFillPercentage()) + , ShouldSerializeOffset_(params.ShouldSerializeOffset()) { YQL_ENSURE(!MinFillPercentage_ || *MinFillPercentage_ <= 100); } + size_t OffsetMetadataCount() const { + return ShouldSerializeOffset_ ? 1 : 0; + } + protected: arrow::MemoryPool* Pool_; const TMaybe<ui8> MinFillPercentage_; + bool ShouldSerializeOffset_; }; class TBlockDeserializerBase : public IBlockDeserializer { public: - TBlockDeserializerBase() = default; + TBlockDeserializerBase(const TBlockSerializerParams& params) + : ShouldLoadOffset_(params.ShouldSerializeOffset()) + {} virtual void SetArrowType(const std::shared_ptr<arrow::DataType>& type) { ArrowType_ = type; } void LoadMetadata(const TMetadataSource& metaSource) final { + if (ShouldLoadOffset_) { + OffsetReminder_ = metaSource(); + YQL_ENSURE(OffsetReminder_ < 8, "Unexpected offset value. Actual offset is: " << *OffsetReminder_); + } if (IsNullable()) { LoadNullsSizes(metaSource, NullsCount_, NullsSize_); } DoLoadMetadata(metaSource); } - virtual std::shared_ptr<arrow::ArrayData> LoadArray(TChunkedBuffer& src, ui64 blockLen, ui64 offset) final { + virtual std::shared_ptr<arrow::ArrayData> LoadArray(TChunkedBuffer& src, ui64 blockLen, TMaybe<size_t> offset) final { YQL_ENSURE(blockLen > 0, "Should be handled earlier"); std::shared_ptr<arrow::Buffer> nulls; i64 nullsCount = 0; @@ -201,11 +213,11 @@ public: DoResetMetadata(); } - std::shared_ptr<arrow::ArrayData> MakeDefaultValue(ui64 blockLen, ui64 offset) const { + std::shared_ptr<arrow::ArrayData> MakeDefaultValue(ui64 blockLen, TMaybe<size_t> offset) const { std::shared_ptr<arrow::Buffer> nulls; i64 nullsCount = 0; if (IsNullable()) { - nulls = MakeZeroBitmap(blockLen + offset); + nulls = MakeZeroBitmap(blockLen + GetOffset(offset)); nullsCount = blockLen; } return DoMakeDefaultValue(nulls, nullsCount, blockLen, offset); @@ -215,12 +227,23 @@ protected: virtual void DoLoadMetadata(const TMetadataSource& metaSource) = 0; virtual void DoResetMetadata() = 0; virtual bool IsNullable() const = 0; - virtual std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const = 0; - virtual std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) = 0; + virtual std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) const = 0; + virtual std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) = 0; + + ui64 GetOffset(TMaybe<size_t> providedOffset) const { + if (!ShouldLoadOffset_) { + YQL_ENSURE(providedOffset.Defined(), "Offset must be provided explicitly in arguments."); + return *providedOffset; + } + YQL_ENSURE(OffsetReminder_.Defined(), "Offset must be specified in metadata."); + return *OffsetReminder_; + } std::shared_ptr<arrow::DataType> ArrowType_; TMaybe<ui64> NullsCount_; TMaybe<ui64> NullsSize_; + TMaybe<ui64> OffsetReminder_; + bool ShouldLoadOffset_; }; template<size_t ObjectSize, bool Nullable> @@ -231,10 +254,14 @@ public: using TBase::TBase; size_t ArrayMetadataCount() const final { - return Nullable ? 3 : 1; + return OffsetMetadataCount() + (Nullable ? 3 : 1); } void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final { + const ui64 offsetReminder = data.offset % 8; + if (ShouldSerializeOffset_) { + metaSink(offsetReminder); + } if constexpr (Nullable) { StoreNullsSizes(data, metaSink); if (data.GetNullCount() == data.length) { @@ -242,8 +269,7 @@ public: return; } } - const ui64 desiredOffset = data.offset % 8; - size_t dataBytes = ((size_t)data.length + desiredOffset) * ObjectSize; + size_t dataBytes = ((size_t)data.length + offsetReminder) * ObjectSize; metaSink(dataBytes); } @@ -262,10 +288,15 @@ public: } }; -template<size_t ObjectSize, bool Nullable> -class TFixedSizeBlockDeserializer final : public TBlockDeserializerBase { +template <size_t ObjectSize, bool Nullable> +class TFixedSizeBlockDeserializer final: public TBlockDeserializerBase { + using TBase = TBlockDeserializerBase; public: - TFixedSizeBlockDeserializer() = default; + TFixedSizeBlockDeserializer(const TBlockSerializerParams& params) + : TBase(params) + {} + + private: void DoLoadMetadata(const TMetadataSource& metaSource) final { LoadBufferSize(metaSource, DataSize_); @@ -275,14 +306,14 @@ private: return Nullable; } - std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final { - auto data = MakeZeroBuffer((blockLen + offset) * ObjectSize); - return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, data }, nullsCount, offset); + std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) const final { + auto data = MakeZeroBuffer((blockLen + GetOffset(offset)) * ObjectSize); + return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, data }, nullsCount, GetOffset(offset)); } - std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { + std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) final { auto data = LoadBuffer(src, DataSize_); - return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, data}, nullsCount, offset); + return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, data}, nullsCount, GetOffset(offset)); } void DoResetMetadata() final { @@ -302,10 +333,14 @@ public: private: size_t ArrayMetadataCount() const final { - return Nullable ? 4 : 2; + return OffsetMetadataCount() + (Nullable ? 4 : 2); } void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final { + const ui64 offsetReminder = data.offset % 8; + if (ShouldSerializeOffset_) { + metaSink(offsetReminder); + } if constexpr (Nullable) { StoreNullsSizes(data, metaSink); if (data.GetNullCount() == data.length) { @@ -315,13 +350,12 @@ private: } } - const ui64 desiredOffset = data.offset % 8; - size_t offsetsSize = ((size_t)data.length + 1 + desiredOffset) * sizeof(TOffset); + size_t offsetsSize = ((size_t)data.length + 1 + offsetReminder) * sizeof(TOffset); metaSink(offsetsSize); if (ShouldTrimArray(data)) { - const TOffset* offsetData = data.GetValues<TOffset>(1) - desiredOffset; - metaSink(offsetData[data.length + desiredOffset] - offsetData[0]); + const TOffset* offsetData = data.GetValues<TOffset>(1) - offsetReminder; + metaSink(offsetData[data.length + offsetReminder] - offsetData[0]); } else { metaSink(data.buffers[2]->size()); } @@ -384,25 +418,26 @@ private: template<typename TStringType, bool Nullable> class TStringBlockDeserializer final : public TBlockDeserializerBase { + using TBase = TBlockDeserializerBase; using TOffset = typename TStringType::offset_type; public: - TStringBlockDeserializer() = default; + using TBase::TBase; private: void DoLoadMetadata(const TMetadataSource& metaSource) final { LoadBufferSize(metaSource, OffsetsSize_); LoadBufferSize(metaSource, DataSize_); } - std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final { - auto offsets = MakeZeroBuffer((blockLen + 1 + offset) * sizeof(TOffset)); + std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) const final { + auto offsets = MakeZeroBuffer((blockLen + 1 + GetOffset(offset)) * sizeof(TOffset)); auto data = MakeEmptyBuffer(); - return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, offsets, data }, nullsCount, offset); + return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, offsets, data }, nullsCount, GetOffset(offset)); } - std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { + std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) final { auto offsets = LoadBuffer(src, OffsetsSize_); auto data = LoadBuffer(src, DataSize_); - return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, offsets, data }, nullsCount, offset); + return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, offsets, data }, nullsCount, GetOffset(offset)); } bool IsNullable() const final { @@ -429,10 +464,13 @@ public: private: size_t ArrayMetadataCount() const final { - return 2 + Inner_->ArrayMetadataCount(); + return OffsetMetadataCount() + 2 + Inner_->ArrayMetadataCount(); } void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final { + if (ShouldSerializeOffset_) { + metaSink(data.offset % 8); + } StoreNullsSizes(data, metaSink); if (data.GetNullCount() == data.length) { auto innerCount = Inner_->ArrayMetadataCount(); @@ -455,22 +493,25 @@ private: }; class TExtOptionalBlockDeserializer final : public TBlockDeserializerBase { + using TBase = TBlockDeserializerBase; public: - explicit TExtOptionalBlockDeserializer(std::unique_ptr<TBlockDeserializerBase>&& inner) - : Inner_(std::move(inner)) + explicit TExtOptionalBlockDeserializer(std::unique_ptr<TBlockDeserializerBase>&& inner, const TBlockSerializerParams& params) + : TBase(params) + , Inner_(std::move(inner)) { } + private: void DoLoadMetadata(const TMetadataSource& metaSource) final { Inner_->LoadMetadata(metaSource); } - std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final { - return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->MakeDefaultValue(blockLen, offset) }, nullsCount, offset); + std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) const final { + return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->MakeDefaultValue(blockLen, offset) }, nullsCount, GetOffset(offset)); } - std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { - return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->LoadArray(src, blockLen, offset) }, nullsCount, offset); + std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) final { + return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->LoadArray(src, blockLen, offset) }, nullsCount, GetOffset(offset)); } bool IsNullable() const final { @@ -498,11 +539,13 @@ public: private: size_t ArrayMetadataCount() const final { - return 0; + return OffsetMetadataCount(); } void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final { - Y_UNUSED(data, metaSink); + if (ShouldSerializeOffset_) { + metaSink(data.offset % 8); + } } void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final { @@ -511,19 +554,24 @@ private: }; class TSingularTypeBlockDeserializer final: public TBlockDeserializerBase { + using TBase = TBlockDeserializerBase; + +public: + using TBase::TBase; + private: void DoLoadMetadata(const TMetadataSource& metaSource) final { Y_UNUSED(metaSource); } - std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final { + std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) const final { Y_UNUSED(offset); Y_ENSURE(nullsCount == 0); Y_ENSURE(!nulls || nulls->size() == 0); return arrow::NullArray(blockLen).data(); } - std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { + std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) final { Y_UNUSED(offset, src); Y_ENSURE(nullsCount == 0); Y_ENSURE(!nulls || nulls->size() == 0); @@ -551,10 +599,14 @@ private: if constexpr (Nullable) { result += 2; } + result += OffsetMetadataCount(); return result; } void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final { + if (ShouldSerializeOffset_) { + metaSink(data.offset % 8); + } if constexpr (Nullable) { StoreNullsSizes(data, metaSink); } @@ -597,8 +649,7 @@ public: } void StoreChildrenMetadata(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, - const IBlockSerializer::TMetadataSink& metaSink) const { - + const IBlockSerializer::TMetadataSink& metaSink) const { for (size_t i = 0; i < Children_.size(); ++i) { Children_[i]->StoreMetadata(*child_data[i], metaSink); } @@ -630,7 +681,7 @@ public: } void StoreChildrenMetadata(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, - const IBlockSerializer::TMetadataSink& metaSink) const { + const IBlockSerializer::TMetadataSink& metaSink) const { DateSerialiser_.StoreMetadata(*child_data[0], metaSink); TzSerialiser_.StoreMetadata(*child_data[1], metaSink); } @@ -649,11 +700,14 @@ private: template<bool Nullable> class TTupleBlockDeserializer final : public TBlockDeserializerBase { +using TBase = TBlockDeserializerBase; public: - explicit TTupleBlockDeserializer(TVector<std::unique_ptr<TBlockDeserializerBase>>&& children) - : Children_(std::move(children)) + explicit TTupleBlockDeserializer(TVector<std::unique_ptr<TBlockDeserializerBase>>&& children, const TBlockSerializerParams& params) + : TBase(params) + , Children_(std::move(children)) { } + private: void DoLoadMetadata(const TMetadataSource& metaSource) final { for (auto& child : Children_) { @@ -661,20 +715,20 @@ private: } } - std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final { + std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) const final { std::vector<std::shared_ptr<arrow::ArrayData>> childData; for (auto& child : Children_) { childData.emplace_back(child->MakeDefaultValue(blockLen, offset)); } - return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset); + return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, GetOffset(offset)); } - std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { + std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) final { std::vector<std::shared_ptr<arrow::ArrayData>> childData; for (auto& child : Children_) { childData.emplace_back(child->LoadArray(src, blockLen, offset)); } - return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset); + return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, GetOffset(offset)); } void DoResetMetadata() final { @@ -700,8 +754,15 @@ private: template<typename TDate, bool Nullable> class TTzDateBlockDeserializer final : public TBlockDeserializerBase { + using TBase = TBlockDeserializerBase; + public: - TTzDateBlockDeserializer() = default; + explicit TTzDateBlockDeserializer(const TBlockSerializerParams& params) + : TBase(params) + , DateDeserialiser_(params) + , TzDeserialiser_(params) + { + } private: void DoLoadMetadata(const TMetadataSource& metaSource) final { @@ -709,18 +770,18 @@ private: TzDeserialiser_.LoadMetadata(metaSource); } - std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final { + std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) const final { std::vector<std::shared_ptr<arrow::ArrayData>> childData; childData.emplace_back(DateDeserialiser_.MakeDefaultValue(blockLen, offset)); childData.emplace_back(TzDeserialiser_.MakeDefaultValue(blockLen, offset)); - return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset); + return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, GetOffset(offset)); } - std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { + std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) final { std::vector<std::shared_ptr<arrow::ArrayData>> childData; childData.emplace_back(DateDeserialiser_.LoadArray(src, blockLen, offset)); childData.emplace_back(TzDeserialiser_.LoadArray(src, blockLen, offset)); - return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset); + return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, GetOffset(offset)); } void DoResetMetadata() final { @@ -801,46 +862,49 @@ struct TDeserializerTraits { constexpr static bool PassType = false; - static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { + static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder, const TBlockSerializerParams& params) { Y_UNUSED(pgBuilder); if (desc.PassByValue) { - return std::make_unique<TFixedSize<ui64, true>>(); + return std::make_unique<TFixedSize<ui64, true>>(params); } - return std::make_unique<TStrings<arrow::BinaryType, true>>(); + return std::make_unique<TStrings<arrow::BinaryType, true>>(params); } - static std::unique_ptr<TResult> MakeResource(bool isOptional) { - Y_UNUSED(isOptional); + static std::unique_ptr<TResult> MakeResource(bool isOptional, const TBlockSerializerParams& params) { + Y_UNUSED(isOptional, params); ythrow yexception() << "Deserializer not implemented for block resources"; } - static std::unique_ptr<TResult> MakeSingular() { - return std::make_unique<TSingularType>(); + static std::unique_ptr<TResult> MakeSingular(const TBlockSerializerParams& params) { + return std::make_unique<TSingularType>(params); } template<typename TTzDateType> - static std::unique_ptr<TResult> MakeTzDate(bool isOptional) { + static std::unique_ptr<TResult> MakeTzDate(bool isOptional, const TBlockSerializerParams& params) { if (isOptional) { - return std::make_unique<TTzDate<TTzDateType, true>>(); + return std::make_unique<TTzDate<TTzDateType, true>>(params); } else { - return std::make_unique<TTzDate<TTzDateType, false>>(); + return std::make_unique<TTzDate<TTzDateType, false>>(params); } } }; } // namespace - -std::unique_ptr<IBlockSerializer> MakeBlockSerializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type, const TBlockSerializerParams& params) { +std::unique_ptr<IBlockSerializer> MakeBlockSerializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, + const NYql::NUdf::TType* type, + const TBlockSerializerParams& params) { return NYql::NUdf::DispatchByArrowTraits<TSerializerTraits>(typeInfoHelper, type, nullptr, params); } -std::unique_ptr<IBlockDeserializer> MakeBlockDeserializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type) { - std::unique_ptr<TBlockDeserializerBase> result = NYql::NUdf::DispatchByArrowTraits<TDeserializerTraits>(typeInfoHelper, type, nullptr); +std::unique_ptr<IBlockDeserializer> MakeBlockDeserializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, + const NYql::NUdf::TType* type, + const TBlockSerializerParams& params) { + std::unique_ptr<TBlockDeserializerBase> result = NYql::NUdf::DispatchByArrowTraits<TDeserializerTraits>(typeInfoHelper, type, nullptr, params); result->SetArrowType(NYql::NUdf::GetArrowType(typeInfoHelper, type)); return std::move(result); } - } // namespace NKikimr::NMiniKQL + diff --git a/yql/essentials/minikql/computation/mkql_block_transport.h b/yql/essentials/minikql/computation/mkql_block_transport.h index b116dd6aa6b..0b07017b0b9 100644 --- a/yql/essentials/minikql/computation/mkql_block_transport.h +++ b/yql/essentials/minikql/computation/mkql_block_transport.h @@ -11,9 +11,31 @@ namespace NKikimr::NMiniKQL { -struct TBlockSerializerParams { - arrow::MemoryPool* Pool; - TMaybe<ui8> MinFillPercentage; +class TBlockSerializerParams { +public: + TBlockSerializerParams(arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage, bool shouldSerializeOffset) + : Pool_(pool) + , MinFillPercentage_(minFillPercentage) + , ShouldSerializeOffset_(shouldSerializeOffset) + { + } + + arrow::MemoryPool* Pool() const { + return Pool_; + } + + TMaybe<ui8> MinFillPercentage() const { + return MinFillPercentage_; + } + + bool ShouldSerializeOffset() const { + return ShouldSerializeOffset_; + } + +private: + arrow::MemoryPool* Pool_; + TMaybe<ui8> MinFillPercentage_; + bool ShouldSerializeOffset_; }; class IBlockSerializer { @@ -33,11 +55,11 @@ public: using TMetadataSource = std::function<ui64()>; virtual void LoadMetadata(const TMetadataSource& metaSource) = 0; - virtual std::shared_ptr<arrow::ArrayData> LoadArray(NYql::TChunkedBuffer& src, ui64 blockLen, ui64 offset) = 0; -}; + virtual std::shared_ptr<arrow::ArrayData> LoadArray(NYql::TChunkedBuffer& src, ui64 blockLen, TMaybe<size_t> offset) = 0; +}; std::unique_ptr<IBlockSerializer> MakeBlockSerializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type, const TBlockSerializerParams& params); -std::unique_ptr<IBlockDeserializer> MakeBlockDeserializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type); +std::unique_ptr<IBlockDeserializer> MakeBlockDeserializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type, const TBlockSerializerParams& params); } diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp index 95e2148e728..d2527662575 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp @@ -75,6 +75,43 @@ T UnpackData(TChunkedInputBuffer& buf) { return res; } +class TBlockTransportFlags { +public: + enum class EFlag: ui64 { + ScalarsArePresentFlag = 1 << 0, + StoreOffsetsForEachChildData = 1 << 1, + }; + + TBlockTransportFlags(ui64 data) + : Data_(data) + { + } + + TBlockTransportFlags() + : Data_(0) + { + } + + bool HasFlag(EFlag flag) const { + return (Data_ & ToIntegral(flag)) != 0; + } + + TBlockTransportFlags* AddFlag(EFlag flag) { + Data_ = Data_ | ToIntegral(flag); + return this; + } + + ui64 Data() const { + return Data_; + } + +private: + static constexpr ui64 ToIntegral(EFlag flag) { + return static_cast<ui64>(flag); + } + ui64 Data_ = 0; +}; + NUdf::TUnboxedValuePod UnpackString(TChunkedInputBuffer& buf, ui32 size) { auto res = MakeStringNotFilled(size, 0); NYql::NUdf::TMutableStringRef ref = res.AsStringRef(); @@ -984,6 +1021,17 @@ bool IsMultiBlock(const TType* type, ui32& blockLengthIndex, TVector<const TBloc return true; } +bool IsOffsetExplicitStored(EValuePackerVersion valuePackerVersion) { + return valuePackerVersion >= EValuePackerVersion::V1; +}; + +size_t GetTopLevelOffsetsCount(EValuePackerVersion valuePackerVersion, ui32 width) { + if (!IsOffsetExplicitStored(valuePackerVersion)) { + return width - 1; + } + return 0; +}; + } // namespace template<bool Fast> @@ -1066,29 +1114,55 @@ TStringBuf TValuePackerGeneric<Fast>::Pack(const NUdf::TUnboxedValuePod& value) // Transport packer -template<bool Fast> -TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage) +template <bool Fast> +TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, EValuePackerVersion valuePackerVersion, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage) : Type_(type) , BufferPageAllocSize_(bufferPageAllocSize ? *bufferPageAllocSize : TBufferPage::DefaultPageAllocSize) , State_(ScanTypeProperties(Type_, false)) , IncrementalState_(ScanTypeProperties(Type_, true)) , ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool()) + , ValuePackerVersion_(valuePackerVersion) { MKQL_ENSURE(!stable, "Stable packing is not supported"); InitBlocks(minFillPercentage); } -template<bool Fast> -TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage) +template <bool Fast> +TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, + TMaybe<size_t> bufferPageAllocSize, + arrow::MemoryPool* pool, + TMaybe<ui8> minFillPercentage) + : TValuePackerTransport(type, EValuePackerVersion::V0, bufferPageAllocSize, pool, minFillPercentage) +{ +} + +template <bool Fast> +TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, EValuePackerVersion valuePackerVersion, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage) : Type_(type) , BufferPageAllocSize_(bufferPageAllocSize ? *bufferPageAllocSize : TBufferPage::DefaultPageAllocSize) , State_(ScanTypeProperties(Type_, false)) , IncrementalState_(ScanTypeProperties(Type_, true)) , ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool()) + , ValuePackerVersion_(valuePackerVersion) { InitBlocks(minFillPercentage); } +template <bool Fast> +TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, + const TType* type, + TMaybe<size_t> bufferPageAllocSize, + arrow::MemoryPool* ppol, + TMaybe<ui8> minFillPercentage) + : TValuePackerTransport(stable, + type, + EValuePackerVersion::V0, + bufferPageAllocSize, + ppol, + minFillPercentage) +{ +} + template<bool Fast> void TValuePackerTransport<Fast>::InitBlocks(TMaybe<ui8> minFillPercentage) { TVector<const TBlockType*> items; @@ -1098,10 +1172,10 @@ void TValuePackerTransport<Fast>::InitBlocks(TMaybe<ui8> minFillPercentage) { return; } - const TBlockSerializerParams serializerParams = { - .Pool = &ArrowPool_, - .MinFillPercentage = minFillPercentage - }; + const TBlockSerializerParams serializerParams( + &ArrowPool_, + minFillPercentage, + IsOffsetExplicitStored(ValuePackerVersion_)); IsBlock_ = true; ConvertedScalars_.resize(items.size()); @@ -1112,7 +1186,7 @@ void TValuePackerTransport<Fast>::InitBlocks(TMaybe<ui8> minFillPercentage) { if (i != BlockLenIndex_) { const TBlockType* itemType = items[i]; BlockSerializers_[i] = MakeBlockSerializer(TTypeInfoHelper(), itemType->GetItemType(), serializerParams); - BlockDeserializers_[i] = MakeBlockDeserializer(TTypeInfoHelper(), itemType->GetItemType()); + BlockDeserializers_[i] = MakeBlockDeserializer(TTypeInfoHelper(), itemType->GetItemType(), serializerParams); if (itemType->GetShape() == TBlockType::EShape::Scalar) { BlockReaders_[i] = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), itemType->GetItemType()); } @@ -1217,6 +1291,13 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons auto metadataBuffer = std::make_shared<TBuffer>(); + + TBlockTransportFlags flags; + flags.AddFlag(TBlockTransportFlags::EFlag::ScalarsArePresentFlag); + if (IsOffsetExplicitStored(ValuePackerVersion_)) { + flags.AddFlag(TBlockTransportFlags::EFlag::StoreOffsetsForEachChildData); + } + ui32 totalMetadataCount = 0; for (size_t i = 0; i < width; ++i) { if (i != BlockLenIndex_) { @@ -1227,11 +1308,11 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons // calculate approximate metadata size const size_t metadataReservedSize = - MAX_PACKED64_SIZE + // block len - MAX_PACKED64_SIZE + // feature flags - (width - 1) + // 1-byte offsets - MAX_PACKED32_SIZE + // metadata words count - MAX_PACKED64_SIZE * totalMetadataCount; // metadata words + MAX_PACKED64_SIZE + // block len + MAX_PACKED64_SIZE + // feature flags + GetTopLevelOffsetsCount(ValuePackerVersion_, width) + // 1-byte offsets for V0, empty for higher versions. + MAX_PACKED32_SIZE + // metadata words count + MAX_PACKED64_SIZE * totalMetadataCount; // metadata words metadataBuffer->Reserve(len ? metadataReservedSize : MAX_PACKED64_SIZE); // save block length @@ -1243,10 +1324,7 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons return *this; } - // save feature flags - // 1 = "scalars are present" - const ui64 metadataFlags = 1 << 0; - PackData<false>(metadataFlags, *metadataBuffer); + PackData<false>(flags.Data(), *metadataBuffer); TVector<std::shared_ptr<arrow::ArrayData>> arrays(width); // save reminder of original offset for each column - it is needed to properly handle offset in bitmaps @@ -1260,7 +1338,9 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons i64 offset = datum.array()->offset; MKQL_ENSURE(offset >= 0, "Negative offset"); // all offsets should be equal - MKQL_ENSURE(HasOffset(*datum.array(), offset), "Unexpected offset in child data"); + if (!IsOffsetExplicitStored(ValuePackerVersion_)) { + MKQL_ENSURE(HasOffset(*datum.array(), offset), "Unexpected offset in child data"); + } reminder = offset % 8; arrays[i] = datum.array(); } else { @@ -1274,7 +1354,9 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons } arrays[i] = ConvertedScalars_[i]; } - PackData<false>(reminder, *metadataBuffer); + if (!IsOffsetExplicitStored(ValuePackerVersion_)) { + PackData<false>(reminder, *metadataBuffer); + } } // save count of metadata words @@ -1291,7 +1373,7 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons } } - MKQL_ENSURE(savedMetadata == totalMetadataCount, "Serialization metadata error"); + MKQL_ENSURE(savedMetadata == totalMetadataCount, "Serialization metadata error. Expected: " << totalMetadataCount << ", actual: " << savedMetadata); BlockBuffer_.Append(TStringBuf(metadataBuffer->data(), metadataBuffer->size()), metadataBuffer); // save buffers @@ -1316,17 +1398,21 @@ void TValuePackerTransport<Fast>::UnpackBatchBlocks(TChunkedBuffer&& buf, const } // unpack flags - const ui64 metadataFlags = UnpackData<false, ui64>(chunked); - MKQL_ENSURE(metadataFlags == 1, "Unsupported metadata flags"); - + const auto metadataFlags = TBlockTransportFlags(UnpackData<false, ui64>(chunked)); + MKQL_ENSURE(metadataFlags.HasFlag(TBlockTransportFlags::EFlag::ScalarsArePresentFlag), "Scalars are present flag must be set."); + if (IsOffsetExplicitStored(ValuePackerVersion_)) { + MKQL_ENSURE(metadataFlags.HasFlag(TBlockTransportFlags::EFlag::StoreOffsetsForEachChildData), "Offsets must be explicitly stored."); + } // unpack array offsets const ui32 width = BlockDeserializers_.size(); MKQL_ENSURE(width > 0, "Invalid width"); TVector<ui64> offsets(width); - for (ui32 i = 0; i < width; ++i) { - if (BlockDeserializers_[i]) { - offsets[i] = UnpackData<false, ui8>(chunked); - MKQL_ENSURE(offsets[i] < 8, "Unexpected offset value"); + if (!IsOffsetExplicitStored(ValuePackerVersion_)) { + for (ui32 i = 0; i < width; ++i) { + if (BlockDeserializers_[i]) { + offsets[i] = UnpackData<false, ui8>(chunked); + MKQL_ENSURE(offsets[i] < 8, "Unexpected offset value. Actual offset is: " << offsets[i]); + } } } @@ -1350,7 +1436,8 @@ void TValuePackerTransport<Fast>::UnpackBatchBlocks(TChunkedBuffer&& buf, const if (i != BlockLenIndex_) { MKQL_ENSURE(BlockDeserializers_[i], "Missing deserializer"); const bool isScalar = BlockReaders_[i] != nullptr; - auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, offsets[i]); + TMaybe<size_t> offset = IsOffsetExplicitStored(ValuePackerVersion_) ? Nothing() : TMaybe<size_t>(offsets[i]); + auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, offset); if (isScalar) { TBlockItem item = BlockReaders_[i]->GetItem(*array, 0); const TType* itemType = IsLegacyBlock_ ? static_cast<const TStructType*>(Type_)->GetMemberType(i) : diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack.h b/yql/essentials/minikql/computation/mkql_computation_node_pack.h index 12aac705bb9..6c200631bf2 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_pack.h +++ b/yql/essentials/minikql/computation/mkql_computation_node_pack.h @@ -70,17 +70,33 @@ private: mutable NDetails::TPackerState State_; }; +// This version specify exactly how data will be packed and unpacked. +enum class EValuePackerVersion { + V0 = 0, // Initial version. + V1 = 1, // Fixed Block type |child_data| serialization/deserialization. + // Remove the invariant of equality of offsets for all recursive children. +}; + template<bool Fast> class TValuePackerTransport { public: using TSelf = TValuePackerTransport<Fast>; + explicit TValuePackerTransport(const TType* type, EValuePackerVersion valuePackerVersion, + TMaybe<size_t> bufferPageAllocSize = Nothing(), arrow::MemoryPool* pool = nullptr, TMaybe<ui8> minFillPercentage = Nothing()); + + // Deprecated: For YDB sync only. explicit TValuePackerTransport(const TType* type, - TMaybe<size_t> bufferPageAllocSize = Nothing(), arrow::MemoryPool* pool = nullptr, TMaybe<ui8> minFillPercentage = Nothing()); + TMaybe<size_t> bufferPageAllocSize = Nothing(), arrow::MemoryPool* pool = nullptr, TMaybe<ui8> minFillPercentage = Nothing()); + // for compatibility with TValuePackerGeneric - stable packing is not supported - TValuePackerTransport(bool stable, const TType* type, + TValuePackerTransport(bool stable, const TType* type, EValuePackerVersion valuePackerVersion, TMaybe<size_t> bufferPageAllocSize = Nothing(), arrow::MemoryPool* ppol = nullptr, TMaybe<ui8> minFillPercentage = Nothing()); + // Deprecated: For YDB sync only. + TValuePackerTransport(bool stable, const TType* type, + TMaybe<size_t> bufferPageAllocSize = Nothing(), arrow::MemoryPool* ppol = nullptr, TMaybe<ui8> minFillPercentage = Nothing()); + // AddItem()/UnpackBatch() will perform incremental packing - type T is processed as list item type. Will produce List<T> layout TSelf& AddItem(const NUdf::TUnboxedValuePod& value); TSelf& AddWideItem(const NUdf::TUnboxedValuePod* values, ui32 count); @@ -125,7 +141,7 @@ private: bool IsBlock_ = false; bool IsLegacyBlock_ = false; ui32 BlockLenIndex_ = 0; - + EValuePackerVersion ValuePackerVersion_; TVector<std::unique_ptr<IBlockSerializer>> BlockSerializers_; TVector<std::unique_ptr<IBlockReader>> BlockReaders_; TVector<std::shared_ptr<arrow::ArrayData>> ConvertedScalars_; diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp index b13acb40a62..60b23aebcfe 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp @@ -57,6 +57,16 @@ using NDetails::TChunkedInputBuffer; template<bool Fast, bool Transport> class TMiniKQLComputationNodePackTest: public TTestBase { using TValuePackerType = typename TPackerTraits<Fast, Transport>::TPackerType; + + TValuePackerType MakeValuePacker(bool stable, const TType* type, EValuePackerVersion valuePackerVersion, TMaybe<size_t> bufferPageAllocSize = Nothing(), arrow::MemoryPool* pool = nullptr, TMaybe<ui8> minFillPercentage = Nothing()) { + if constexpr (Transport) { + return TValuePackerType(stable, type, valuePackerVersion, bufferPageAllocSize, pool, minFillPercentage); + } else { + Y_UNUSED(valuePackerVersion); + return TValuePackerType(stable, type); + } + } + protected: TMiniKQLComputationNodePackTest() : FunctionRegistry_(CreateFunctionRegistry(CreateBuiltinRegistry())) @@ -343,7 +353,7 @@ protected: void TestPackPerformance(TType* type, const NUdf::TUnboxedValuePod& uValue) { - TValuePackerType packer(false, type); + auto packer = MakeValuePacker(false, type, EValuePackerVersion::V0); const THPTimer timer; for (size_t i = 0U; i < PERFORMANCE_COUNT; ++i) packer.Pack(uValue); @@ -372,7 +382,7 @@ protected: NUdf::TUnboxedValue TestPackUnpack(TType* type, const NUdf::TUnboxedValuePod& uValue, const TString& additionalMsg, const std::optional<ui32>& expectedLength = {}) { - TValuePackerType packer(false, type); + auto packer = MakeValuePacker(false, type, EValuePackerVersion::V0); return TestPackUnpack(packer, uValue, additionalMsg, expectedLength); } @@ -386,7 +396,7 @@ protected: template <typename T> void TestNumericType(NUdf::TDataTypeId schemeType) { TString typeDesc = TStringBuilder() << ", Type:" << NUdf::GetDataTypeInfo(NUdf::GetDataSlot(schemeType)).Name; - TValuePackerType packer(false, PgmBuilder_.NewDataType(schemeType)); + auto packer = MakeValuePacker(false, PgmBuilder_.NewDataType(schemeType), EValuePackerVersion::V0); TestNumericValue<T>(Max<T>(), packer, typeDesc); TestNumericValue<T>(Min<T>(), packer, typeDesc); @@ -411,7 +421,7 @@ protected: template <typename T> void TestOptionalNumericType(NUdf::TDataTypeId schemeType) { TString typeDesc = TStringBuilder() << ", Type:Optional(" << NUdf::GetDataTypeInfo(NUdf::GetDataSlot(schemeType)).Name; - TValuePackerType packer(false, PgmBuilder_.NewOptionalType(PgmBuilder_.NewDataType(schemeType))); + auto packer = MakeValuePacker(false, PgmBuilder_.NewOptionalType(PgmBuilder_.NewDataType(schemeType)), EValuePackerVersion::V0); TestOptionalNumericValue<T>(std::optional<T>(Max<T>()), packer, typeDesc); TestOptionalNumericValue<T>(std::optional<T>(Min<T>()), packer, typeDesc); TestOptionalNumericValue<T>(std::optional<T>(), packer, typeDesc, 1); @@ -428,7 +438,7 @@ protected: void TestStringType(NUdf::TDataTypeId schemeType) { TString typeDesc = TStringBuilder() << ", Type:" << NUdf::GetDataTypeInfo(NUdf::GetDataSlot(schemeType)).Name; - TValuePackerType packer(false, PgmBuilder_.NewDataType(schemeType)); + auto packer = MakeValuePacker(false, PgmBuilder_.NewDataType(schemeType), EValuePackerVersion::V0); TestStringValue("0123456789012345678901234567890123456789", packer, typeDesc, 40 + 4); TestStringValue("[]", packer, typeDesc, Fast ? (2 + 4) : (2 + 1)); TestStringValue("1234567", packer, typeDesc, Fast ? (7 + 4) : (7 + 1)); @@ -442,7 +452,7 @@ protected: void TestUuidType() { auto schemeType = NUdf::TDataType<NUdf::TUuid>::Id; TString typeDesc = TStringBuilder() << ", Type:" << NUdf::GetDataTypeInfo(NUdf::GetDataSlot(schemeType)).Name; - TValuePackerType packer(false, PgmBuilder_.NewDataType(schemeType)); + auto packer = MakeValuePacker(false, PgmBuilder_.NewDataType(schemeType), EValuePackerVersion::V0); TestStringValue("0123456789abcdef", packer, typeDesc, Fast ? 16 : (16 + 4)); } @@ -459,7 +469,7 @@ protected: void TestOptionalStringType(NUdf::TDataTypeId schemeType) { TString typeDesc = TStringBuilder() << ", Type:Optional(" << NUdf::GetDataTypeInfo(NUdf::GetDataSlot(schemeType)).Name; - TValuePackerType packer(false, PgmBuilder_.NewOptionalType(PgmBuilder_.NewDataType(schemeType))); + auto packer = MakeValuePacker(/*stable=*/false, PgmBuilder_.NewOptionalType(PgmBuilder_.NewDataType(schemeType)), EValuePackerVersion::V0); TestOptionalStringValue("0123456789012345678901234567890123456789", packer, typeDesc, Fast ? (40 + 4 + 1) : (40 + 4)); TestOptionalStringValue(std::nullopt, packer, typeDesc, 1); TestOptionalStringValue("[]", packer, typeDesc, Fast ? (2 + 4 + 1) : (2 + 1)); @@ -591,7 +601,7 @@ protected: TType* tupleType; const auto value = MakeTupleValue(tupleType); - TValuePackerType packer(false, tupleType); + auto packer = MakeValuePacker(false, tupleType, EValuePackerVersion::V0); auto buffer = packer.Pack(value); @@ -625,8 +635,8 @@ protected: if constexpr (Transport) { auto itemType = PgmBuilder_.NewDataType(NUdf::TDataType<char *>::Id); auto listType = PgmBuilder_.NewListType(itemType); - TValuePackerType packer(false, itemType); - TValuePackerType listPacker(false, listType); + auto packer = MakeValuePacker(false, itemType, EValuePackerVersion::V0); + auto listPacker = MakeValuePacker(false, listType, EValuePackerVersion::V0); TStringBuf str = "01234567890ABCDEFG"; @@ -663,12 +673,15 @@ protected: bool LegacyStruct = false; bool TrimBlock = false; TMaybe<ui8> MinFillPercentage; + EValuePackerVersion PackerVersion; TString ToString() const { auto result = TStringBuilder() << "Offset: " << Offset << ", Len: " << Len << ", LegacyStruct: " << LegacyStruct << ", TrimBlock: " << TrimBlock; if (MinFillPercentage) { result << ", MinFillPercentage: " << ui64(*MinFillPercentage); } + result << ", PackerVersion: " << static_cast<int>(PackerVersion); + return result; } }; @@ -709,202 +722,228 @@ protected: bool legacyStruct = args.LegacyStruct; ui64 offset = args.Offset; ui64 len = args.Len; - if constexpr (Transport) { - auto strType = PgmBuilder_.NewDataType(NUdf::TDataType<char*>::Id); - auto ui32Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui32>::Id); - auto ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id); - auto optStrType = PgmBuilder_.NewOptionalType(strType); - auto optUi32Type = PgmBuilder_.NewOptionalType(ui32Type); - - auto tupleOptUi32StrType = PgmBuilder_.NewTupleType({ optUi32Type, strType }); - auto optTupleOptUi32StrType = PgmBuilder_.NewOptionalType(tupleOptUi32StrType); - - auto blockUi32Type = PgmBuilder_.NewBlockType(ui32Type, TBlockType::EShape::Many); - auto blockOptStrType = PgmBuilder_.NewBlockType(optStrType, TBlockType::EShape::Many); - auto scalarOptStrType = PgmBuilder_.NewBlockType(optStrType, TBlockType::EShape::Scalar); - auto blockOptTupleOptUi32StrType = PgmBuilder_.NewBlockType(optTupleOptUi32StrType, TBlockType::EShape::Many); - auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar); - - auto tzDateType = PgmBuilder_.NewDataType(NUdf::EDataSlot::TzDate); - auto blockTzDateType = PgmBuilder_.NewBlockType(tzDateType, TBlockType::EShape::Many); - auto nullType = PgmBuilder_.NewNullType(); - auto blockNullType = PgmBuilder_.NewBlockType(nullType, TBlockType::EShape::Many); - - auto rowType = - legacyStruct - ? PgmBuilder_.NewStructType({ - {"A", blockUi32Type}, - {"B", blockOptStrType}, - {"_yql_block_length", scalarUi64Type}, - {"a", scalarOptStrType}, - {"b", blockOptTupleOptUi32StrType}, - {"c", blockTzDateType}, - {"nill", blockNullType}, - }) - : PgmBuilder_.NewMultiType( - {blockUi32Type, blockOptStrType, scalarOptStrType, - blockOptTupleOptUi32StrType, blockTzDateType, blockNullType, scalarUi64Type}); - - ui64 blockLen = 1000; - UNIT_ASSERT_LE(offset + len, blockLen); - - auto builder1 = MakeArrayBuilder(TTypeInfoHelper(), ui32Type, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(ui32Type)), nullptr); - auto builder2 = MakeArrayBuilder(TTypeInfoHelper(), optStrType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optStrType)), nullptr); - auto builder3 = MakeArrayBuilder(TTypeInfoHelper(), optTupleOptUi32StrType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optTupleOptUi32StrType)), nullptr); - auto builder4 = MakeArrayBuilder(TTypeInfoHelper(), tzDateType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(tzDateType)), nullptr); - auto builder5 = MakeArrayBuilder(TTypeInfoHelper(), nullType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(nullType)), nullptr); - - for (ui32 i = 0; i < blockLen; ++i) { - TBlockItem b1(i); - builder1->Add(b1); - - TString a = "a string " + ToString(i); - TBlockItem b2 = (i % 2) ? TBlockItem(a) : TBlockItem(); - builder2->Add(b2); - - TBlockItem b3items[] = { (i % 2) ? TBlockItem(i) : TBlockItem(), TBlockItem(a) }; - TBlockItem b3 = (i % 7) ? TBlockItem(b3items) : TBlockItem(); - builder3->Add(b3); - - TBlockItem tzDate {i}; - tzDate.SetTimezoneId(i % 100); - builder4->Add(tzDate); - builder5->Add(TBlockItem::Zero()); + auto strType = PgmBuilder_.NewDataType(NUdf::TDataType<char*>::Id); + auto ui32Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui32>::Id); + auto ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id); + auto optStrType = PgmBuilder_.NewOptionalType(strType); + auto optUi32Type = PgmBuilder_.NewOptionalType(ui32Type); + auto optOptUi32Type = PgmBuilder_.NewOptionalType(optUi32Type); + + auto tupleOptUi32StrType = PgmBuilder_.NewTupleType({optUi32Type, strType}); + auto optTupleOptUi32StrType = PgmBuilder_.NewOptionalType(tupleOptUi32StrType); + + auto blockUi32Type = PgmBuilder_.NewBlockType(ui32Type, TBlockType::EShape::Many); + auto blockOptStrType = PgmBuilder_.NewBlockType(optStrType, TBlockType::EShape::Many); + auto scalarOptStrType = PgmBuilder_.NewBlockType(optStrType, TBlockType::EShape::Scalar); + auto blockOptTupleOptUi32StrType = PgmBuilder_.NewBlockType(optTupleOptUi32StrType, TBlockType::EShape::Many); + auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar); + auto blockOptOptUi32Type = PgmBuilder_.NewBlockType(optOptUi32Type, TBlockType::EShape::Many); + auto tzDateType = PgmBuilder_.NewDataType(NUdf::EDataSlot::TzDate); + auto blockTzDateType = PgmBuilder_.NewBlockType(tzDateType, TBlockType::EShape::Many); + auto nullType = PgmBuilder_.NewNullType(); + auto blockNullType = PgmBuilder_.NewBlockType(nullType, TBlockType::EShape::Many); + + auto rowType = + legacyStruct + ? PgmBuilder_.NewStructType({{"A", blockUi32Type}, + {"B", blockOptStrType}, + {"_yql_block_length", scalarUi64Type}, + {"a", scalarOptStrType}, + {"b", blockOptTupleOptUi32StrType}, + {"c", blockTzDateType}, + {"nill", blockNullType}, + {"optOptInt32", blockOptOptUi32Type}}) + : PgmBuilder_.NewMultiType( + {blockUi32Type, blockOptStrType, scalarOptStrType, + blockOptTupleOptUi32StrType, blockTzDateType, blockNullType, blockOptOptUi32Type, scalarUi64Type}); + + ui64 blockLen = 1000; + UNIT_ASSERT_LE(offset + len, blockLen); + + auto builder1 = MakeArrayBuilder(TTypeInfoHelper(), ui32Type, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(ui32Type)), nullptr); + auto builder2 = MakeArrayBuilder(TTypeInfoHelper(), optStrType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optStrType)), nullptr); + auto builder3 = MakeArrayBuilder(TTypeInfoHelper(), optTupleOptUi32StrType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optTupleOptUi32StrType)), nullptr); + auto builder4 = MakeArrayBuilder(TTypeInfoHelper(), tzDateType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(tzDateType)), nullptr); + auto builder5 = MakeArrayBuilder(TTypeInfoHelper(), nullType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(nullType)), nullptr); + auto builder6 = MakeArrayBuilder(TTypeInfoHelper(), optOptUi32Type, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optOptUi32Type)), nullptr); + + for (ui32 i = 0; i < blockLen; ++i) { + TBlockItem b1(i); + builder1->Add(b1); + + TString a = "a string " + ToString(i); + TBlockItem b2 = (i % 2) ? TBlockItem(a) : TBlockItem(); + builder2->Add(b2); + + TBlockItem b3items[] = {(i % 2) ? TBlockItem(i) : TBlockItem(), TBlockItem(a)}; + TBlockItem b3 = (i % 7) ? TBlockItem(b3items) : TBlockItem(); + builder3->Add(b3); + + TBlockItem tzDate{i}; + tzDate.SetTimezoneId(i % 100); + builder4->Add(tzDate); + builder5->Add(TBlockItem::Zero()); + switch (i % 3) { + case 0: + builder6->Add(TBlockItem(i)); + break; + case 1: + builder6->Add(TBlockItem().MakeOptional()); + break; + case 2: + builder6->Add(TBlockItem()); + break; } + } - std::string_view testScalarString = "foobar"; - auto strbuf = std::make_shared<arrow::Buffer>((const ui8*)testScalarString.data(), testScalarString.size()); - - TVector<arrow::Datum> datums; - if (legacyStruct) { - datums.emplace_back(builder1->Build(true)); - datums.emplace_back(builder2->Build(true)); - datums.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen))); - datums.emplace_back(arrow::Datum(std::make_shared<arrow::BinaryScalar>(strbuf))); - datums.emplace_back(builder3->Build(true)); - datums.emplace_back(builder4->Build(true)); - datums.emplace_back(builder5->Build(true)); - } else { - datums.emplace_back(builder1->Build(true)); - datums.emplace_back(builder2->Build(true)); - datums.emplace_back(arrow::Datum(std::make_shared<arrow::BinaryScalar>(strbuf))); - datums.emplace_back(builder3->Build(true)); - datums.emplace_back(builder4->Build(true)); - datums.emplace_back(builder5->Build(true)); - datums.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen))); - } + std::string_view testScalarString = "foobar"; + auto strbuf = std::make_shared<arrow::Buffer>((const ui8*)testScalarString.data(), testScalarString.size()); + + TVector<arrow::Datum> datums; + if (legacyStruct) { + datums.emplace_back(builder1->Build(true)); + datums.emplace_back(builder2->Build(true)); + datums.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen))); + datums.emplace_back(arrow::Datum(std::make_shared<arrow::BinaryScalar>(strbuf))); + datums.emplace_back(builder3->Build(true)); + datums.emplace_back(builder4->Build(true)); + datums.emplace_back(builder5->Build(true)); + datums.emplace_back(builder6->Build(true)); + } else { + datums.emplace_back(builder1->Build(true)); + datums.emplace_back(builder2->Build(true)); + datums.emplace_back(arrow::Datum(std::make_shared<arrow::BinaryScalar>(strbuf))); + datums.emplace_back(builder3->Build(true)); + datums.emplace_back(builder4->Build(true)); + datums.emplace_back(builder5->Build(true)); + datums.emplace_back(builder6->Build(true)); + datums.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen))); + } - const ui32 blockLenIndex = legacyStruct ? 2 : 6; - if (offset != 0 || len != blockLen) { - for (auto& datum : datums) { - if (datum.is_array()) { - datum = NYql::NUdf::DeepSlice(datum.array(), offset, len); - } + const ui32 blockLenIndex = legacyStruct ? 2 : datums.size() - 1; + if (offset != 0 || len != blockLen) { + for (auto& datum : datums) { + if (datum.is_array()) { + datum = NYql::NUdf::DeepSlice(datum.array(), offset, len); } - datums[blockLenIndex] = arrow::Datum(std::make_shared<arrow::UInt64Scalar>(len)); } + datums[blockLenIndex] = arrow::Datum(std::make_shared<arrow::UInt64Scalar>(len)); + } - const auto trimmerFactory = [&](ui32 index) { - const TType* columnType = legacyStruct ? static_cast<const TStructType*>(rowType)->GetMemberType(index) - : static_cast<const TMultiType*>(rowType)->GetElementType(index); - return MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), static_cast<const TBlockType*>(columnType)->GetItemType(), ArrowPool_); - }; - if (args.TrimBlock) { - for (ui32 index = 0; index < datums.size(); ++index) { - auto& datum = datums[index]; - if (!datum.is_array()) { - continue; - } - datum = trimmerFactory(index)->Trim(datum.array()); + const auto trimmerFactory = [&](ui32 index) { + const TType* columnType = legacyStruct ? static_cast<const TStructType*>(rowType)->GetMemberType(index) + : static_cast<const TMultiType*>(rowType)->GetElementType(index); + return MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), static_cast<const TBlockType*>(columnType)->GetItemType(), ArrowPool_); + }; + if (args.TrimBlock) { + for (ui32 index = 0; index < datums.size(); ++index) { + auto& datum = datums[index]; + if (!datum.is_array()) { + continue; } + datum = trimmerFactory(index)->Trim(datum.array()); } - TUnboxedValueVector columns; - for (auto& datum : datums) { - columns.emplace_back(HolderFactory_.CreateArrowBlock(std::move(datum))); - } + } + TUnboxedValueVector columns; + for (auto& datum : datums) { + columns.emplace_back(HolderFactory_.CreateArrowBlock(std::move(datum))); + } - TValuePackerType packer(false, rowType, {}, ArrowPool_, args.MinFillPercentage); - if (legacyStruct) { - TUnboxedValueVector columnsCopy = columns; - NUdf::TUnboxedValue row = HolderFactory_.VectorAsArray(columnsCopy); - packer.AddItem(row); - } else { - packer.AddWideItem(columns.data(), columns.size()); - } - TChunkedBuffer packed = packer.Finish(); + auto senderPacker = MakeValuePacker(false, rowType, args.PackerVersion, {}, ArrowPool_, args.MinFillPercentage); + if (legacyStruct) { + TUnboxedValueVector columnsCopy = columns; + NUdf::TUnboxedValue row = HolderFactory_.VectorAsArray(columnsCopy); + senderPacker.AddItem(row); + } else { + senderPacker.AddWideItem(columns.data(), columns.size()); + } + TChunkedBuffer packed = senderPacker.Finish(); - TUnboxedValueBatch unpacked(rowType); - packer.UnpackBatch(std::move(packed), HolderFactory_, unpacked); + TUnboxedValueBatch unpacked(rowType); + auto receiverPacker = MakeValuePacker(false, rowType, args.PackerVersion, {}, ArrowPool_, args.MinFillPercentage); + receiverPacker.UnpackBatch(std::move(packed), HolderFactory_, unpacked); - UNIT_ASSERT_VALUES_EQUAL(unpacked.RowCount(), 1); + UNIT_ASSERT_VALUES_EQUAL(unpacked.RowCount(), 1); - TUnboxedValueVector unpackedColumns; - if (legacyStruct) { - auto elements = unpacked.Head()->GetElements(); - unpackedColumns.insert(unpackedColumns.end(), elements, elements + columns.size()); - } else { - unpacked.ForEachRowWide([&](const NYql::NUdf::TUnboxedValue* values, ui32 count) { - unpackedColumns.insert(unpackedColumns.end(), values, values + count); - }); - } + TUnboxedValueVector unpackedColumns; + if (legacyStruct) { + auto elements = unpacked.Head()->GetElements(); + unpackedColumns.insert(unpackedColumns.end(), elements, elements + columns.size()); + } else { + unpacked.ForEachRowWide([&](const NYql::NUdf::TUnboxedValue* values, ui32 count) { + unpackedColumns.insert(unpackedColumns.end(), values, values + count); + }); + } - UNIT_ASSERT_VALUES_EQUAL(unpackedColumns.size(), columns.size()); - if (legacyStruct) { - UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns[2]).GetDatum().scalar_as<arrow::UInt64Scalar>().value, len); - UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns[3]).GetDatum().scalar_as<arrow::BinaryScalar>().value->ToString(), testScalarString); - } else { - UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns.back()).GetDatum().scalar_as<arrow::UInt64Scalar>().value, len); - UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns[2]).GetDatum().scalar_as<arrow::BinaryScalar>().value->ToString(), testScalarString); - } + UNIT_ASSERT_VALUES_EQUAL(unpackedColumns.size(), columns.size()); + if (legacyStruct) { + UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns[2]).GetDatum().scalar_as<arrow::UInt64Scalar>().value, len); + UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns[3]).GetDatum().scalar_as<arrow::BinaryScalar>().value->ToString(), testScalarString); + } else { + UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns.back()).GetDatum().scalar_as<arrow::UInt64Scalar>().value, len); + UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns[2]).GetDatum().scalar_as<arrow::BinaryScalar>().value->ToString(), testScalarString); + } - if (args.MinFillPercentage) { - for (size_t i = 0; i < unpackedColumns.size(); ++i) { - auto datum = TArrowBlock::From(unpackedColumns[i]).GetDatum(); - if (datum.is_scalar()) { - continue; - } - const auto unpackedSize = NUdf::GetSizeOfArrayDataInBytes(*datum.array()); - const auto trimmedSize = NUdf::GetSizeOfArrayDataInBytes(*trimmerFactory(i)->Trim(datum.array())); - UNIT_ASSERT_GE_C(trimmedSize, unpackedSize * *args.MinFillPercentage / 100, "column: " << i); + if (args.MinFillPercentage) { + for (size_t i = 0; i < unpackedColumns.size(); ++i) { + auto datum = TArrowBlock::From(unpackedColumns[i]).GetDatum(); + if (datum.is_scalar()) { + continue; } + const auto unpackedSize = NUdf::GetSizeOfArrayDataInBytes(*datum.array()); + const auto trimmedSize = NUdf::GetSizeOfArrayDataInBytes(*trimmerFactory(i)->Trim(datum.array())); + UNIT_ASSERT_GE_C(trimmedSize, unpackedSize * *args.MinFillPercentage / 100, "column: " << i); } + } - auto reader1 = MakeBlockReader(TTypeInfoHelper(), ui32Type); - auto reader2 = MakeBlockReader(TTypeInfoHelper(), optStrType); - auto reader3 = MakeBlockReader(TTypeInfoHelper(), optTupleOptUi32StrType); - auto reader4 = MakeBlockReader(TTypeInfoHelper(), tzDateType); - auto reader5 = MakeBlockReader(TTypeInfoHelper(), nullType); + auto reader1 = MakeBlockReader(TTypeInfoHelper(), ui32Type); + auto reader2 = MakeBlockReader(TTypeInfoHelper(), optStrType); + auto reader3 = MakeBlockReader(TTypeInfoHelper(), optTupleOptUi32StrType); + auto reader4 = MakeBlockReader(TTypeInfoHelper(), tzDateType); + auto reader5 = MakeBlockReader(TTypeInfoHelper(), nullType); + auto reader6 = MakeBlockReader(TTypeInfoHelper(), optOptUi32Type); - for (ui32 i = offset; i < len; ++i) { - TBlockItem b1 = reader1->GetItem(*TArrowBlock::From(unpackedColumns[0]).GetDatum().array(), i - offset); - UNIT_ASSERT_VALUES_EQUAL(b1.As<ui32>(), i); + for (ui32 i = offset; i < len; ++i) { + TBlockItem b1 = reader1->GetItem(*TArrowBlock::From(unpackedColumns[0]).GetDatum().array(), i - offset); + UNIT_ASSERT_VALUES_EQUAL(b1.As<ui32>(), i); - TString a = "a string " + ToString(i); - TBlockItem b2 = reader2->GetItem(*TArrowBlock::From(unpackedColumns[1]).GetDatum().array(), i - offset); - if (i % 2) { - UNIT_ASSERT_VALUES_EQUAL(std::string_view(b2.AsStringRef()), a); - } else { - UNIT_ASSERT(!b2); - } + TString a = "a string " + ToString(i); + TBlockItem b2 = reader2->GetItem(*TArrowBlock::From(unpackedColumns[1]).GetDatum().array(), i - offset); + if (i % 2) { + UNIT_ASSERT_VALUES_EQUAL(std::string_view(b2.AsStringRef()), a); + } else { + UNIT_ASSERT(!b2); + } - TBlockItem b3 = reader3->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 4 : 3]).GetDatum().array(), i - offset); - if (i % 7) { - auto elements = b3.GetElements(); - if (i % 2) { - UNIT_ASSERT_VALUES_EQUAL(elements[0].As<ui32>(), i); - } else { - UNIT_ASSERT(!elements[0]); - } - UNIT_ASSERT_VALUES_EQUAL(std::string_view(elements[1].AsStringRef()), a); + TBlockItem b3 = reader3->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 4 : 3]).GetDatum().array(), i - offset); + if (i % 7) { + auto elements = b3.GetElements(); + if (i % 2) { + UNIT_ASSERT_VALUES_EQUAL(elements[0].As<ui32>(), i); } else { - UNIT_ASSERT(!b3); + UNIT_ASSERT(!elements[0]); } + UNIT_ASSERT_VALUES_EQUAL(std::string_view(elements[1].AsStringRef()), a); + } else { + UNIT_ASSERT(!b3); + } - TBlockItem b4 = reader4->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 5 : 4]).GetDatum().array(), i - offset); - UNIT_ASSERT(b4.Get<ui16>() == i); - UNIT_ASSERT(b4.GetTimezoneId() == (i % 100)); - TBlockItem b5 = reader5->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 6 : 5]).GetDatum().array(), i - offset); - UNIT_ASSERT(b5); + TBlockItem b4 = reader4->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 5 : 4]).GetDatum().array(), i - offset); + UNIT_ASSERT(b4.Get<ui16>() == i); + UNIT_ASSERT(b4.GetTimezoneId() == (i % 100)); + TBlockItem b5 = reader5->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 6 : 5]).GetDatum().array(), i - offset); + UNIT_ASSERT(b5); + TBlockItem b6 = reader6->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 7 : 6]).GetDatum().array(), i - offset); + switch (i % 3) { + case 0: + UNIT_ASSERT_EQUAL(b6.Get<ui64>(), i); + break; + case 1: + UNIT_ASSERT(!b6.GetOptionalValue()); + break; + case 2: + UNIT_ASSERT(!b6); + break; } } } @@ -929,13 +968,425 @@ protected: TBlockTestArgs args; TestBlockPackingCases(args, { MakeIntrusive<TArgsDispatcher<TBlockTestArgs>>(args, std::vector<TBlockTestArgs>{ + {.Offset = 0, .Len = 3}, {.Offset = 0, .Len = 1000}, {.Offset = 19, .Len = 623} }), MakeIntrusive<TArgsDispatcher<bool>>(args.LegacyStruct, std::vector<bool>{false, true}), MakeIntrusive<TArgsDispatcher<bool>>(args.TrimBlock, std::vector<bool>{false, true}), - MakeIntrusive<TArgsDispatcher<TMaybe<ui8>>>(args.MinFillPercentage, std::vector<TMaybe<ui8>>{Nothing(), 90}) + MakeIntrusive<TArgsDispatcher<TMaybe<ui8>>>(args.MinFillPercentage, std::vector<TMaybe<ui8>>{Nothing(), 90}), + MakeIntrusive<TArgsDispatcher<EValuePackerVersion>>(args.PackerVersion, std::vector<EValuePackerVersion>{EValuePackerVersion::V0, EValuePackerVersion::V1}) + }); + } + + void TestBlockPackingSerializeUi32Impl(EValuePackerVersion valuePackerVersion, size_t expectedHash) { + auto* ui32Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui32>::Id); + auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id); + + auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar); + + auto blockUi32Type = PgmBuilder_.NewBlockType(ui32Type, TBlockType::EShape::Many); + auto* rowType = PgmBuilder_.NewMultiType({blockUi32Type, scalarUi64Type}); + + auto builder = MakeArrayBuilder(TTypeInfoHelper(), ui32Type, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(ui32Type)), nullptr); + builder->Add(NYql::NUdf::TBlockItem(1)); + builder->Add(NYql::NUdf::TBlockItem(2)); + builder->Add(NYql::NUdf::TBlockItem(3)); + TUnboxedValueVector columns; + columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true))); + columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3)))); + TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish(); + TStringStream stream; + serialized.CopyTo(stream); + auto actualHash = THash<TStringBuf>{}(stream.Str()); + UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash); + } + + void TestBlockPackingSerializeUi32() { + TestBlockPackingSerializeUi32Impl(EValuePackerVersion::V0, 12952134869926058643U); + TestBlockPackingSerializeUi32Impl(EValuePackerVersion::V0, 12952134869926058643U); + } + + void TestBlockPackingSerializeOptionalUi32Impl(EValuePackerVersion valuePackerVersion, size_t expectedHash) { + auto* ui32Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui32>::Id); + auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id); + auto* optUi32Type = PgmBuilder_.NewOptionalType(ui32Type); + + auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar); + auto blockOptUi32Type = PgmBuilder_.NewBlockType(optUi32Type, TBlockType::EShape::Many); + auto* rowType = PgmBuilder_.NewMultiType({blockOptUi32Type, scalarUi64Type}); + + auto builder = MakeArrayBuilder(TTypeInfoHelper(), optUi32Type, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optUi32Type)), nullptr); + builder->Add(NYql::NUdf::TBlockItem(ui32(1))); + builder->Add(NYql::NUdf::TBlockItem()); + builder->Add(NYql::NUdf::TBlockItem(ui32(3))); + TUnboxedValueVector columns; + columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true))); + columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3)))); + TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish(); + TStringStream stream; + serialized.CopyTo(stream); + auto actualHash = THash<TStringBuf>{}(stream.Str()); + UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash); + } + + void TestBlockPackingSerializeOptionalUi32() { + TestBlockPackingSerializeOptionalUi32Impl(EValuePackerVersion::V0, 2597716339394977815U); + TestBlockPackingSerializeOptionalUi32Impl(EValuePackerVersion::V1, 13473152532547735594U); + } + + void TestBlockPackingSerializeStringImpl(EValuePackerVersion valuePackerVersion, size_t expectedHash) { + auto* strType = PgmBuilder_.NewDataType(NUdf::TDataType<char*>::Id); + auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id); + + auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar); + auto blockStrType = PgmBuilder_.NewBlockType(strType, TBlockType::EShape::Many); + auto* rowType = PgmBuilder_.NewMultiType({blockStrType, scalarUi64Type}); + + auto builder = MakeArrayBuilder(TTypeInfoHelper(), strType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(strType)), nullptr); + builder->Add(NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("str1"))); + builder->Add(NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("str2"))); + builder->Add(NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("str3"))); + TUnboxedValueVector columns; + columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true))); + columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3)))); + TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish(); + TStringStream stream; + serialized.CopyTo(stream); + auto actualHash = THash<TStringBuf>{}(stream.Str()); + UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash); + } + + void TestBlockPackingSerializeString() { + TestBlockPackingSerializeStringImpl(EValuePackerVersion::V0, 2530494095874447064U); + TestBlockPackingSerializeStringImpl(EValuePackerVersion::V1, 10045743204568850498U); + } + + void TestBlockPackingSerializeOptionalStringImpl(EValuePackerVersion valuePackerVersion, size_t expectedHash) { + auto* strType = PgmBuilder_.NewDataType(NUdf::TDataType<char*>::Id); + auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id); + auto* optStrType = PgmBuilder_.NewOptionalType(strType); + + auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar); + auto blockOptStrType = PgmBuilder_.NewBlockType(optStrType, TBlockType::EShape::Many); + auto* rowType = PgmBuilder_.NewMultiType({blockOptStrType, scalarUi64Type}); + + auto builder = MakeArrayBuilder(TTypeInfoHelper(), optStrType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optStrType)), nullptr); + builder->Add(NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("str1"))); + builder->Add(NYql::NUdf::TBlockItem()); + builder->Add(NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("str3"))); + TUnboxedValueVector columns; + columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true))); + columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3)))); + TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish(); + TStringStream stream; + serialized.CopyTo(stream); + auto actualHash = THash<TStringBuf>{}(stream.Str()); + UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash); + } + + void TestBlockPackingSerializeOptionalString() { + TestBlockPackingSerializeOptionalStringImpl(EValuePackerVersion::V0, 15589836522266239526U); + TestBlockPackingSerializeOptionalStringImpl(EValuePackerVersion::V1, 7371349434798600169U); + } + + void TestBlockPackingSerializeOptionalOptionalUi32Impl(EValuePackerVersion valuePackerVersion, size_t expectedHash) { + auto* ui32Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui32>::Id); + auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id); + auto* optUi32Type = PgmBuilder_.NewOptionalType(ui32Type); + auto* optOptUi32Type = PgmBuilder_.NewOptionalType(optUi32Type); + + auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar); + auto blockOptOptUi32Type = PgmBuilder_.NewBlockType(optOptUi32Type, TBlockType::EShape::Many); + auto* rowType = PgmBuilder_.NewMultiType({blockOptOptUi32Type, scalarUi64Type}); + + auto builder = MakeArrayBuilder(TTypeInfoHelper(), optOptUi32Type, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optOptUi32Type)), nullptr); + builder->Add(NYql::NUdf::TBlockItem(ui32(1))); + builder->Add(NYql::NUdf::TBlockItem()); + builder->Add(NYql::NUdf::TBlockItem(ui32(3))); + TUnboxedValueVector columns; + columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true))); + columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3)))); + TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish(); + TStringStream stream; + serialized.CopyTo(stream); + auto actualHash = THash<TStringBuf>{}(stream.Str()); + + UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash); + } + + void TestBlockPackingSerializeOptionalOptionalUi32() { + TestBlockPackingSerializeOptionalOptionalUi32Impl(EValuePackerVersion::V0, 8466504593757236838U); + TestBlockPackingSerializeOptionalOptionalUi32Impl(EValuePackerVersion::V1, 7467770574734535989U); + } + + void TestBlockPackingSerializeSingularTypeImpl(EValuePackerVersion valuePackerVersion, size_t expectedHash) { + auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id); + auto* nullType = PgmBuilder_.NewNullType(); + + auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar); + auto blockNullType = PgmBuilder_.NewBlockType(nullType, TBlockType::EShape::Many); + auto* rowType = PgmBuilder_.NewMultiType({blockNullType, scalarUi64Type}); + + auto builder = MakeArrayBuilder(TTypeInfoHelper(), nullType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(nullType)), nullptr); + builder->Add(NYql::NUdf::TBlockItem::Zero()); + builder->Add(NYql::NUdf::TBlockItem::Zero()); + builder->Add(NYql::NUdf::TBlockItem::Zero()); + TUnboxedValueVector columns; + columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true))); + columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3)))); + TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish(); + TStringStream stream; + serialized.CopyTo(stream); + auto actualHash = THash<TStringBuf>{}(stream.Str()); + + UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash); + } + + void TestBlockPackingSerializeSingularType() { + TestBlockPackingSerializeSingularTypeImpl(EValuePackerVersion::V0, 2342694087331559075U); + TestBlockPackingSerializeSingularTypeImpl(EValuePackerVersion::V1, 814315123753111618U); + } + + void TestBlockPackingSerializeOptionalSingularTypeImpl(EValuePackerVersion valuePackerVersion, size_t expectedHash) { + auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id); + auto* nullType = PgmBuilder_.NewNullType(); + auto* optNullType = PgmBuilder_.NewOptionalType(nullType); + + auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar); + auto blockOptNullType = PgmBuilder_.NewBlockType(optNullType, TBlockType::EShape::Many); + auto* rowType = PgmBuilder_.NewMultiType({blockOptNullType, scalarUi64Type}); + + auto builder = MakeArrayBuilder(TTypeInfoHelper(), optNullType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optNullType)), nullptr); + builder->Add(NYql::NUdf::TBlockItem::Zero()); + builder->Add(NYql::NUdf::TBlockItem()); + builder->Add(NYql::NUdf::TBlockItem::Zero()); + TUnboxedValueVector columns; + columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true))); + columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3)))); + TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish(); + TStringStream stream; + serialized.CopyTo(stream); + auto actualHash = THash<TStringBuf>{}(stream.Str()); + + UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash); + } + + void TestBlockPackingSerializeOptionalSingularType() { + TestBlockPackingSerializeOptionalSingularTypeImpl(EValuePackerVersion::V0, 7314801330449120945U); + TestBlockPackingSerializeOptionalSingularTypeImpl(EValuePackerVersion::V1, 13506722731499795140U); + } + + void TestBlockPackingSerializeTTzDateImpl(EValuePackerVersion valuePackerVersion, size_t expectedHash) { + auto* tzDateType = PgmBuilder_.NewDataType(NUdf::EDataSlot::TzDate); + auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id); + + auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar); + auto blockTzDateType = PgmBuilder_.NewBlockType(tzDateType, TBlockType::EShape::Many); + auto* rowType = PgmBuilder_.NewMultiType({blockTzDateType, scalarUi64Type}); + + auto builder = MakeArrayBuilder(TTypeInfoHelper(), tzDateType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(tzDateType)), nullptr); + NYql::NUdf::TBlockItem tzDate1{ui16(1)}; + tzDate1.SetTimezoneId(100); + builder->Add(tzDate1); + NYql::NUdf::TBlockItem tzDate2{ui16(2)}; + tzDate2.SetTimezoneId(200); + builder->Add(tzDate2); + NYql::NUdf::TBlockItem tzDate3{ui16(3)}; + tzDate3.SetTimezoneId(300); + builder->Add(tzDate3); + TUnboxedValueVector columns; + columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true))); + columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3)))); + TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish(); + TStringStream stream; + serialized.CopyTo(stream); + auto actualHash = THash<TStringBuf>{}(stream.Str()); + + UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash); + } + + void TestBlockPackingSerializeTTzDate() { + TestBlockPackingSerializeTTzDateImpl(EValuePackerVersion::V0, 17903070954188109601U); + TestBlockPackingSerializeTTzDateImpl(EValuePackerVersion::V1, 2802122333798714691); + } + + void TestBlockPackingSerializeOptionalTTzDateImpl(EValuePackerVersion valuePackerVersion, size_t expectedHash) { + auto* tzDateType = PgmBuilder_.NewDataType(NUdf::EDataSlot::TzDate); + auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id); + auto* optTzDateType = PgmBuilder_.NewOptionalType(tzDateType); + + auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar); + auto blockOptTzDateType = PgmBuilder_.NewBlockType(optTzDateType, TBlockType::EShape::Many); + auto* rowType = PgmBuilder_.NewMultiType({blockOptTzDateType, scalarUi64Type}); + + auto builder = MakeArrayBuilder(TTypeInfoHelper(), optTzDateType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optTzDateType)), nullptr); + NYql::NUdf::TBlockItem tzDate1{ui16(1)}; + tzDate1.SetTimezoneId(100); + builder->Add(tzDate1); + builder->Add(NYql::NUdf::TBlockItem()); + NYql::NUdf::TBlockItem tzDate3{ui16(3)}; + tzDate3.SetTimezoneId(300); + builder->Add(tzDate3); + TUnboxedValueVector columns; + columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true))); + columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3)))); + TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish(); + TStringStream stream; + serialized.CopyTo(stream); + auto actualHash = THash<TStringBuf>{}(stream.Str()); + + UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash); + } + + void TestBlockPackingSerializeOptionalTTzDate() { + TestBlockPackingSerializeOptionalTTzDateImpl(EValuePackerVersion::V0, 18400739887390114022U); + TestBlockPackingSerializeOptionalTTzDateImpl(EValuePackerVersion::V1, 11315052524575174297U); + } + + void TestBlockPackingSerializeTupleInt32StringImpl(EValuePackerVersion valuePackerVersion, size_t expectedHash) { + auto* i32Type = PgmBuilder_.NewDataType(NUdf::TDataType<i32>::Id); + auto* strType = PgmBuilder_.NewDataType(NUdf::TDataType<char*>::Id); + auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id); + auto* tupleType = PgmBuilder_.NewTupleType({i32Type, strType}); + + auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar); + auto blockTupleType = PgmBuilder_.NewBlockType(tupleType, TBlockType::EShape::Many); + auto* rowType = PgmBuilder_.NewMultiType({blockTupleType, scalarUi64Type}); + + auto builder = MakeArrayBuilder(TTypeInfoHelper(), tupleType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(tupleType)), nullptr); + NYql::NUdf::TBlockItem tuple1Items[] = {NYql::NUdf::TBlockItem(i32(1)), NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("a"))}; + builder->Add(NYql::NUdf::TBlockItem(tuple1Items)); + NYql::NUdf::TBlockItem tuple2Items[] = {NYql::NUdf::TBlockItem(i32(2)), NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("b"))}; + builder->Add(NYql::NUdf::TBlockItem(tuple2Items)); + NYql::NUdf::TBlockItem tuple3Items[] = {NYql::NUdf::TBlockItem(i32(3)), NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("c"))}; + builder->Add(NYql::NUdf::TBlockItem(tuple3Items)); + TUnboxedValueVector columns; + columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true))); + columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3)))); + TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish(); + TStringStream stream; + serialized.CopyTo(stream); + auto actualHash = THash<TStringBuf>{}(stream.Str()); + + UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash); + } + + void TestBlockPackingSerializeTupleInt32String() { + TestBlockPackingSerializeTupleInt32StringImpl(EValuePackerVersion::V0, 6542528838520568576U); + TestBlockPackingSerializeTupleInt32StringImpl(EValuePackerVersion::V1, 13820338994079843620U); + } + + void TestBlockPackingSerializeOptionalTupleInt32StringImpl(EValuePackerVersion valuePackerVersion, size_t expectedHash) { + auto* i32Type = PgmBuilder_.NewDataType(NUdf::TDataType<i32>::Id); + auto* strType = PgmBuilder_.NewDataType(NUdf::TDataType<char*>::Id); + auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id); + auto* tupleType = PgmBuilder_.NewTupleType({i32Type, strType}); + auto* optTupleType = PgmBuilder_.NewOptionalType(tupleType); + + auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar); + auto blockOptTupleType = PgmBuilder_.NewBlockType(optTupleType, TBlockType::EShape::Many); + auto* rowType = PgmBuilder_.NewMultiType({blockOptTupleType, scalarUi64Type}); + + auto builder = MakeArrayBuilder(TTypeInfoHelper(), optTupleType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optTupleType)), nullptr); + NYql::NUdf::TBlockItem tuple1Items[] = {NYql::NUdf::TBlockItem(i32(1)), NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("a"))}; + builder->Add(NYql::NUdf::TBlockItem(tuple1Items)); + builder->Add(NYql::NUdf::TBlockItem()); + NYql::NUdf::TBlockItem tuple3Items[] = {NYql::NUdf::TBlockItem(i32(3)), NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("c"))}; + builder->Add(NYql::NUdf::TBlockItem(tuple3Items)); + TUnboxedValueVector columns; + columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true))); + columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3)))); + TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish(); + TStringStream stream; + serialized.CopyTo(stream); + auto actualHash = THash<TStringBuf>{}(stream.Str()); + + UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash); + } + + void TestBlockPackingSerializeOptionalTupleInt32String() { + TestBlockPackingSerializeOptionalTupleInt32StringImpl(EValuePackerVersion::V0, 15729296940258124779U); + TestBlockPackingSerializeOptionalTupleInt32StringImpl(EValuePackerVersion::V1, 8613360264621805090U); + } + + void TestBlockPackingSerializeTupleShiftedOffsetImpl(EValuePackerVersion valuePackerVersion, size_t expectedHash, bool expectedToFail) { + auto* i32Type = PgmBuilder_.NewDataType(NUdf::TDataType<i32>::Id); + auto* strType = PgmBuilder_.NewDataType(NUdf::TDataType<char*>::Id); + auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id); + auto* tupleType = PgmBuilder_.NewTupleType({i32Type, strType}); + auto* blockTupleType = PgmBuilder_.NewBlockType(tupleType, TBlockType::EShape::Many); + auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar); + + auto* rowType = PgmBuilder_.NewMultiType({blockTupleType, scalarUi64Type}); + + auto bigString = (TString("Very long string") * 1000000).substr(0, 2000000); + auto builder = MakeArrayBuilder(TTypeInfoHelper(), tupleType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(tupleType)), nullptr); + { + TBlockItem tuple[] = {TBlockItem(1), TBlockItem(NYql::NUdf::TStringRef("Short string"))}; + builder->Add(TBlockItem(tuple)); + } + { + TBlockItem tuple[] = {TBlockItem(2), TBlockItem(NYql::NUdf::TStringRef("Short string"))}; + builder->Add(TBlockItem(tuple)); + } + { + TBlockItem tuple[] = {TBlockItem(3), TBlockItem(NYql::NUdf::TStringRef(bigString))}; + builder->Add(TBlockItem(tuple)); + } + auto buildResult = builder->Build(/*finish=*/true); + UNIT_ASSERT_EQUAL(buildResult.kind(), arrow::Datum::CHUNKED_ARRAY); + UNIT_ASSERT_EQUAL(buildResult.chunks().size(), 2); + + auto children = buildResult.chunks()[1]->data()->child_data; + // Check that children's offsets are different. + // It is the main invariant for this test. + UNIT_ASSERT_UNEQUAL(children[0]->offset, children[1]->offset); + + TUnboxedValueVector columns; + // Here we take only second chunk with exactly one element. + columns.emplace_back(HolderFactory_.CreateArrowBlock(buildResult.chunks()[1])); + columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(1)))); + + if (expectedToFail) { + UNIT_ASSERT_EXCEPTION(MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish(), yexception); + return; + } + TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish(); + TStringStream stream; + serialized.CopyTo(stream); + auto actualHash = THash<TStringBuf>{}(stream.Str()); + + UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash); + + // Test deserialization result + TUnboxedValueBatch unpacked(rowType); + auto receiverPacker = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()); + receiverPacker.UnpackBatch(std::move(serialized), HolderFactory_, unpacked); + + UNIT_ASSERT_VALUES_EQUAL(unpacked.RowCount(), 1); + + TUnboxedValueVector unpackedColumns; + unpacked.ForEachRowWide([&](const NYql::NUdf::TUnboxedValue* values, ui32 count) { + unpackedColumns.insert(unpackedColumns.end(), values, values + count); }); + + UNIT_ASSERT_VALUES_EQUAL(unpackedColumns.size(), 2); + + auto tupleReader = MakeBlockReader(TTypeInfoHelper(), tupleType); + auto tupleArray = TArrowBlock::From(unpackedColumns[0]).GetDatum().array(); + UNIT_ASSERT_VALUES_EQUAL(tupleArray->length, 1); + TBlockItem deserializedTuple = tupleReader->GetItem(*tupleArray, 0); + auto elements = deserializedTuple.GetElements(); + + UNIT_ASSERT_VALUES_EQUAL(elements[0].As<i32>(), 3); + UNIT_ASSERT_VALUES_EQUAL(TString(elements[1].AsStringRef()), bigString); + UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns[1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value, 1); + } + + void TestBlockPackingSerializeTupleShiftedOffset() { + TestBlockPackingSerializeTupleShiftedOffsetImpl(EValuePackerVersion::V0, 0U, /*expectedToFail=*/true); + TestBlockPackingSerializeTupleShiftedOffsetImpl(EValuePackerVersion::V1, 4876856722251275868U, /*expectedToFail=*/false); } private: @@ -1015,6 +1466,19 @@ class TMiniKQLComputationNodeTransportPackTest: public TMiniKQLComputationNodePa UNIT_TEST(TestRopeSplit); UNIT_TEST(TestIncrementalPacking); UNIT_TEST(TestBlockPacking); + + UNIT_TEST(TestBlockPackingSerializeUi32); + UNIT_TEST(TestBlockPackingSerializeOptionalUi32); + UNIT_TEST(TestBlockPackingSerializeString); + UNIT_TEST(TestBlockPackingSerializeOptionalString); + UNIT_TEST(TestBlockPackingSerializeOptionalOptionalUi32); + UNIT_TEST(TestBlockPackingSerializeSingularType); + UNIT_TEST(TestBlockPackingSerializeOptionalSingularType); + UNIT_TEST(TestBlockPackingSerializeTTzDate); + UNIT_TEST(TestBlockPackingSerializeOptionalTTzDate); + UNIT_TEST(TestBlockPackingSerializeTupleInt32String); + UNIT_TEST(TestBlockPackingSerializeOptionalTupleInt32String); + UNIT_TEST(TestBlockPackingSerializeTupleShiftedOffset); UNIT_TEST_SUITE_END(); }; @@ -1040,6 +1504,19 @@ class TMiniKQLComputationNodeTransportFastPackTest: public TMiniKQLComputationNo UNIT_TEST(TestRopeSplit); UNIT_TEST(TestIncrementalPacking); UNIT_TEST(TestBlockPacking); + + UNIT_TEST(TestBlockPackingSerializeUi32); + UNIT_TEST(TestBlockPackingSerializeOptionalUi32); + UNIT_TEST(TestBlockPackingSerializeString); + UNIT_TEST(TestBlockPackingSerializeOptionalString); + UNIT_TEST(TestBlockPackingSerializeOptionalOptionalUi32); + UNIT_TEST(TestBlockPackingSerializeSingularType); + UNIT_TEST(TestBlockPackingSerializeOptionalSingularType); + UNIT_TEST(TestBlockPackingSerializeTTzDate); + UNIT_TEST(TestBlockPackingSerializeOptionalTTzDate); + UNIT_TEST(TestBlockPackingSerializeTupleInt32String); + UNIT_TEST(TestBlockPackingSerializeOptionalTupleInt32String); + UNIT_TEST(TestBlockPackingSerializeTupleShiftedOffset); UNIT_TEST_SUITE_END(); }; diff --git a/yql/essentials/minikql/computation/mkql_spiller_adapter.h b/yql/essentials/minikql/computation/mkql_spiller_adapter.h index 8ddcfe46be4..a5c3c56c122 100644 --- a/yql/essentials/minikql/computation/mkql_spiller_adapter.h +++ b/yql/essentials/minikql/computation/mkql_spiller_adapter.h @@ -17,7 +17,7 @@ public: : Spiller(spiller) , ItemType(type) , SizeLimit(sizeLimit) - , Packer(type) + , Packer(type, EValuePackerVersion::V1) { } |
