// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "contrib/libs/apache/arrow_next/cpp/src/parquet/encoding.h" #include #include #include #include #include #include #include #include #include #include "contrib/libs/apache/arrow_next/cpp/src/arrow/array.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/stl_allocator.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/type_traits.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/bit_stream_utils_internal.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/bit_util.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/bitmap_ops.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/byte_stream_split_internal.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/checked_cast.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/hashing.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/int_util_overflow.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/logging.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/rle_encoding_internal.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/spaced.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/ubsan.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/visit_data_inline.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/exception.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/platform.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/schema.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/types.h" namespace bit_util = arrow20::bit_util; using arrow20::Status; using arrow20::internal::AddWithOverflow; using arrow20::internal::checked_cast; using arrow20::internal::SafeSignedSubtract; using arrow20::util::SafeLoad; using arrow20::util::SafeLoadAs; template using ArrowPoolVector = std::vector>; namespace parquet20 { namespace { // The Parquet spec isn't very clear whether ByteArray lengths are signed or // unsigned, but the Java implementation uses signed ints. constexpr size_t kMaxByteArraySize = std::numeric_limits::max(); class EncoderImpl : virtual public Encoder { public: EncoderImpl(const ColumnDescriptor* descr, Encoding::type encoding, MemoryPool* pool) : descr_(descr), encoding_(encoding), pool_(pool), type_length_(descr ? descr->type_length() : -1) {} Encoding::type encoding() const override { return encoding_; } MemoryPool* memory_pool() const override { return pool_; } int64_t ReportUnencodedDataBytes() override { if (descr_->physical_type() != Type::BYTE_ARRAY) { throw ParquetException("ReportUnencodedDataBytes is only supported for BYTE_ARRAY"); } int64_t bytes = unencoded_byte_array_data_bytes_; unencoded_byte_array_data_bytes_ = 0; return bytes; } protected: // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY const ColumnDescriptor* descr_; const Encoding::type encoding_; MemoryPool* pool_; /// Type length from descr const int type_length_; /// Number of unencoded bytes written to the encoder. Used for ByteArray type only. int64_t unencoded_byte_array_data_bytes_ = 0; }; // ---------------------------------------------------------------------- // PLAIN encoder template class PlainEncoder : public EncoderImpl, virtual public TypedEncoder { public: using T = typename DType::c_type; explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool) : EncoderImpl(descr, Encoding::PLAIN, pool), sink_(pool) {} int64_t EstimatedDataEncodedSize() override { return sink_.length(); } std::shared_ptr FlushValues() override { std::shared_ptr buffer; PARQUET_THROW_NOT_OK(sink_.Finish(&buffer)); return buffer; } using TypedEncoder::Put; void Put(const T* buffer, int num_values) override; void Put(const ::arrow20::Array& values) override; void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) override { if (valid_bits != NULLPTR) { PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow20::AllocateBuffer(num_values * sizeof(T), this->memory_pool())); T* data = buffer->template mutable_data_as(); int num_valid_values = ::arrow20::util::internal::SpacedCompress( src, num_values, valid_bits, valid_bits_offset, data); Put(data, num_valid_values); } else { Put(src, num_values); } } void UnsafePutByteArray(const void* data, uint32_t length) { DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL"; sink_.UnsafeAppend(&length, sizeof(uint32_t)); sink_.UnsafeAppend(data, static_cast(length)); unencoded_byte_array_data_bytes_ += length; } void Put(const ByteArray& val) { // Write the result to the output stream const int64_t increment = static_cast(val.len + sizeof(uint32_t)); if (ARROW_PREDICT_FALSE(sink_.length() + increment > sink_.capacity())) { PARQUET_THROW_NOT_OK(sink_.Reserve(increment)); } UnsafePutByteArray(val.ptr, val.len); } protected: template void PutBinaryArray(const ArrayType& array) { const int64_t total_bytes = array.value_offset(array.length()) - array.value_offset(0); PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes + array.length() * sizeof(uint32_t))); PARQUET_THROW_NOT_OK(::arrow20::VisitArraySpanInline( *array.data(), [&](::std::string_view view) { if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) { return Status::Invalid( "Parquet cannot store strings with size 2GB or more, got: ", view.size()); } UnsafePutByteArray(view.data(), static_cast(view.size())); return Status::OK(); }, []() { return Status::OK(); })); } ::arrow20::BufferBuilder sink_; }; template void PlainEncoder::Put(const T* buffer, int num_values) { if (num_values > 0) { PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T))); } } template <> inline void PlainEncoder::Put(const ByteArray* src, int num_values) { for (int i = 0; i < num_values; ++i) { Put(src[i]); } } template void DirectPutImpl(const ::arrow20::Array& values, ::arrow20::BufferBuilder* sink) { if (values.type_id() != ArrayType::TypeClass::type_id) { std::string type_name = ArrayType::TypeClass::type_name(); throw ParquetException("direct put to " + type_name + " from " + values.type()->ToString() + " not supported"); } using value_type = typename ArrayType::value_type; constexpr auto value_size = sizeof(value_type); auto raw_values = checked_cast(values).raw_values(); if (values.null_count() == 0) { // no nulls, just dump the data PARQUET_THROW_NOT_OK(sink->Append(raw_values, values.length() * value_size)); } else { PARQUET_THROW_NOT_OK( sink->Reserve((values.length() - values.null_count()) * value_size)); for (int64_t i = 0; i < values.length(); i++) { if (values.IsValid(i)) { sink->UnsafeAppend(&raw_values[i], value_size); } } } } template <> void PlainEncoder::Put(const ::arrow20::Array& values) { DirectPutImpl<::arrow20::Int32Array>(values, &sink_); } template <> void PlainEncoder::Put(const ::arrow20::Array& values) { DirectPutImpl<::arrow20::Int64Array>(values, &sink_); } template <> void PlainEncoder::Put(const ::arrow20::Array& values) { ParquetException::NYI("direct put to Int96"); } template <> void PlainEncoder::Put(const ::arrow20::Array& values) { DirectPutImpl<::arrow20::FloatArray>(values, &sink_); } template <> void PlainEncoder::Put(const ::arrow20::Array& values) { DirectPutImpl<::arrow20::DoubleArray>(values, &sink_); } template void PlainEncoder::Put(const ::arrow20::Array& values) { ParquetException::NYI("direct put of " + values.type()->ToString()); } void AssertBaseBinary(const ::arrow20::Array& values) { if (!::arrow20::is_base_binary_like(values.type_id())) { throw ParquetException("Only BaseBinaryArray and subclasses supported"); } } template <> inline void PlainEncoder::Put(const ::arrow20::Array& values) { AssertBaseBinary(values); if (::arrow20::is_binary_like(values.type_id())) { PutBinaryArray(checked_cast(values)); } else { DCHECK(::arrow20::is_large_binary_like(values.type_id())); PutBinaryArray(checked_cast(values)); } } void AssertFixedSizeBinary(const ::arrow20::Array& values, int type_length) { if (!::arrow20::is_fixed_size_binary(values.type_id())) { throw ParquetException("Only FixedSizeBinaryArray and subclasses supported"); } if (checked_cast(*values.type()).byte_width() != type_length) { throw ParquetException("Size mismatch: " + values.type()->ToString() + " should have been " + std::to_string(type_length) + " wide"); } } template <> inline void PlainEncoder::Put(const ::arrow20::Array& values) { AssertFixedSizeBinary(values, descr_->type_length()); const auto& data = checked_cast(values); if (data.null_count() == 0) { // no nulls, just dump the data PARQUET_THROW_NOT_OK( sink_.Append(data.raw_values(), data.length() * data.byte_width())); } else { const int64_t total_bytes = data.length() * data.byte_width() - data.null_count() * data.byte_width(); PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes)); for (int64_t i = 0; i < data.length(); i++) { if (data.IsValid(i)) { sink_.UnsafeAppend(data.Value(i), data.byte_width()); } } } } template <> inline void PlainEncoder::Put(const FixedLenByteArray* src, int num_values) { if (descr_->type_length() == 0) { return; } for (int i = 0; i < num_values; ++i) { // Write the result to the output stream DCHECK(src[i].ptr != nullptr) << "Value ptr cannot be NULL"; PARQUET_THROW_NOT_OK(sink_.Append(src[i].ptr, descr_->type_length())); } } template <> class PlainEncoder : public EncoderImpl, virtual public BooleanEncoder { public: explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool) : EncoderImpl(descr, Encoding::PLAIN, pool), sink_(pool) {} int64_t EstimatedDataEncodedSize() override; std::shared_ptr FlushValues() override; void Put(const bool* src, int num_values) override; void Put(const std::vector& src, int num_values) override; void PutSpaced(const bool* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) override { if (valid_bits != NULLPTR) { PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow20::AllocateBuffer(num_values * sizeof(T), this->memory_pool())); T* data = buffer->mutable_data_as(); int num_valid_values = ::arrow20::util::internal::SpacedCompress( src, num_values, valid_bits, valid_bits_offset, data); Put(data, num_valid_values); } else { Put(src, num_values); } } void Put(const ::arrow20::Array& values) override { if (values.type_id() != ::arrow20::Type::BOOL) { throw ParquetException("direct put to boolean from " + values.type()->ToString() + " not supported"); } const auto& data = checked_cast(values); if (data.null_count() == 0) { // no nulls, just dump the data PARQUET_THROW_NOT_OK(sink_.Reserve(data.length())); sink_.UnsafeAppend(data.data()->GetValues(1, 0), data.offset(), data.length()); } else { PARQUET_THROW_NOT_OK(sink_.Reserve(data.length() - data.null_count())); for (int64_t i = 0; i < data.length(); i++) { if (data.IsValid(i)) { sink_.UnsafeAppend(data.Value(i)); } } } } private: ::arrow20::TypedBufferBuilder sink_; template void PutImpl(const SequenceType& src, int num_values); }; template void PlainEncoder::PutImpl(const SequenceType& src, int num_values) { PARQUET_THROW_NOT_OK(sink_.Reserve(num_values)); for (int i = 0; i < num_values; ++i) { sink_.UnsafeAppend(src[i]); } } int64_t PlainEncoder::EstimatedDataEncodedSize() { return ::arrow20::bit_util::BytesForBits(sink_.length()); } std::shared_ptr PlainEncoder::FlushValues() { std::shared_ptr buffer; PARQUET_THROW_NOT_OK(sink_.Finish(&buffer)); return buffer; } void PlainEncoder::Put(const bool* src, int num_values) { PutImpl(src, num_values); } void PlainEncoder::Put(const std::vector& src, int num_values) { PutImpl(src, num_values); } // ---------------------------------------------------------------------- // DictEncoder implementations template struct DictEncoderTraits { using c_type = typename DType::c_type; using MemoTableType = ::arrow20::internal::ScalarMemoTable; }; template <> struct DictEncoderTraits { using MemoTableType = ::arrow20::internal::BinaryMemoTable<::arrow20::BinaryBuilder>; }; template <> struct DictEncoderTraits { using MemoTableType = ::arrow20::internal::BinaryMemoTable<::arrow20::BinaryBuilder>; }; // Initially 1024 elements static constexpr int32_t kInitialHashTableSize = 1 << 10; int RlePreserveBufferSize(int num_values, int bit_width) { // Note: because of the way RleEncoder::CheckBufferFull() // is called, we have to reserve an extra "RleEncoder::MinBufferSize" // bytes. These extra bytes won't be used but not reserving them // would cause the encoder to fail. return ::arrow20::util::RleEncoder::MaxBufferSize(bit_width, num_values) + ::arrow20::util::RleEncoder::MinBufferSize(bit_width); } /// See the dictionary encoding section of /// https://github.com/Parquet/parquet-format. The encoding supports /// streaming encoding. Values are encoded as they are added while the /// dictionary is being constructed. At any time, the buffered values /// can be written out with the current dictionary size. More values /// can then be added to the encoder, including new dictionary /// entries. template class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder { using MemoTableType = typename DictEncoderTraits::MemoTableType; public: typedef typename DType::c_type T; /// In data page, the bit width used to encode the entry /// ids stored as 1 byte (max bit width = 32). constexpr static int32_t kDataPageBitWidthBytes = 1; explicit DictEncoderImpl(const ColumnDescriptor* desc, MemoryPool* pool) : EncoderImpl(desc, Encoding::RLE_DICTIONARY, pool), buffered_indices_(::arrow20::stl::allocator(pool)), dict_encoded_size_(0), memo_table_(pool, kInitialHashTableSize) {} ~DictEncoderImpl() override = default; int dict_encoded_size() const override { return dict_encoded_size_; } int WriteIndices(uint8_t* buffer, int buffer_len) override { // Write bit width in first byte *buffer = static_cast(bit_width()); ++buffer; --buffer_len; ::arrow20::util::RleEncoder encoder(buffer, buffer_len, bit_width()); for (int32_t index : buffered_indices_) { if (ARROW_PREDICT_FALSE(!encoder.Put(index))) return -1; } encoder.Flush(); ClearIndices(); return kDataPageBitWidthBytes + encoder.len(); } /// Returns a conservative estimate of the number of bytes needed to encode the buffered /// indices. Used to size the buffer passed to WriteIndices(). int64_t EstimatedDataEncodedSize() override { return kDataPageBitWidthBytes + RlePreserveBufferSize(static_cast(buffered_indices_.size()), bit_width()); } /// The minimum bit width required to encode the currently buffered indices. int bit_width() const override { if (ARROW_PREDICT_FALSE(num_entries() == 0)) return 0; if (ARROW_PREDICT_FALSE(num_entries() == 1)) return 1; return bit_util::Log2(num_entries()); } /// Encode value. Note that this does not actually write any data, just /// buffers the value's index to be written later. inline void Put(const T& value); // Not implemented for other data types inline void PutByteArray(const void* ptr, int32_t length); void Put(const T* src, int num_values) override { for (int32_t i = 0; i < num_values; i++) { Put(SafeLoad(src + i)); } } void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) override { ::arrow20::internal::VisitSetBitRunsVoid(valid_bits, valid_bits_offset, num_values, [&](int64_t position, int64_t length) { for (int64_t i = 0; i < length; i++) { Put(SafeLoad(src + i + position)); } }); } using TypedEncoder::Put; void Put(const ::arrow20::Array& values) override; void PutDictionary(const ::arrow20::Array& values) override; template void PutIndicesTyped(const ::arrow20::Array& data) { auto values = data.data()->GetValues(1); size_t buffer_position = buffered_indices_.size(); buffered_indices_.resize(buffer_position + static_cast(data.length() - data.null_count())); ::arrow20::internal::VisitSetBitRunsVoid( data.null_bitmap_data(), data.offset(), data.length(), [&](int64_t position, int64_t length) { for (int64_t i = 0; i < length; ++i) { buffered_indices_[buffer_position++] = static_cast(values[i + position]); } }); // Track unencoded bytes based on dictionary value type if constexpr (std::is_same_v) { // For ByteArray, need to look up actual lengths from dictionary for (size_t idx = buffer_position - static_cast(data.length() - data.null_count()); idx < buffer_position; ++idx) { memo_table_.VisitValue(buffered_indices_[idx], [&](std::string_view value) { unencoded_byte_array_data_bytes_ += value.length(); }); } } } void PutIndices(const ::arrow20::Array& data) override { switch (data.type()->id()) { case ::arrow20::Type::UINT8: case ::arrow20::Type::INT8: return PutIndicesTyped<::arrow20::UInt8Type>(data); case ::arrow20::Type::UINT16: case ::arrow20::Type::INT16: return PutIndicesTyped<::arrow20::UInt16Type>(data); case ::arrow20::Type::UINT32: case ::arrow20::Type::INT32: return PutIndicesTyped<::arrow20::UInt32Type>(data); case ::arrow20::Type::UINT64: case ::arrow20::Type::INT64: return PutIndicesTyped<::arrow20::UInt64Type>(data); default: throw ParquetException("Passed non-integer array to PutIndices"); } } std::shared_ptr FlushValues() override { std::shared_ptr buffer = AllocateBuffer(this->pool_, EstimatedDataEncodedSize()); int result_size = WriteIndices(buffer->mutable_data(), static_cast(EstimatedDataEncodedSize())); PARQUET_THROW_NOT_OK(buffer->Resize(result_size, false)); return buffer; } /// Writes out the encoded dictionary to buffer. buffer must be preallocated to /// dict_encoded_size() bytes. void WriteDict(uint8_t* buffer) const override; /// The number of entries in the dictionary. int num_entries() const override { return memo_table_.size(); } private: /// Clears all the indices (but leaves the dictionary). void ClearIndices() { buffered_indices_.clear(); } /// Indices that have not yet be written out by WriteIndices(). ArrowPoolVector buffered_indices_; template void PutBinaryArray(const ArrayType& array) { PARQUET_THROW_NOT_OK(::arrow20::VisitArraySpanInline( *array.data(), [&](::std::string_view view) { if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) { return Status::Invalid( "Parquet cannot store strings with size 2GB or more, got: ", view.size()); } PutByteArray(view.data(), static_cast(view.size())); return Status::OK(); }, []() { return Status::OK(); })); } template void PutBinaryDictionaryArray(const ArrayType& array) { DCHECK_EQ(array.null_count(), 0); for (int64_t i = 0; i < array.length(); i++) { auto v = array.GetView(i); if (ARROW_PREDICT_FALSE(v.size() > kMaxByteArraySize)) { throw ParquetException( "Parquet cannot store strings with size 2GB or more, got: ", v.size()); } dict_encoded_size_ += static_cast(v.size() + sizeof(uint32_t)); int32_t unused_memo_index; PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert( v.data(), static_cast(v.size()), &unused_memo_index)); } } /// The number of bytes needed to encode the dictionary. int dict_encoded_size_; MemoTableType memo_table_; }; template void DictEncoderImpl::WriteDict(uint8_t* buffer) const { // For primitive types, only a memcpy DCHECK_EQ(static_cast(dict_encoded_size_), sizeof(T) * memo_table_.size()); memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast(buffer)); } // ByteArray and FLBA already have the dictionary encoded in their data heaps template <> void DictEncoderImpl::WriteDict(uint8_t* buffer) const { memo_table_.VisitValues(0, [&buffer](::std::string_view v) { uint32_t len = static_cast(v.length()); memcpy(buffer, &len, sizeof(len)); buffer += sizeof(len); memcpy(buffer, v.data(), len); buffer += len; }); } template <> void DictEncoderImpl::WriteDict(uint8_t* buffer) const { memo_table_.VisitValues(0, [&](::std::string_view v) { DCHECK_EQ(v.length(), static_cast(type_length_)); memcpy(buffer, v.data(), type_length_); buffer += type_length_; }); } template inline void DictEncoderImpl::Put(const T& v) { // Put() implementation for primitive types auto on_found = [](int32_t memo_index) {}; auto on_not_found = [this](int32_t memo_index) { dict_encoded_size_ += static_cast(sizeof(T)); }; int32_t memo_index; PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(v, on_found, on_not_found, &memo_index)); buffered_indices_.push_back(memo_index); } template inline void DictEncoderImpl::PutByteArray(const void* ptr, int32_t length) { DCHECK(false); } template <> inline void DictEncoderImpl::PutByteArray(const void* ptr, int32_t length) { static const uint8_t empty[] = {0}; auto on_found = [](int32_t memo_index) {}; auto on_not_found = [&](int32_t memo_index) { dict_encoded_size_ += static_cast(length + sizeof(uint32_t)); }; DCHECK(ptr != nullptr || length == 0); ptr = (ptr != nullptr) ? ptr : empty; int32_t memo_index; PARQUET_THROW_NOT_OK( memo_table_.GetOrInsert(ptr, length, on_found, on_not_found, &memo_index)); buffered_indices_.push_back(memo_index); unencoded_byte_array_data_bytes_ += length; } template <> inline void DictEncoderImpl::Put(const ByteArray& val) { return PutByteArray(val.ptr, static_cast(val.len)); } template <> inline void DictEncoderImpl::Put(const FixedLenByteArray& v) { static const uint8_t empty[] = {0}; auto on_found = [](int32_t memo_index) {}; auto on_not_found = [this](int32_t memo_index) { dict_encoded_size_ += type_length_; }; DCHECK(v.ptr != nullptr || type_length_ == 0); const void* ptr = (v.ptr != nullptr) ? v.ptr : empty; int32_t memo_index; PARQUET_THROW_NOT_OK( memo_table_.GetOrInsert(ptr, type_length_, on_found, on_not_found, &memo_index)); buffered_indices_.push_back(memo_index); } template <> void DictEncoderImpl::Put(const ::arrow20::Array& values) { ParquetException::NYI("Direct put to Int96"); } template <> void DictEncoderImpl::PutDictionary(const ::arrow20::Array& values) { ParquetException::NYI("Direct put to Int96"); } template void DictEncoderImpl::Put(const ::arrow20::Array& values) { using ArrayType = typename ::arrow20::CTypeTraits::ArrayType; const auto& data = checked_cast(values); if (data.null_count() == 0) { // no nulls, just dump the data for (int64_t i = 0; i < data.length(); i++) { Put(data.Value(i)); } } else { for (int64_t i = 0; i < data.length(); i++) { if (data.IsValid(i)) { Put(data.Value(i)); } } } } template <> void DictEncoderImpl::Put(const ::arrow20::Array& values) { AssertFixedSizeBinary(values, type_length_); const auto& data = checked_cast(values); if (data.null_count() == 0) { // no nulls, just dump the data for (int64_t i = 0; i < data.length(); i++) { Put(FixedLenByteArray(data.Value(i))); } } else { std::vector empty(type_length_, 0); for (int64_t i = 0; i < data.length(); i++) { if (data.IsValid(i)) { Put(FixedLenByteArray(data.Value(i))); } } } } template <> void DictEncoderImpl::Put(const ::arrow20::Array& values) { AssertBaseBinary(values); if (::arrow20::is_binary_like(values.type_id())) { PutBinaryArray(checked_cast(values)); } else { DCHECK(::arrow20::is_large_binary_like(values.type_id())); PutBinaryArray(checked_cast(values)); } } template void AssertCanPutDictionary(DictEncoderImpl* encoder, const ::arrow20::Array& dict) { if (dict.null_count() > 0) { throw ParquetException("Inserted dictionary cannot contain nulls"); } if (encoder->num_entries() > 0) { throw ParquetException("Can only call PutDictionary on an empty DictEncoder"); } } template void DictEncoderImpl::PutDictionary(const ::arrow20::Array& values) { AssertCanPutDictionary(this, values); using ArrayType = typename ::arrow20::CTypeTraits::ArrayType; const auto& data = checked_cast(values); dict_encoded_size_ += static_cast(sizeof(typename DType::c_type) * data.length()); for (int64_t i = 0; i < data.length(); i++) { int32_t unused_memo_index; PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(data.Value(i), &unused_memo_index)); } } template <> void DictEncoderImpl::PutDictionary(const ::arrow20::Array& values) { AssertFixedSizeBinary(values, type_length_); AssertCanPutDictionary(this, values); const auto& data = checked_cast(values); dict_encoded_size_ += static_cast(type_length_ * data.length()); for (int64_t i = 0; i < data.length(); i++) { int32_t unused_memo_index; PARQUET_THROW_NOT_OK( memo_table_.GetOrInsert(data.Value(i), type_length_, &unused_memo_index)); } } template <> void DictEncoderImpl::PutDictionary(const ::arrow20::Array& values) { AssertBaseBinary(values); AssertCanPutDictionary(this, values); if (::arrow20::is_binary_like(values.type_id())) { PutBinaryDictionaryArray(checked_cast(values)); } else { DCHECK(::arrow20::is_large_binary_like(values.type_id())); PutBinaryDictionaryArray(checked_cast(values)); } } // ---------------------------------------------------------------------- // BYTE_STREAM_SPLIT encoder // Common base class for all types template class ByteStreamSplitEncoderBase : public EncoderImpl, virtual public TypedEncoder { public: using T = typename DType::c_type; using TypedEncoder::Put; ByteStreamSplitEncoderBase(const ColumnDescriptor* descr, int byte_width, ::arrow20::MemoryPool* pool) : EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool), sink_{pool}, byte_width_(byte_width), num_values_in_buffer_{0} {} int64_t EstimatedDataEncodedSize() override { return sink_.length(); } std::shared_ptr FlushValues() override { if (byte_width_ == 1) { // Special-cased fast path PARQUET_ASSIGN_OR_THROW(auto buf, sink_.Finish()); return buf; } auto output_buffer = AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize()); uint8_t* output_buffer_raw = output_buffer->mutable_data(); const uint8_t* raw_values = sink_.data(); ::arrow20::util::internal::ByteStreamSplitEncode( raw_values, /*width=*/byte_width_, num_values_in_buffer_, output_buffer_raw); sink_.Reset(); num_values_in_buffer_ = 0; return output_buffer; } void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) override { if (valid_bits != NULLPTR) { PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow20::AllocateBuffer(num_values * sizeof(T), this->memory_pool())); T* data = buffer->template mutable_data_as(); int num_valid_values = ::arrow20::util::internal::SpacedCompress( src, num_values, valid_bits, valid_bits_offset, data); Put(data, num_valid_values); } else { Put(src, num_values); } } protected: ::arrow20::BufferBuilder sink_; // Required because type_length_ is only filled in for FLBA const int byte_width_; int64_t num_values_in_buffer_; }; // BYTE_STREAM_SPLIT encoder implementation for FLOAT, DOUBLE, INT32, INT64 template class ByteStreamSplitEncoder : public ByteStreamSplitEncoderBase { public: using T = typename DType::c_type; using ArrowType = typename EncodingTraits::ArrowType; ByteStreamSplitEncoder(const ColumnDescriptor* descr, ::arrow20::MemoryPool* pool = ::arrow20::default_memory_pool()) : ByteStreamSplitEncoderBase(descr, /*byte_width=*/static_cast(sizeof(T)), pool) {} // Inherit Put(const std::vector&...) using TypedEncoder::Put; void Put(const T* buffer, int num_values) override { if (num_values > 0) { PARQUET_THROW_NOT_OK( this->sink_.Append(reinterpret_cast(buffer), num_values * static_cast(sizeof(T)))); this->num_values_in_buffer_ += num_values; } } void Put(const ::arrow20::Array& values) override { if (values.type_id() != ArrowType::type_id) { throw ParquetException(std::string() + "direct put from " + values.type()->ToString() + " not supported"); } const auto& data = *values.data(); this->PutSpaced(data.GetValues(1), static_cast(data.length), data.GetValues(0, 0), data.offset); } }; // BYTE_STREAM_SPLIT encoder implementation for FLBA template <> class ByteStreamSplitEncoder : public ByteStreamSplitEncoderBase { public: using DType = FLBAType; using T = FixedLenByteArray; using ArrowType = ::arrow20::FixedSizeBinaryArray; ByteStreamSplitEncoder(const ColumnDescriptor* descr, ::arrow20::MemoryPool* pool = ::arrow20::default_memory_pool()) : ByteStreamSplitEncoderBase(descr, /*byte_width=*/descr->type_length(), pool) {} // Inherit Put(const std::vector&...) using TypedEncoder::Put; void Put(const T* buffer, int num_values) override { if (byte_width_ > 0) { const int64_t total_bytes = static_cast(num_values) * byte_width_; PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes)); for (int i = 0; i < num_values; ++i) { // Write the result to the output stream DCHECK(buffer[i].ptr != nullptr) << "Value ptr cannot be NULL"; sink_.UnsafeAppend(buffer[i].ptr, byte_width_); } } this->num_values_in_buffer_ += num_values; } void Put(const ::arrow20::Array& values) override { AssertFixedSizeBinary(values, byte_width_); const auto& data = checked_cast(values); if (data.null_count() == 0) { // no nulls, just buffer the data PARQUET_THROW_NOT_OK(sink_.Append(data.raw_values(), data.length() * byte_width_)); this->num_values_in_buffer_ += data.length(); } else { const int64_t num_values = data.length() - data.null_count(); const int64_t total_bytes = num_values * byte_width_; PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes)); // TODO use VisitSetBitRunsVoid for (int64_t i = 0; i < data.length(); i++) { if (data.IsValid(i)) { sink_.UnsafeAppend(data.Value(i), byte_width_); } } this->num_values_in_buffer_ += num_values; } } }; // ---------------------------------------------------------------------- // DELTA_BINARY_PACKED encoder /// DeltaBitPackEncoder is an encoder for the DeltaBinary Packing format /// as per the parquet spec. See: /// https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5 /// /// Consists of a header followed by blocks of delta encoded values binary packed. /// /// Format /// [header] [block 1] [block 2] ... [block N] /// /// Header /// [block size] [number of mini blocks per block] [total value count] [first value] /// /// Block /// [min delta] [list of bitwidths of the mini blocks] [miniblocks] /// /// Sets aside bytes at the start of the internal buffer where the header will be written, /// and only writes the header when FlushValues is called before returning it. /// /// To encode a block, we will: /// /// 1. Compute the differences between consecutive elements. For the first element in the /// block, use the last element in the previous block or, in the case of the first block, /// use the first value of the whole sequence, stored in the header. /// /// 2. Compute the frame of reference (the minimum of the deltas in the block). Subtract /// this min delta from all deltas in the block. This guarantees that all values are /// non-negative. /// /// 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int followed by the /// bit widths of the mini blocks and the delta values (minus the min delta) bit packed /// per mini block. /// /// Supports only INT32 and INT64. template class DeltaBitPackEncoder : public EncoderImpl, virtual public TypedEncoder { // Maximum possible header size static constexpr uint32_t kMaxPageHeaderWriterSize = 32; static constexpr uint32_t kValuesPerBlock = std::is_same_v ? 128 : 256; static constexpr uint32_t kMiniBlocksPerBlock = 4; public: using T = typename DType::c_type; using UT = std::make_unsigned_t; using TypedEncoder::Put; explicit DeltaBitPackEncoder(const ColumnDescriptor* descr, MemoryPool* pool, const uint32_t values_per_block = kValuesPerBlock, const uint32_t mini_blocks_per_block = kMiniBlocksPerBlock) : EncoderImpl(descr, Encoding::DELTA_BINARY_PACKED, pool), values_per_block_(values_per_block), mini_blocks_per_block_(mini_blocks_per_block), values_per_mini_block_(values_per_block / mini_blocks_per_block), deltas_(values_per_block, ::arrow20::stl::allocator(pool)), bits_buffer_( AllocateBuffer(pool, (kMiniBlocksPerBlock + values_per_block) * sizeof(T))), sink_(pool), bit_writer_(bits_buffer_->mutable_data(), static_cast(bits_buffer_->size())) { if (values_per_block_ % 128 != 0) { throw ParquetException( "the number of values in a block must be multiple of 128, but it's " + std::to_string(values_per_block_)); } if (values_per_mini_block_ % 32 != 0) { throw ParquetException( "the number of values in a miniblock must be multiple of 32, but it's " + std::to_string(values_per_mini_block_)); } if (values_per_block % mini_blocks_per_block != 0) { throw ParquetException( "the number of values per block % number of miniblocks per block must be 0, " "but it's " + std::to_string(values_per_block % mini_blocks_per_block)); } // Reserve enough space at the beginning of the buffer for largest possible header. PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize)); } std::shared_ptr FlushValues() override; int64_t EstimatedDataEncodedSize() override { return sink_.length(); } void Put(const ::arrow20::Array& values) override; void Put(const T* buffer, int num_values) override; void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) override; void FlushBlock(); private: const uint32_t values_per_block_; const uint32_t mini_blocks_per_block_; const uint32_t values_per_mini_block_; uint32_t values_current_block_{0}; uint32_t total_value_count_{0}; T first_value_{0}; T current_value_{0}; ArrowPoolVector deltas_; std::shared_ptr bits_buffer_; ::arrow20::BufferBuilder sink_; ::arrow20::bit_util::BitWriter bit_writer_; }; template void DeltaBitPackEncoder::Put(const T* src, int num_values) { if (num_values == 0) { return; } int idx = 0; if (total_value_count_ == 0) { current_value_ = src[0]; first_value_ = current_value_; idx = 1; } total_value_count_ += num_values; while (idx < num_values) { T value = src[idx]; // Calculate deltas. The possible overflow is handled by use of unsigned integers // making subtraction operations well-defined and correct even in case of overflow. // Encoded integers will wrap back around on decoding. // See http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n deltas_[values_current_block_] = SafeSignedSubtract(value, current_value_); current_value_ = value; idx++; values_current_block_++; if (values_current_block_ == values_per_block_) { FlushBlock(); } } } template void DeltaBitPackEncoder::FlushBlock() { if (values_current_block_ == 0) { return; } // Calculate the frame of reference for this miniblock. This value will be subtracted // from all deltas to guarantee all deltas are positive for encoding. const T min_delta = *std::min_element(deltas_.begin(), deltas_.begin() + values_current_block_); bit_writer_.PutZigZagVlqInt(min_delta); // Call to GetNextBytePtr reserves mini_blocks_per_block_ bytes of space to write // bit widths of miniblocks as they become known during the encoding. uint8_t* bit_width_data = bit_writer_.GetNextBytePtr(mini_blocks_per_block_); DCHECK(bit_width_data != nullptr); const uint32_t num_miniblocks = static_cast(std::ceil(static_cast(values_current_block_) / static_cast(values_per_mini_block_))); for (uint32_t i = 0; i < num_miniblocks; i++) { const uint32_t values_current_mini_block = std::min(values_per_mini_block_, values_current_block_); const uint32_t start = i * values_per_mini_block_; const T max_delta = *std::max_element( deltas_.begin() + start, deltas_.begin() + start + values_current_mini_block); // The minimum number of bits required to write any of values in deltas_ vector. // See overflow comment above. const auto bit_width = bit_width_data[i] = bit_util::NumRequiredBits( static_cast(max_delta) - static_cast(min_delta)); for (uint32_t j = start; j < start + values_current_mini_block; j++) { // Convert delta to frame of reference. See overflow comment above. const UT value = static_cast(deltas_[j]) - static_cast(min_delta); bit_writer_.PutValue(value, bit_width); } // If there are not enough values to fill the last mini block, we pad the mini block // with zeroes so that its length is the number of values in a full mini block // multiplied by the bit width. for (uint32_t j = values_current_mini_block; j < values_per_mini_block_; j++) { bit_writer_.PutValue(0, bit_width); } values_current_block_ -= values_current_mini_block; } // If, in the last block, less than miniblocks are // needed to store the values, the bytes storing the bit widths of the unneeded // miniblocks are still present, their value should be zero, but readers must accept // arbitrary values as well. for (uint32_t i = num_miniblocks; i < mini_blocks_per_block_; i++) { bit_width_data[i] = 0; } DCHECK_EQ(values_current_block_, 0); bit_writer_.Flush(); PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); bit_writer_.Clear(); } template std::shared_ptr DeltaBitPackEncoder::FlushValues() { if (values_current_block_ > 0) { FlushBlock(); } PARQUET_ASSIGN_OR_THROW(auto buffer, sink_.Finish(/*shrink_to_fit=*/true)); uint8_t header_buffer_[kMaxPageHeaderWriterSize] = {}; bit_util::BitWriter header_writer(header_buffer_, sizeof(header_buffer_)); if (!header_writer.PutVlqInt(values_per_block_) || !header_writer.PutVlqInt(mini_blocks_per_block_) || !header_writer.PutVlqInt(total_value_count_) || !header_writer.PutZigZagVlqInt(static_cast(first_value_))) { throw ParquetException("header writing error"); } header_writer.Flush(); // We reserved enough space at the beginning of the buffer for largest possible header // and data was written immediately after. We now write the header data immediately // before the end of reserved space. const size_t offset_bytes = kMaxPageHeaderWriterSize - header_writer.bytes_written(); std::memcpy(buffer->mutable_data() + offset_bytes, header_buffer_, header_writer.bytes_written()); // Reset counter of cached values total_value_count_ = 0; // Reserve enough space at the beginning of the buffer for largest possible header. PARQUET_THROW_NOT_OK(sink_.Advance(kMaxPageHeaderWriterSize)); // Excess bytes at the beginning are sliced off and ignored. return SliceBuffer(std::move(buffer), offset_bytes); } template <> void DeltaBitPackEncoder::Put(const ::arrow20::Array& values) { const ::arrow20::ArrayData& data = *values.data(); if (values.type_id() != ::arrow20::Type::INT32) { throw ParquetException("Expected Int32TArray, got ", values.type()->ToString()); } if (data.length > std::numeric_limits::max()) { throw ParquetException("Array cannot be longer than ", std::numeric_limits::max()); } if (values.null_count() == 0) { Put(data.GetValues(1), static_cast(data.length)); } else { PutSpaced(data.GetValues(1), static_cast(data.length), data.GetValues(0, 0), data.offset); } } template <> void DeltaBitPackEncoder::Put(const ::arrow20::Array& values) { const ::arrow20::ArrayData& data = *values.data(); if (values.type_id() != ::arrow20::Type::INT64) { throw ParquetException("Expected Int64TArray, got ", values.type()->ToString()); } if (data.length > std::numeric_limits::max()) { throw ParquetException("Array cannot be longer than ", std::numeric_limits::max()); } if (values.null_count() == 0) { Put(data.GetValues(1), static_cast(data.length)); } else { PutSpaced(data.GetValues(1), static_cast(data.length), data.GetValues(0, 0), data.offset); } } template void DeltaBitPackEncoder::PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) { if (valid_bits != NULLPTR) { PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow20::AllocateBuffer(num_values * sizeof(T), this->memory_pool())); T* data = buffer->template mutable_data_as(); int num_valid_values = ::arrow20::util::internal::SpacedCompress( src, num_values, valid_bits, valid_bits_offset, data); Put(data, num_valid_values); } else { Put(src, num_values); } } // ---------------------------------------------------------------------- // DELTA_LENGTH_BYTE_ARRAY encoder class DeltaLengthByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder { public: explicit DeltaLengthByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool) : EncoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY, pool = ::arrow20::default_memory_pool()), sink_(pool), length_encoder_(nullptr, pool) {} std::shared_ptr FlushValues() override; int64_t EstimatedDataEncodedSize() override { return sink_.length() + length_encoder_.EstimatedDataEncodedSize(); } using TypedEncoder::Put; void Put(const ::arrow20::Array& values) override; void Put(const T* buffer, int num_values) override; void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) override; protected: template void PutBinaryArray(const ArrayType& array) { PARQUET_THROW_NOT_OK(::arrow20::VisitArraySpanInline( *array.data(), [&](::std::string_view view) { if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) { return Status::Invalid( "Parquet cannot store strings with size 2GB or more, got: ", view.size()); } if (ARROW_PREDICT_FALSE( view.size() + sink_.length() > static_cast(std::numeric_limits::max()))) { return Status::Invalid("excess expansion in DELTA_LENGTH_BYTE_ARRAY"); } length_encoder_.Put({static_cast(view.length())}, 1); PARQUET_THROW_NOT_OK(sink_.Append(view.data(), view.length())); unencoded_byte_array_data_bytes_ += view.size(); return Status::OK(); }, []() { return Status::OK(); })); } ::arrow20::BufferBuilder sink_; DeltaBitPackEncoder length_encoder_; }; void DeltaLengthByteArrayEncoder::Put(const ::arrow20::Array& values) { AssertBaseBinary(values); if (::arrow20::is_binary_like(values.type_id())) { PutBinaryArray(checked_cast(values)); } else { PutBinaryArray(checked_cast(values)); } } void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) { if (num_values == 0) { return; } constexpr int kBatchSize = 256; std::array lengths; uint32_t total_increment_size = 0; for (int idx = 0; idx < num_values; idx += kBatchSize) { const int batch_size = std::min(kBatchSize, num_values - idx); for (int j = 0; j < batch_size; ++j) { const int32_t len = src[idx + j].len; if (ARROW_PREDICT_FALSE( AddWithOverflow(total_increment_size, len, &total_increment_size))) { throw ParquetException("excess expansion in DELTA_LENGTH_BYTE_ARRAY"); } lengths[j] = len; } length_encoder_.Put(lengths.data(), batch_size); } if (sink_.length() + total_increment_size > std::numeric_limits::max()) { throw ParquetException("excess expansion in DELTA_LENGTH_BYTE_ARRAY"); } PARQUET_THROW_NOT_OK(sink_.Reserve(total_increment_size)); for (int idx = 0; idx < num_values; idx++) { sink_.UnsafeAppend(src[idx].ptr, src[idx].len); } unencoded_byte_array_data_bytes_ += total_increment_size; } void DeltaLengthByteArrayEncoder::PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) { if (valid_bits != NULLPTR) { PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow20::AllocateBuffer(num_values * sizeof(T), this->memory_pool())); T* data = buffer->template mutable_data_as(); int num_valid_values = ::arrow20::util::internal::SpacedCompress( src, num_values, valid_bits, valid_bits_offset, data); Put(data, num_valid_values); } else { Put(src, num_values); } } std::shared_ptr DeltaLengthByteArrayEncoder::FlushValues() { std::shared_ptr encoded_lengths = length_encoder_.FlushValues(); std::shared_ptr data; PARQUET_THROW_NOT_OK(sink_.Finish(&data)); sink_.Reset(); PARQUET_THROW_NOT_OK(sink_.Resize(encoded_lengths->size() + data->size())); PARQUET_THROW_NOT_OK(sink_.Append(encoded_lengths->data(), encoded_lengths->size())); PARQUET_THROW_NOT_OK(sink_.Append(data->data(), data->size())); std::shared_ptr buffer; PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true)); return buffer; } // ---------------------------------------------------------------------- // DELTA_BYTE_ARRAY encoder /// Delta Byte Array encoding also known as incremental encoding or front compression: /// for each element in a sequence of strings, store the prefix length of the previous /// entry plus the suffix. /// /// This is stored as a sequence of delta-encoded prefix lengths (DELTA_BINARY_PACKED), /// followed by the suffixes encoded as delta length byte arrays /// (DELTA_LENGTH_BYTE_ARRAY). template class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder { static constexpr std::string_view kEmpty = ""; public: using T = typename DType::c_type; explicit DeltaByteArrayEncoder(const ColumnDescriptor* descr, MemoryPool* pool = ::arrow20::default_memory_pool()) : EncoderImpl(descr, Encoding::DELTA_BYTE_ARRAY, pool), sink_(pool), prefix_length_encoder_(/*descr=*/nullptr, pool), suffix_encoder_(descr, pool), last_value_(""), empty_(static_cast(kEmpty.size()), reinterpret_cast(kEmpty.data())) {} std::shared_ptr FlushValues() override; int64_t EstimatedDataEncodedSize() override { return prefix_length_encoder_.EstimatedDataEncodedSize() + suffix_encoder_.EstimatedDataEncodedSize(); } using TypedEncoder::Put; void Put(const ::arrow20::Array& values) override; void Put(const T* buffer, int num_values) override; void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) override { if (valid_bits != nullptr) { if (buffer_ == nullptr) { PARQUET_ASSIGN_OR_THROW(buffer_, ::arrow20::AllocateResizableBuffer(num_values * sizeof(T), this->memory_pool())); } else { PARQUET_THROW_NOT_OK(buffer_->Resize(num_values * sizeof(T), false)); } T* data = buffer_->mutable_data_as(); int num_valid_values = ::arrow20::util::internal::SpacedCompress( src, num_values, valid_bits, valid_bits_offset, data); Put(data, num_valid_values); } else { Put(src, num_values); } } protected: template void PutInternal(const T* src, int num_values, const VisitorType visitor) { if (num_values == 0) { return; } std::string_view last_value_view = last_value_; constexpr int kBatchSize = 256; std::array prefix_lengths; std::array suffixes; for (int i = 0; i < num_values; i += kBatchSize) { const int batch_size = std::min(kBatchSize, num_values - i); for (int j = 0; j < batch_size; ++j) { const int idx = i + j; const auto view = visitor[idx]; const auto len = static_cast(view.length()); uint32_t common_prefix_length = 0; const uint32_t maximum_common_prefix_length = std::min(len, static_cast(last_value_view.length())); while (common_prefix_length < maximum_common_prefix_length) { if (last_value_view[common_prefix_length] != view[common_prefix_length]) { break; } common_prefix_length++; } last_value_view = view; prefix_lengths[j] = common_prefix_length; const uint32_t suffix_length = len - common_prefix_length; const uint8_t* suffix_ptr = src[idx].ptr + common_prefix_length; // Convert to ByteArray, so it can be passed to the suffix_encoder_. const ByteArray suffix(suffix_length, suffix_ptr); suffixes[j] = suffix; unencoded_byte_array_data_bytes_ += len; } suffix_encoder_.Put(suffixes.data(), batch_size); prefix_length_encoder_.Put(prefix_lengths.data(), batch_size); } last_value_ = last_value_view; } template void PutBinaryArray(const ArrayType& array) { auto previous_len = static_cast(last_value_.length()); std::string_view last_value_view = last_value_; PARQUET_THROW_NOT_OK(::arrow20::VisitArraySpanInline( *array.data(), [&](::std::string_view view) { if (ARROW_PREDICT_FALSE(view.size() >= kMaxByteArraySize)) { return Status::Invalid( "Parquet cannot store strings with size 2GB or more, got: ", view.size()); } const ByteArray src{view}; uint32_t common_prefix_length = 0; const uint32_t len = src.len; const uint32_t maximum_common_prefix_length = std::min(previous_len, len); while (common_prefix_length < maximum_common_prefix_length) { if (last_value_view[common_prefix_length] != view[common_prefix_length]) { break; } common_prefix_length++; } previous_len = len; prefix_length_encoder_.Put({static_cast(common_prefix_length)}, 1); last_value_view = view; const auto suffix_length = static_cast(len - common_prefix_length); if (suffix_length == 0) { suffix_encoder_.Put(&empty_, 1); return Status::OK(); } const uint8_t* suffix_ptr = src.ptr + common_prefix_length; // Convert to ByteArray, so it can be passed to the suffix_encoder_. const ByteArray suffix(suffix_length, suffix_ptr); suffix_encoder_.Put(&suffix, 1); unencoded_byte_array_data_bytes_ += len; return Status::OK(); }, []() { return Status::OK(); })); last_value_ = last_value_view; } ::arrow20::BufferBuilder sink_; DeltaBitPackEncoder prefix_length_encoder_; DeltaLengthByteArrayEncoder suffix_encoder_; std::string last_value_; const ByteArray empty_; std::unique_ptr buffer_; }; struct ByteArrayVisitor { const ByteArray* src; std::string_view operator[](int i) const { if (ARROW_PREDICT_FALSE(src[i].len >= kMaxByteArraySize)) { throw ParquetException("Parquet cannot store strings with size 2GB or more, got: ", src[i].len); } return std::string_view{src[i]}; } uint32_t len(int i) const { return src[i].len; } }; struct FLBAVisitor { const FLBA* src; const uint32_t type_length; std::string_view operator[](int i) const { return std::string_view{reinterpret_cast(src[i].ptr), type_length}; } uint32_t len(int i) const { return type_length; } }; template <> void DeltaByteArrayEncoder::Put(const ByteArray* src, int num_values) { auto visitor = ByteArrayVisitor{src}; PutInternal(src, num_values, visitor); } template <> void DeltaByteArrayEncoder::Put(const FLBA* src, int num_values) { auto visitor = FLBAVisitor{src, static_cast(descr_->type_length())}; PutInternal(src, num_values, visitor); } template void DeltaByteArrayEncoder::Put(const ::arrow20::Array& values) { if (::arrow20::is_binary_like(values.type_id())) { PutBinaryArray(checked_cast(values)); } else if (::arrow20::is_large_binary_like(values.type_id())) { PutBinaryArray(checked_cast(values)); } else if (::arrow20::is_fixed_size_binary(values.type_id())) { PutBinaryArray(checked_cast(values)); } else { throw ParquetException("Only BaseBinaryArray and subclasses supported"); } } template std::shared_ptr DeltaByteArrayEncoder::FlushValues() { PARQUET_THROW_NOT_OK(sink_.Resize(EstimatedDataEncodedSize(), false)); std::shared_ptr prefix_lengths = prefix_length_encoder_.FlushValues(); PARQUET_THROW_NOT_OK(sink_.Append(prefix_lengths->data(), prefix_lengths->size())); std::shared_ptr suffixes = suffix_encoder_.FlushValues(); PARQUET_THROW_NOT_OK(sink_.Append(suffixes->data(), suffixes->size())); std::shared_ptr buffer; PARQUET_THROW_NOT_OK(sink_.Finish(&buffer, true)); last_value_.clear(); return buffer; } // ---------------------------------------------------------------------- // RLE encoder for BOOLEAN class RleBooleanEncoder final : public EncoderImpl, virtual public BooleanEncoder { public: explicit RleBooleanEncoder(const ColumnDescriptor* descr, ::arrow20::MemoryPool* pool) : EncoderImpl(descr, Encoding::RLE, pool), buffered_append_values_(::arrow20::stl::allocator(pool)) {} int64_t EstimatedDataEncodedSize() override { return kRleLengthInBytes + MaxRleBufferSize(); } std::shared_ptr FlushValues() override; void Put(const T* buffer, int num_values) override; void Put(const ::arrow20::Array& values) override { if (values.type_id() != ::arrow20::Type::BOOL) { throw ParquetException("RleBooleanEncoder expects BooleanArray, got ", values.type()->ToString()); } const auto& boolean_array = checked_cast(values); if (values.null_count() == 0) { for (int i = 0; i < boolean_array.length(); ++i) { // null_count == 0, so just call Value directly is ok. buffered_append_values_.push_back(boolean_array.Value(i)); } } else { PARQUET_THROW_NOT_OK(::arrow20::VisitArraySpanInline<::arrow20::BooleanType>( *boolean_array.data(), [&](bool value) { buffered_append_values_.push_back(value); return Status::OK(); }, []() { return Status::OK(); })); } } void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) override { if (valid_bits != NULLPTR) { PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow20::AllocateBuffer(num_values * sizeof(T), this->memory_pool())); T* data = buffer->mutable_data_as(); int num_valid_values = ::arrow20::util::internal::SpacedCompress( src, num_values, valid_bits, valid_bits_offset, data); Put(data, num_valid_values); } else { Put(src, num_values); } } void Put(const std::vector& src, int num_values) override; protected: template void PutImpl(const SequenceType& src, int num_values); int MaxRleBufferSize() const noexcept { return RlePreserveBufferSize(static_cast(buffered_append_values_.size()), kBitWidth); } constexpr static int32_t kBitWidth = 1; /// 4 bytes in little-endian, which indicates the length. constexpr static int32_t kRleLengthInBytes = 4; // std::vector in C++ is tricky, because it's a bitmap. // Here RleBooleanEncoder will only append values into it, and // dump values into Buffer, so using it here is ok. ArrowPoolVector buffered_append_values_; }; void RleBooleanEncoder::Put(const bool* src, int num_values) { PutImpl(src, num_values); } void RleBooleanEncoder::Put(const std::vector& src, int num_values) { PutImpl(src, num_values); } template void RleBooleanEncoder::PutImpl(const SequenceType& src, int num_values) { for (int i = 0; i < num_values; ++i) { buffered_append_values_.push_back(src[i]); } } std::shared_ptr RleBooleanEncoder::FlushValues() { int rle_buffer_size_max = MaxRleBufferSize(); std::shared_ptr buffer = AllocateBuffer(this->pool_, rle_buffer_size_max + kRleLengthInBytes); ::arrow20::util::RleEncoder encoder(buffer->mutable_data() + kRleLengthInBytes, rle_buffer_size_max, /*bit_width*/ kBitWidth); for (bool value : buffered_append_values_) { encoder.Put(value ? 1 : 0); } encoder.Flush(); ::arrow20::util::SafeStore(buffer->mutable_data(), ::arrow20::bit_util::ToLittleEndian(encoder.len())); PARQUET_THROW_NOT_OK(buffer->Resize(kRleLengthInBytes + encoder.len())); buffered_append_values_.clear(); return buffer; } } // namespace // ---------------------------------------------------------------------- // Factory function std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encoding, bool use_dictionary, const ColumnDescriptor* descr, MemoryPool* pool) { if (use_dictionary) { switch (type_num) { case Type::INT32: return std::make_unique>(descr, pool); case Type::INT64: return std::make_unique>(descr, pool); case Type::INT96: return std::make_unique>(descr, pool); case Type::FLOAT: return std::make_unique>(descr, pool); case Type::DOUBLE: return std::make_unique>(descr, pool); case Type::BYTE_ARRAY: return std::make_unique>(descr, pool); case Type::FIXED_LEN_BYTE_ARRAY: return std::make_unique>(descr, pool); default: DCHECK(false) << "Encoder not implemented"; break; } } else if (encoding == Encoding::PLAIN) { switch (type_num) { case Type::BOOLEAN: return std::make_unique>(descr, pool); case Type::INT32: return std::make_unique>(descr, pool); case Type::INT64: return std::make_unique>(descr, pool); case Type::INT96: return std::make_unique>(descr, pool); case Type::FLOAT: return std::make_unique>(descr, pool); case Type::DOUBLE: return std::make_unique>(descr, pool); case Type::BYTE_ARRAY: return std::make_unique>(descr, pool); case Type::FIXED_LEN_BYTE_ARRAY: return std::make_unique>(descr, pool); default: DCHECK(false) << "Encoder not implemented"; break; } } else if (encoding == Encoding::BYTE_STREAM_SPLIT) { switch (type_num) { case Type::INT32: return std::make_unique>(descr, pool); case Type::INT64: return std::make_unique>(descr, pool); case Type::FLOAT: return std::make_unique>(descr, pool); case Type::DOUBLE: return std::make_unique>(descr, pool); case Type::FIXED_LEN_BYTE_ARRAY: return std::make_unique>(descr, pool); default: throw ParquetException( "BYTE_STREAM_SPLIT only supports FLOAT, DOUBLE, INT32, INT64 " "and FIXED_LEN_BYTE_ARRAY"); } } else if (encoding == Encoding::DELTA_BINARY_PACKED) { switch (type_num) { case Type::INT32: return std::make_unique>(descr, pool); case Type::INT64: return std::make_unique>(descr, pool); default: throw ParquetException( "DELTA_BINARY_PACKED encoder only supports INT32 and INT64"); } } else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) { switch (type_num) { case Type::BYTE_ARRAY: return std::make_unique(descr, pool); default: throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY"); } } else if (encoding == Encoding::RLE) { switch (type_num) { case Type::BOOLEAN: return std::make_unique(descr, pool); default: throw ParquetException("RLE only supports BOOLEAN"); } } else if (encoding == Encoding::DELTA_BYTE_ARRAY) { switch (type_num) { case Type::BYTE_ARRAY: return std::make_unique>(descr, pool); case Type::FIXED_LEN_BYTE_ARRAY: return std::make_unique>(descr, pool); default: throw ParquetException( "DELTA_BYTE_ARRAY only supports BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY"); } } else { ParquetException::NYI("Selected encoding is not supported"); } DCHECK(false) << "Should not be able to reach this code"; return nullptr; } } // namespace parquet20