diff options
author | aneporada <aneporada@ydb.tech> | 2023-08-01 17:40:38 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-08-01 17:40:38 +0300 |
commit | 20efd2f292b02a79125f61b2b5790a89625d674a (patch) | |
tree | b4b1bcc45120e31c32cfc8faa71083b9f5d19d1f | |
parent | a5535be48cfd26688ac23f30cd8f6dfb587cb353 (diff) | |
download | ydb-20efd2f292b02a79125f61b2b5790a89625d674a.tar.gz |
Fix block serialization with different array offsets
initial
3 files changed, 80 insertions, 83 deletions
diff --git a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp index 2ccc5d3a82d..d96b08e090f 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp +++ b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp @@ -49,13 +49,14 @@ bool NeedStoreBitmap(const arrow::ArrayData& data) { return nullCount != 0 && nullCount != data.length; } -void StoreNullsSizes(const arrow::ArrayData& data, ui64 desiredOffset, const IBlockSerializer::TMetadataSink& metaSink) { +void StoreNullsSizes(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) { metaSink(data.GetNullCount()); if (!NeedStoreBitmap(data)) { metaSink(0); return; } + const ui64 desiredOffset = data.offset % 8; size_t nullBytes = (((size_t)data.length + desiredOffset + 7) & ~7ull) >> 3; metaSink(nullBytes); } @@ -66,10 +67,11 @@ void LoadNullsSizes(const IBlockDeserializer::TMetadataSource& metaSource, TMayb nullsSize = metaSource(); } -void StoreNulls(const arrow::ArrayData& data, ui64 desiredOffset, TRope& dst) { +void StoreNulls(const arrow::ArrayData& data, TRope& dst) { if (!NeedStoreBitmap(data)) { return; } + const ui64 desiredOffset = data.offset % 8; size_t nullBytes = (((size_t)data.length + desiredOffset + 7) & ~7ull) >> 3; YQL_ENSURE(desiredOffset <= data.offset); YQL_ENSURE((data.offset - desiredOffset) % 8 == 0); @@ -177,30 +179,30 @@ private: return Nullable ? 3 : 1; } - void StoreMetadata(const arrow::ArrayData& data, ui64 offset, const IBlockSerializer::TMetadataSink& metaSink) const final { - YQL_ENSURE(offset <= data.offset); + void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final { if constexpr (Nullable) { - StoreNullsSizes(data, offset, metaSink); + StoreNullsSizes(data, metaSink); if (data.GetNullCount() == data.length) { metaSink(0); return; } } - size_t dataBytes = ((size_t)data.length + offset) * ObjectSize; + const ui64 desiredOffset = data.offset % 8; + size_t dataBytes = ((size_t)data.length + desiredOffset) * ObjectSize; metaSink(dataBytes); } - void StoreArray(const arrow::ArrayData& data, ui64 offset, TRope& dst) const final { - YQL_ENSURE(offset <= data.offset); + void StoreArray(const arrow::ArrayData& data, TRope& dst) const final { if constexpr (Nullable) { - StoreNulls(data, offset, dst); + StoreNulls(data, dst); if (data.GetNullCount() == data.length) { return; } } - const char* buf = reinterpret_cast<const char*>(data.buffers[1]->data()) + (data.offset - offset) * ObjectSize; - size_t dataBytes = ((size_t)data.length + offset) * ObjectSize; + const ui64 desiredOffset = data.offset % 8; + const char* buf = reinterpret_cast<const char*>(data.buffers[1]->data()) + (data.offset - desiredOffset) * ObjectSize; + size_t dataBytes = ((size_t)data.length + desiredOffset) * ObjectSize; dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[1], buf, dataBytes)); } }; @@ -246,10 +248,9 @@ private: return Nullable ? 4 : 2; } - void StoreMetadata(const arrow::ArrayData& data, ui64 offset, const IBlockSerializer::TMetadataSink& metaSink) const final { - YQL_ENSURE(offset <= data.offset); + void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final { if constexpr (Nullable) { - StoreNullsSizes(data, offset, metaSink); + StoreNullsSizes(data, metaSink); if (data.GetNullCount() == data.length) { metaSink(0); metaSink(0); @@ -257,22 +258,23 @@ private: } } - size_t offsetsSize = ((size_t)data.length + 1 + offset) * sizeof(TOffset); + const ui64 desiredOffset = data.offset % 8; + size_t offsetsSize = ((size_t)data.length + 1 + desiredOffset) * sizeof(TOffset); metaSink(offsetsSize); metaSink(data.buffers[2]->size()); } - void StoreArray(const arrow::ArrayData& data, ui64 offset, TRope& dst) const { - YQL_ENSURE(offset <= data.offset); + void StoreArray(const arrow::ArrayData& data, TRope& dst) const final { if constexpr (Nullable) { - StoreNulls(data, offset, dst); + StoreNulls(data, dst); if (data.GetNullCount() == data.length) { return; } } - const char* offsets = reinterpret_cast<const char*>(data.GetValues<TOffset>(1) - offset); - size_t offsetsSize = ((size_t)data.length + 1 + offset) * sizeof(TOffset); + const ui64 desiredOffset = data.offset % 8; + const char* offsets = reinterpret_cast<const char*>(data.GetValues<TOffset>(1) - desiredOffset); + size_t offsetsSize = ((size_t)data.length + 1 + desiredOffset) * sizeof(TOffset); dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[1], offsets, offsetsSize)); const char* mainData = reinterpret_cast<const char*>(data.buffers[2]->data()); @@ -327,24 +329,22 @@ private: return 2 + Inner_->ArrayMetadataCount(); } - void StoreMetadata(const arrow::ArrayData& data, ui64 offset, const IBlockSerializer::TMetadataSink& metaSink) const final { - YQL_ENSURE(offset <= data.offset); - StoreNullsSizes(data, offset, metaSink); + void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final { + StoreNullsSizes(data, metaSink); if (data.GetNullCount() == data.length) { auto innerCount = Inner_->ArrayMetadataCount(); for (size_t i = 0; i < innerCount; ++i) { metaSink(0); } } else { - Inner_->StoreMetadata(*data.child_data[0], offset, metaSink); + Inner_->StoreMetadata(*data.child_data[0], metaSink); } } - void StoreArray(const arrow::ArrayData& data, ui64 offset, TRope& dst) const { - YQL_ENSURE(offset <= data.offset); - StoreNulls(data, offset, dst); + void StoreArray(const arrow::ArrayData& data, TRope& dst) const final { + StoreNulls(data, dst); if (data.GetNullCount() != data.length) { - Inner_->StoreArray(*data.child_data[0], offset, dst); + Inner_->StoreArray(*data.child_data[0], dst); } } @@ -403,10 +403,9 @@ private: return result; } - void StoreMetadata(const arrow::ArrayData& data, ui64 offset, const IBlockSerializer::TMetadataSink& metaSink) const final { - YQL_ENSURE(offset <= data.offset); + void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final { if constexpr (Nullable) { - StoreNullsSizes(data, offset, metaSink); + StoreNullsSizes(data, metaSink); } if (data.GetNullCount() == data.length) { auto childCount = GetChildMetaCount(); @@ -415,19 +414,18 @@ private: } } else { for (size_t i = 0; i < Children_.size(); ++i) { - Children_[i]->StoreMetadata(*data.child_data[i], offset, metaSink); + Children_[i]->StoreMetadata(*data.child_data[i], metaSink); } } } - void StoreArray(const arrow::ArrayData& data, ui64 offset, TRope& dst) const { - YQL_ENSURE(offset <= data.offset); + void StoreArray(const arrow::ArrayData& data, TRope& dst) const final { if constexpr (Nullable) { - StoreNulls(data, offset, dst); + StoreNulls(data, dst); } if (data.GetNullCount() != data.length) { for (size_t i = 0; i < Children_.size(); ++i) { - Children_[i]->StoreArray(*data.child_data[i], offset, dst); + Children_[i]->StoreArray(*data.child_data[i], dst); } } } diff --git a/ydb/library/yql/minikql/computation/mkql_block_transport.h b/ydb/library/yql/minikql/computation/mkql_block_transport.h index 97f35241682..1cefc2e3a7a 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_transport.h +++ b/ydb/library/yql/minikql/computation/mkql_block_transport.h @@ -18,8 +18,8 @@ public: virtual size_t ArrayMetadataCount() const = 0; using TMetadataSink = std::function<void(ui64 meta)>; - virtual void StoreMetadata(const arrow::ArrayData& data, ui64 desiredOffset, const TMetadataSink& metaSink) const = 0; - virtual void StoreArray(const arrow::ArrayData& data, ui64 desiredOffset, TRope& dst) const = 0; + virtual void StoreMetadata(const arrow::ArrayData& data, const TMetadataSink& metaSink) const = 0; + virtual void StoreArray(const arrow::ArrayData& data, TRope& dst) const = 0; }; class IBlockDeserializer { diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp index 745feeccd5e..1a4dea8d754 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp @@ -861,17 +861,11 @@ void PackImpl(const TType* type, TBuf& buffer, const NUdf::TUnboxedValuePod& val } } -bool HasNonTrivialNulls(const arrow::ArrayData& array, i64 expectedOffset) { - MKQL_ENSURE(array.offset == expectedOffset, "Unexpected offset in child arrays"); - i64 nulls = array.GetNullCount(); - if (nulls > 0 && nulls != array.length) { - return true; - } - - return AnyOf(array.child_data, [&](const auto& child) { return HasNonTrivialNulls(*child, expectedOffset); }); +bool HasOffset(const arrow::ArrayData& array, i64 expectedOffset) { + return array.offset == expectedOffset && + AllOf(array.child_data, [&](const auto& child) { return HasOffset(*child, expectedOffset); }); } - } // namespace template<bool Fast> @@ -1106,59 +1100,52 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons const ui64 len = TArrowBlock::From(values[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; auto metadataBuffer = std::make_shared<TPagedBuffer>(); + // save block length PackData<false>(len, *metadataBuffer); - TMaybe<ui64> originalOffset; - bool hasNonTrivialNulls = false; - // all offsets should be equal - for (size_t i = 0; i < width - 1; ++i) { - arrow::Datum datum = TArrowBlock::From(values[i]).GetDatum(); - if (datum.is_array()) { - i64 offset = datum.array()->offset; - MKQL_ENSURE(offset >= 0, "Negative offset"); - if (!originalOffset.Defined()) { - originalOffset = offset; - } else { - MKQL_ENSURE(*originalOffset == offset, "All columns should have equal offsets"); - } - hasNonTrivialNulls = hasNonTrivialNulls || HasNonTrivialNulls(*datum.array(), offset); - } - } - - const ui64 desiredOffset = (originalOffset.Defined() && hasNonTrivialNulls) ? (*originalOffset % 8) : 0; - PackData<false>(desiredOffset, *metadataBuffer); - - // "scalars are present" + // save feature flags + // 1 = "scalars are present" const ui64 metadataFlags = 1 << 0; PackData<false>(metadataFlags, *metadataBuffer); - ui32 totalMetadataCount = 0; - for (size_t i = 0; i < width - 1; ++i) { - totalMetadataCount += BlockSerializers_[i]->ArrayMetadataCount(); - } - PackData<false>(totalMetadataCount, *metadataBuffer); - TVector<std::shared_ptr<arrow::ArrayData>> arrays; arrays.reserve(width - 1); - for (ui32 i = 0; i < width - 1; ++i) { + // save reminder of original offset for each column - it is needed to properly handle offset in bitmaps + for (size_t i = 0; i < width - 1; ++i) { arrow::Datum datum = TArrowBlock::From(values[i]).GetDatum(); - if (datum.is_scalar()) { + ui8 reminder = 0; + if (datum.is_array()) { + i64 offset = datum.array()->offset; + MKQL_ENSURE(offset >= 0, "Negative offset"); + // all offsets should be equal + MKQL_ENSURE(HasOffset(*datum.array(), offset), "Unexpected offset in child data"); + reminder = offset % 8; + arrays.emplace_back(datum.array()); + } else { + MKQL_ENSURE(datum.is_scalar(), "Expecting array or scalar"); if (!ConvertedScalars_[i]) { const TType* itemType = static_cast<const TMultiType*>(Type_)->GetElementType(i); datum = MakeArrayFromScalar(*datum.scalar(), 1, static_cast<const TBlockType*>(itemType)->GetItemType(), ArrowPool_); + MKQL_ENSURE(HasOffset(*datum.array(), 0), "Expected zero array offset after scalar is converted to array"); ConvertedScalars_[i] = datum.array(); } arrays.emplace_back(ConvertedScalars_[i]); - } else { - MKQL_ENSURE(datum.is_array(), "Expecting array or scalar"); - arrays.emplace_back(datum.array()); } + PackData<false>(reminder, *metadataBuffer); } + // save count of metadata words + ui32 totalMetadataCount = 0; + for (size_t i = 0; i < width - 1; ++i) { + totalMetadataCount += BlockSerializers_[i]->ArrayMetadataCount(); + } + PackData<false>(totalMetadataCount, *metadataBuffer); + + // save metadata itself ui32 savedMetadata = 0; for (size_t i = 0; i < width - 1; ++i) { const bool isScalar = BlockReaders_[i] != nullptr; - BlockSerializers_[i]->StoreMetadata(*arrays[i], isScalar ? 0 : desiredOffset, [&](ui64 meta) { + BlockSerializers_[i]->StoreMetadata(*arrays[i], [&](ui64 meta) { PackData<false>(meta, *metadataBuffer); ++savedMetadata; }); @@ -1167,9 +1154,10 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons MKQL_ENSURE(savedMetadata == totalMetadataCount, "Serialization metadata error"); BlockBuffer_.Insert(BlockBuffer_.End(), TPagedBuffer::AsRope(metadataBuffer)); + // save buffers for (size_t i = 0; i < width - 1; ++i) { const bool isScalar = BlockReaders_[i] != nullptr; - BlockSerializers_[i]->StoreArray(*arrays[i], isScalar ? 0 : desiredOffset, BlockBuffer_); + BlockSerializers_[i]->StoreArray(*arrays[i], BlockBuffer_); } ++ItemCount_; return *this; @@ -1179,15 +1167,25 @@ template<bool Fast> void TValuePackerTransport<Fast>::UnpackBatchBlocks(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const { while (!buf.empty()) { TChunkedInputBuffer chunked(std::move(buf)); + + // unpack block length const ui64 len = UnpackData<false, ui64>(chunked); if (len == 0) { continue; } - const ui64 offset = UnpackData<false, ui64>(chunked); + // unpack flags const ui64 metadataFlags = UnpackData<false, ui64>(chunked); MKQL_ENSURE(metadataFlags == 1, "Unsupported metadata flags"); + // unpack array offsets + TVector<ui64> offsets; + for (size_t i = 0; i < BlockDeserializers_.size(); ++i) { + offsets.emplace_back(UnpackData<false, ui8>(chunked)); + MKQL_ENSURE(offsets.back() < 8, "Unexpected offset value"); + } + + // unpack metadata ui32 metaCount = UnpackData<false, ui32>(chunked); for (auto& deserializer : BlockDeserializers_) { deserializer->LoadMetadata([&]() -> ui64 { @@ -1198,10 +1196,11 @@ void TValuePackerTransport<Fast>::UnpackBatchBlocks(TRope&& buf, const THolderFa } MKQL_ENSURE(metaCount == 0, "Partial buffers read"); TRope ropeTail = chunked.ReleaseRope(); + // unpack buffers result.PushRow([&](ui32 i) { if (i < BlockDeserializers_.size()) { const bool isScalar = BlockReaders_[i] != nullptr; - auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, isScalar ? 0 : offset); + auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, offsets[i]); if (isScalar) { TBlockItem item = BlockReaders_[i]->GetItem(*array, 0); const TBlockType* itemType = static_cast<const TBlockType*>(static_cast<const TMultiType*>(Type_)->GetElementType(i)); |