summaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
diff options
context:
space:
mode:
authoratarasov5 <[email protected]>2025-07-23 11:24:03 +0300
committeratarasov5 <[email protected]>2025-07-23 12:25:55 +0300
commitf531c2bc5154672d6ba28f691d3a4bdf7ff90c34 (patch)
treeef0b71a8dd91f7719b54c95d5a01b3e4d729106a /yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
parent9d67ed79ecd1a554c6606e8ae27ca146bd828b05 (diff)
YQL-19995: Fix block serialization for child data with different offsets
В данном пулреквесте я добавил поддержку сериализации tuple/struct массивов с разными оффсетами. Сделал это через новый флаг `EValuePackerVersion::{V0, V1}`. Также нужно будет сделать коммит в contrib/ydb commit_hash:79709ad660a4295958e5488d3dd24d660f32ca9a
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_computation_node_pack.cpp')
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_pack.cpp145
1 files changed, 116 insertions, 29 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
index 95e2148e728..d2527662575 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
+++ b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
@@ -75,6 +75,43 @@ T UnpackData(TChunkedInputBuffer& buf) {
return res;
}
+class TBlockTransportFlags {
+public:
+ enum class EFlag: ui64 {
+ ScalarsArePresentFlag = 1 << 0,
+ StoreOffsetsForEachChildData = 1 << 1,
+ };
+
+ TBlockTransportFlags(ui64 data)
+ : Data_(data)
+ {
+ }
+
+ TBlockTransportFlags()
+ : Data_(0)
+ {
+ }
+
+ bool HasFlag(EFlag flag) const {
+ return (Data_ & ToIntegral(flag)) != 0;
+ }
+
+ TBlockTransportFlags* AddFlag(EFlag flag) {
+ Data_ = Data_ | ToIntegral(flag);
+ return this;
+ }
+
+ ui64 Data() const {
+ return Data_;
+ }
+
+private:
+ static constexpr ui64 ToIntegral(EFlag flag) {
+ return static_cast<ui64>(flag);
+ }
+ ui64 Data_ = 0;
+};
+
NUdf::TUnboxedValuePod UnpackString(TChunkedInputBuffer& buf, ui32 size) {
auto res = MakeStringNotFilled(size, 0);
NYql::NUdf::TMutableStringRef ref = res.AsStringRef();
@@ -984,6 +1021,17 @@ bool IsMultiBlock(const TType* type, ui32& blockLengthIndex, TVector<const TBloc
return true;
}
+bool IsOffsetExplicitStored(EValuePackerVersion valuePackerVersion) {
+ return valuePackerVersion >= EValuePackerVersion::V1;
+};
+
+size_t GetTopLevelOffsetsCount(EValuePackerVersion valuePackerVersion, ui32 width) {
+ if (!IsOffsetExplicitStored(valuePackerVersion)) {
+ return width - 1;
+ }
+ return 0;
+};
+
} // namespace
template<bool Fast>
@@ -1066,29 +1114,55 @@ TStringBuf TValuePackerGeneric<Fast>::Pack(const NUdf::TUnboxedValuePod& value)
// Transport packer
-template<bool Fast>
-TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage)
+template <bool Fast>
+TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, EValuePackerVersion valuePackerVersion, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage)
: Type_(type)
, BufferPageAllocSize_(bufferPageAllocSize ? *bufferPageAllocSize : TBufferPage::DefaultPageAllocSize)
, State_(ScanTypeProperties(Type_, false))
, IncrementalState_(ScanTypeProperties(Type_, true))
, ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool())
+ , ValuePackerVersion_(valuePackerVersion)
{
MKQL_ENSURE(!stable, "Stable packing is not supported");
InitBlocks(minFillPercentage);
}
-template<bool Fast>
-TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage)
+template <bool Fast>
+TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type,
+ TMaybe<size_t> bufferPageAllocSize,
+ arrow::MemoryPool* pool,
+ TMaybe<ui8> minFillPercentage)
+ : TValuePackerTransport(type, EValuePackerVersion::V0, bufferPageAllocSize, pool, minFillPercentage)
+{
+}
+
+template <bool Fast>
+TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, EValuePackerVersion valuePackerVersion, TMaybe<size_t> bufferPageAllocSize, arrow::MemoryPool* pool, TMaybe<ui8> minFillPercentage)
: Type_(type)
, BufferPageAllocSize_(bufferPageAllocSize ? *bufferPageAllocSize : TBufferPage::DefaultPageAllocSize)
, State_(ScanTypeProperties(Type_, false))
, IncrementalState_(ScanTypeProperties(Type_, true))
, ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool())
+ , ValuePackerVersion_(valuePackerVersion)
{
InitBlocks(minFillPercentage);
}
+template <bool Fast>
+TValuePackerTransport<Fast>::TValuePackerTransport(bool stable,
+ const TType* type,
+ TMaybe<size_t> bufferPageAllocSize,
+ arrow::MemoryPool* ppol,
+ TMaybe<ui8> minFillPercentage)
+ : TValuePackerTransport(stable,
+ type,
+ EValuePackerVersion::V0,
+ bufferPageAllocSize,
+ ppol,
+ minFillPercentage)
+{
+}
+
template<bool Fast>
void TValuePackerTransport<Fast>::InitBlocks(TMaybe<ui8> minFillPercentage) {
TVector<const TBlockType*> items;
@@ -1098,10 +1172,10 @@ void TValuePackerTransport<Fast>::InitBlocks(TMaybe<ui8> minFillPercentage) {
return;
}
- const TBlockSerializerParams serializerParams = {
- .Pool = &ArrowPool_,
- .MinFillPercentage = minFillPercentage
- };
+ const TBlockSerializerParams serializerParams(
+ &ArrowPool_,
+ minFillPercentage,
+ IsOffsetExplicitStored(ValuePackerVersion_));
IsBlock_ = true;
ConvertedScalars_.resize(items.size());
@@ -1112,7 +1186,7 @@ void TValuePackerTransport<Fast>::InitBlocks(TMaybe<ui8> minFillPercentage) {
if (i != BlockLenIndex_) {
const TBlockType* itemType = items[i];
BlockSerializers_[i] = MakeBlockSerializer(TTypeInfoHelper(), itemType->GetItemType(), serializerParams);
- BlockDeserializers_[i] = MakeBlockDeserializer(TTypeInfoHelper(), itemType->GetItemType());
+ BlockDeserializers_[i] = MakeBlockDeserializer(TTypeInfoHelper(), itemType->GetItemType(), serializerParams);
if (itemType->GetShape() == TBlockType::EShape::Scalar) {
BlockReaders_[i] = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), itemType->GetItemType());
}
@@ -1217,6 +1291,13 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons
auto metadataBuffer = std::make_shared<TBuffer>();
+
+ TBlockTransportFlags flags;
+ flags.AddFlag(TBlockTransportFlags::EFlag::ScalarsArePresentFlag);
+ if (IsOffsetExplicitStored(ValuePackerVersion_)) {
+ flags.AddFlag(TBlockTransportFlags::EFlag::StoreOffsetsForEachChildData);
+ }
+
ui32 totalMetadataCount = 0;
for (size_t i = 0; i < width; ++i) {
if (i != BlockLenIndex_) {
@@ -1227,11 +1308,11 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons
// calculate approximate metadata size
const size_t metadataReservedSize =
- MAX_PACKED64_SIZE + // block len
- MAX_PACKED64_SIZE + // feature flags
- (width - 1) + // 1-byte offsets
- MAX_PACKED32_SIZE + // metadata words count
- MAX_PACKED64_SIZE * totalMetadataCount; // metadata words
+ MAX_PACKED64_SIZE + // block len
+ MAX_PACKED64_SIZE + // feature flags
+ GetTopLevelOffsetsCount(ValuePackerVersion_, width) + // 1-byte offsets for V0, empty for higher versions.
+ MAX_PACKED32_SIZE + // metadata words count
+ MAX_PACKED64_SIZE * totalMetadataCount; // metadata words
metadataBuffer->Reserve(len ? metadataReservedSize : MAX_PACKED64_SIZE);
// save block length
@@ -1243,10 +1324,7 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons
return *this;
}
- // save feature flags
- // 1 = "scalars are present"
- const ui64 metadataFlags = 1 << 0;
- PackData<false>(metadataFlags, *metadataBuffer);
+ PackData<false>(flags.Data(), *metadataBuffer);
TVector<std::shared_ptr<arrow::ArrayData>> arrays(width);
// save reminder of original offset for each column - it is needed to properly handle offset in bitmaps
@@ -1260,7 +1338,9 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons
i64 offset = datum.array()->offset;
MKQL_ENSURE(offset >= 0, "Negative offset");
// all offsets should be equal
- MKQL_ENSURE(HasOffset(*datum.array(), offset), "Unexpected offset in child data");
+ if (!IsOffsetExplicitStored(ValuePackerVersion_)) {
+ MKQL_ENSURE(HasOffset(*datum.array(), offset), "Unexpected offset in child data");
+ }
reminder = offset % 8;
arrays[i] = datum.array();
} else {
@@ -1274,7 +1354,9 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons
}
arrays[i] = ConvertedScalars_[i];
}
- PackData<false>(reminder, *metadataBuffer);
+ if (!IsOffsetExplicitStored(ValuePackerVersion_)) {
+ PackData<false>(reminder, *metadataBuffer);
+ }
}
// save count of metadata words
@@ -1291,7 +1373,7 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons
}
}
- MKQL_ENSURE(savedMetadata == totalMetadataCount, "Serialization metadata error");
+ MKQL_ENSURE(savedMetadata == totalMetadataCount, "Serialization metadata error. Expected: " << totalMetadataCount << ", actual: " << savedMetadata);
BlockBuffer_.Append(TStringBuf(metadataBuffer->data(), metadataBuffer->size()), metadataBuffer);
// save buffers
@@ -1316,17 +1398,21 @@ void TValuePackerTransport<Fast>::UnpackBatchBlocks(TChunkedBuffer&& buf, const
}
// unpack flags
- const ui64 metadataFlags = UnpackData<false, ui64>(chunked);
- MKQL_ENSURE(metadataFlags == 1, "Unsupported metadata flags");
-
+ const auto metadataFlags = TBlockTransportFlags(UnpackData<false, ui64>(chunked));
+ MKQL_ENSURE(metadataFlags.HasFlag(TBlockTransportFlags::EFlag::ScalarsArePresentFlag), "Scalars are present flag must be set.");
+ if (IsOffsetExplicitStored(ValuePackerVersion_)) {
+ MKQL_ENSURE(metadataFlags.HasFlag(TBlockTransportFlags::EFlag::StoreOffsetsForEachChildData), "Offsets must be explicitly stored.");
+ }
// unpack array offsets
const ui32 width = BlockDeserializers_.size();
MKQL_ENSURE(width > 0, "Invalid width");
TVector<ui64> offsets(width);
- for (ui32 i = 0; i < width; ++i) {
- if (BlockDeserializers_[i]) {
- offsets[i] = UnpackData<false, ui8>(chunked);
- MKQL_ENSURE(offsets[i] < 8, "Unexpected offset value");
+ if (!IsOffsetExplicitStored(ValuePackerVersion_)) {
+ for (ui32 i = 0; i < width; ++i) {
+ if (BlockDeserializers_[i]) {
+ offsets[i] = UnpackData<false, ui8>(chunked);
+ MKQL_ENSURE(offsets[i] < 8, "Unexpected offset value. Actual offset is: " << offsets[i]);
+ }
}
}
@@ -1350,7 +1436,8 @@ void TValuePackerTransport<Fast>::UnpackBatchBlocks(TChunkedBuffer&& buf, const
if (i != BlockLenIndex_) {
MKQL_ENSURE(BlockDeserializers_[i], "Missing deserializer");
const bool isScalar = BlockReaders_[i] != nullptr;
- auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, offsets[i]);
+ TMaybe<size_t> offset = IsOffsetExplicitStored(ValuePackerVersion_) ? Nothing() : TMaybe<size_t>(offsets[i]);
+ auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, offset);
if (isScalar) {
TBlockItem item = BlockReaders_[i]->GetItem(*array, 0);
const TType* itemType = IsLegacyBlock_ ? static_cast<const TStructType*>(Type_)->GetMemberType(i) :