summaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/computation
diff options
context:
space:
mode:
authoratarasov5 <[email protected]>2025-07-23 11:24:03 +0300
committeratarasov5 <[email protected]>2025-07-23 12:25:55 +0300
commitf531c2bc5154672d6ba28f691d3a4bdf7ff90c34 (patch)
treeef0b71a8dd91f7719b54c95d5a01b3e4d729106a /yql/essentials/minikql/computation
parent9d67ed79ecd1a554c6606e8ae27ca146bd828b05 (diff)
YQL-19995: Fix block serialization for child data with different offsets
В данном пулреквесте я добавил поддержку сериализации tuple/struct массивов с разными оффсетами. Сделал это через новый флаг `EValuePackerVersion::{V0, V1}`. Также нужно будет сделать коммит в contrib/ydb commit_hash:79709ad660a4295958e5488d3dd24d660f32ca9a
Diffstat (limited to 'yql/essentials/minikql/computation')
-rw-r--r--yql/essentials/minikql/computation/mkql_block_transport.cpp204
-rw-r--r--yql/essentials/minikql/computation/mkql_block_transport.h34
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_pack.cpp145
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_pack.h22
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp845
-rw-r--r--yql/essentials/minikql/computation/mkql_spiller_adapter.h2
6 files changed, 959 insertions, 293 deletions
diff --git a/yql/essentials/minikql/computation/mkql_block_transport.cpp b/yql/essentials/minikql/computation/mkql_block_transport.cpp
index dcf9542f5df..a7ed5306fbf 100644
--- a/yql/essentials/minikql/computation/mkql_block_transport.cpp
+++ b/yql/essentials/minikql/computation/mkql_block_transport.cpp
@@ -149,33 +149,45 @@ std::shared_ptr<arrow::Buffer> LoadNullsBitmap(TChunkedBuffer& source, TMaybe<ui
class TBlockSerializerBase : public IBlockSerializer {
public:
explicit TBlockSerializerBase(const TBlockSerializerParams& params)
- : Pool_(params.Pool)
- , MinFillPercentage_(params.MinFillPercentage)
+ : Pool_(params.Pool())
+ , MinFillPercentage_(params.MinFillPercentage())
+ , ShouldSerializeOffset_(params.ShouldSerializeOffset())
{
YQL_ENSURE(!MinFillPercentage_ || *MinFillPercentage_ <= 100);
}
+ size_t OffsetMetadataCount() const {
+ return ShouldSerializeOffset_ ? 1 : 0;
+ }
+
protected:
arrow::MemoryPool* Pool_;
const TMaybe<ui8> MinFillPercentage_;
+ bool ShouldSerializeOffset_;
};
class TBlockDeserializerBase : public IBlockDeserializer {
public:
- TBlockDeserializerBase() = default;
+ TBlockDeserializerBase(const TBlockSerializerParams& params)
+ : ShouldLoadOffset_(params.ShouldSerializeOffset())
+ {}
virtual void SetArrowType(const std::shared_ptr<arrow::DataType>& type) {
ArrowType_ = type;
}
void LoadMetadata(const TMetadataSource& metaSource) final {
+ if (ShouldLoadOffset_) {
+ OffsetReminder_ = metaSource();
+ YQL_ENSURE(OffsetReminder_ < 8, "Unexpected offset value. Actual offset is: " << *OffsetReminder_);
+ }
if (IsNullable()) {
LoadNullsSizes(metaSource, NullsCount_, NullsSize_);
}
DoLoadMetadata(metaSource);
}
- virtual std::shared_ptr<arrow::ArrayData> LoadArray(TChunkedBuffer& src, ui64 blockLen, ui64 offset) final {
+ virtual std::shared_ptr<arrow::ArrayData> LoadArray(TChunkedBuffer& src, ui64 blockLen, TMaybe<size_t> offset) final {
YQL_ENSURE(blockLen > 0, "Should be handled earlier");
std::shared_ptr<arrow::Buffer> nulls;
i64 nullsCount = 0;
@@ -201,11 +213,11 @@ public:
DoResetMetadata();
}
- std::shared_ptr<arrow::ArrayData> MakeDefaultValue(ui64 blockLen, ui64 offset) const {
+ std::shared_ptr<arrow::ArrayData> MakeDefaultValue(ui64 blockLen, TMaybe<size_t> offset) const {
std::shared_ptr<arrow::Buffer> nulls;
i64 nullsCount = 0;
if (IsNullable()) {
- nulls = MakeZeroBitmap(blockLen + offset);
+ nulls = MakeZeroBitmap(blockLen + GetOffset(offset));
nullsCount = blockLen;
}
return DoMakeDefaultValue(nulls, nullsCount, blockLen, offset);
@@ -215,12 +227,23 @@ protected:
virtual void DoLoadMetadata(const TMetadataSource& metaSource) = 0;
virtual void DoResetMetadata() = 0;
virtual bool IsNullable() const = 0;
- virtual std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const = 0;
- virtual std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) = 0;
+ virtual std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) const = 0;
+ virtual std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) = 0;
+
+ ui64 GetOffset(TMaybe<size_t> providedOffset) const {
+ if (!ShouldLoadOffset_) {
+ YQL_ENSURE(providedOffset.Defined(), "Offset must be provided explicitly in arguments.");
+ return *providedOffset;
+ }
+ YQL_ENSURE(OffsetReminder_.Defined(), "Offset must be specified in metadata.");
+ return *OffsetReminder_;
+ }
std::shared_ptr<arrow::DataType> ArrowType_;
TMaybe<ui64> NullsCount_;
TMaybe<ui64> NullsSize_;
+ TMaybe<ui64> OffsetReminder_;
+ bool ShouldLoadOffset_;
};
template<size_t ObjectSize, bool Nullable>
@@ -231,10 +254,14 @@ public:
using TBase::TBase;
size_t ArrayMetadataCount() const final {
- return Nullable ? 3 : 1;
+ return OffsetMetadataCount() + (Nullable ? 3 : 1);
}
void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final {
+ const ui64 offsetReminder = data.offset % 8;
+ if (ShouldSerializeOffset_) {
+ metaSink(offsetReminder);
+ }
if constexpr (Nullable) {
StoreNullsSizes(data, metaSink);
if (data.GetNullCount() == data.length) {
@@ -242,8 +269,7 @@ public:
return;
}
}
- const ui64 desiredOffset = data.offset % 8;
- size_t dataBytes = ((size_t)data.length + desiredOffset) * ObjectSize;
+ size_t dataBytes = ((size_t)data.length + offsetReminder) * ObjectSize;
metaSink(dataBytes);
}
@@ -262,10 +288,15 @@ public:
}
};
-template<size_t ObjectSize, bool Nullable>
-class TFixedSizeBlockDeserializer final : public TBlockDeserializerBase {
+template <size_t ObjectSize, bool Nullable>
+class TFixedSizeBlockDeserializer final: public TBlockDeserializerBase {
+ using TBase = TBlockDeserializerBase;
public:
- TFixedSizeBlockDeserializer() = default;
+ TFixedSizeBlockDeserializer(const TBlockSerializerParams& params)
+ : TBase(params)
+ {}
+
+
private:
void DoLoadMetadata(const TMetadataSource& metaSource) final {
LoadBufferSize(metaSource, DataSize_);
@@ -275,14 +306,14 @@ private:
return Nullable;
}
- std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final {
- auto data = MakeZeroBuffer((blockLen + offset) * ObjectSize);
- return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, data }, nullsCount, offset);
+ std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) const final {
+ auto data = MakeZeroBuffer((blockLen + GetOffset(offset)) * ObjectSize);
+ return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, data }, nullsCount, GetOffset(offset));
}
- std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
+ std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) final {
auto data = LoadBuffer(src, DataSize_);
- return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, data}, nullsCount, offset);
+ return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, data}, nullsCount, GetOffset(offset));
}
void DoResetMetadata() final {
@@ -302,10 +333,14 @@ public:
private:
size_t ArrayMetadataCount() const final {
- return Nullable ? 4 : 2;
+ return OffsetMetadataCount() + (Nullable ? 4 : 2);
}
void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final {
+ const ui64 offsetReminder = data.offset % 8;
+ if (ShouldSerializeOffset_) {
+ metaSink(offsetReminder);
+ }
if constexpr (Nullable) {
StoreNullsSizes(data, metaSink);
if (data.GetNullCount() == data.length) {
@@ -315,13 +350,12 @@ private:
}
}
- const ui64 desiredOffset = data.offset % 8;
- size_t offsetsSize = ((size_t)data.length + 1 + desiredOffset) * sizeof(TOffset);
+ size_t offsetsSize = ((size_t)data.length + 1 + offsetReminder) * sizeof(TOffset);
metaSink(offsetsSize);
if (ShouldTrimArray(data)) {
- const TOffset* offsetData = data.GetValues<TOffset>(1) - desiredOffset;
- metaSink(offsetData[data.length + desiredOffset] - offsetData[0]);
+ const TOffset* offsetData = data.GetValues<TOffset>(1) - offsetReminder;
+ metaSink(offsetData[data.length + offsetReminder] - offsetData[0]);
} else {
metaSink(data.buffers[2]->size());
}
@@ -384,25 +418,26 @@ private:
template<typename TStringType, bool Nullable>
class TStringBlockDeserializer final : public TBlockDeserializerBase {
+ using TBase = TBlockDeserializerBase;
using TOffset = typename TStringType::offset_type;
public:
- TStringBlockDeserializer() = default;
+ using TBase::TBase;
private:
void DoLoadMetadata(const TMetadataSource& metaSource) final {
LoadBufferSize(metaSource, OffsetsSize_);
LoadBufferSize(metaSource, DataSize_);
}
- std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final {
- auto offsets = MakeZeroBuffer((blockLen + 1 + offset) * sizeof(TOffset));
+ std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) const final {
+ auto offsets = MakeZeroBuffer((blockLen + 1 + GetOffset(offset)) * sizeof(TOffset));
auto data = MakeEmptyBuffer();
- return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, offsets, data }, nullsCount, offset);
+ return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, offsets, data }, nullsCount, GetOffset(offset));
}
- std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
+ std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) final {
auto offsets = LoadBuffer(src, OffsetsSize_);
auto data = LoadBuffer(src, DataSize_);
- return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, offsets, data }, nullsCount, offset);
+ return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, offsets, data }, nullsCount, GetOffset(offset));
}
bool IsNullable() const final {
@@ -429,10 +464,13 @@ public:
private:
size_t ArrayMetadataCount() const final {
- return 2 + Inner_->ArrayMetadataCount();
+ return OffsetMetadataCount() + 2 + Inner_->ArrayMetadataCount();
}
void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final {
+ if (ShouldSerializeOffset_) {
+ metaSink(data.offset % 8);
+ }
StoreNullsSizes(data, metaSink);
if (data.GetNullCount() == data.length) {
auto innerCount = Inner_->ArrayMetadataCount();
@@ -455,22 +493,25 @@ private:
};
class TExtOptionalBlockDeserializer final : public TBlockDeserializerBase {
+ using TBase = TBlockDeserializerBase;
public:
- explicit TExtOptionalBlockDeserializer(std::unique_ptr<TBlockDeserializerBase>&& inner)
- : Inner_(std::move(inner))
+ explicit TExtOptionalBlockDeserializer(std::unique_ptr<TBlockDeserializerBase>&& inner, const TBlockSerializerParams& params)
+ : TBase(params)
+ , Inner_(std::move(inner))
{
}
+
private:
void DoLoadMetadata(const TMetadataSource& metaSource) final {
Inner_->LoadMetadata(metaSource);
}
- std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final {
- return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->MakeDefaultValue(blockLen, offset) }, nullsCount, offset);
+ std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) const final {
+ return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->MakeDefaultValue(blockLen, offset) }, nullsCount, GetOffset(offset));
}
- std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
- return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->LoadArray(src, blockLen, offset) }, nullsCount, offset);
+ std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) final {
+ return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->LoadArray(src, blockLen, offset) }, nullsCount, GetOffset(offset));
}
bool IsNullable() const final {
@@ -498,11 +539,13 @@ public:
private:
size_t ArrayMetadataCount() const final {
- return 0;
+ return OffsetMetadataCount();
}
void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final {
- Y_UNUSED(data, metaSink);
+ if (ShouldSerializeOffset_) {
+ metaSink(data.offset % 8);
+ }
}
void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final {
@@ -511,19 +554,24 @@ private:
};
class TSingularTypeBlockDeserializer final: public TBlockDeserializerBase {
+ using TBase = TBlockDeserializerBase;
+
+public:
+ using TBase::TBase;
+
private:
void DoLoadMetadata(const TMetadataSource& metaSource) final {
Y_UNUSED(metaSource);
}
- std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final {
+ std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) const final {
Y_UNUSED(offset);
Y_ENSURE(nullsCount == 0);
Y_ENSURE(!nulls || nulls->size() == 0);
return arrow::NullArray(blockLen).data();
}
- std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
+ std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) final {
Y_UNUSED(offset, src);
Y_ENSURE(nullsCount == 0);
Y_ENSURE(!nulls || nulls->size() == 0);
@@ -551,10 +599,14 @@ private:
if constexpr (Nullable) {
result += 2;
}
+ result += OffsetMetadataCount();
return result;
}
void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final {
+ if (ShouldSerializeOffset_) {
+ metaSink(data.offset % 8);
+ }
if constexpr (Nullable) {
StoreNullsSizes(data, metaSink);
}
@@ -597,8 +649,7 @@ public:
}
void StoreChildrenMetadata(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data,
- const IBlockSerializer::TMetadataSink& metaSink) const {
-
+ const IBlockSerializer::TMetadataSink& metaSink) const {
for (size_t i = 0; i < Children_.size(); ++i) {
Children_[i]->StoreMetadata(*child_data[i], metaSink);
}
@@ -630,7 +681,7 @@ public:
}
void StoreChildrenMetadata(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data,
- const IBlockSerializer::TMetadataSink& metaSink) const {
+ const IBlockSerializer::TMetadataSink& metaSink) const {
DateSerialiser_.StoreMetadata(*child_data[0], metaSink);
TzSerialiser_.StoreMetadata(*child_data[1], metaSink);
}
@@ -649,11 +700,14 @@ private:
template<bool Nullable>
class TTupleBlockDeserializer final : public TBlockDeserializerBase {
+using TBase = TBlockDeserializerBase;
public:
- explicit TTupleBlockDeserializer(TVector<std::unique_ptr<TBlockDeserializerBase>>&& children)
- : Children_(std::move(children))
+ explicit TTupleBlockDeserializer(TVector<std::unique_ptr<TBlockDeserializerBase>>&& children, const TBlockSerializerParams& params)
+ : TBase(params)
+ , Children_(std::move(children))
{
}
+
private:
void DoLoadMetadata(const TMetadataSource& metaSource) final {
for (auto& child : Children_) {
@@ -661,20 +715,20 @@ private:
}
}
- std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final {
+ std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) const final {
std::vector<std::shared_ptr<arrow::ArrayData>> childData;
for (auto& child : Children_) {
childData.emplace_back(child->MakeDefaultValue(blockLen, offset));
}
- return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset);
+ return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, GetOffset(offset));
}
- std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
+ std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) final {
std::vector<std::shared_ptr<arrow::ArrayData>> childData;
for (auto& child : Children_) {
childData.emplace_back(child->LoadArray(src, blockLen, offset));
}
- return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset);
+ return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, GetOffset(offset));
}
void DoResetMetadata() final {
@@ -700,8 +754,15 @@ private:
template<typename TDate, bool Nullable>
class TTzDateBlockDeserializer final : public TBlockDeserializerBase {
+ using TBase = TBlockDeserializerBase;
+
public:
- TTzDateBlockDeserializer() = default;
+ explicit TTzDateBlockDeserializer(const TBlockSerializerParams& params)
+ : TBase(params)
+ , DateDeserialiser_(params)
+ , TzDeserialiser_(params)
+ {
+ }
private:
void DoLoadMetadata(const TMetadataSource& metaSource) final {
@@ -709,18 +770,18 @@ private:
TzDeserialiser_.LoadMetadata(metaSource);
}
- std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final {
+ std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) const final {
std::vector<std::shared_ptr<arrow::ArrayData>> childData;
childData.emplace_back(DateDeserialiser_.MakeDefaultValue(blockLen, offset));
childData.emplace_back(TzDeserialiser_.MakeDefaultValue(blockLen, offset));
- return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset);
+ return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, GetOffset(offset));
}
- std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
+ std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, TMaybe<size_t> offset) final {
std::vector<std::shared_ptr<arrow::ArrayData>> childData;
childData.emplace_back(DateDeserialiser_.LoadArray(src, blockLen, offset));
childData.emplace_back(TzDeserialiser_.LoadArray(src, blockLen, offset));
- return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset);
+ return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, GetOffset(offset));
}
void DoResetMetadata() final {
@@ -801,46 +862,49 @@ struct TDeserializerTraits {
constexpr static bool PassType = false;
- static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
+ static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder, const TBlockSerializerParams& params) {
Y_UNUSED(pgBuilder);
if (desc.PassByValue) {
- return std::make_unique<TFixedSize<ui64, true>>();
+ return std::make_unique<TFixedSize<ui64, true>>(params);
}
- return std::make_unique<TStrings<arrow::BinaryType, true>>();
+ return std::make_unique<TStrings<arrow::BinaryType, true>>(params);
}
- static std::unique_ptr<TResult> MakeResource(bool isOptional) {
- Y_UNUSED(isOptional);
+ static std::unique_ptr<TResult> MakeResource(bool isOptional, const TBlockSerializerParams& params) {
+ Y_UNUSED(isOptional, params);
ythrow yexception() << "Deserializer not implemented for block resources";
}
- static std::unique_ptr<TResult> MakeSingular() {
- return std::make_unique<TSingularType>();
+ static std::unique_ptr<TResult> MakeSingular(const TBlockSerializerParams& params) {
+ return std::make_unique<TSingularType>(params);
}
template<typename TTzDateType>
- static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
+ static std::unique_ptr<TResult> MakeTzDate(bool isOptional, const TBlockSerializerParams& params) {
if (isOptional) {
- return std::make_unique<TTzDate<TTzDateType, true>>();
+ return std::make_unique<TTzDate<TTzDateType, true>>(params);
}
else {
- return std::make_unique<TTzDate<TTzDateType, false>>();
+ return std::make_unique<TTzDate<TTzDateType, false>>(params);
}
}
};
} // namespace
-
-std::unique_ptr<IBlockSerializer> MakeBlockSerializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type, const TBlockSerializerParams& params) {
+std::unique_ptr<IBlockSerializer> MakeBlockSerializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper,
+ const NYql::NUdf::TType* type,
+ const TBlockSerializerParams& params) {
return NYql::NUdf::DispatchByArrowTraits<TSerializerTraits>(typeInfoHelper, type, nullptr, params);
}
-std::unique_ptr<IBlockDeserializer> MakeBlockDeserializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type) {
- std::unique_ptr<TBlockDeserializerBase> result = NYql::NUdf::DispatchByArrowTraits<TDeserializerTraits>(typeInfoHelper, type, nullptr);
+std::unique_ptr<IBlockDeserializer> MakeBlockDeserializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper,
+ const NYql::NUdf::TType* type,
+ const TBlockSerializerParams& params) {
+ std::unique_ptr<TBlockDeserializerBase> result = NYql::NUdf::DispatchByArrowTraits<TDeserializerTraits>(typeInfoHelper, type, nullptr, params);
result->SetArrowType(NYql::NUdf::GetArrowType(typeInfoHelper, type));
return std::move(result);
}
-
} // namespace NKikimr::NMiniKQL
+
diff --git a/yql/essentials/minikql/computation/mkql_block_transport.h b/yql/essentials/minikql/computation/mkql_block_transport.h
index b116dd6aa6b..0b07017b0b9 100644
--- a/yql/essentials/minikql/computation/mkql_block_transport.h
+++ b/yql/essentials/minikql/computation/mkql_block_transport.h
@@ -11,9 +11,31 @@
namespace NKikimr::NMiniKQL {
-struct TBlockSerializerParams {
- arrow::MemoryPool* Pool;
- TMaybe<ui8> MinFillPercentage;
+class TBlockSerializerParams {
+public:
+ TBlockSerializerParams(arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage, bool shouldSerializeOffset)
+ : Pool_(pool)
+ , MinFillPercentage_(minFillPercentage)
+ , ShouldSerializeOffset_(shouldSerializeOffset)
+ {
+ }
+
+ arrow::MemoryPool* Pool() const {
+ return Pool_;
+ }
+
+ TMaybe<ui8> MinFillPercentage() const {
+ return MinFillPercentage_;
+ }
+
+ bool ShouldSerializeOffset() const {
+ return ShouldSerializeOffset_;
+ }
+
+private:
+ arrow::MemoryPool* Pool_;
+ TMaybe<ui8> MinFillPercentage_;
+ bool ShouldSerializeOffset_;
};
class IBlockSerializer {
@@ -33,11 +55,11 @@ public:
using TMetadataSource = std::function<ui64()>;
virtual void LoadMetadata(const TMetadataSource& metaSource) = 0;
- virtual std::shared_ptr<arrow::ArrayData> LoadArray(NYql::TChunkedBuffer& src, ui64 blockLen, ui64 offset) = 0;
-};
+ virtual std::shared_ptr<arrow::ArrayData> LoadArray(NYql::TChunkedBuffer& src, ui64 blockLen, TMaybe<size_t> offset) = 0;
+};
std::unique_ptr<IBlockSerializer> MakeBlockSerializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type, const TBlockSerializerParams& params);
-std::unique_ptr<IBlockDeserializer> MakeBlockDeserializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type);
+std::unique_ptr<IBlockDeserializer> MakeBlockDeserializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type, const TBlockSerializerParams& params);
}
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
index 95e2148e728..d2527662575 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
+++ b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
@@ -75,6 +75,43 @@ T UnpackData(TChunkedInputBuffer& buf) {
return res;
}
+class TBlockTransportFlags {
+public:
+ enum class EFlag: ui64 {
+ ScalarsArePresentFlag = 1 << 0,
+ StoreOffsetsForEachChildData = 1 << 1,
+ };
+
+ TBlockTransportFlags(ui64 data)
+ : Data_(data)
+ {
+ }
+
+ TBlockTransportFlags()
+ : Data_(0)
+ {
+ }
+
+ bool HasFlag(EFlag flag) const {
+ return (Data_ & ToIntegral(flag)) != 0;
+ }
+
+ TBlockTransportFlags* AddFlag(EFlag flag) {
+ Data_ = Data_ | ToIntegral(flag);
+ return this;
+ }
+
+ ui64 Data() const {
+ return Data_;
+ }
+
+private:
+ static constexpr ui64 ToIntegral(EFlag flag) {
+ return static_cast<ui64>(flag);
+ }
+ ui64 Data_ = 0;
+};
+
NUdf::TUnboxedValuePod UnpackString(TChunkedInputBuffer& buf, ui32 size) {
auto res = MakeStringNotFilled(size, 0);
NYql::NUdf::TMutableStringRef ref = res.AsStringRef();
@@ -984,6 +1021,17 @@ bool IsMultiBlock(const TType* type, ui32& blockLengthIndex, TVector<const TBloc
return true;
}
+bool IsOffsetExplicitStored(EValuePackerVersion valuePackerVersion) {
+ return valuePackerVersion >= EValuePackerVersion::V1;
+};
+
+size_t GetTopLevelOffsetsCount(EValuePackerVersion valuePackerVersion, ui32 width) {
+ if (!IsOffsetExplicitStored(valuePackerVersion)) {
+ return width - 1;
+ }
+ return 0;
+};
+
} // namespace
template<bool Fast>
@@ -1066,29 +1114,55 @@ TStringBuf TValuePackerGeneric<Fast>::Pack(const NUdf::TUnboxedValuePod& value)
// Transport packer
-template<bool Fast>
-TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage)
+template <bool Fast>
+TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, EValuePackerVersion valuePackerVersion, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage)
: Type_(type)
, BufferPageAllocSize_(bufferPageAllocSize ? *bufferPageAllocSize : TBufferPage::DefaultPageAllocSize)
, State_(ScanTypeProperties(Type_, false))
, IncrementalState_(ScanTypeProperties(Type_, true))
, ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool())
+ , ValuePackerVersion_(valuePackerVersion)
{
MKQL_ENSURE(!stable, "Stable packing is not supported");
InitBlocks(minFillPercentage);
}
-template<bool Fast>
-TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage)
+template <bool Fast>
+TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type,
+ TMaybe<size_t> bufferPageAllocSize,
+ arrow::MemoryPool* pool,
+ TMaybe<ui8> minFillPercentage)
+ : TValuePackerTransport(type, EValuePackerVersion::V0, bufferPageAllocSize, pool, minFillPercentage)
+{
+}
+
+template <bool Fast>
+TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, EValuePackerVersion valuePackerVersion, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage)
: Type_(type)
, BufferPageAllocSize_(bufferPageAllocSize ? *bufferPageAllocSize : TBufferPage::DefaultPageAllocSize)
, State_(ScanTypeProperties(Type_, false))
, IncrementalState_(ScanTypeProperties(Type_, true))
, ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool())
+ , ValuePackerVersion_(valuePackerVersion)
{
InitBlocks(minFillPercentage);
}
+template <bool Fast>
+TValuePackerTransport<Fast>::TValuePackerTransport(bool stable,
+ const TType* type,
+ TMaybe<size_t> bufferPageAllocSize,
+ arrow::MemoryPool* ppol,
+ TMaybe<ui8> minFillPercentage)
+ : TValuePackerTransport(stable,
+ type,
+ EValuePackerVersion::V0,
+ bufferPageAllocSize,
+ ppol,
+ minFillPercentage)
+{
+}
+
template<bool Fast>
void TValuePackerTransport<Fast>::InitBlocks(TMaybe<ui8> minFillPercentage) {
TVector<const TBlockType*> items;
@@ -1098,10 +1172,10 @@ void TValuePackerTransport<Fast>::InitBlocks(TMaybe<ui8> minFillPercentage) {
return;
}
- const TBlockSerializerParams serializerParams = {
- .Pool = &ArrowPool_,
- .MinFillPercentage = minFillPercentage
- };
+ const TBlockSerializerParams serializerParams(
+ &ArrowPool_,
+ minFillPercentage,
+ IsOffsetExplicitStored(ValuePackerVersion_));
IsBlock_ = true;
ConvertedScalars_.resize(items.size());
@@ -1112,7 +1186,7 @@ void TValuePackerTransport<Fast>::InitBlocks(TMaybe<ui8> minFillPercentage) {
if (i != BlockLenIndex_) {
const TBlockType* itemType = items[i];
BlockSerializers_[i] = MakeBlockSerializer(TTypeInfoHelper(), itemType->GetItemType(), serializerParams);
- BlockDeserializers_[i] = MakeBlockDeserializer(TTypeInfoHelper(), itemType->GetItemType());
+ BlockDeserializers_[i] = MakeBlockDeserializer(TTypeInfoHelper(), itemType->GetItemType(), serializerParams);
if (itemType->GetShape() == TBlockType::EShape::Scalar) {
BlockReaders_[i] = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), itemType->GetItemType());
}
@@ -1217,6 +1291,13 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons
auto metadataBuffer = std::make_shared<TBuffer>();
+
+ TBlockTransportFlags flags;
+ flags.AddFlag(TBlockTransportFlags::EFlag::ScalarsArePresentFlag);
+ if (IsOffsetExplicitStored(ValuePackerVersion_)) {
+ flags.AddFlag(TBlockTransportFlags::EFlag::StoreOffsetsForEachChildData);
+ }
+
ui32 totalMetadataCount = 0;
for (size_t i = 0; i < width; ++i) {
if (i != BlockLenIndex_) {
@@ -1227,11 +1308,11 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons
// calculate approximate metadata size
const size_t metadataReservedSize =
- MAX_PACKED64_SIZE + // block len
- MAX_PACKED64_SIZE + // feature flags
- (width - 1) + // 1-byte offsets
- MAX_PACKED32_SIZE + // metadata words count
- MAX_PACKED64_SIZE * totalMetadataCount; // metadata words
+ MAX_PACKED64_SIZE + // block len
+ MAX_PACKED64_SIZE + // feature flags
+ GetTopLevelOffsetsCount(ValuePackerVersion_, width) + // 1-byte offsets for V0, empty for higher versions.
+ MAX_PACKED32_SIZE + // metadata words count
+ MAX_PACKED64_SIZE * totalMetadataCount; // metadata words
metadataBuffer->Reserve(len ? metadataReservedSize : MAX_PACKED64_SIZE);
// save block length
@@ -1243,10 +1324,7 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons
return *this;
}
- // save feature flags
- // 1 = "scalars are present"
- const ui64 metadataFlags = 1 << 0;
- PackData<false>(metadataFlags, *metadataBuffer);
+ PackData<false>(flags.Data(), *metadataBuffer);
TVector<std::shared_ptr<arrow::ArrayData>> arrays(width);
// save reminder of original offset for each column - it is needed to properly handle offset in bitmaps
@@ -1260,7 +1338,9 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons
i64 offset = datum.array()->offset;
MKQL_ENSURE(offset >= 0, "Negative offset");
// all offsets should be equal
- MKQL_ENSURE(HasOffset(*datum.array(), offset), "Unexpected offset in child data");
+ if (!IsOffsetExplicitStored(ValuePackerVersion_)) {
+ MKQL_ENSURE(HasOffset(*datum.array(), offset), "Unexpected offset in child data");
+ }
reminder = offset % 8;
arrays[i] = datum.array();
} else {
@@ -1274,7 +1354,9 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons
}
arrays[i] = ConvertedScalars_[i];
}
- PackData<false>(reminder, *metadataBuffer);
+ if (!IsOffsetExplicitStored(ValuePackerVersion_)) {
+ PackData<false>(reminder, *metadataBuffer);
+ }
}
// save count of metadata words
@@ -1291,7 +1373,7 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons
}
}
- MKQL_ENSURE(savedMetadata == totalMetadataCount, "Serialization metadata error");
+ MKQL_ENSURE(savedMetadata == totalMetadataCount, "Serialization metadata error. Expected: " << totalMetadataCount << ", actual: " << savedMetadata);
BlockBuffer_.Append(TStringBuf(metadataBuffer->data(), metadataBuffer->size()), metadataBuffer);
// save buffers
@@ -1316,17 +1398,21 @@ void TValuePackerTransport<Fast>::UnpackBatchBlocks(TChunkedBuffer&& buf, const
}
// unpack flags
- const ui64 metadataFlags = UnpackData<false, ui64>(chunked);
- MKQL_ENSURE(metadataFlags == 1, "Unsupported metadata flags");
-
+ const auto metadataFlags = TBlockTransportFlags(UnpackData<false, ui64>(chunked));
+ MKQL_ENSURE(metadataFlags.HasFlag(TBlockTransportFlags::EFlag::ScalarsArePresentFlag), "Scalars are present flag must be set.");
+ if (IsOffsetExplicitStored(ValuePackerVersion_)) {
+ MKQL_ENSURE(metadataFlags.HasFlag(TBlockTransportFlags::EFlag::StoreOffsetsForEachChildData), "Offsets must be explicitly stored.");
+ }
// unpack array offsets
const ui32 width = BlockDeserializers_.size();
MKQL_ENSURE(width > 0, "Invalid width");
TVector<ui64> offsets(width);
- for (ui32 i = 0; i < width; ++i) {
- if (BlockDeserializers_[i]) {
- offsets[i] = UnpackData<false, ui8>(chunked);
- MKQL_ENSURE(offsets[i] < 8, "Unexpected offset value");
+ if (!IsOffsetExplicitStored(ValuePackerVersion_)) {
+ for (ui32 i = 0; i < width; ++i) {
+ if (BlockDeserializers_[i]) {
+ offsets[i] = UnpackData<false, ui8>(chunked);
+ MKQL_ENSURE(offsets[i] < 8, "Unexpected offset value. Actual offset is: " << offsets[i]);
+ }
}
}
@@ -1350,7 +1436,8 @@ void TValuePackerTransport<Fast>::UnpackBatchBlocks(TChunkedBuffer&& buf, const
if (i != BlockLenIndex_) {
MKQL_ENSURE(BlockDeserializers_[i], "Missing deserializer");
const bool isScalar = BlockReaders_[i] != nullptr;
- auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, offsets[i]);
+ TMaybe<size_t> offset = IsOffsetExplicitStored(ValuePackerVersion_) ? Nothing() : TMaybe<size_t>(offsets[i]);
+ auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, offset);
if (isScalar) {
TBlockItem item = BlockReaders_[i]->GetItem(*array, 0);
const TType* itemType = IsLegacyBlock_ ? static_cast<const TStructType*>(Type_)->GetMemberType(i) :
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack.h b/yql/essentials/minikql/computation/mkql_computation_node_pack.h
index 12aac705bb9..6c200631bf2 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_pack.h
+++ b/yql/essentials/minikql/computation/mkql_computation_node_pack.h
@@ -70,17 +70,33 @@ private:
mutable NDetails::TPackerState State_;
};
+// This version specify exactly how data will be packed and unpacked.
+enum class EValuePackerVersion {
+ V0 = 0, // Initial version.
+ V1 = 1, // Fixed Block type |child_data| serialization/deserialization.
+ // Remove the invariant of equality of offsets for all recursive children.
+};
+
template<bool Fast>
class TValuePackerTransport {
public:
using TSelf = TValuePackerTransport<Fast>;
+ explicit TValuePackerTransport(const TType* type, EValuePackerVersion valuePackerVersion,
+ TMaybe<size_t> bufferPageAllocSize = Nothing(), arrow::MemoryPool* pool = nullptr, TMaybe<ui8> minFillPercentage = Nothing());
+
+ // Deprecated: For YDB sync only.
explicit TValuePackerTransport(const TType* type,
- TMaybe<size_t> bufferPageAllocSize = Nothing(), arrow::MemoryPool* pool = nullptr, TMaybe<ui8> minFillPercentage = Nothing());
+ TMaybe<size_t> bufferPageAllocSize = Nothing(), arrow::MemoryPool* pool = nullptr, TMaybe<ui8> minFillPercentage = Nothing());
+
// for compatibility with TValuePackerGeneric - stable packing is not supported
- TValuePackerTransport(bool stable, const TType* type,
+ TValuePackerTransport(bool stable, const TType* type, EValuePackerVersion valuePackerVersion,
TMaybe<size_t> bufferPageAllocSize = Nothing(), arrow::MemoryPool* ppol = nullptr, TMaybe<ui8> minFillPercentage = Nothing());
+ // Deprecated: For YDB sync only.
+ TValuePackerTransport(bool stable, const TType* type,
+ TMaybe<size_t> bufferPageAllocSize = Nothing(), arrow::MemoryPool* ppol = nullptr, TMaybe<ui8> minFillPercentage = Nothing());
+
// AddItem()/UnpackBatch() will perform incremental packing - type T is processed as list item type. Will produce List<T> layout
TSelf& AddItem(const NUdf::TUnboxedValuePod& value);
TSelf& AddWideItem(const NUdf::TUnboxedValuePod* values, ui32 count);
@@ -125,7 +141,7 @@ private:
bool IsBlock_ = false;
bool IsLegacyBlock_ = false;
ui32 BlockLenIndex_ = 0;
-
+ EValuePackerVersion ValuePackerVersion_;
TVector<std::unique_ptr<IBlockSerializer>> BlockSerializers_;
TVector<std::unique_ptr<IBlockReader>> BlockReaders_;
TVector<std::shared_ptr<arrow::ArrayData>> ConvertedScalars_;
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp
index b13acb40a62..60b23aebcfe 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp
+++ b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp
@@ -57,6 +57,16 @@ using NDetails::TChunkedInputBuffer;
template<bool Fast, bool Transport>
class TMiniKQLComputationNodePackTest: public TTestBase {
using TValuePackerType = typename TPackerTraits<Fast, Transport>::TPackerType;
+
+ TValuePackerType MakeValuePacker(bool stable, const TType* type, EValuePackerVersion valuePackerVersion, TMaybe<size_t> bufferPageAllocSize = Nothing(), arrow::MemoryPool* pool = nullptr, TMaybe<ui8> minFillPercentage = Nothing()) {
+ if constexpr (Transport) {
+ return TValuePackerType(stable, type, valuePackerVersion, bufferPageAllocSize, pool, minFillPercentage);
+ } else {
+ Y_UNUSED(valuePackerVersion);
+ return TValuePackerType(stable, type);
+ }
+ }
+
protected:
TMiniKQLComputationNodePackTest()
: FunctionRegistry_(CreateFunctionRegistry(CreateBuiltinRegistry()))
@@ -343,7 +353,7 @@ protected:
void TestPackPerformance(TType* type, const NUdf::TUnboxedValuePod& uValue)
{
- TValuePackerType packer(false, type);
+ auto packer = MakeValuePacker(false, type, EValuePackerVersion::V0);
const THPTimer timer;
for (size_t i = 0U; i < PERFORMANCE_COUNT; ++i)
packer.Pack(uValue);
@@ -372,7 +382,7 @@ protected:
NUdf::TUnboxedValue TestPackUnpack(TType* type, const NUdf::TUnboxedValuePod& uValue, const TString& additionalMsg,
const std::optional<ui32>& expectedLength = {})
{
- TValuePackerType packer(false, type);
+ auto packer = MakeValuePacker(false, type, EValuePackerVersion::V0);
return TestPackUnpack(packer, uValue, additionalMsg, expectedLength);
}
@@ -386,7 +396,7 @@ protected:
template <typename T>
void TestNumericType(NUdf::TDataTypeId schemeType) {
TString typeDesc = TStringBuilder() << ", Type:" << NUdf::GetDataTypeInfo(NUdf::GetDataSlot(schemeType)).Name;
- TValuePackerType packer(false, PgmBuilder_.NewDataType(schemeType));
+ auto packer = MakeValuePacker(false, PgmBuilder_.NewDataType(schemeType), EValuePackerVersion::V0);
TestNumericValue<T>(Max<T>(), packer, typeDesc);
TestNumericValue<T>(Min<T>(), packer, typeDesc);
@@ -411,7 +421,7 @@ protected:
template <typename T>
void TestOptionalNumericType(NUdf::TDataTypeId schemeType) {
TString typeDesc = TStringBuilder() << ", Type:Optional(" << NUdf::GetDataTypeInfo(NUdf::GetDataSlot(schemeType)).Name;
- TValuePackerType packer(false, PgmBuilder_.NewOptionalType(PgmBuilder_.NewDataType(schemeType)));
+ auto packer = MakeValuePacker(false, PgmBuilder_.NewOptionalType(PgmBuilder_.NewDataType(schemeType)), EValuePackerVersion::V0);
TestOptionalNumericValue<T>(std::optional<T>(Max<T>()), packer, typeDesc);
TestOptionalNumericValue<T>(std::optional<T>(Min<T>()), packer, typeDesc);
TestOptionalNumericValue<T>(std::optional<T>(), packer, typeDesc, 1);
@@ -428,7 +438,7 @@ protected:
void TestStringType(NUdf::TDataTypeId schemeType) {
TString typeDesc = TStringBuilder() << ", Type:" << NUdf::GetDataTypeInfo(NUdf::GetDataSlot(schemeType)).Name;
- TValuePackerType packer(false, PgmBuilder_.NewDataType(schemeType));
+ auto packer = MakeValuePacker(false, PgmBuilder_.NewDataType(schemeType), EValuePackerVersion::V0);
TestStringValue("0123456789012345678901234567890123456789", packer, typeDesc, 40 + 4);
TestStringValue("[]", packer, typeDesc, Fast ? (2 + 4) : (2 + 1));
TestStringValue("1234567", packer, typeDesc, Fast ? (7 + 4) : (7 + 1));
@@ -442,7 +452,7 @@ protected:
void TestUuidType() {
auto schemeType = NUdf::TDataType<NUdf::TUuid>::Id;
TString typeDesc = TStringBuilder() << ", Type:" << NUdf::GetDataTypeInfo(NUdf::GetDataSlot(schemeType)).Name;
- TValuePackerType packer(false, PgmBuilder_.NewDataType(schemeType));
+ auto packer = MakeValuePacker(false, PgmBuilder_.NewDataType(schemeType), EValuePackerVersion::V0);
TestStringValue("0123456789abcdef", packer, typeDesc, Fast ? 16 : (16 + 4));
}
@@ -459,7 +469,7 @@ protected:
void TestOptionalStringType(NUdf::TDataTypeId schemeType) {
TString typeDesc = TStringBuilder() << ", Type:Optional(" << NUdf::GetDataTypeInfo(NUdf::GetDataSlot(schemeType)).Name;
- TValuePackerType packer(false, PgmBuilder_.NewOptionalType(PgmBuilder_.NewDataType(schemeType)));
+ auto packer = MakeValuePacker(/*stable=*/false, PgmBuilder_.NewOptionalType(PgmBuilder_.NewDataType(schemeType)), EValuePackerVersion::V0);
TestOptionalStringValue("0123456789012345678901234567890123456789", packer, typeDesc, Fast ? (40 + 4 + 1) : (40 + 4));
TestOptionalStringValue(std::nullopt, packer, typeDesc, 1);
TestOptionalStringValue("[]", packer, typeDesc, Fast ? (2 + 4 + 1) : (2 + 1));
@@ -591,7 +601,7 @@ protected:
TType* tupleType;
const auto value = MakeTupleValue(tupleType);
- TValuePackerType packer(false, tupleType);
+ auto packer = MakeValuePacker(false, tupleType, EValuePackerVersion::V0);
auto buffer = packer.Pack(value);
@@ -625,8 +635,8 @@ protected:
if constexpr (Transport) {
auto itemType = PgmBuilder_.NewDataType(NUdf::TDataType<char *>::Id);
auto listType = PgmBuilder_.NewListType(itemType);
- TValuePackerType packer(false, itemType);
- TValuePackerType listPacker(false, listType);
+ auto packer = MakeValuePacker(false, itemType, EValuePackerVersion::V0);
+ auto listPacker = MakeValuePacker(false, listType, EValuePackerVersion::V0);
TStringBuf str = "01234567890ABCDEFG";
@@ -663,12 +673,15 @@ protected:
bool LegacyStruct = false;
bool TrimBlock = false;
TMaybe<ui8> MinFillPercentage;
+ EValuePackerVersion PackerVersion;
TString ToString() const {
auto result = TStringBuilder() << "Offset: " << Offset << ", Len: " << Len << ", LegacyStruct: " << LegacyStruct << ", TrimBlock: " << TrimBlock;
if (MinFillPercentage) {
result << ", MinFillPercentage: " << ui64(*MinFillPercentage);
}
+ result << ", PackerVersion: " << static_cast<int>(PackerVersion);
+
return result;
}
};
@@ -709,202 +722,228 @@ protected:
bool legacyStruct = args.LegacyStruct;
ui64 offset = args.Offset;
ui64 len = args.Len;
- if constexpr (Transport) {
- auto strType = PgmBuilder_.NewDataType(NUdf::TDataType<char*>::Id);
- auto ui32Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui32>::Id);
- auto ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id);
- auto optStrType = PgmBuilder_.NewOptionalType(strType);
- auto optUi32Type = PgmBuilder_.NewOptionalType(ui32Type);
-
- auto tupleOptUi32StrType = PgmBuilder_.NewTupleType({ optUi32Type, strType });
- auto optTupleOptUi32StrType = PgmBuilder_.NewOptionalType(tupleOptUi32StrType);
-
- auto blockUi32Type = PgmBuilder_.NewBlockType(ui32Type, TBlockType::EShape::Many);
- auto blockOptStrType = PgmBuilder_.NewBlockType(optStrType, TBlockType::EShape::Many);
- auto scalarOptStrType = PgmBuilder_.NewBlockType(optStrType, TBlockType::EShape::Scalar);
- auto blockOptTupleOptUi32StrType = PgmBuilder_.NewBlockType(optTupleOptUi32StrType, TBlockType::EShape::Many);
- auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
-
- auto tzDateType = PgmBuilder_.NewDataType(NUdf::EDataSlot::TzDate);
- auto blockTzDateType = PgmBuilder_.NewBlockType(tzDateType, TBlockType::EShape::Many);
- auto nullType = PgmBuilder_.NewNullType();
- auto blockNullType = PgmBuilder_.NewBlockType(nullType, TBlockType::EShape::Many);
-
- auto rowType =
- legacyStruct
- ? PgmBuilder_.NewStructType({
- {"A", blockUi32Type},
- {"B", blockOptStrType},
- {"_yql_block_length", scalarUi64Type},
- {"a", scalarOptStrType},
- {"b", blockOptTupleOptUi32StrType},
- {"c", blockTzDateType},
- {"nill", blockNullType},
- })
- : PgmBuilder_.NewMultiType(
- {blockUi32Type, blockOptStrType, scalarOptStrType,
- blockOptTupleOptUi32StrType, blockTzDateType, blockNullType, scalarUi64Type});
-
- ui64 blockLen = 1000;
- UNIT_ASSERT_LE(offset + len, blockLen);
-
- auto builder1 = MakeArrayBuilder(TTypeInfoHelper(), ui32Type, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(ui32Type)), nullptr);
- auto builder2 = MakeArrayBuilder(TTypeInfoHelper(), optStrType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optStrType)), nullptr);
- auto builder3 = MakeArrayBuilder(TTypeInfoHelper(), optTupleOptUi32StrType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optTupleOptUi32StrType)), nullptr);
- auto builder4 = MakeArrayBuilder(TTypeInfoHelper(), tzDateType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(tzDateType)), nullptr);
- auto builder5 = MakeArrayBuilder(TTypeInfoHelper(), nullType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(nullType)), nullptr);
-
- for (ui32 i = 0; i < blockLen; ++i) {
- TBlockItem b1(i);
- builder1->Add(b1);
-
- TString a = "a string " + ToString(i);
- TBlockItem b2 = (i % 2) ? TBlockItem(a) : TBlockItem();
- builder2->Add(b2);
-
- TBlockItem b3items[] = { (i % 2) ? TBlockItem(i) : TBlockItem(), TBlockItem(a) };
- TBlockItem b3 = (i % 7) ? TBlockItem(b3items) : TBlockItem();
- builder3->Add(b3);
-
- TBlockItem tzDate {i};
- tzDate.SetTimezoneId(i % 100);
- builder4->Add(tzDate);
- builder5->Add(TBlockItem::Zero());
+ auto strType = PgmBuilder_.NewDataType(NUdf::TDataType<char*>::Id);
+ auto ui32Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui32>::Id);
+ auto ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id);
+ auto optStrType = PgmBuilder_.NewOptionalType(strType);
+ auto optUi32Type = PgmBuilder_.NewOptionalType(ui32Type);
+ auto optOptUi32Type = PgmBuilder_.NewOptionalType(optUi32Type);
+
+ auto tupleOptUi32StrType = PgmBuilder_.NewTupleType({optUi32Type, strType});
+ auto optTupleOptUi32StrType = PgmBuilder_.NewOptionalType(tupleOptUi32StrType);
+
+ auto blockUi32Type = PgmBuilder_.NewBlockType(ui32Type, TBlockType::EShape::Many);
+ auto blockOptStrType = PgmBuilder_.NewBlockType(optStrType, TBlockType::EShape::Many);
+ auto scalarOptStrType = PgmBuilder_.NewBlockType(optStrType, TBlockType::EShape::Scalar);
+ auto blockOptTupleOptUi32StrType = PgmBuilder_.NewBlockType(optTupleOptUi32StrType, TBlockType::EShape::Many);
+ auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
+ auto blockOptOptUi32Type = PgmBuilder_.NewBlockType(optOptUi32Type, TBlockType::EShape::Many);
+ auto tzDateType = PgmBuilder_.NewDataType(NUdf::EDataSlot::TzDate);
+ auto blockTzDateType = PgmBuilder_.NewBlockType(tzDateType, TBlockType::EShape::Many);
+ auto nullType = PgmBuilder_.NewNullType();
+ auto blockNullType = PgmBuilder_.NewBlockType(nullType, TBlockType::EShape::Many);
+
+ auto rowType =
+ legacyStruct
+ ? PgmBuilder_.NewStructType({{"A", blockUi32Type},
+ {"B", blockOptStrType},
+ {"_yql_block_length", scalarUi64Type},
+ {"a", scalarOptStrType},
+ {"b", blockOptTupleOptUi32StrType},
+ {"c", blockTzDateType},
+ {"nill", blockNullType},
+ {"optOptInt32", blockOptOptUi32Type}})
+ : PgmBuilder_.NewMultiType(
+ {blockUi32Type, blockOptStrType, scalarOptStrType,
+ blockOptTupleOptUi32StrType, blockTzDateType, blockNullType, blockOptOptUi32Type, scalarUi64Type});
+
+ ui64 blockLen = 1000;
+ UNIT_ASSERT_LE(offset + len, blockLen);
+
+ auto builder1 = MakeArrayBuilder(TTypeInfoHelper(), ui32Type, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(ui32Type)), nullptr);
+ auto builder2 = MakeArrayBuilder(TTypeInfoHelper(), optStrType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optStrType)), nullptr);
+ auto builder3 = MakeArrayBuilder(TTypeInfoHelper(), optTupleOptUi32StrType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optTupleOptUi32StrType)), nullptr);
+ auto builder4 = MakeArrayBuilder(TTypeInfoHelper(), tzDateType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(tzDateType)), nullptr);
+ auto builder5 = MakeArrayBuilder(TTypeInfoHelper(), nullType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(nullType)), nullptr);
+ auto builder6 = MakeArrayBuilder(TTypeInfoHelper(), optOptUi32Type, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optOptUi32Type)), nullptr);
+
+ for (ui32 i = 0; i < blockLen; ++i) {
+ TBlockItem b1(i);
+ builder1->Add(b1);
+
+ TString a = "a string " + ToString(i);
+ TBlockItem b2 = (i % 2) ? TBlockItem(a) : TBlockItem();
+ builder2->Add(b2);
+
+ TBlockItem b3items[] = {(i % 2) ? TBlockItem(i) : TBlockItem(), TBlockItem(a)};
+ TBlockItem b3 = (i % 7) ? TBlockItem(b3items) : TBlockItem();
+ builder3->Add(b3);
+
+ TBlockItem tzDate{i};
+ tzDate.SetTimezoneId(i % 100);
+ builder4->Add(tzDate);
+ builder5->Add(TBlockItem::Zero());
+ switch (i % 3) {
+ case 0:
+ builder6->Add(TBlockItem(i));
+ break;
+ case 1:
+ builder6->Add(TBlockItem().MakeOptional());
+ break;
+ case 2:
+ builder6->Add(TBlockItem());
+ break;
}
+ }
- std::string_view testScalarString = "foobar";
- auto strbuf = std::make_shared<arrow::Buffer>((const ui8*)testScalarString.data(), testScalarString.size());
-
- TVector<arrow::Datum> datums;
- if (legacyStruct) {
- datums.emplace_back(builder1->Build(true));
- datums.emplace_back(builder2->Build(true));
- datums.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen)));
- datums.emplace_back(arrow::Datum(std::make_shared<arrow::BinaryScalar>(strbuf)));
- datums.emplace_back(builder3->Build(true));
- datums.emplace_back(builder4->Build(true));
- datums.emplace_back(builder5->Build(true));
- } else {
- datums.emplace_back(builder1->Build(true));
- datums.emplace_back(builder2->Build(true));
- datums.emplace_back(arrow::Datum(std::make_shared<arrow::BinaryScalar>(strbuf)));
- datums.emplace_back(builder3->Build(true));
- datums.emplace_back(builder4->Build(true));
- datums.emplace_back(builder5->Build(true));
- datums.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen)));
- }
+ std::string_view testScalarString = "foobar";
+ auto strbuf = std::make_shared<arrow::Buffer>((const ui8*)testScalarString.data(), testScalarString.size());
+
+ TVector<arrow::Datum> datums;
+ if (legacyStruct) {
+ datums.emplace_back(builder1->Build(true));
+ datums.emplace_back(builder2->Build(true));
+ datums.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen)));
+ datums.emplace_back(arrow::Datum(std::make_shared<arrow::BinaryScalar>(strbuf)));
+ datums.emplace_back(builder3->Build(true));
+ datums.emplace_back(builder4->Build(true));
+ datums.emplace_back(builder5->Build(true));
+ datums.emplace_back(builder6->Build(true));
+ } else {
+ datums.emplace_back(builder1->Build(true));
+ datums.emplace_back(builder2->Build(true));
+ datums.emplace_back(arrow::Datum(std::make_shared<arrow::BinaryScalar>(strbuf)));
+ datums.emplace_back(builder3->Build(true));
+ datums.emplace_back(builder4->Build(true));
+ datums.emplace_back(builder5->Build(true));
+ datums.emplace_back(builder6->Build(true));
+ datums.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen)));
+ }
- const ui32 blockLenIndex = legacyStruct ? 2 : 6;
- if (offset != 0 || len != blockLen) {
- for (auto& datum : datums) {
- if (datum.is_array()) {
- datum = NYql::NUdf::DeepSlice(datum.array(), offset, len);
- }
+ const ui32 blockLenIndex = legacyStruct ? 2 : datums.size() - 1;
+ if (offset != 0 || len != blockLen) {
+ for (auto& datum : datums) {
+ if (datum.is_array()) {
+ datum = NYql::NUdf::DeepSlice(datum.array(), offset, len);
}
- datums[blockLenIndex] = arrow::Datum(std::make_shared<arrow::UInt64Scalar>(len));
}
+ datums[blockLenIndex] = arrow::Datum(std::make_shared<arrow::UInt64Scalar>(len));
+ }
- const auto trimmerFactory = [&](ui32 index) {
- const TType* columnType = legacyStruct ? static_cast<const TStructType*>(rowType)->GetMemberType(index)
- : static_cast<const TMultiType*>(rowType)->GetElementType(index);
- return MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), static_cast<const TBlockType*>(columnType)->GetItemType(), ArrowPool_);
- };
- if (args.TrimBlock) {
- for (ui32 index = 0; index < datums.size(); ++index) {
- auto& datum = datums[index];
- if (!datum.is_array()) {
- continue;
- }
- datum = trimmerFactory(index)->Trim(datum.array());
+ const auto trimmerFactory = [&](ui32 index) {
+ const TType* columnType = legacyStruct ? static_cast<const TStructType*>(rowType)->GetMemberType(index)
+ : static_cast<const TMultiType*>(rowType)->GetElementType(index);
+ return MakeBlockTrimmer(NMiniKQL::TTypeInfoHelper(), static_cast<const TBlockType*>(columnType)->GetItemType(), ArrowPool_);
+ };
+ if (args.TrimBlock) {
+ for (ui32 index = 0; index < datums.size(); ++index) {
+ auto& datum = datums[index];
+ if (!datum.is_array()) {
+ continue;
}
+ datum = trimmerFactory(index)->Trim(datum.array());
}
- TUnboxedValueVector columns;
- for (auto& datum : datums) {
- columns.emplace_back(HolderFactory_.CreateArrowBlock(std::move(datum)));
- }
+ }
+ TUnboxedValueVector columns;
+ for (auto& datum : datums) {
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(std::move(datum)));
+ }
- TValuePackerType packer(false, rowType, {}, ArrowPool_, args.MinFillPercentage);
- if (legacyStruct) {
- TUnboxedValueVector columnsCopy = columns;
- NUdf::TUnboxedValue row = HolderFactory_.VectorAsArray(columnsCopy);
- packer.AddItem(row);
- } else {
- packer.AddWideItem(columns.data(), columns.size());
- }
- TChunkedBuffer packed = packer.Finish();
+ auto senderPacker = MakeValuePacker(false, rowType, args.PackerVersion, {}, ArrowPool_, args.MinFillPercentage);
+ if (legacyStruct) {
+ TUnboxedValueVector columnsCopy = columns;
+ NUdf::TUnboxedValue row = HolderFactory_.VectorAsArray(columnsCopy);
+ senderPacker.AddItem(row);
+ } else {
+ senderPacker.AddWideItem(columns.data(), columns.size());
+ }
+ TChunkedBuffer packed = senderPacker.Finish();
- TUnboxedValueBatch unpacked(rowType);
- packer.UnpackBatch(std::move(packed), HolderFactory_, unpacked);
+ TUnboxedValueBatch unpacked(rowType);
+ auto receiverPacker = MakeValuePacker(false, rowType, args.PackerVersion, {}, ArrowPool_, args.MinFillPercentage);
+ receiverPacker.UnpackBatch(std::move(packed), HolderFactory_, unpacked);
- UNIT_ASSERT_VALUES_EQUAL(unpacked.RowCount(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(unpacked.RowCount(), 1);
- TUnboxedValueVector unpackedColumns;
- if (legacyStruct) {
- auto elements = unpacked.Head()->GetElements();
- unpackedColumns.insert(unpackedColumns.end(), elements, elements + columns.size());
- } else {
- unpacked.ForEachRowWide([&](const NYql::NUdf::TUnboxedValue* values, ui32 count) {
- unpackedColumns.insert(unpackedColumns.end(), values, values + count);
- });
- }
+ TUnboxedValueVector unpackedColumns;
+ if (legacyStruct) {
+ auto elements = unpacked.Head()->GetElements();
+ unpackedColumns.insert(unpackedColumns.end(), elements, elements + columns.size());
+ } else {
+ unpacked.ForEachRowWide([&](const NYql::NUdf::TUnboxedValue* values, ui32 count) {
+ unpackedColumns.insert(unpackedColumns.end(), values, values + count);
+ });
+ }
- UNIT_ASSERT_VALUES_EQUAL(unpackedColumns.size(), columns.size());
- if (legacyStruct) {
- UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns[2]).GetDatum().scalar_as<arrow::UInt64Scalar>().value, len);
- UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns[3]).GetDatum().scalar_as<arrow::BinaryScalar>().value->ToString(), testScalarString);
- } else {
- UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns.back()).GetDatum().scalar_as<arrow::UInt64Scalar>().value, len);
- UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns[2]).GetDatum().scalar_as<arrow::BinaryScalar>().value->ToString(), testScalarString);
- }
+ UNIT_ASSERT_VALUES_EQUAL(unpackedColumns.size(), columns.size());
+ if (legacyStruct) {
+ UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns[2]).GetDatum().scalar_as<arrow::UInt64Scalar>().value, len);
+ UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns[3]).GetDatum().scalar_as<arrow::BinaryScalar>().value->ToString(), testScalarString);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns.back()).GetDatum().scalar_as<arrow::UInt64Scalar>().value, len);
+ UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns[2]).GetDatum().scalar_as<arrow::BinaryScalar>().value->ToString(), testScalarString);
+ }
- if (args.MinFillPercentage) {
- for (size_t i = 0; i < unpackedColumns.size(); ++i) {
- auto datum = TArrowBlock::From(unpackedColumns[i]).GetDatum();
- if (datum.is_scalar()) {
- continue;
- }
- const auto unpackedSize = NUdf::GetSizeOfArrayDataInBytes(*datum.array());
- const auto trimmedSize = NUdf::GetSizeOfArrayDataInBytes(*trimmerFactory(i)->Trim(datum.array()));
- UNIT_ASSERT_GE_C(trimmedSize, unpackedSize * *args.MinFillPercentage / 100, "column: " << i);
+ if (args.MinFillPercentage) {
+ for (size_t i = 0; i < unpackedColumns.size(); ++i) {
+ auto datum = TArrowBlock::From(unpackedColumns[i]).GetDatum();
+ if (datum.is_scalar()) {
+ continue;
}
+ const auto unpackedSize = NUdf::GetSizeOfArrayDataInBytes(*datum.array());
+ const auto trimmedSize = NUdf::GetSizeOfArrayDataInBytes(*trimmerFactory(i)->Trim(datum.array()));
+ UNIT_ASSERT_GE_C(trimmedSize, unpackedSize * *args.MinFillPercentage / 100, "column: " << i);
}
+ }
- auto reader1 = MakeBlockReader(TTypeInfoHelper(), ui32Type);
- auto reader2 = MakeBlockReader(TTypeInfoHelper(), optStrType);
- auto reader3 = MakeBlockReader(TTypeInfoHelper(), optTupleOptUi32StrType);
- auto reader4 = MakeBlockReader(TTypeInfoHelper(), tzDateType);
- auto reader5 = MakeBlockReader(TTypeInfoHelper(), nullType);
+ auto reader1 = MakeBlockReader(TTypeInfoHelper(), ui32Type);
+ auto reader2 = MakeBlockReader(TTypeInfoHelper(), optStrType);
+ auto reader3 = MakeBlockReader(TTypeInfoHelper(), optTupleOptUi32StrType);
+ auto reader4 = MakeBlockReader(TTypeInfoHelper(), tzDateType);
+ auto reader5 = MakeBlockReader(TTypeInfoHelper(), nullType);
+ auto reader6 = MakeBlockReader(TTypeInfoHelper(), optOptUi32Type);
- for (ui32 i = offset; i < len; ++i) {
- TBlockItem b1 = reader1->GetItem(*TArrowBlock::From(unpackedColumns[0]).GetDatum().array(), i - offset);
- UNIT_ASSERT_VALUES_EQUAL(b1.As<ui32>(), i);
+ for (ui32 i = offset; i < len; ++i) {
+ TBlockItem b1 = reader1->GetItem(*TArrowBlock::From(unpackedColumns[0]).GetDatum().array(), i - offset);
+ UNIT_ASSERT_VALUES_EQUAL(b1.As<ui32>(), i);
- TString a = "a string " + ToString(i);
- TBlockItem b2 = reader2->GetItem(*TArrowBlock::From(unpackedColumns[1]).GetDatum().array(), i - offset);
- if (i % 2) {
- UNIT_ASSERT_VALUES_EQUAL(std::string_view(b2.AsStringRef()), a);
- } else {
- UNIT_ASSERT(!b2);
- }
+ TString a = "a string " + ToString(i);
+ TBlockItem b2 = reader2->GetItem(*TArrowBlock::From(unpackedColumns[1]).GetDatum().array(), i - offset);
+ if (i % 2) {
+ UNIT_ASSERT_VALUES_EQUAL(std::string_view(b2.AsStringRef()), a);
+ } else {
+ UNIT_ASSERT(!b2);
+ }
- TBlockItem b3 = reader3->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 4 : 3]).GetDatum().array(), i - offset);
- if (i % 7) {
- auto elements = b3.GetElements();
- if (i % 2) {
- UNIT_ASSERT_VALUES_EQUAL(elements[0].As<ui32>(), i);
- } else {
- UNIT_ASSERT(!elements[0]);
- }
- UNIT_ASSERT_VALUES_EQUAL(std::string_view(elements[1].AsStringRef()), a);
+ TBlockItem b3 = reader3->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 4 : 3]).GetDatum().array(), i - offset);
+ if (i % 7) {
+ auto elements = b3.GetElements();
+ if (i % 2) {
+ UNIT_ASSERT_VALUES_EQUAL(elements[0].As<ui32>(), i);
} else {
- UNIT_ASSERT(!b3);
+ UNIT_ASSERT(!elements[0]);
}
+ UNIT_ASSERT_VALUES_EQUAL(std::string_view(elements[1].AsStringRef()), a);
+ } else {
+ UNIT_ASSERT(!b3);
+ }
- TBlockItem b4 = reader4->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 5 : 4]).GetDatum().array(), i - offset);
- UNIT_ASSERT(b4.Get<ui16>() == i);
- UNIT_ASSERT(b4.GetTimezoneId() == (i % 100));
- TBlockItem b5 = reader5->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 6 : 5]).GetDatum().array(), i - offset);
- UNIT_ASSERT(b5);
+ TBlockItem b4 = reader4->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 5 : 4]).GetDatum().array(), i - offset);
+ UNIT_ASSERT(b4.Get<ui16>() == i);
+ UNIT_ASSERT(b4.GetTimezoneId() == (i % 100));
+ TBlockItem b5 = reader5->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 6 : 5]).GetDatum().array(), i - offset);
+ UNIT_ASSERT(b5);
+ TBlockItem b6 = reader6->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 7 : 6]).GetDatum().array(), i - offset);
+ switch (i % 3) {
+ case 0:
+ UNIT_ASSERT_EQUAL(b6.Get<ui64>(), i);
+ break;
+ case 1:
+ UNIT_ASSERT(!b6.GetOptionalValue());
+ break;
+ case 2:
+ UNIT_ASSERT(!b6);
+ break;
}
}
}
@@ -929,13 +968,425 @@ protected:
TBlockTestArgs args;
TestBlockPackingCases(args, {
MakeIntrusive<TArgsDispatcher<TBlockTestArgs>>(args, std::vector<TBlockTestArgs>{
+ {.Offset = 0, .Len = 3},
{.Offset = 0, .Len = 1000},
{.Offset = 19, .Len = 623}
}),
MakeIntrusive<TArgsDispatcher<bool>>(args.LegacyStruct, std::vector<bool>{false, true}),
MakeIntrusive<TArgsDispatcher<bool>>(args.TrimBlock, std::vector<bool>{false, true}),
- MakeIntrusive<TArgsDispatcher<TMaybe<ui8>>>(args.MinFillPercentage, std::vector<TMaybe<ui8>>{Nothing(), 90})
+ MakeIntrusive<TArgsDispatcher<TMaybe<ui8>>>(args.MinFillPercentage, std::vector<TMaybe<ui8>>{Nothing(), 90}),
+ MakeIntrusive<TArgsDispatcher<EValuePackerVersion>>(args.PackerVersion, std::vector<EValuePackerVersion>{EValuePackerVersion::V0, EValuePackerVersion::V1})
+ });
+ }
+
+ void TestBlockPackingSerializeUi32Impl(EValuePackerVersion valuePackerVersion, size_t expectedHash) {
+ auto* ui32Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui32>::Id);
+ auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id);
+
+ auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
+
+ auto blockUi32Type = PgmBuilder_.NewBlockType(ui32Type, TBlockType::EShape::Many);
+ auto* rowType = PgmBuilder_.NewMultiType({blockUi32Type, scalarUi64Type});
+
+ auto builder = MakeArrayBuilder(TTypeInfoHelper(), ui32Type, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(ui32Type)), nullptr);
+ builder->Add(NYql::NUdf::TBlockItem(1));
+ builder->Add(NYql::NUdf::TBlockItem(2));
+ builder->Add(NYql::NUdf::TBlockItem(3));
+ TUnboxedValueVector columns;
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true)));
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3))));
+ TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish();
+ TStringStream stream;
+ serialized.CopyTo(stream);
+ auto actualHash = THash<TStringBuf>{}(stream.Str());
+ UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash);
+ }
+
+ void TestBlockPackingSerializeUi32() {
+ TestBlockPackingSerializeUi32Impl(EValuePackerVersion::V0, 12952134869926058643U);
+ TestBlockPackingSerializeUi32Impl(EValuePackerVersion::V0, 12952134869926058643U);
+ }
+
+ void TestBlockPackingSerializeOptionalUi32Impl(EValuePackerVersion valuePackerVersion, size_t expectedHash) {
+ auto* ui32Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui32>::Id);
+ auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id);
+ auto* optUi32Type = PgmBuilder_.NewOptionalType(ui32Type);
+
+ auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
+ auto blockOptUi32Type = PgmBuilder_.NewBlockType(optUi32Type, TBlockType::EShape::Many);
+ auto* rowType = PgmBuilder_.NewMultiType({blockOptUi32Type, scalarUi64Type});
+
+ auto builder = MakeArrayBuilder(TTypeInfoHelper(), optUi32Type, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optUi32Type)), nullptr);
+ builder->Add(NYql::NUdf::TBlockItem(ui32(1)));
+ builder->Add(NYql::NUdf::TBlockItem());
+ builder->Add(NYql::NUdf::TBlockItem(ui32(3)));
+ TUnboxedValueVector columns;
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true)));
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3))));
+ TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish();
+ TStringStream stream;
+ serialized.CopyTo(stream);
+ auto actualHash = THash<TStringBuf>{}(stream.Str());
+ UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash);
+ }
+
+ void TestBlockPackingSerializeOptionalUi32() {
+ TestBlockPackingSerializeOptionalUi32Impl(EValuePackerVersion::V0, 2597716339394977815U);
+ TestBlockPackingSerializeOptionalUi32Impl(EValuePackerVersion::V1, 13473152532547735594U);
+ }
+
+ void TestBlockPackingSerializeStringImpl(EValuePackerVersion valuePackerVersion, size_t expectedHash) {
+ auto* strType = PgmBuilder_.NewDataType(NUdf::TDataType<char*>::Id);
+ auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id);
+
+ auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
+ auto blockStrType = PgmBuilder_.NewBlockType(strType, TBlockType::EShape::Many);
+ auto* rowType = PgmBuilder_.NewMultiType({blockStrType, scalarUi64Type});
+
+ auto builder = MakeArrayBuilder(TTypeInfoHelper(), strType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(strType)), nullptr);
+ builder->Add(NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("str1")));
+ builder->Add(NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("str2")));
+ builder->Add(NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("str3")));
+ TUnboxedValueVector columns;
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true)));
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3))));
+ TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish();
+ TStringStream stream;
+ serialized.CopyTo(stream);
+ auto actualHash = THash<TStringBuf>{}(stream.Str());
+ UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash);
+ }
+
+ void TestBlockPackingSerializeString() {
+ TestBlockPackingSerializeStringImpl(EValuePackerVersion::V0, 2530494095874447064U);
+ TestBlockPackingSerializeStringImpl(EValuePackerVersion::V1, 10045743204568850498U);
+ }
+
+ void TestBlockPackingSerializeOptionalStringImpl(EValuePackerVersion valuePackerVersion, size_t expectedHash) {
+ auto* strType = PgmBuilder_.NewDataType(NUdf::TDataType<char*>::Id);
+ auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id);
+ auto* optStrType = PgmBuilder_.NewOptionalType(strType);
+
+ auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
+ auto blockOptStrType = PgmBuilder_.NewBlockType(optStrType, TBlockType::EShape::Many);
+ auto* rowType = PgmBuilder_.NewMultiType({blockOptStrType, scalarUi64Type});
+
+ auto builder = MakeArrayBuilder(TTypeInfoHelper(), optStrType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optStrType)), nullptr);
+ builder->Add(NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("str1")));
+ builder->Add(NYql::NUdf::TBlockItem());
+ builder->Add(NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("str3")));
+ TUnboxedValueVector columns;
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true)));
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3))));
+ TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish();
+ TStringStream stream;
+ serialized.CopyTo(stream);
+ auto actualHash = THash<TStringBuf>{}(stream.Str());
+ UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash);
+ }
+
+ void TestBlockPackingSerializeOptionalString() {
+ TestBlockPackingSerializeOptionalStringImpl(EValuePackerVersion::V0, 15589836522266239526U);
+ TestBlockPackingSerializeOptionalStringImpl(EValuePackerVersion::V1, 7371349434798600169U);
+ }
+
+ void TestBlockPackingSerializeOptionalOptionalUi32Impl(EValuePackerVersion valuePackerVersion, size_t expectedHash) {
+ auto* ui32Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui32>::Id);
+ auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id);
+ auto* optUi32Type = PgmBuilder_.NewOptionalType(ui32Type);
+ auto* optOptUi32Type = PgmBuilder_.NewOptionalType(optUi32Type);
+
+ auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
+ auto blockOptOptUi32Type = PgmBuilder_.NewBlockType(optOptUi32Type, TBlockType::EShape::Many);
+ auto* rowType = PgmBuilder_.NewMultiType({blockOptOptUi32Type, scalarUi64Type});
+
+ auto builder = MakeArrayBuilder(TTypeInfoHelper(), optOptUi32Type, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optOptUi32Type)), nullptr);
+ builder->Add(NYql::NUdf::TBlockItem(ui32(1)));
+ builder->Add(NYql::NUdf::TBlockItem());
+ builder->Add(NYql::NUdf::TBlockItem(ui32(3)));
+ TUnboxedValueVector columns;
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true)));
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3))));
+ TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish();
+ TStringStream stream;
+ serialized.CopyTo(stream);
+ auto actualHash = THash<TStringBuf>{}(stream.Str());
+
+ UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash);
+ }
+
+ void TestBlockPackingSerializeOptionalOptionalUi32() {
+ TestBlockPackingSerializeOptionalOptionalUi32Impl(EValuePackerVersion::V0, 8466504593757236838U);
+ TestBlockPackingSerializeOptionalOptionalUi32Impl(EValuePackerVersion::V1, 7467770574734535989U);
+ }
+
+ void TestBlockPackingSerializeSingularTypeImpl(EValuePackerVersion valuePackerVersion, size_t expectedHash) {
+ auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id);
+ auto* nullType = PgmBuilder_.NewNullType();
+
+ auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
+ auto blockNullType = PgmBuilder_.NewBlockType(nullType, TBlockType::EShape::Many);
+ auto* rowType = PgmBuilder_.NewMultiType({blockNullType, scalarUi64Type});
+
+ auto builder = MakeArrayBuilder(TTypeInfoHelper(), nullType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(nullType)), nullptr);
+ builder->Add(NYql::NUdf::TBlockItem::Zero());
+ builder->Add(NYql::NUdf::TBlockItem::Zero());
+ builder->Add(NYql::NUdf::TBlockItem::Zero());
+ TUnboxedValueVector columns;
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true)));
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3))));
+ TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish();
+ TStringStream stream;
+ serialized.CopyTo(stream);
+ auto actualHash = THash<TStringBuf>{}(stream.Str());
+
+ UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash);
+ }
+
+ void TestBlockPackingSerializeSingularType() {
+ TestBlockPackingSerializeSingularTypeImpl(EValuePackerVersion::V0, 2342694087331559075U);
+ TestBlockPackingSerializeSingularTypeImpl(EValuePackerVersion::V1, 814315123753111618U);
+ }
+
+ void TestBlockPackingSerializeOptionalSingularTypeImpl(EValuePackerVersion valuePackerVersion, size_t expectedHash) {
+ auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id);
+ auto* nullType = PgmBuilder_.NewNullType();
+ auto* optNullType = PgmBuilder_.NewOptionalType(nullType);
+
+ auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
+ auto blockOptNullType = PgmBuilder_.NewBlockType(optNullType, TBlockType::EShape::Many);
+ auto* rowType = PgmBuilder_.NewMultiType({blockOptNullType, scalarUi64Type});
+
+ auto builder = MakeArrayBuilder(TTypeInfoHelper(), optNullType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optNullType)), nullptr);
+ builder->Add(NYql::NUdf::TBlockItem::Zero());
+ builder->Add(NYql::NUdf::TBlockItem());
+ builder->Add(NYql::NUdf::TBlockItem::Zero());
+ TUnboxedValueVector columns;
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true)));
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3))));
+ TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish();
+ TStringStream stream;
+ serialized.CopyTo(stream);
+ auto actualHash = THash<TStringBuf>{}(stream.Str());
+
+ UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash);
+ }
+
+ void TestBlockPackingSerializeOptionalSingularType() {
+ TestBlockPackingSerializeOptionalSingularTypeImpl(EValuePackerVersion::V0, 7314801330449120945U);
+ TestBlockPackingSerializeOptionalSingularTypeImpl(EValuePackerVersion::V1, 13506722731499795140U);
+ }
+
+ void TestBlockPackingSerializeTTzDateImpl(EValuePackerVersion valuePackerVersion, size_t expectedHash) {
+ auto* tzDateType = PgmBuilder_.NewDataType(NUdf::EDataSlot::TzDate);
+ auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id);
+
+ auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
+ auto blockTzDateType = PgmBuilder_.NewBlockType(tzDateType, TBlockType::EShape::Many);
+ auto* rowType = PgmBuilder_.NewMultiType({blockTzDateType, scalarUi64Type});
+
+ auto builder = MakeArrayBuilder(TTypeInfoHelper(), tzDateType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(tzDateType)), nullptr);
+ NYql::NUdf::TBlockItem tzDate1{ui16(1)};
+ tzDate1.SetTimezoneId(100);
+ builder->Add(tzDate1);
+ NYql::NUdf::TBlockItem tzDate2{ui16(2)};
+ tzDate2.SetTimezoneId(200);
+ builder->Add(tzDate2);
+ NYql::NUdf::TBlockItem tzDate3{ui16(3)};
+ tzDate3.SetTimezoneId(300);
+ builder->Add(tzDate3);
+ TUnboxedValueVector columns;
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true)));
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3))));
+ TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish();
+ TStringStream stream;
+ serialized.CopyTo(stream);
+ auto actualHash = THash<TStringBuf>{}(stream.Str());
+
+ UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash);
+ }
+
+ void TestBlockPackingSerializeTTzDate() {
+ TestBlockPackingSerializeTTzDateImpl(EValuePackerVersion::V0, 17903070954188109601U);
+ TestBlockPackingSerializeTTzDateImpl(EValuePackerVersion::V1, 2802122333798714691);
+ }
+
+ void TestBlockPackingSerializeOptionalTTzDateImpl(EValuePackerVersion valuePackerVersion, size_t expectedHash) {
+ auto* tzDateType = PgmBuilder_.NewDataType(NUdf::EDataSlot::TzDate);
+ auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id);
+ auto* optTzDateType = PgmBuilder_.NewOptionalType(tzDateType);
+
+ auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
+ auto blockOptTzDateType = PgmBuilder_.NewBlockType(optTzDateType, TBlockType::EShape::Many);
+ auto* rowType = PgmBuilder_.NewMultiType({blockOptTzDateType, scalarUi64Type});
+
+ auto builder = MakeArrayBuilder(TTypeInfoHelper(), optTzDateType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optTzDateType)), nullptr);
+ NYql::NUdf::TBlockItem tzDate1{ui16(1)};
+ tzDate1.SetTimezoneId(100);
+ builder->Add(tzDate1);
+ builder->Add(NYql::NUdf::TBlockItem());
+ NYql::NUdf::TBlockItem tzDate3{ui16(3)};
+ tzDate3.SetTimezoneId(300);
+ builder->Add(tzDate3);
+ TUnboxedValueVector columns;
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true)));
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3))));
+ TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish();
+ TStringStream stream;
+ serialized.CopyTo(stream);
+ auto actualHash = THash<TStringBuf>{}(stream.Str());
+
+ UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash);
+ }
+
+ void TestBlockPackingSerializeOptionalTTzDate() {
+ TestBlockPackingSerializeOptionalTTzDateImpl(EValuePackerVersion::V0, 18400739887390114022U);
+ TestBlockPackingSerializeOptionalTTzDateImpl(EValuePackerVersion::V1, 11315052524575174297U);
+ }
+
+ void TestBlockPackingSerializeTupleInt32StringImpl(EValuePackerVersion valuePackerVersion, size_t expectedHash) {
+ auto* i32Type = PgmBuilder_.NewDataType(NUdf::TDataType<i32>::Id);
+ auto* strType = PgmBuilder_.NewDataType(NUdf::TDataType<char*>::Id);
+ auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id);
+ auto* tupleType = PgmBuilder_.NewTupleType({i32Type, strType});
+
+ auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
+ auto blockTupleType = PgmBuilder_.NewBlockType(tupleType, TBlockType::EShape::Many);
+ auto* rowType = PgmBuilder_.NewMultiType({blockTupleType, scalarUi64Type});
+
+ auto builder = MakeArrayBuilder(TTypeInfoHelper(), tupleType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(tupleType)), nullptr);
+ NYql::NUdf::TBlockItem tuple1Items[] = {NYql::NUdf::TBlockItem(i32(1)), NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("a"))};
+ builder->Add(NYql::NUdf::TBlockItem(tuple1Items));
+ NYql::NUdf::TBlockItem tuple2Items[] = {NYql::NUdf::TBlockItem(i32(2)), NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("b"))};
+ builder->Add(NYql::NUdf::TBlockItem(tuple2Items));
+ NYql::NUdf::TBlockItem tuple3Items[] = {NYql::NUdf::TBlockItem(i32(3)), NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("c"))};
+ builder->Add(NYql::NUdf::TBlockItem(tuple3Items));
+ TUnboxedValueVector columns;
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true)));
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3))));
+ TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish();
+ TStringStream stream;
+ serialized.CopyTo(stream);
+ auto actualHash = THash<TStringBuf>{}(stream.Str());
+
+ UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash);
+ }
+
+ void TestBlockPackingSerializeTupleInt32String() {
+ TestBlockPackingSerializeTupleInt32StringImpl(EValuePackerVersion::V0, 6542528838520568576U);
+ TestBlockPackingSerializeTupleInt32StringImpl(EValuePackerVersion::V1, 13820338994079843620U);
+ }
+
+ void TestBlockPackingSerializeOptionalTupleInt32StringImpl(EValuePackerVersion valuePackerVersion, size_t expectedHash) {
+ auto* i32Type = PgmBuilder_.NewDataType(NUdf::TDataType<i32>::Id);
+ auto* strType = PgmBuilder_.NewDataType(NUdf::TDataType<char*>::Id);
+ auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id);
+ auto* tupleType = PgmBuilder_.NewTupleType({i32Type, strType});
+ auto* optTupleType = PgmBuilder_.NewOptionalType(tupleType);
+
+ auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
+ auto blockOptTupleType = PgmBuilder_.NewBlockType(optTupleType, TBlockType::EShape::Many);
+ auto* rowType = PgmBuilder_.NewMultiType({blockOptTupleType, scalarUi64Type});
+
+ auto builder = MakeArrayBuilder(TTypeInfoHelper(), optTupleType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optTupleType)), nullptr);
+ NYql::NUdf::TBlockItem tuple1Items[] = {NYql::NUdf::TBlockItem(i32(1)), NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("a"))};
+ builder->Add(NYql::NUdf::TBlockItem(tuple1Items));
+ builder->Add(NYql::NUdf::TBlockItem());
+ NYql::NUdf::TBlockItem tuple3Items[] = {NYql::NUdf::TBlockItem(i32(3)), NYql::NUdf::TBlockItem(NYql::NUdf::TStringRef("c"))};
+ builder->Add(NYql::NUdf::TBlockItem(tuple3Items));
+ TUnboxedValueVector columns;
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(builder->Build(/*finish=*/true)));
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(3))));
+ TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish();
+ TStringStream stream;
+ serialized.CopyTo(stream);
+ auto actualHash = THash<TStringBuf>{}(stream.Str());
+
+ UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash);
+ }
+
+ void TestBlockPackingSerializeOptionalTupleInt32String() {
+ TestBlockPackingSerializeOptionalTupleInt32StringImpl(EValuePackerVersion::V0, 15729296940258124779U);
+ TestBlockPackingSerializeOptionalTupleInt32StringImpl(EValuePackerVersion::V1, 8613360264621805090U);
+ }
+
+ void TestBlockPackingSerializeTupleShiftedOffsetImpl(EValuePackerVersion valuePackerVersion, size_t expectedHash, bool expectedToFail) {
+ auto* i32Type = PgmBuilder_.NewDataType(NUdf::TDataType<i32>::Id);
+ auto* strType = PgmBuilder_.NewDataType(NUdf::TDataType<char*>::Id);
+ auto* ui64Type = PgmBuilder_.NewDataType(NUdf::TDataType<ui64>::Id);
+ auto* tupleType = PgmBuilder_.NewTupleType({i32Type, strType});
+ auto* blockTupleType = PgmBuilder_.NewBlockType(tupleType, TBlockType::EShape::Many);
+ auto scalarUi64Type = PgmBuilder_.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
+
+ auto* rowType = PgmBuilder_.NewMultiType({blockTupleType, scalarUi64Type});
+
+ auto bigString = (TString("Very long string") * 1000000).substr(0, 2000000);
+ auto builder = MakeArrayBuilder(TTypeInfoHelper(), tupleType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(tupleType)), nullptr);
+ {
+ TBlockItem tuple[] = {TBlockItem(1), TBlockItem(NYql::NUdf::TStringRef("Short string"))};
+ builder->Add(TBlockItem(tuple));
+ }
+ {
+ TBlockItem tuple[] = {TBlockItem(2), TBlockItem(NYql::NUdf::TStringRef("Short string"))};
+ builder->Add(TBlockItem(tuple));
+ }
+ {
+ TBlockItem tuple[] = {TBlockItem(3), TBlockItem(NYql::NUdf::TStringRef(bigString))};
+ builder->Add(TBlockItem(tuple));
+ }
+ auto buildResult = builder->Build(/*finish=*/true);
+ UNIT_ASSERT_EQUAL(buildResult.kind(), arrow::Datum::CHUNKED_ARRAY);
+ UNIT_ASSERT_EQUAL(buildResult.chunks().size(), 2);
+
+ auto children = buildResult.chunks()[1]->data()->child_data;
+ // Check that children's offsets are different.
+ // It is the main invariant for this test.
+ UNIT_ASSERT_UNEQUAL(children[0]->offset, children[1]->offset);
+
+ TUnboxedValueVector columns;
+ // Here we take only second chunk with exactly one element.
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(buildResult.chunks()[1]));
+ columns.emplace_back(HolderFactory_.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(1))));
+
+ if (expectedToFail) {
+ UNIT_ASSERT_EXCEPTION(MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish(), yexception);
+ return;
+ }
+ TChunkedBuffer serialized = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing()).AddWideItem(columns.data(), columns.size()).Finish();
+ TStringStream stream;
+ serialized.CopyTo(stream);
+ auto actualHash = THash<TStringBuf>{}(stream.Str());
+
+ UNIT_ASSERT_EQUAL_C(actualHash, expectedHash, TStringBuilder() << "Actual hash: " << actualHash);
+
+ // Test deserialization result
+ TUnboxedValueBatch unpacked(rowType);
+ auto receiverPacker = MakeValuePacker(false, rowType, valuePackerVersion, {}, ArrowPool_, Nothing());
+ receiverPacker.UnpackBatch(std::move(serialized), HolderFactory_, unpacked);
+
+ UNIT_ASSERT_VALUES_EQUAL(unpacked.RowCount(), 1);
+
+ TUnboxedValueVector unpackedColumns;
+ unpacked.ForEachRowWide([&](const NYql::NUdf::TUnboxedValue* values, ui32 count) {
+ unpackedColumns.insert(unpackedColumns.end(), values, values + count);
});
+
+ UNIT_ASSERT_VALUES_EQUAL(unpackedColumns.size(), 2);
+
+ auto tupleReader = MakeBlockReader(TTypeInfoHelper(), tupleType);
+ auto tupleArray = TArrowBlock::From(unpackedColumns[0]).GetDatum().array();
+ UNIT_ASSERT_VALUES_EQUAL(tupleArray->length, 1);
+ TBlockItem deserializedTuple = tupleReader->GetItem(*tupleArray, 0);
+ auto elements = deserializedTuple.GetElements();
+
+ UNIT_ASSERT_VALUES_EQUAL(elements[0].As<i32>(), 3);
+ UNIT_ASSERT_VALUES_EQUAL(TString(elements[1].AsStringRef()), bigString);
+ UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(unpackedColumns[1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value, 1);
+ }
+
+ void TestBlockPackingSerializeTupleShiftedOffset() {
+ TestBlockPackingSerializeTupleShiftedOffsetImpl(EValuePackerVersion::V0, 0U, /*expectedToFail=*/true);
+ TestBlockPackingSerializeTupleShiftedOffsetImpl(EValuePackerVersion::V1, 4876856722251275868U, /*expectedToFail=*/false);
}
private:
@@ -1015,6 +1466,19 @@ class TMiniKQLComputationNodeTransportPackTest: public TMiniKQLComputationNodePa
UNIT_TEST(TestRopeSplit);
UNIT_TEST(TestIncrementalPacking);
UNIT_TEST(TestBlockPacking);
+
+ UNIT_TEST(TestBlockPackingSerializeUi32);
+ UNIT_TEST(TestBlockPackingSerializeOptionalUi32);
+ UNIT_TEST(TestBlockPackingSerializeString);
+ UNIT_TEST(TestBlockPackingSerializeOptionalString);
+ UNIT_TEST(TestBlockPackingSerializeOptionalOptionalUi32);
+ UNIT_TEST(TestBlockPackingSerializeSingularType);
+ UNIT_TEST(TestBlockPackingSerializeOptionalSingularType);
+ UNIT_TEST(TestBlockPackingSerializeTTzDate);
+ UNIT_TEST(TestBlockPackingSerializeOptionalTTzDate);
+ UNIT_TEST(TestBlockPackingSerializeTupleInt32String);
+ UNIT_TEST(TestBlockPackingSerializeOptionalTupleInt32String);
+ UNIT_TEST(TestBlockPackingSerializeTupleShiftedOffset);
UNIT_TEST_SUITE_END();
};
@@ -1040,6 +1504,19 @@ class TMiniKQLComputationNodeTransportFastPackTest: public TMiniKQLComputationNo
UNIT_TEST(TestRopeSplit);
UNIT_TEST(TestIncrementalPacking);
UNIT_TEST(TestBlockPacking);
+
+ UNIT_TEST(TestBlockPackingSerializeUi32);
+ UNIT_TEST(TestBlockPackingSerializeOptionalUi32);
+ UNIT_TEST(TestBlockPackingSerializeString);
+ UNIT_TEST(TestBlockPackingSerializeOptionalString);
+ UNIT_TEST(TestBlockPackingSerializeOptionalOptionalUi32);
+ UNIT_TEST(TestBlockPackingSerializeSingularType);
+ UNIT_TEST(TestBlockPackingSerializeOptionalSingularType);
+ UNIT_TEST(TestBlockPackingSerializeTTzDate);
+ UNIT_TEST(TestBlockPackingSerializeOptionalTTzDate);
+ UNIT_TEST(TestBlockPackingSerializeTupleInt32String);
+ UNIT_TEST(TestBlockPackingSerializeOptionalTupleInt32String);
+ UNIT_TEST(TestBlockPackingSerializeTupleShiftedOffset);
UNIT_TEST_SUITE_END();
};
diff --git a/yql/essentials/minikql/computation/mkql_spiller_adapter.h b/yql/essentials/minikql/computation/mkql_spiller_adapter.h
index 8ddcfe46be4..a5c3c56c122 100644
--- a/yql/essentials/minikql/computation/mkql_spiller_adapter.h
+++ b/yql/essentials/minikql/computation/mkql_spiller_adapter.h
@@ -17,7 +17,7 @@ public:
: Spiller(spiller)
, ItemType(type)
, SizeLimit(sizeLimit)
- , Packer(type)
+ , Packer(type, EValuePackerVersion::V1)
{
}