aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-08-01 17:40:38 +0300
committeraneporada <aneporada@ydb.tech>2023-08-01 17:40:38 +0300
commit20efd2f292b02a79125f61b2b5790a89625d674a (patch)
treeb4b1bcc45120e31c32cfc8faa71083b9f5d19d1f
parenta5535be48cfd26688ac23f30cd8f6dfb587cb353 (diff)
downloadydb-20efd2f292b02a79125f61b2b5790a89625d674a.tar.gz
Fix block serialization with different array offsets
initial
-rw-r--r--ydb/library/yql/minikql/computation/mkql_block_transport.cpp70
-rw-r--r--ydb/library/yql/minikql/computation/mkql_block_transport.h4
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp89
3 files changed, 80 insertions, 83 deletions
diff --git a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp
index 2ccc5d3a82d..d96b08e090f 100644
--- a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp
@@ -49,13 +49,14 @@ bool NeedStoreBitmap(const arrow::ArrayData& data) {
return nullCount != 0 && nullCount != data.length;
}
-void StoreNullsSizes(const arrow::ArrayData& data, ui64 desiredOffset, const IBlockSerializer::TMetadataSink& metaSink) {
+void StoreNullsSizes(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) {
metaSink(data.GetNullCount());
if (!NeedStoreBitmap(data)) {
metaSink(0);
return;
}
+ const ui64 desiredOffset = data.offset % 8;
size_t nullBytes = (((size_t)data.length + desiredOffset + 7) & ~7ull) >> 3;
metaSink(nullBytes);
}
@@ -66,10 +67,11 @@ void LoadNullsSizes(const IBlockDeserializer::TMetadataSource& metaSource, TMayb
nullsSize = metaSource();
}
-void StoreNulls(const arrow::ArrayData& data, ui64 desiredOffset, TRope& dst) {
+void StoreNulls(const arrow::ArrayData& data, TRope& dst) {
if (!NeedStoreBitmap(data)) {
return;
}
+ const ui64 desiredOffset = data.offset % 8;
size_t nullBytes = (((size_t)data.length + desiredOffset + 7) & ~7ull) >> 3;
YQL_ENSURE(desiredOffset <= data.offset);
YQL_ENSURE((data.offset - desiredOffset) % 8 == 0);
@@ -177,30 +179,30 @@ private:
return Nullable ? 3 : 1;
}
- void StoreMetadata(const arrow::ArrayData& data, ui64 offset, const IBlockSerializer::TMetadataSink& metaSink) const final {
- YQL_ENSURE(offset <= data.offset);
+ void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final {
if constexpr (Nullable) {
- StoreNullsSizes(data, offset, metaSink);
+ StoreNullsSizes(data, metaSink);
if (data.GetNullCount() == data.length) {
metaSink(0);
return;
}
}
- size_t dataBytes = ((size_t)data.length + offset) * ObjectSize;
+ const ui64 desiredOffset = data.offset % 8;
+ size_t dataBytes = ((size_t)data.length + desiredOffset) * ObjectSize;
metaSink(dataBytes);
}
- void StoreArray(const arrow::ArrayData& data, ui64 offset, TRope& dst) const final {
- YQL_ENSURE(offset <= data.offset);
+ void StoreArray(const arrow::ArrayData& data, TRope& dst) const final {
if constexpr (Nullable) {
- StoreNulls(data, offset, dst);
+ StoreNulls(data, dst);
if (data.GetNullCount() == data.length) {
return;
}
}
- const char* buf = reinterpret_cast<const char*>(data.buffers[1]->data()) + (data.offset - offset) * ObjectSize;
- size_t dataBytes = ((size_t)data.length + offset) * ObjectSize;
+ const ui64 desiredOffset = data.offset % 8;
+ const char* buf = reinterpret_cast<const char*>(data.buffers[1]->data()) + (data.offset - desiredOffset) * ObjectSize;
+ size_t dataBytes = ((size_t)data.length + desiredOffset) * ObjectSize;
dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[1], buf, dataBytes));
}
};
@@ -246,10 +248,9 @@ private:
return Nullable ? 4 : 2;
}
- void StoreMetadata(const arrow::ArrayData& data, ui64 offset, const IBlockSerializer::TMetadataSink& metaSink) const final {
- YQL_ENSURE(offset <= data.offset);
+ void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final {
if constexpr (Nullable) {
- StoreNullsSizes(data, offset, metaSink);
+ StoreNullsSizes(data, metaSink);
if (data.GetNullCount() == data.length) {
metaSink(0);
metaSink(0);
@@ -257,22 +258,23 @@ private:
}
}
- size_t offsetsSize = ((size_t)data.length + 1 + offset) * sizeof(TOffset);
+ const ui64 desiredOffset = data.offset % 8;
+ size_t offsetsSize = ((size_t)data.length + 1 + desiredOffset) * sizeof(TOffset);
metaSink(offsetsSize);
metaSink(data.buffers[2]->size());
}
- void StoreArray(const arrow::ArrayData& data, ui64 offset, TRope& dst) const {
- YQL_ENSURE(offset <= data.offset);
+ void StoreArray(const arrow::ArrayData& data, TRope& dst) const final {
if constexpr (Nullable) {
- StoreNulls(data, offset, dst);
+ StoreNulls(data, dst);
if (data.GetNullCount() == data.length) {
return;
}
}
- const char* offsets = reinterpret_cast<const char*>(data.GetValues<TOffset>(1) - offset);
- size_t offsetsSize = ((size_t)data.length + 1 + offset) * sizeof(TOffset);
+ const ui64 desiredOffset = data.offset % 8;
+ const char* offsets = reinterpret_cast<const char*>(data.GetValues<TOffset>(1) - desiredOffset);
+ size_t offsetsSize = ((size_t)data.length + 1 + desiredOffset) * sizeof(TOffset);
dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[1], offsets, offsetsSize));
const char* mainData = reinterpret_cast<const char*>(data.buffers[2]->data());
@@ -327,24 +329,22 @@ private:
return 2 + Inner_->ArrayMetadataCount();
}
- void StoreMetadata(const arrow::ArrayData& data, ui64 offset, const IBlockSerializer::TMetadataSink& metaSink) const final {
- YQL_ENSURE(offset <= data.offset);
- StoreNullsSizes(data, offset, metaSink);
+ void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final {
+ StoreNullsSizes(data, metaSink);
if (data.GetNullCount() == data.length) {
auto innerCount = Inner_->ArrayMetadataCount();
for (size_t i = 0; i < innerCount; ++i) {
metaSink(0);
}
} else {
- Inner_->StoreMetadata(*data.child_data[0], offset, metaSink);
+ Inner_->StoreMetadata(*data.child_data[0], metaSink);
}
}
- void StoreArray(const arrow::ArrayData& data, ui64 offset, TRope& dst) const {
- YQL_ENSURE(offset <= data.offset);
- StoreNulls(data, offset, dst);
+ void StoreArray(const arrow::ArrayData& data, TRope& dst) const final {
+ StoreNulls(data, dst);
if (data.GetNullCount() != data.length) {
- Inner_->StoreArray(*data.child_data[0], offset, dst);
+ Inner_->StoreArray(*data.child_data[0], dst);
}
}
@@ -403,10 +403,9 @@ private:
return result;
}
- void StoreMetadata(const arrow::ArrayData& data, ui64 offset, const IBlockSerializer::TMetadataSink& metaSink) const final {
- YQL_ENSURE(offset <= data.offset);
+ void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final {
if constexpr (Nullable) {
- StoreNullsSizes(data, offset, metaSink);
+ StoreNullsSizes(data, metaSink);
}
if (data.GetNullCount() == data.length) {
auto childCount = GetChildMetaCount();
@@ -415,19 +414,18 @@ private:
}
} else {
for (size_t i = 0; i < Children_.size(); ++i) {
- Children_[i]->StoreMetadata(*data.child_data[i], offset, metaSink);
+ Children_[i]->StoreMetadata(*data.child_data[i], metaSink);
}
}
}
- void StoreArray(const arrow::ArrayData& data, ui64 offset, TRope& dst) const {
- YQL_ENSURE(offset <= data.offset);
+ void StoreArray(const arrow::ArrayData& data, TRope& dst) const final {
if constexpr (Nullable) {
- StoreNulls(data, offset, dst);
+ StoreNulls(data, dst);
}
if (data.GetNullCount() != data.length) {
for (size_t i = 0; i < Children_.size(); ++i) {
- Children_[i]->StoreArray(*data.child_data[i], offset, dst);
+ Children_[i]->StoreArray(*data.child_data[i], dst);
}
}
}
diff --git a/ydb/library/yql/minikql/computation/mkql_block_transport.h b/ydb/library/yql/minikql/computation/mkql_block_transport.h
index 97f35241682..1cefc2e3a7a 100644
--- a/ydb/library/yql/minikql/computation/mkql_block_transport.h
+++ b/ydb/library/yql/minikql/computation/mkql_block_transport.h
@@ -18,8 +18,8 @@ public:
virtual size_t ArrayMetadataCount() const = 0;
using TMetadataSink = std::function<void(ui64 meta)>;
- virtual void StoreMetadata(const arrow::ArrayData& data, ui64 desiredOffset, const TMetadataSink& metaSink) const = 0;
- virtual void StoreArray(const arrow::ArrayData& data, ui64 desiredOffset, TRope& dst) const = 0;
+ virtual void StoreMetadata(const arrow::ArrayData& data, const TMetadataSink& metaSink) const = 0;
+ virtual void StoreArray(const arrow::ArrayData& data, TRope& dst) const = 0;
};
class IBlockDeserializer {
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
index 745feeccd5e..1a4dea8d754 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
@@ -861,17 +861,11 @@ void PackImpl(const TType* type, TBuf& buffer, const NUdf::TUnboxedValuePod& val
}
}
-bool HasNonTrivialNulls(const arrow::ArrayData& array, i64 expectedOffset) {
- MKQL_ENSURE(array.offset == expectedOffset, "Unexpected offset in child arrays");
- i64 nulls = array.GetNullCount();
- if (nulls > 0 && nulls != array.length) {
- return true;
- }
-
- return AnyOf(array.child_data, [&](const auto& child) { return HasNonTrivialNulls(*child, expectedOffset); });
+bool HasOffset(const arrow::ArrayData& array, i64 expectedOffset) {
+ return array.offset == expectedOffset &&
+ AllOf(array.child_data, [&](const auto& child) { return HasOffset(*child, expectedOffset); });
}
-
} // namespace
template<bool Fast>
@@ -1106,59 +1100,52 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons
const ui64 len = TArrowBlock::From(values[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
auto metadataBuffer = std::make_shared<TPagedBuffer>();
+ // save block length
PackData<false>(len, *metadataBuffer);
- TMaybe<ui64> originalOffset;
- bool hasNonTrivialNulls = false;
- // all offsets should be equal
- for (size_t i = 0; i < width - 1; ++i) {
- arrow::Datum datum = TArrowBlock::From(values[i]).GetDatum();
- if (datum.is_array()) {
- i64 offset = datum.array()->offset;
- MKQL_ENSURE(offset >= 0, "Negative offset");
- if (!originalOffset.Defined()) {
- originalOffset = offset;
- } else {
- MKQL_ENSURE(*originalOffset == offset, "All columns should have equal offsets");
- }
- hasNonTrivialNulls = hasNonTrivialNulls || HasNonTrivialNulls(*datum.array(), offset);
- }
- }
-
- const ui64 desiredOffset = (originalOffset.Defined() && hasNonTrivialNulls) ? (*originalOffset % 8) : 0;
- PackData<false>(desiredOffset, *metadataBuffer);
-
- // "scalars are present"
+ // save feature flags
+ // 1 = "scalars are present"
const ui64 metadataFlags = 1 << 0;
PackData<false>(metadataFlags, *metadataBuffer);
- ui32 totalMetadataCount = 0;
- for (size_t i = 0; i < width - 1; ++i) {
- totalMetadataCount += BlockSerializers_[i]->ArrayMetadataCount();
- }
- PackData<false>(totalMetadataCount, *metadataBuffer);
-
TVector<std::shared_ptr<arrow::ArrayData>> arrays;
arrays.reserve(width - 1);
- for (ui32 i = 0; i < width - 1; ++i) {
+ // save reminder of original offset for each column - it is needed to properly handle offset in bitmaps
+ for (size_t i = 0; i < width - 1; ++i) {
arrow::Datum datum = TArrowBlock::From(values[i]).GetDatum();
- if (datum.is_scalar()) {
+ ui8 reminder = 0;
+ if (datum.is_array()) {
+ i64 offset = datum.array()->offset;
+ MKQL_ENSURE(offset >= 0, "Negative offset");
+ // all offsets should be equal
+ MKQL_ENSURE(HasOffset(*datum.array(), offset), "Unexpected offset in child data");
+ reminder = offset % 8;
+ arrays.emplace_back(datum.array());
+ } else {
+ MKQL_ENSURE(datum.is_scalar(), "Expecting array or scalar");
if (!ConvertedScalars_[i]) {
const TType* itemType = static_cast<const TMultiType*>(Type_)->GetElementType(i);
datum = MakeArrayFromScalar(*datum.scalar(), 1, static_cast<const TBlockType*>(itemType)->GetItemType(), ArrowPool_);
+ MKQL_ENSURE(HasOffset(*datum.array(), 0), "Expected zero array offset after scalar is converted to array");
ConvertedScalars_[i] = datum.array();
}
arrays.emplace_back(ConvertedScalars_[i]);
- } else {
- MKQL_ENSURE(datum.is_array(), "Expecting array or scalar");
- arrays.emplace_back(datum.array());
}
+ PackData<false>(reminder, *metadataBuffer);
}
+ // save count of metadata words
+ ui32 totalMetadataCount = 0;
+ for (size_t i = 0; i < width - 1; ++i) {
+ totalMetadataCount += BlockSerializers_[i]->ArrayMetadataCount();
+ }
+ PackData<false>(totalMetadataCount, *metadataBuffer);
+
+ // save metadata itself
ui32 savedMetadata = 0;
for (size_t i = 0; i < width - 1; ++i) {
const bool isScalar = BlockReaders_[i] != nullptr;
- BlockSerializers_[i]->StoreMetadata(*arrays[i], isScalar ? 0 : desiredOffset, [&](ui64 meta) {
+ BlockSerializers_[i]->StoreMetadata(*arrays[i], [&](ui64 meta) {
PackData<false>(meta, *metadataBuffer);
++savedMetadata;
});
@@ -1167,9 +1154,10 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons
MKQL_ENSURE(savedMetadata == totalMetadataCount, "Serialization metadata error");
BlockBuffer_.Insert(BlockBuffer_.End(), TPagedBuffer::AsRope(metadataBuffer));
+ // save buffers
for (size_t i = 0; i < width - 1; ++i) {
const bool isScalar = BlockReaders_[i] != nullptr;
- BlockSerializers_[i]->StoreArray(*arrays[i], isScalar ? 0 : desiredOffset, BlockBuffer_);
+ BlockSerializers_[i]->StoreArray(*arrays[i], BlockBuffer_);
}
++ItemCount_;
return *this;
@@ -1179,15 +1167,25 @@ template<bool Fast>
void TValuePackerTransport<Fast>::UnpackBatchBlocks(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const {
while (!buf.empty()) {
TChunkedInputBuffer chunked(std::move(buf));
+
+ // unpack block length
const ui64 len = UnpackData<false, ui64>(chunked);
if (len == 0) {
continue;
}
- const ui64 offset = UnpackData<false, ui64>(chunked);
+ // unpack flags
const ui64 metadataFlags = UnpackData<false, ui64>(chunked);
MKQL_ENSURE(metadataFlags == 1, "Unsupported metadata flags");
+ // unpack array offsets
+ TVector<ui64> offsets;
+ for (size_t i = 0; i < BlockDeserializers_.size(); ++i) {
+ offsets.emplace_back(UnpackData<false, ui8>(chunked));
+ MKQL_ENSURE(offsets.back() < 8, "Unexpected offset value");
+ }
+
+ // unpack metadata
ui32 metaCount = UnpackData<false, ui32>(chunked);
for (auto& deserializer : BlockDeserializers_) {
deserializer->LoadMetadata([&]() -> ui64 {
@@ -1198,10 +1196,11 @@ void TValuePackerTransport<Fast>::UnpackBatchBlocks(TRope&& buf, const THolderFa
}
MKQL_ENSURE(metaCount == 0, "Partial buffers read");
TRope ropeTail = chunked.ReleaseRope();
+ // unpack buffers
result.PushRow([&](ui32 i) {
if (i < BlockDeserializers_.size()) {
const bool isScalar = BlockReaders_[i] != nullptr;
- auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, isScalar ? 0 : offset);
+ auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, offsets[i]);
if (isScalar) {
TBlockItem item = BlockReaders_[i]->GetItem(*array, 0);
const TBlockType* itemType = static_cast<const TBlockType*>(static_cast<const TMultiType*>(Type_)->GetElementType(i));