aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs/apache/arrow/cpp/src/parquet/encoding.cc
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/libs/apache/arrow/cpp/src/parquet/encoding.cc
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/libs/apache/arrow/cpp/src/parquet/encoding.cc')
-rw-r--r--contrib/libs/apache/arrow/cpp/src/parquet/encoding.cc2547
1 files changed, 2547 insertions, 0 deletions
diff --git a/contrib/libs/apache/arrow/cpp/src/parquet/encoding.cc b/contrib/libs/apache/arrow/cpp/src/parquet/encoding.cc
new file mode 100644
index 0000000000..6e8f7ee549
--- /dev/null
+++ b/contrib/libs/apache/arrow/cpp/src/parquet/encoding.cc
@@ -0,0 +1,2547 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/encoding.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdlib>
+#include <limits>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/array/builder_dict.h"
+#include "arrow/stl_allocator.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/bit_stream_utils.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/bitmap_writer.h"
+#include "arrow/util/byte_stream_split.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/hashing.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/rle_encoding.h"
+#include "arrow/util/ubsan.h"
+#include "arrow/visitor_inline.h"
+
+#include "parquet/exception.h"
+#include "parquet/platform.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+namespace BitUtil = arrow::BitUtil;
+
+using arrow::Status;
+using arrow::VisitNullBitmapInline;
+using arrow::internal::checked_cast;
+
+template <typename T>
+using ArrowPoolVector = std::vector<T, ::arrow::stl::allocator<T>>;
+
+namespace parquet {
+namespace {
+
+constexpr int64_t kInMemoryDefaultCapacity = 1024;
+// The Parquet spec isn't very clear whether ByteArray lengths are signed or
+// unsigned, but the Java implementation uses signed ints.
+constexpr size_t kMaxByteArraySize = std::numeric_limits<int32_t>::max();
+
+class EncoderImpl : virtual public Encoder {
+ public:
+ EncoderImpl(const ColumnDescriptor* descr, Encoding::type encoding, MemoryPool* pool)
+ : descr_(descr),
+ encoding_(encoding),
+ pool_(pool),
+ type_length_(descr ? descr->type_length() : -1) {}
+
+ Encoding::type encoding() const override { return encoding_; }
+
+ MemoryPool* memory_pool() const override { return pool_; }
+
+ protected:
+ // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
+ const ColumnDescriptor* descr_;
+ const Encoding::type encoding_;
+ MemoryPool* pool_;
+
+ /// Type length from descr
+ int type_length_;
+};
+
+// ----------------------------------------------------------------------
+// Plain encoder implementation
+
+template <typename DType>
+class PlainEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+ using T = typename DType::c_type;
+
+ explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
+ : EncoderImpl(descr, Encoding::PLAIN, pool), sink_(pool) {}
+
+ int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
+
+ std::shared_ptr<Buffer> FlushValues() override {
+ std::shared_ptr<Buffer> buffer;
+ PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
+ return buffer;
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const T* buffer, int num_values) override;
+
+ void Put(const ::arrow::Array& values) override;
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
+ this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+ }
+
+ void UnsafePutByteArray(const void* data, uint32_t length) {
+ DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL";
+ sink_.UnsafeAppend(&length, sizeof(uint32_t));
+ sink_.UnsafeAppend(data, static_cast<int64_t>(length));
+ }
+
+ void Put(const ByteArray& val) {
+ // Write the result to the output stream
+ const int64_t increment = static_cast<int64_t>(val.len + sizeof(uint32_t));
+ if (ARROW_PREDICT_FALSE(sink_.length() + increment > sink_.capacity())) {
+ PARQUET_THROW_NOT_OK(sink_.Reserve(increment));
+ }
+ UnsafePutByteArray(val.ptr, val.len);
+ }
+
+ protected:
+ template <typename ArrayType>
+ void PutBinaryArray(const ArrayType& array) {
+ const int64_t total_bytes =
+ array.value_offset(array.length()) - array.value_offset(0);
+ PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes + array.length() * sizeof(uint32_t)));
+
+ PARQUET_THROW_NOT_OK(::arrow::VisitArrayDataInline<typename ArrayType::TypeClass>(
+ *array.data(),
+ [&](::arrow::util::string_view view) {
+ if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) {
+ return Status::Invalid("Parquet cannot store strings with size 2GB or more");
+ }
+ UnsafePutByteArray(view.data(), static_cast<uint32_t>(view.size()));
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+ }
+
+ ::arrow::BufferBuilder sink_;
+};
+
+template <typename DType>
+void PlainEncoder<DType>::Put(const T* buffer, int num_values) {
+ if (num_values > 0) {
+ PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
+ }
+}
+
+template <>
+inline void PlainEncoder<ByteArrayType>::Put(const ByteArray* src, int num_values) {
+ for (int i = 0; i < num_values; ++i) {
+ Put(src[i]);
+ }
+}
+
+template <typename ArrayType>
+void DirectPutImpl(const ::arrow::Array& values, ::arrow::BufferBuilder* sink) {
+ if (values.type_id() != ArrayType::TypeClass::type_id) {
+ std::string type_name = ArrayType::TypeClass::type_name();
+ throw ParquetException("direct put to " + type_name + " from " +
+ values.type()->ToString() + " not supported");
+ }
+
+ using value_type = typename ArrayType::value_type;
+ constexpr auto value_size = sizeof(value_type);
+ auto raw_values = checked_cast<const ArrayType&>(values).raw_values();
+
+ if (values.null_count() == 0) {
+ // no nulls, just dump the data
+ PARQUET_THROW_NOT_OK(sink->Append(raw_values, values.length() * value_size));
+ } else {
+ PARQUET_THROW_NOT_OK(
+ sink->Reserve((values.length() - values.null_count()) * value_size));
+
+ for (int64_t i = 0; i < values.length(); i++) {
+ if (values.IsValid(i)) {
+ sink->UnsafeAppend(&raw_values[i], value_size);
+ }
+ }
+ }
+}
+
+template <>
+void PlainEncoder<Int32Type>::Put(const ::arrow::Array& values) {
+ DirectPutImpl<::arrow::Int32Array>(values, &sink_);
+}
+
+template <>
+void PlainEncoder<Int64Type>::Put(const ::arrow::Array& values) {
+ DirectPutImpl<::arrow::Int64Array>(values, &sink_);
+}
+
+template <>
+void PlainEncoder<Int96Type>::Put(const ::arrow::Array& values) {
+ ParquetException::NYI("direct put to Int96");
+}
+
+template <>
+void PlainEncoder<FloatType>::Put(const ::arrow::Array& values) {
+ DirectPutImpl<::arrow::FloatArray>(values, &sink_);
+}
+
+template <>
+void PlainEncoder<DoubleType>::Put(const ::arrow::Array& values) {
+ DirectPutImpl<::arrow::DoubleArray>(values, &sink_);
+}
+
+template <typename DType>
+void PlainEncoder<DType>::Put(const ::arrow::Array& values) {
+ ParquetException::NYI("direct put of " + values.type()->ToString());
+}
+
+void AssertBaseBinary(const ::arrow::Array& values) {
+ if (!::arrow::is_base_binary_like(values.type_id())) {
+ throw ParquetException("Only BaseBinaryArray and subclasses supported");
+ }
+}
+
+template <>
+inline void PlainEncoder<ByteArrayType>::Put(const ::arrow::Array& values) {
+ AssertBaseBinary(values);
+
+ if (::arrow::is_binary_like(values.type_id())) {
+ PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
+ } else {
+ DCHECK(::arrow::is_large_binary_like(values.type_id()));
+ PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
+ }
+}
+
+void AssertFixedSizeBinary(const ::arrow::Array& values, int type_length) {
+ if (values.type_id() != ::arrow::Type::FIXED_SIZE_BINARY &&
+ values.type_id() != ::arrow::Type::DECIMAL) {
+ throw ParquetException("Only FixedSizeBinaryArray and subclasses supported");
+ }
+ if (checked_cast<const ::arrow::FixedSizeBinaryType&>(*values.type()).byte_width() !=
+ type_length) {
+ throw ParquetException("Size mismatch: " + values.type()->ToString() +
+ " should have been " + std::to_string(type_length) + " wide");
+ }
+}
+
+template <>
+inline void PlainEncoder<FLBAType>::Put(const ::arrow::Array& values) {
+ AssertFixedSizeBinary(values, descr_->type_length());
+ const auto& data = checked_cast<const ::arrow::FixedSizeBinaryArray&>(values);
+
+ if (data.null_count() == 0) {
+ // no nulls, just dump the data
+ PARQUET_THROW_NOT_OK(
+ sink_.Append(data.raw_values(), data.length() * data.byte_width()));
+ } else {
+ const int64_t total_bytes =
+ data.length() * data.byte_width() - data.null_count() * data.byte_width();
+ PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes));
+ for (int64_t i = 0; i < data.length(); i++) {
+ if (data.IsValid(i)) {
+ sink_.UnsafeAppend(data.Value(i), data.byte_width());
+ }
+ }
+ }
+}
+
+template <>
+inline void PlainEncoder<FLBAType>::Put(const FixedLenByteArray* src, int num_values) {
+ if (descr_->type_length() == 0) {
+ return;
+ }
+ for (int i = 0; i < num_values; ++i) {
+ // Write the result to the output stream
+ DCHECK(src[i].ptr != nullptr) << "Value ptr cannot be NULL";
+ PARQUET_THROW_NOT_OK(sink_.Append(src[i].ptr, descr_->type_length()));
+ }
+}
+
+template <>
+class PlainEncoder<BooleanType> : public EncoderImpl, virtual public BooleanEncoder {
+ public:
+ explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
+ : EncoderImpl(descr, Encoding::PLAIN, pool),
+ bits_available_(kInMemoryDefaultCapacity * 8),
+ bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
+ sink_(pool),
+ bit_writer_(bits_buffer_->mutable_data(),
+ static_cast<int>(bits_buffer_->size())) {}
+
+ int64_t EstimatedDataEncodedSize() override;
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ void Put(const bool* src, int num_values) override;
+
+ void Put(const std::vector<bool>& src, int num_values) override;
+
+ void PutSpaced(const bool* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
+ this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+ }
+
+ void Put(const ::arrow::Array& values) override {
+ if (values.type_id() != ::arrow::Type::BOOL) {
+ throw ParquetException("direct put to boolean from " + values.type()->ToString() +
+ " not supported");
+ }
+
+ const auto& data = checked_cast<const ::arrow::BooleanArray&>(values);
+ if (data.null_count() == 0) {
+ PARQUET_THROW_NOT_OK(sink_.Reserve(BitUtil::BytesForBits(data.length())));
+ // no nulls, just dump the data
+ ::arrow::internal::CopyBitmap(data.data()->GetValues<uint8_t>(1), data.offset(),
+ data.length(), sink_.mutable_data(), sink_.length());
+ } else {
+ auto n_valid = BitUtil::BytesForBits(data.length() - data.null_count());
+ PARQUET_THROW_NOT_OK(sink_.Reserve(n_valid));
+ ::arrow::internal::FirstTimeBitmapWriter writer(sink_.mutable_data(),
+ sink_.length(), n_valid);
+
+ for (int64_t i = 0; i < data.length(); i++) {
+ if (data.IsValid(i)) {
+ if (data.Value(i)) {
+ writer.Set();
+ } else {
+ writer.Clear();
+ }
+ writer.Next();
+ }
+ }
+ writer.Finish();
+ }
+ sink_.UnsafeAdvance(data.length());
+ }
+
+ private:
+ int bits_available_;
+ std::shared_ptr<ResizableBuffer> bits_buffer_;
+ ::arrow::BufferBuilder sink_;
+ ::arrow::BitUtil::BitWriter bit_writer_;
+
+ template <typename SequenceType>
+ void PutImpl(const SequenceType& src, int num_values);
+};
+
+template <typename SequenceType>
+void PlainEncoder<BooleanType>::PutImpl(const SequenceType& src, int num_values) {
+ int bit_offset = 0;
+ if (bits_available_ > 0) {
+ int bits_to_write = std::min(bits_available_, num_values);
+ for (int i = 0; i < bits_to_write; i++) {
+ bit_writer_.PutValue(src[i], 1);
+ }
+ bits_available_ -= bits_to_write;
+ bit_offset = bits_to_write;
+
+ if (bits_available_ == 0) {
+ bit_writer_.Flush();
+ PARQUET_THROW_NOT_OK(
+ sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+ bit_writer_.Clear();
+ }
+ }
+
+ int bits_remaining = num_values - bit_offset;
+ while (bit_offset < num_values) {
+ bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;
+
+ int bits_to_write = std::min(bits_available_, bits_remaining);
+ for (int i = bit_offset; i < bit_offset + bits_to_write; i++) {
+ bit_writer_.PutValue(src[i], 1);
+ }
+ bit_offset += bits_to_write;
+ bits_available_ -= bits_to_write;
+ bits_remaining -= bits_to_write;
+
+ if (bits_available_ == 0) {
+ bit_writer_.Flush();
+ PARQUET_THROW_NOT_OK(
+ sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+ bit_writer_.Clear();
+ }
+ }
+}
+
+int64_t PlainEncoder<BooleanType>::EstimatedDataEncodedSize() {
+ int64_t position = sink_.length();
+ return position + bit_writer_.bytes_written();
+}
+
+std::shared_ptr<Buffer> PlainEncoder<BooleanType>::FlushValues() {
+ if (bits_available_ > 0) {
+ bit_writer_.Flush();
+ PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
+ bit_writer_.Clear();
+ bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;
+ }
+
+ std::shared_ptr<Buffer> buffer;
+ PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
+ return buffer;
+}
+
+void PlainEncoder<BooleanType>::Put(const bool* src, int num_values) {
+ PutImpl(src, num_values);
+}
+
+void PlainEncoder<BooleanType>::Put(const std::vector<bool>& src, int num_values) {
+ PutImpl(src, num_values);
+}
+
+// ----------------------------------------------------------------------
+// DictEncoder<T> implementations
+
+template <typename DType>
+struct DictEncoderTraits {
+ using c_type = typename DType::c_type;
+ using MemoTableType = ::arrow::internal::ScalarMemoTable<c_type>;
+};
+
+template <>
+struct DictEncoderTraits<ByteArrayType> {
+ using MemoTableType = ::arrow::internal::BinaryMemoTable<::arrow::BinaryBuilder>;
+};
+
+template <>
+struct DictEncoderTraits<FLBAType> {
+ using MemoTableType = ::arrow::internal::BinaryMemoTable<::arrow::BinaryBuilder>;
+};
+
+// Initially 1024 elements
+static constexpr int32_t kInitialHashTableSize = 1 << 10;
+
+/// See the dictionary encoding section of
+/// https://github.com/Parquet/parquet-format. The encoding supports
+/// streaming encoding. Values are encoded as they are added while the
+/// dictionary is being constructed. At any time, the buffered values
+/// can be written out with the current dictionary size. More values
+/// can then be added to the encoder, including new dictionary
+/// entries.
+template <typename DType>
+class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
+ using MemoTableType = typename DictEncoderTraits<DType>::MemoTableType;
+
+ public:
+ typedef typename DType::c_type T;
+
+ explicit DictEncoderImpl(const ColumnDescriptor* desc, MemoryPool* pool)
+ : EncoderImpl(desc, Encoding::PLAIN_DICTIONARY, pool),
+ buffered_indices_(::arrow::stl::allocator<int32_t>(pool)),
+ dict_encoded_size_(0),
+ memo_table_(pool, kInitialHashTableSize) {}
+
+ ~DictEncoderImpl() override { DCHECK(buffered_indices_.empty()); }
+
+ int dict_encoded_size() override { return dict_encoded_size_; }
+
+ int WriteIndices(uint8_t* buffer, int buffer_len) override {
+ // Write bit width in first byte
+ *buffer = static_cast<uint8_t>(bit_width());
+ ++buffer;
+ --buffer_len;
+
+ ::arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width());
+
+ for (int32_t index : buffered_indices_) {
+ if (!encoder.Put(index)) return -1;
+ }
+ encoder.Flush();
+
+ ClearIndices();
+ return 1 + encoder.len();
+ }
+
+ void set_type_length(int type_length) { this->type_length_ = type_length; }
+
+ /// Returns a conservative estimate of the number of bytes needed to encode the buffered
+ /// indices. Used to size the buffer passed to WriteIndices().
+ int64_t EstimatedDataEncodedSize() override {
+ // Note: because of the way RleEncoder::CheckBufferFull() is called, we have to
+ // reserve
+ // an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used
+ // but not reserving them would cause the encoder to fail.
+ return 1 +
+ ::arrow::util::RleEncoder::MaxBufferSize(
+ bit_width(), static_cast<int>(buffered_indices_.size())) +
+ ::arrow::util::RleEncoder::MinBufferSize(bit_width());
+ }
+
+ /// The minimum bit width required to encode the currently buffered indices.
+ int bit_width() const override {
+ if (ARROW_PREDICT_FALSE(num_entries() == 0)) return 0;
+ if (ARROW_PREDICT_FALSE(num_entries() == 1)) return 1;
+ return BitUtil::Log2(num_entries());
+ }
+
+ /// Encode value. Note that this does not actually write any data, just
+ /// buffers the value's index to be written later.
+ inline void Put(const T& value);
+
+ // Not implemented for other data types
+ inline void PutByteArray(const void* ptr, int32_t length);
+
+ void Put(const T* src, int num_values) override {
+ for (int32_t i = 0; i < num_values; i++) {
+ Put(src[i]);
+ }
+ }
+
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ ::arrow::internal::VisitSetBitRunsVoid(valid_bits, valid_bits_offset, num_values,
+ [&](int64_t position, int64_t length) {
+ for (int64_t i = 0; i < length; i++) {
+ Put(src[i + position]);
+ }
+ });
+ }
+
+ using TypedEncoder<DType>::Put;
+
+ void Put(const ::arrow::Array& values) override;
+ void PutDictionary(const ::arrow::Array& values) override;
+
+ template <typename ArrowType, typename T = typename ArrowType::c_type>
+ void PutIndicesTyped(const ::arrow::Array& data) {
+ auto values = data.data()->GetValues<T>(1);
+ size_t buffer_position = buffered_indices_.size();
+ buffered_indices_.resize(buffer_position +
+ static_cast<size_t>(data.length() - data.null_count()));
+ ::arrow::internal::VisitSetBitRunsVoid(
+ data.null_bitmap_data(), data.offset(), data.length(),
+ [&](int64_t position, int64_t length) {
+ for (int64_t i = 0; i < length; ++i) {
+ buffered_indices_[buffer_position++] =
+ static_cast<int32_t>(values[i + position]);
+ }
+ });
+ }
+
+ void PutIndices(const ::arrow::Array& data) override {
+ switch (data.type()->id()) {
+ case ::arrow::Type::UINT8:
+ case ::arrow::Type::INT8:
+ return PutIndicesTyped<::arrow::UInt8Type>(data);
+ case ::arrow::Type::UINT16:
+ case ::arrow::Type::INT16:
+ return PutIndicesTyped<::arrow::UInt16Type>(data);
+ case ::arrow::Type::UINT32:
+ case ::arrow::Type::INT32:
+ return PutIndicesTyped<::arrow::UInt32Type>(data);
+ case ::arrow::Type::UINT64:
+ case ::arrow::Type::INT64:
+ return PutIndicesTyped<::arrow::UInt64Type>(data);
+ default:
+ throw ParquetException("Passed non-integer array to PutIndices");
+ }
+ }
+
+ std::shared_ptr<Buffer> FlushValues() override {
+ std::shared_ptr<ResizableBuffer> buffer =
+ AllocateBuffer(this->pool_, EstimatedDataEncodedSize());
+ int result_size = WriteIndices(buffer->mutable_data(),
+ static_cast<int>(EstimatedDataEncodedSize()));
+ PARQUET_THROW_NOT_OK(buffer->Resize(result_size, false));
+ return std::move(buffer);
+ }
+
+ /// Writes out the encoded dictionary to buffer. buffer must be preallocated to
+ /// dict_encoded_size() bytes.
+ void WriteDict(uint8_t* buffer) override;
+
+ /// The number of entries in the dictionary.
+ int num_entries() const override { return memo_table_.size(); }
+
+ private:
+ /// Clears all the indices (but leaves the dictionary).
+ void ClearIndices() { buffered_indices_.clear(); }
+
+ /// Indices that have not yet be written out by WriteIndices().
+ ArrowPoolVector<int32_t> buffered_indices_;
+
+ template <typename ArrayType>
+ void PutBinaryArray(const ArrayType& array) {
+ PARQUET_THROW_NOT_OK(::arrow::VisitArrayDataInline<typename ArrayType::TypeClass>(
+ *array.data(),
+ [&](::arrow::util::string_view view) {
+ if (ARROW_PREDICT_FALSE(view.size() > kMaxByteArraySize)) {
+ return Status::Invalid("Parquet cannot store strings with size 2GB or more");
+ }
+ PutByteArray(view.data(), static_cast<uint32_t>(view.size()));
+ return Status::OK();
+ },
+ []() { return Status::OK(); }));
+ }
+
+ template <typename ArrayType>
+ void PutBinaryDictionaryArray(const ArrayType& array) {
+ DCHECK_EQ(array.null_count(), 0);
+ for (int64_t i = 0; i < array.length(); i++) {
+ auto v = array.GetView(i);
+ if (ARROW_PREDICT_FALSE(v.size() > kMaxByteArraySize)) {
+ throw ParquetException("Parquet cannot store strings with size 2GB or more");
+ }
+ dict_encoded_size_ += static_cast<int>(v.size() + sizeof(uint32_t));
+ int32_t unused_memo_index;
+ PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(
+ v.data(), static_cast<int32_t>(v.size()), &unused_memo_index));
+ }
+ }
+
+ /// The number of bytes needed to encode the dictionary.
+ int dict_encoded_size_;
+
+ MemoTableType memo_table_;
+};
+
+template <typename DType>
+void DictEncoderImpl<DType>::WriteDict(uint8_t* buffer) {
+ // For primitive types, only a memcpy
+ DCHECK_EQ(static_cast<size_t>(dict_encoded_size_), sizeof(T) * memo_table_.size());
+ memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast<T*>(buffer));
+}
+
+// ByteArray and FLBA already have the dictionary encoded in their data heaps
+template <>
+void DictEncoderImpl<ByteArrayType>::WriteDict(uint8_t* buffer) {
+ memo_table_.VisitValues(0, [&buffer](const ::arrow::util::string_view& v) {
+ uint32_t len = static_cast<uint32_t>(v.length());
+ memcpy(buffer, &len, sizeof(len));
+ buffer += sizeof(len);
+ memcpy(buffer, v.data(), len);
+ buffer += len;
+ });
+}
+
+template <>
+void DictEncoderImpl<FLBAType>::WriteDict(uint8_t* buffer) {
+ memo_table_.VisitValues(0, [&](const ::arrow::util::string_view& v) {
+ DCHECK_EQ(v.length(), static_cast<size_t>(type_length_));
+ memcpy(buffer, v.data(), type_length_);
+ buffer += type_length_;
+ });
+}
+
+template <typename DType>
+inline void DictEncoderImpl<DType>::Put(const T& v) {
+ // Put() implementation for primitive types
+ auto on_found = [](int32_t memo_index) {};
+ auto on_not_found = [this](int32_t memo_index) {
+ dict_encoded_size_ += static_cast<int>(sizeof(T));
+ };
+
+ int32_t memo_index;
+ PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(v, on_found, on_not_found, &memo_index));
+ buffered_indices_.push_back(memo_index);
+}
+
+template <typename DType>
+inline void DictEncoderImpl<DType>::PutByteArray(const void* ptr, int32_t length) {
+ DCHECK(false);
+}
+
+template <>
+inline void DictEncoderImpl<ByteArrayType>::PutByteArray(const void* ptr,
+ int32_t length) {
+ static const uint8_t empty[] = {0};
+
+ auto on_found = [](int32_t memo_index) {};
+ auto on_not_found = [&](int32_t memo_index) {
+ dict_encoded_size_ += static_cast<int>(length + sizeof(uint32_t));
+ };
+
+ DCHECK(ptr != nullptr || length == 0);
+ ptr = (ptr != nullptr) ? ptr : empty;
+ int32_t memo_index;
+ PARQUET_THROW_NOT_OK(
+ memo_table_.GetOrInsert(ptr, length, on_found, on_not_found, &memo_index));
+ buffered_indices_.push_back(memo_index);
+}
+
+template <>
+inline void DictEncoderImpl<ByteArrayType>::Put(const ByteArray& val) {
+ return PutByteArray(val.ptr, static_cast<int32_t>(val.len));
+}
+
+template <>
+inline void DictEncoderImpl<FLBAType>::Put(const FixedLenByteArray& v) {
+ static const uint8_t empty[] = {0};
+
+ auto on_found = [](int32_t memo_index) {};
+ auto on_not_found = [this](int32_t memo_index) { dict_encoded_size_ += type_length_; };
+
+ DCHECK(v.ptr != nullptr || type_length_ == 0);
+ const void* ptr = (v.ptr != nullptr) ? v.ptr : empty;
+ int32_t memo_index;
+ PARQUET_THROW_NOT_OK(
+ memo_table_.GetOrInsert(ptr, type_length_, on_found, on_not_found, &memo_index));
+ buffered_indices_.push_back(memo_index);
+}
+
+template <>
+void DictEncoderImpl<Int96Type>::Put(const ::arrow::Array& values) {
+ ParquetException::NYI("Direct put to Int96");
+}
+
+template <>
+void DictEncoderImpl<Int96Type>::PutDictionary(const ::arrow::Array& values) {
+ ParquetException::NYI("Direct put to Int96");
+}
+
+template <typename DType>
+void DictEncoderImpl<DType>::Put(const ::arrow::Array& values) {
+ using ArrayType = typename ::arrow::CTypeTraits<typename DType::c_type>::ArrayType;
+ const auto& data = checked_cast<const ArrayType&>(values);
+ if (data.null_count() == 0) {
+ // no nulls, just dump the data
+ for (int64_t i = 0; i < data.length(); i++) {
+ Put(data.Value(i));
+ }
+ } else {
+ for (int64_t i = 0; i < data.length(); i++) {
+ if (data.IsValid(i)) {
+ Put(data.Value(i));
+ }
+ }
+ }
+}
+
+template <>
+void DictEncoderImpl<FLBAType>::Put(const ::arrow::Array& values) {
+ AssertFixedSizeBinary(values, type_length_);
+ const auto& data = checked_cast<const ::arrow::FixedSizeBinaryArray&>(values);
+ if (data.null_count() == 0) {
+ // no nulls, just dump the data
+ for (int64_t i = 0; i < data.length(); i++) {
+ Put(FixedLenByteArray(data.Value(i)));
+ }
+ } else {
+ std::vector<uint8_t> empty(type_length_, 0);
+ for (int64_t i = 0; i < data.length(); i++) {
+ if (data.IsValid(i)) {
+ Put(FixedLenByteArray(data.Value(i)));
+ }
+ }
+ }
+}
+
+template <>
+void DictEncoderImpl<ByteArrayType>::Put(const ::arrow::Array& values) {
+ AssertBaseBinary(values);
+ if (::arrow::is_binary_like(values.type_id())) {
+ PutBinaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
+ } else {
+ DCHECK(::arrow::is_large_binary_like(values.type_id()));
+ PutBinaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
+ }
+}
+
+template <typename DType>
+void AssertCanPutDictionary(DictEncoderImpl<DType>* encoder, const ::arrow::Array& dict) {
+ if (dict.null_count() > 0) {
+ throw ParquetException("Inserted dictionary cannot cannot contain nulls");
+ }
+
+ if (encoder->num_entries() > 0) {
+ throw ParquetException("Can only call PutDictionary on an empty DictEncoder");
+ }
+}
+
+template <typename DType>
+void DictEncoderImpl<DType>::PutDictionary(const ::arrow::Array& values) {
+ AssertCanPutDictionary(this, values);
+
+ using ArrayType = typename ::arrow::CTypeTraits<typename DType::c_type>::ArrayType;
+ const auto& data = checked_cast<const ArrayType&>(values);
+
+ dict_encoded_size_ += static_cast<int>(sizeof(typename DType::c_type) * data.length());
+ for (int64_t i = 0; i < data.length(); i++) {
+ int32_t unused_memo_index;
+ PARQUET_THROW_NOT_OK(memo_table_.GetOrInsert(data.Value(i), &unused_memo_index));
+ }
+}
+
+template <>
+void DictEncoderImpl<FLBAType>::PutDictionary(const ::arrow::Array& values) {
+ AssertFixedSizeBinary(values, type_length_);
+ AssertCanPutDictionary(this, values);
+
+ const auto& data = checked_cast<const ::arrow::FixedSizeBinaryArray&>(values);
+
+ dict_encoded_size_ += static_cast<int>(type_length_ * data.length());
+ for (int64_t i = 0; i < data.length(); i++) {
+ int32_t unused_memo_index;
+ PARQUET_THROW_NOT_OK(
+ memo_table_.GetOrInsert(data.Value(i), type_length_, &unused_memo_index));
+ }
+}
+
+template <>
+void DictEncoderImpl<ByteArrayType>::PutDictionary(const ::arrow::Array& values) {
+ AssertBaseBinary(values);
+ AssertCanPutDictionary(this, values);
+
+ if (::arrow::is_binary_like(values.type_id())) {
+ PutBinaryDictionaryArray(checked_cast<const ::arrow::BinaryArray&>(values));
+ } else {
+ DCHECK(::arrow::is_large_binary_like(values.type_id()));
+ PutBinaryDictionaryArray(checked_cast<const ::arrow::LargeBinaryArray&>(values));
+ }
+}
+
+// ----------------------------------------------------------------------
+// ByteStreamSplitEncoder<T> implementations
+
+template <typename DType>
+class ByteStreamSplitEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
+ public:
+ using T = typename DType::c_type;
+ using TypedEncoder<DType>::Put;
+
+ explicit ByteStreamSplitEncoder(
+ const ColumnDescriptor* descr,
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
+ int64_t EstimatedDataEncodedSize() override;
+ std::shared_ptr<Buffer> FlushValues() override;
+
+ void Put(const T* buffer, int num_values) override;
+ void Put(const ::arrow::Array& values) override;
+ void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override;
+
+ protected:
+ template <typename ArrowType>
+ void PutImpl(const ::arrow::Array& values) {
+ if (values.type_id() != ArrowType::type_id) {
+ throw ParquetException(std::string() + "direct put to " + ArrowType::type_name() +
+ " from " + values.type()->ToString() + " not supported");
+ }
+ const auto& data = *values.data();
+ PutSpaced(data.GetValues<typename ArrowType::c_type>(1),
+ static_cast<int>(data.length), data.GetValues<uint8_t>(0, 0), data.offset);
+ }
+
+ ::arrow::BufferBuilder sink_;
+ int64_t num_values_in_buffer_;
+};
+
+template <typename DType>
+ByteStreamSplitEncoder<DType>::ByteStreamSplitEncoder(const ColumnDescriptor* descr,
+ ::arrow::MemoryPool* pool)
+ : EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool),
+ sink_{pool},
+ num_values_in_buffer_{0} {}
+
+template <typename DType>
+int64_t ByteStreamSplitEncoder<DType>::EstimatedDataEncodedSize() {
+ return sink_.length();
+}
+
+template <typename DType>
+std::shared_ptr<Buffer> ByteStreamSplitEncoder<DType>::FlushValues() {
+ std::shared_ptr<ResizableBuffer> output_buffer =
+ AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize());
+ uint8_t* output_buffer_raw = output_buffer->mutable_data();
+ const uint8_t* raw_values = sink_.data();
+ ::arrow::util::internal::ByteStreamSplitEncode<T>(raw_values, num_values_in_buffer_,
+ output_buffer_raw);
+ sink_.Reset();
+ num_values_in_buffer_ = 0;
+ return std::move(output_buffer);
+}
+
+template <typename DType>
+void ByteStreamSplitEncoder<DType>::Put(const T* buffer, int num_values) {
+ if (num_values > 0) {
+ PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
+ num_values_in_buffer_ += num_values;
+ }
+}
+
+template <>
+void ByteStreamSplitEncoder<FloatType>::Put(const ::arrow::Array& values) {
+ PutImpl<::arrow::FloatType>(values);
+}
+
+template <>
+void ByteStreamSplitEncoder<DoubleType>::Put(const ::arrow::Array& values) {
+ PutImpl<::arrow::DoubleType>(values);
+}
+
+template <typename DType>
+void ByteStreamSplitEncoder<DType>::PutSpaced(const T* src, int num_values,
+ const uint8_t* valid_bits,
+ int64_t valid_bits_offset) {
+ if (valid_bits != NULLPTR) {
+ PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T),
+ this->memory_pool()));
+ T* data = reinterpret_cast<T*>(buffer->mutable_data());
+ int num_valid_values = ::arrow::util::internal::SpacedCompress<T>(
+ src, num_values, valid_bits, valid_bits_offset, data);
+ Put(data, num_valid_values);
+ } else {
+ Put(src, num_values);
+ }
+}
+
+class DecoderImpl : virtual public Decoder {
+ public:
+ void SetData(int num_values, const uint8_t* data, int len) override {
+ num_values_ = num_values;
+ data_ = data;
+ len_ = len;
+ }
+
+ int values_left() const override { return num_values_; }
+ Encoding::type encoding() const override { return encoding_; }
+
+ protected:
+ explicit DecoderImpl(const ColumnDescriptor* descr, Encoding::type encoding)
+ : descr_(descr), encoding_(encoding), num_values_(0), data_(NULLPTR), len_(0) {}
+
+ // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
+ const ColumnDescriptor* descr_;
+
+ const Encoding::type encoding_;
+ int num_values_;
+ const uint8_t* data_;
+ int len_;
+ int type_length_;
+};
+
+template <typename DType>
+class PlainDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
+ public:
+ using T = typename DType::c_type;
+ explicit PlainDecoder(const ColumnDescriptor* descr);
+
+ int Decode(T* buffer, int max_values) override;
+
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<DType>::Accumulator* builder) override;
+
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<DType>::DictAccumulator* builder) override;
+};
+
+template <>
+inline int PlainDecoder<Int96Type>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<Int96Type>::Accumulator* builder) {
+ ParquetException::NYI("DecodeArrow not supported for Int96");
+}
+
+template <>
+inline int PlainDecoder<Int96Type>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<Int96Type>::DictAccumulator* builder) {
+ ParquetException::NYI("DecodeArrow not supported for Int96");
+}
+
+template <>
+inline int PlainDecoder<BooleanType>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<BooleanType>::DictAccumulator* builder) {
+ ParquetException::NYI("dictionaries of BooleanType");
+}
+
+template <typename DType>
+int PlainDecoder<DType>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<DType>::Accumulator* builder) {
+ using value_type = typename DType::c_type;
+
+ constexpr int value_size = static_cast<int>(sizeof(value_type));
+ int values_decoded = num_values - null_count;
+ if (ARROW_PREDICT_FALSE(len_ < value_size * values_decoded)) {
+ ParquetException::EofException();
+ }
+
+ PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
+
+ VisitNullBitmapInline(
+ valid_bits, valid_bits_offset, num_values, null_count,
+ [&]() {
+ builder->UnsafeAppend(::arrow::util::SafeLoadAs<value_type>(data_));
+ data_ += sizeof(value_type);
+ },
+ [&]() { builder->UnsafeAppendNull(); });
+
+ num_values_ -= values_decoded;
+ len_ -= sizeof(value_type) * values_decoded;
+ return values_decoded;
+}
+
+template <typename DType>
+int PlainDecoder<DType>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<DType>::DictAccumulator* builder) {
+ using value_type = typename DType::c_type;
+
+ constexpr int value_size = static_cast<int>(sizeof(value_type));
+ int values_decoded = num_values - null_count;
+ if (ARROW_PREDICT_FALSE(len_ < value_size * values_decoded)) {
+ ParquetException::EofException();
+ }
+
+ PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
+
+ VisitNullBitmapInline(
+ valid_bits, valid_bits_offset, num_values, null_count,
+ [&]() {
+ PARQUET_THROW_NOT_OK(
+ builder->Append(::arrow::util::SafeLoadAs<value_type>(data_)));
+ data_ += sizeof(value_type);
+ },
+ [&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); });
+
+ num_values_ -= values_decoded;
+ len_ -= sizeof(value_type) * values_decoded;
+ return values_decoded;
+}
+
+// Decode routine templated on C++ type rather than type enum
+template <typename T>
+inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values,
+ int type_length, T* out) {
+ int64_t bytes_to_decode = num_values * static_cast<int64_t>(sizeof(T));
+ if (bytes_to_decode > data_size || bytes_to_decode > INT_MAX) {
+ ParquetException::EofException();
+ }
+ // If bytes_to_decode == 0, data could be null
+ if (bytes_to_decode > 0) {
+ memcpy(out, data, bytes_to_decode);
+ }
+ return static_cast<int>(bytes_to_decode);
+}
+
+template <typename DType>
+PlainDecoder<DType>::PlainDecoder(const ColumnDescriptor* descr)
+ : DecoderImpl(descr, Encoding::PLAIN) {
+ if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
+ type_length_ = descr_->type_length();
+ } else {
+ type_length_ = -1;
+ }
+}
+
+// Template specialization for BYTE_ARRAY. The written values do not own their
+// own data.
+
+static inline int64_t ReadByteArray(const uint8_t* data, int64_t data_size,
+ ByteArray* out) {
+ if (ARROW_PREDICT_FALSE(data_size < 4)) {
+ ParquetException::EofException();
+ }
+ const int32_t len = ::arrow::util::SafeLoadAs<int32_t>(data);
+ if (len < 0) {
+ throw ParquetException("Invalid BYTE_ARRAY value");
+ }
+ const int64_t consumed_length = static_cast<int64_t>(len) + 4;
+ if (ARROW_PREDICT_FALSE(data_size < consumed_length)) {
+ ParquetException::EofException();
+ }
+ *out = ByteArray{static_cast<uint32_t>(len), data + 4};
+ return consumed_length;
+}
+
+template <>
+inline int DecodePlain<ByteArray>(const uint8_t* data, int64_t data_size, int num_values,
+ int type_length, ByteArray* out) {
+ int bytes_decoded = 0;
+ for (int i = 0; i < num_values; ++i) {
+ const auto increment = ReadByteArray(data, data_size, out + i);
+ if (ARROW_PREDICT_FALSE(increment > INT_MAX - bytes_decoded)) {
+ throw ParquetException("BYTE_ARRAY chunk too large");
+ }
+ data += increment;
+ data_size -= increment;
+ bytes_decoded += static_cast<int>(increment);
+ }
+ return bytes_decoded;
+}
+
+// Template specialization for FIXED_LEN_BYTE_ARRAY. The written values do not
+// own their own data.
+template <>
+inline int DecodePlain<FixedLenByteArray>(const uint8_t* data, int64_t data_size,
+ int num_values, int type_length,
+ FixedLenByteArray* out) {
+ int64_t bytes_to_decode = static_cast<int64_t>(type_length) * num_values;
+ if (bytes_to_decode > data_size || bytes_to_decode > INT_MAX) {
+ ParquetException::EofException();
+ }
+ for (int i = 0; i < num_values; ++i) {
+ out[i].ptr = data;
+ data += type_length;
+ data_size -= type_length;
+ }
+ return static_cast<int>(bytes_to_decode);
+}
+
+template <typename DType>
+int PlainDecoder<DType>::Decode(T* buffer, int max_values) {
+ max_values = std::min(max_values, num_values_);
+ int bytes_consumed = DecodePlain<T>(data_, len_, max_values, type_length_, buffer);
+ data_ += bytes_consumed;
+ len_ -= bytes_consumed;
+ num_values_ -= max_values;
+ return max_values;
+}
+
+class PlainBooleanDecoder : public DecoderImpl,
+ virtual public TypedDecoder<BooleanType>,
+ virtual public BooleanDecoder {
+ public:
+ explicit PlainBooleanDecoder(const ColumnDescriptor* descr);
+ void SetData(int num_values, const uint8_t* data, int len) override;
+
+ // Two flavors of bool decoding
+ int Decode(uint8_t* buffer, int max_values) override;
+ int Decode(bool* buffer, int max_values) override;
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<BooleanType>::Accumulator* out) override;
+
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<BooleanType>::DictAccumulator* out) override;
+
+ private:
+ std::unique_ptr<::arrow::BitUtil::BitReader> bit_reader_;
+};
+
+PlainBooleanDecoder::PlainBooleanDecoder(const ColumnDescriptor* descr)
+ : DecoderImpl(descr, Encoding::PLAIN) {}
+
+void PlainBooleanDecoder::SetData(int num_values, const uint8_t* data, int len) {
+ num_values_ = num_values;
+ bit_reader_.reset(new BitUtil::BitReader(data, len));
+}
+
+int PlainBooleanDecoder::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<BooleanType>::Accumulator* builder) {
+ int values_decoded = num_values - null_count;
+ if (ARROW_PREDICT_FALSE(num_values_ < values_decoded)) {
+ ParquetException::EofException();
+ }
+
+ PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
+
+ VisitNullBitmapInline(
+ valid_bits, valid_bits_offset, num_values, null_count,
+ [&]() {
+ bool value;
+ ARROW_IGNORE_EXPR(bit_reader_->GetValue(1, &value));
+ builder->UnsafeAppend(value);
+ },
+ [&]() { builder->UnsafeAppendNull(); });
+
+ num_values_ -= values_decoded;
+ return values_decoded;
+}
+
+inline int PlainBooleanDecoder::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<BooleanType>::DictAccumulator* builder) {
+ ParquetException::NYI("dictionaries of BooleanType");
+}
+
+int PlainBooleanDecoder::Decode(uint8_t* buffer, int max_values) {
+ max_values = std::min(max_values, num_values_);
+ bool val;
+ ::arrow::internal::BitmapWriter bit_writer(buffer, 0, max_values);
+ for (int i = 0; i < max_values; ++i) {
+ if (!bit_reader_->GetValue(1, &val)) {
+ ParquetException::EofException();
+ }
+ if (val) {
+ bit_writer.Set();
+ }
+ bit_writer.Next();
+ }
+ bit_writer.Finish();
+ num_values_ -= max_values;
+ return max_values;
+}
+
+int PlainBooleanDecoder::Decode(bool* buffer, int max_values) {
+ max_values = std::min(max_values, num_values_);
+ if (bit_reader_->GetBatch(1, buffer, max_values) != max_values) {
+ ParquetException::EofException();
+ }
+ num_values_ -= max_values;
+ return max_values;
+}
+
+struct ArrowBinaryHelper {
+ explicit ArrowBinaryHelper(typename EncodingTraits<ByteArrayType>::Accumulator* out) {
+ this->out = out;
+ this->builder = out->builder.get();
+ this->chunk_space_remaining =
+ ::arrow::kBinaryMemoryLimit - this->builder->value_data_length();
+ }
+
+ Status PushChunk() {
+ std::shared_ptr<::arrow::Array> result;
+ RETURN_NOT_OK(builder->Finish(&result));
+ out->chunks.push_back(result);
+ chunk_space_remaining = ::arrow::kBinaryMemoryLimit;
+ return Status::OK();
+ }
+
+ bool CanFit(int64_t length) const { return length <= chunk_space_remaining; }
+
+ void UnsafeAppend(const uint8_t* data, int32_t length) {
+ chunk_space_remaining -= length;
+ builder->UnsafeAppend(data, length);
+ }
+
+ void UnsafeAppendNull() { builder->UnsafeAppendNull(); }
+
+ Status Append(const uint8_t* data, int32_t length) {
+ chunk_space_remaining -= length;
+ return builder->Append(data, length);
+ }
+
+ Status AppendNull() { return builder->AppendNull(); }
+
+ typename EncodingTraits<ByteArrayType>::Accumulator* out;
+ ::arrow::BinaryBuilder* builder;
+ int64_t chunk_space_remaining;
+};
+
+template <>
+inline int PlainDecoder<ByteArrayType>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<ByteArrayType>::Accumulator* builder) {
+ ParquetException::NYI();
+}
+
+template <>
+inline int PlainDecoder<ByteArrayType>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<ByteArrayType>::DictAccumulator* builder) {
+ ParquetException::NYI();
+}
+
+template <>
+inline int PlainDecoder<FLBAType>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<FLBAType>::Accumulator* builder) {
+ int values_decoded = num_values - null_count;
+ if (ARROW_PREDICT_FALSE(len_ < descr_->type_length() * values_decoded)) {
+ ParquetException::EofException();
+ }
+
+ PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
+
+ VisitNullBitmapInline(
+ valid_bits, valid_bits_offset, num_values, null_count,
+ [&]() {
+ builder->UnsafeAppend(data_);
+ data_ += descr_->type_length();
+ },
+ [&]() { builder->UnsafeAppendNull(); });
+
+ num_values_ -= values_decoded;
+ len_ -= descr_->type_length() * values_decoded;
+ return values_decoded;
+}
+
+template <>
+inline int PlainDecoder<FLBAType>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<FLBAType>::DictAccumulator* builder) {
+ int values_decoded = num_values - null_count;
+ if (ARROW_PREDICT_FALSE(len_ < descr_->type_length() * values_decoded)) {
+ ParquetException::EofException();
+ }
+
+ PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
+
+ VisitNullBitmapInline(
+ valid_bits, valid_bits_offset, num_values, null_count,
+ [&]() {
+ PARQUET_THROW_NOT_OK(builder->Append(data_));
+ data_ += descr_->type_length();
+ },
+ [&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); });
+
+ num_values_ -= values_decoded;
+ len_ -= descr_->type_length() * values_decoded;
+ return values_decoded;
+}
+
+class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>,
+ virtual public ByteArrayDecoder {
+ public:
+ using Base = PlainDecoder<ByteArrayType>;
+ using Base::DecodeSpaced;
+ using Base::PlainDecoder;
+
+ // ----------------------------------------------------------------------
+ // Dictionary read paths
+
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ ::arrow::BinaryDictionary32Builder* builder) override {
+ int result = 0;
+ PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits,
+ valid_bits_offset, builder, &result));
+ return result;
+ }
+
+ // ----------------------------------------------------------------------
+ // Optimized dense binary read paths
+
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<ByteArrayType>::Accumulator* out) override {
+ int result = 0;
+ PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits,
+ valid_bits_offset, out, &result));
+ return result;
+ }
+
+ private:
+ Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<ByteArrayType>::Accumulator* out,
+ int* out_values_decoded) {
+ ArrowBinaryHelper helper(out);
+ int values_decoded = 0;
+
+ RETURN_NOT_OK(helper.builder->Reserve(num_values));
+ RETURN_NOT_OK(helper.builder->ReserveData(
+ std::min<int64_t>(len_, helper.chunk_space_remaining)));
+
+ int i = 0;
+ RETURN_NOT_OK(VisitNullBitmapInline(
+ valid_bits, valid_bits_offset, num_values, null_count,
+ [&]() {
+ if (ARROW_PREDICT_FALSE(len_ < 4)) {
+ ParquetException::EofException();
+ }
+ auto value_len = ::arrow::util::SafeLoadAs<int32_t>(data_);
+ if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) {
+ return Status::Invalid("Invalid or corrupted value_len '", value_len, "'");
+ }
+ auto increment = value_len + 4;
+ if (ARROW_PREDICT_FALSE(len_ < increment)) {
+ ParquetException::EofException();
+ }
+ if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) {
+ // This element would exceed the capacity of a chunk
+ RETURN_NOT_OK(helper.PushChunk());
+ RETURN_NOT_OK(helper.builder->Reserve(num_values - i));
+ RETURN_NOT_OK(helper.builder->ReserveData(
+ std::min<int64_t>(len_, helper.chunk_space_remaining)));
+ }
+ helper.UnsafeAppend(data_ + 4, value_len);
+ data_ += increment;
+ len_ -= increment;
+ ++values_decoded;
+ ++i;
+ return Status::OK();
+ },
+ [&]() {
+ helper.UnsafeAppendNull();
+ ++i;
+ return Status::OK();
+ }));
+
+ num_values_ -= values_decoded;
+ *out_values_decoded = values_decoded;
+ return Status::OK();
+ }
+
+ template <typename BuilderType>
+ Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset, BuilderType* builder,
+ int* out_values_decoded) {
+ RETURN_NOT_OK(builder->Reserve(num_values));
+ int values_decoded = 0;
+
+ RETURN_NOT_OK(VisitNullBitmapInline(
+ valid_bits, valid_bits_offset, num_values, null_count,
+ [&]() {
+ if (ARROW_PREDICT_FALSE(len_ < 4)) {
+ ParquetException::EofException();
+ }
+ auto value_len = ::arrow::util::SafeLoadAs<int32_t>(data_);
+ if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) {
+ return Status::Invalid("Invalid or corrupted value_len '", value_len, "'");
+ }
+ auto increment = value_len + 4;
+ if (ARROW_PREDICT_FALSE(len_ < increment)) {
+ ParquetException::EofException();
+ }
+ RETURN_NOT_OK(builder->Append(data_ + 4, value_len));
+ data_ += increment;
+ len_ -= increment;
+ ++values_decoded;
+ return Status::OK();
+ },
+ [&]() { return builder->AppendNull(); }));
+
+ num_values_ -= values_decoded;
+ *out_values_decoded = values_decoded;
+ return Status::OK();
+ }
+};
+
+class PlainFLBADecoder : public PlainDecoder<FLBAType>, virtual public FLBADecoder {
+ public:
+ using Base = PlainDecoder<FLBAType>;
+ using Base::PlainDecoder;
+};
+
+// ----------------------------------------------------------------------
+// Dictionary encoding and decoding
+
+template <typename Type>
+class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> {
+ public:
+ typedef typename Type::c_type T;
+
+ // Initializes the dictionary with values from 'dictionary'. The data in
+ // dictionary is not guaranteed to persist in memory after this call so the
+ // dictionary decoder needs to copy the data out if necessary.
+ explicit DictDecoderImpl(const ColumnDescriptor* descr,
+ MemoryPool* pool = ::arrow::default_memory_pool())
+ : DecoderImpl(descr, Encoding::RLE_DICTIONARY),
+ dictionary_(AllocateBuffer(pool, 0)),
+ dictionary_length_(0),
+ byte_array_data_(AllocateBuffer(pool, 0)),
+ byte_array_offsets_(AllocateBuffer(pool, 0)),
+ indices_scratch_space_(AllocateBuffer(pool, 0)) {}
+
+ // Perform type-specific initiatialization
+ void SetDict(TypedDecoder<Type>* dictionary) override;
+
+ void SetData(int num_values, const uint8_t* data, int len) override {
+ num_values_ = num_values;
+ if (len == 0) {
+ // Initialize dummy decoder to avoid crashes later on
+ idx_decoder_ = ::arrow::util::RleDecoder(data, len, /*bit_width=*/1);
+ return;
+ }
+ uint8_t bit_width = *data;
+ if (ARROW_PREDICT_FALSE(bit_width >= 64)) {
+ throw ParquetException("Invalid or corrupted bit_width");
+ }
+ idx_decoder_ = ::arrow::util::RleDecoder(++data, --len, bit_width);
+ }
+
+ int Decode(T* buffer, int num_values) override {
+ num_values = std::min(num_values, num_values_);
+ int decoded_values =
+ idx_decoder_.GetBatchWithDict(reinterpret_cast<const T*>(dictionary_->data()),
+ dictionary_length_, buffer, num_values);
+ if (decoded_values != num_values) {
+ ParquetException::EofException();
+ }
+ num_values_ -= num_values;
+ return num_values;
+ }
+
+ int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset) override {
+ num_values = std::min(num_values, num_values_);
+ if (num_values != idx_decoder_.GetBatchWithDictSpaced(
+ reinterpret_cast<const T*>(dictionary_->data()),
+ dictionary_length_, buffer, num_values, null_count, valid_bits,
+ valid_bits_offset)) {
+ ParquetException::EofException();
+ }
+ num_values_ -= num_values;
+ return num_values;
+ }
+
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<Type>::Accumulator* out) override;
+
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<Type>::DictAccumulator* out) override;
+
+ void InsertDictionary(::arrow::ArrayBuilder* builder) override;
+
+ int DecodeIndicesSpaced(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ ::arrow::ArrayBuilder* builder) override {
+ if (num_values > 0) {
+ // TODO(wesm): Refactor to batch reads for improved memory use. It is not
+ // trivial because the null_count is relative to the entire bitmap
+ PARQUET_THROW_NOT_OK(indices_scratch_space_->TypedResize<int32_t>(
+ num_values, /*shrink_to_fit=*/false));
+ }
+
+ auto indices_buffer =
+ reinterpret_cast<int32_t*>(indices_scratch_space_->mutable_data());
+
+ if (num_values != idx_decoder_.GetBatchSpaced(num_values, null_count, valid_bits,
+ valid_bits_offset, indices_buffer)) {
+ ParquetException::EofException();
+ }
+
+ /// XXX(wesm): Cannot append "valid bits" directly to the builder
+ std::vector<uint8_t> valid_bytes(num_values);
+ ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
+ for (int64_t i = 0; i < num_values; ++i) {
+ valid_bytes[i] = static_cast<uint8_t>(bit_reader.IsSet());
+ bit_reader.Next();
+ }
+
+ auto binary_builder = checked_cast<::arrow::BinaryDictionary32Builder*>(builder);
+ PARQUET_THROW_NOT_OK(
+ binary_builder->AppendIndices(indices_buffer, num_values, valid_bytes.data()));
+ num_values_ -= num_values - null_count;
+ return num_values - null_count;
+ }
+
+ int DecodeIndices(int num_values, ::arrow::ArrayBuilder* builder) override {
+ num_values = std::min(num_values, num_values_);
+ if (num_values > 0) {
+ // TODO(wesm): Refactor to batch reads for improved memory use. This is
+ // relatively simple here because we don't have to do any bookkeeping of
+ // nulls
+ PARQUET_THROW_NOT_OK(indices_scratch_space_->TypedResize<int32_t>(
+ num_values, /*shrink_to_fit=*/false));
+ }
+ auto indices_buffer =
+ reinterpret_cast<int32_t*>(indices_scratch_space_->mutable_data());
+ if (num_values != idx_decoder_.GetBatch(indices_buffer, num_values)) {
+ ParquetException::EofException();
+ }
+ auto binary_builder = checked_cast<::arrow::BinaryDictionary32Builder*>(builder);
+ PARQUET_THROW_NOT_OK(binary_builder->AppendIndices(indices_buffer, num_values));
+ num_values_ -= num_values;
+ return num_values;
+ }
+
+ int DecodeIndices(int num_values, int32_t* indices) override {
+ if (num_values != idx_decoder_.GetBatch(indices, num_values)) {
+ ParquetException::EofException();
+ }
+ num_values_ -= num_values;
+ return num_values;
+ }
+
+ void GetDictionary(const T** dictionary, int32_t* dictionary_length) override {
+ *dictionary_length = dictionary_length_;
+ *dictionary = reinterpret_cast<T*>(dictionary_->mutable_data());
+ }
+
+ protected:
+ Status IndexInBounds(int32_t index) {
+ if (ARROW_PREDICT_TRUE(0 <= index && index < dictionary_length_)) {
+ return Status::OK();
+ }
+ return Status::Invalid("Index not in dictionary bounds");
+ }
+
+ inline void DecodeDict(TypedDecoder<Type>* dictionary) {
+ dictionary_length_ = static_cast<int32_t>(dictionary->values_left());
+ PARQUET_THROW_NOT_OK(dictionary_->Resize(dictionary_length_ * sizeof(T),
+ /*shrink_to_fit=*/false));
+ dictionary->Decode(reinterpret_cast<T*>(dictionary_->mutable_data()),
+ dictionary_length_);
+ }
+
+ // Only one is set.
+ std::shared_ptr<ResizableBuffer> dictionary_;
+
+ int32_t dictionary_length_;
+
+ // Data that contains the byte array data (byte_array_dictionary_ just has the
+ // pointers).
+ std::shared_ptr<ResizableBuffer> byte_array_data_;
+
+ // Arrow-style byte offsets for each dictionary value. We maintain two
+ // representations of the dictionary, one as ByteArray* for non-Arrow
+ // consumers and this one for Arrow consumers. Since dictionaries are
+ // generally pretty small to begin with this doesn't mean too much extra
+ // memory use in most cases
+ std::shared_ptr<ResizableBuffer> byte_array_offsets_;
+
+ // Reusable buffer for decoding dictionary indices to be appended to a
+ // BinaryDictionary32Builder
+ std::shared_ptr<ResizableBuffer> indices_scratch_space_;
+
+ ::arrow::util::RleDecoder idx_decoder_;
+};
+
+template <typename Type>
+void DictDecoderImpl<Type>::SetDict(TypedDecoder<Type>* dictionary) {
+ DecodeDict(dictionary);
+}
+
+template <>
+void DictDecoderImpl<BooleanType>::SetDict(TypedDecoder<BooleanType>* dictionary) {
+ ParquetException::NYI("Dictionary encoding is not implemented for boolean values");
+}
+
+template <>
+void DictDecoderImpl<ByteArrayType>::SetDict(TypedDecoder<ByteArrayType>* dictionary) {
+ DecodeDict(dictionary);
+
+ auto dict_values = reinterpret_cast<ByteArray*>(dictionary_->mutable_data());
+
+ int total_size = 0;
+ for (int i = 0; i < dictionary_length_; ++i) {
+ total_size += dict_values[i].len;
+ }
+ PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size,
+ /*shrink_to_fit=*/false));
+ PARQUET_THROW_NOT_OK(
+ byte_array_offsets_->Resize((dictionary_length_ + 1) * sizeof(int32_t),
+ /*shrink_to_fit=*/false));
+
+ int32_t offset = 0;
+ uint8_t* bytes_data = byte_array_data_->mutable_data();
+ int32_t* bytes_offsets =
+ reinterpret_cast<int32_t*>(byte_array_offsets_->mutable_data());
+ for (int i = 0; i < dictionary_length_; ++i) {
+ memcpy(bytes_data + offset, dict_values[i].ptr, dict_values[i].len);
+ bytes_offsets[i] = offset;
+ dict_values[i].ptr = bytes_data + offset;
+ offset += dict_values[i].len;
+ }
+ bytes_offsets[dictionary_length_] = offset;
+}
+
+template <>
+inline void DictDecoderImpl<FLBAType>::SetDict(TypedDecoder<FLBAType>* dictionary) {
+ DecodeDict(dictionary);
+
+ auto dict_values = reinterpret_cast<FLBA*>(dictionary_->mutable_data());
+
+ int fixed_len = descr_->type_length();
+ int total_size = dictionary_length_ * fixed_len;
+
+ PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size,
+ /*shrink_to_fit=*/false));
+ uint8_t* bytes_data = byte_array_data_->mutable_data();
+ for (int32_t i = 0, offset = 0; i < dictionary_length_; ++i, offset += fixed_len) {
+ memcpy(bytes_data + offset, dict_values[i].ptr, fixed_len);
+ dict_values[i].ptr = bytes_data + offset;
+ }
+}
+
+template <>
+inline int DictDecoderImpl<Int96Type>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<Int96Type>::Accumulator* builder) {
+ ParquetException::NYI("DecodeArrow to Int96Type");
+}
+
+template <>
+inline int DictDecoderImpl<Int96Type>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<Int96Type>::DictAccumulator* builder) {
+ ParquetException::NYI("DecodeArrow to Int96Type");
+}
+
+template <>
+inline int DictDecoderImpl<ByteArrayType>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<ByteArrayType>::Accumulator* builder) {
+ ParquetException::NYI("DecodeArrow implemented elsewhere");
+}
+
+template <>
+inline int DictDecoderImpl<ByteArrayType>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<ByteArrayType>::DictAccumulator* builder) {
+ ParquetException::NYI("DecodeArrow implemented elsewhere");
+}
+
+template <typename DType>
+int DictDecoderImpl<DType>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<DType>::DictAccumulator* builder) {
+ PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
+
+ auto dict_values = reinterpret_cast<const typename DType::c_type*>(dictionary_->data());
+
+ VisitNullBitmapInline(
+ valid_bits, valid_bits_offset, num_values, null_count,
+ [&]() {
+ int32_t index;
+ if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) {
+ throw ParquetException("");
+ }
+ PARQUET_THROW_NOT_OK(IndexInBounds(index));
+ PARQUET_THROW_NOT_OK(builder->Append(dict_values[index]));
+ },
+ [&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); });
+
+ return num_values - null_count;
+}
+
+template <>
+int DictDecoderImpl<BooleanType>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<BooleanType>::DictAccumulator* builder) {
+ ParquetException::NYI("No dictionary encoding for BooleanType");
+}
+
+template <>
+inline int DictDecoderImpl<FLBAType>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<FLBAType>::Accumulator* builder) {
+ if (builder->byte_width() != descr_->type_length()) {
+ throw ParquetException("Byte width mismatch: builder was " +
+ std::to_string(builder->byte_width()) + " but decoder was " +
+ std::to_string(descr_->type_length()));
+ }
+
+ PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
+
+ auto dict_values = reinterpret_cast<const FLBA*>(dictionary_->data());
+
+ VisitNullBitmapInline(
+ valid_bits, valid_bits_offset, num_values, null_count,
+ [&]() {
+ int32_t index;
+ if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) {
+ throw ParquetException("");
+ }
+ PARQUET_THROW_NOT_OK(IndexInBounds(index));
+ builder->UnsafeAppend(dict_values[index].ptr);
+ },
+ [&]() { builder->UnsafeAppendNull(); });
+
+ return num_values - null_count;
+}
+
+template <>
+int DictDecoderImpl<FLBAType>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<FLBAType>::DictAccumulator* builder) {
+ auto value_type =
+ checked_cast<const ::arrow::DictionaryType&>(*builder->type()).value_type();
+ auto byte_width =
+ checked_cast<const ::arrow::FixedSizeBinaryType&>(*value_type).byte_width();
+ if (byte_width != descr_->type_length()) {
+ throw ParquetException("Byte width mismatch: builder was " +
+ std::to_string(byte_width) + " but decoder was " +
+ std::to_string(descr_->type_length()));
+ }
+
+ PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
+
+ auto dict_values = reinterpret_cast<const FLBA*>(dictionary_->data());
+
+ VisitNullBitmapInline(
+ valid_bits, valid_bits_offset, num_values, null_count,
+ [&]() {
+ int32_t index;
+ if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) {
+ throw ParquetException("");
+ }
+ PARQUET_THROW_NOT_OK(IndexInBounds(index));
+ PARQUET_THROW_NOT_OK(builder->Append(dict_values[index].ptr));
+ },
+ [&]() { PARQUET_THROW_NOT_OK(builder->AppendNull()); });
+
+ return num_values - null_count;
+}
+
+template <typename Type>
+int DictDecoderImpl<Type>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<Type>::Accumulator* builder) {
+ PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
+
+ using value_type = typename Type::c_type;
+ auto dict_values = reinterpret_cast<const value_type*>(dictionary_->data());
+
+ VisitNullBitmapInline(
+ valid_bits, valid_bits_offset, num_values, null_count,
+ [&]() {
+ int32_t index;
+ if (ARROW_PREDICT_FALSE(!idx_decoder_.Get(&index))) {
+ throw ParquetException("");
+ }
+ PARQUET_THROW_NOT_OK(IndexInBounds(index));
+ builder->UnsafeAppend(dict_values[index]);
+ },
+ [&]() { builder->UnsafeAppendNull(); });
+
+ return num_values - null_count;
+}
+
+template <typename Type>
+void DictDecoderImpl<Type>::InsertDictionary(::arrow::ArrayBuilder* builder) {
+ ParquetException::NYI("InsertDictionary only implemented for BYTE_ARRAY types");
+}
+
+template <>
+void DictDecoderImpl<ByteArrayType>::InsertDictionary(::arrow::ArrayBuilder* builder) {
+ auto binary_builder = checked_cast<::arrow::BinaryDictionary32Builder*>(builder);
+
+ // Make a BinaryArray referencing the internal dictionary data
+ auto arr = std::make_shared<::arrow::BinaryArray>(
+ dictionary_length_, byte_array_offsets_, byte_array_data_);
+ PARQUET_THROW_NOT_OK(binary_builder->InsertMemoValues(*arr));
+}
+
+class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
+ virtual public ByteArrayDecoder {
+ public:
+ using BASE = DictDecoderImpl<ByteArrayType>;
+ using BASE::DictDecoderImpl;
+
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ ::arrow::BinaryDictionary32Builder* builder) override {
+ int result = 0;
+ if (null_count == 0) {
+ PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result));
+ } else {
+ PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits,
+ valid_bits_offset, builder, &result));
+ }
+ return result;
+ }
+
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<ByteArrayType>::Accumulator* out) override {
+ int result = 0;
+ if (null_count == 0) {
+ PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull(num_values, out, &result));
+ } else {
+ PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits,
+ valid_bits_offset, out, &result));
+ }
+ return result;
+ }
+
+ private:
+ Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<ByteArrayType>::Accumulator* out,
+ int* out_num_values) {
+ constexpr int32_t kBufferSize = 1024;
+ int32_t indices[kBufferSize];
+
+ ArrowBinaryHelper helper(out);
+
+ ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
+
+ auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
+ int values_decoded = 0;
+ int num_appended = 0;
+ while (num_appended < num_values) {
+ bool is_valid = bit_reader.IsSet();
+ bit_reader.Next();
+
+ if (is_valid) {
+ int32_t batch_size =
+ std::min<int32_t>(kBufferSize, num_values - num_appended - null_count);
+ int num_indices = idx_decoder_.GetBatch(indices, batch_size);
+
+ if (ARROW_PREDICT_FALSE(num_indices < 1)) {
+ return Status::Invalid("Invalid number of indices '", num_indices, "'");
+ }
+
+ int i = 0;
+ while (true) {
+ // Consume all indices
+ if (is_valid) {
+ auto idx = indices[i];
+ RETURN_NOT_OK(IndexInBounds(idx));
+ const auto& val = dict_values[idx];
+ if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
+ RETURN_NOT_OK(helper.PushChunk());
+ }
+ RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
+ ++i;
+ ++values_decoded;
+ } else {
+ RETURN_NOT_OK(helper.AppendNull());
+ --null_count;
+ }
+ ++num_appended;
+ if (i == num_indices) {
+ // Do not advance the bit_reader if we have fulfilled the decode
+ // request
+ break;
+ }
+ is_valid = bit_reader.IsSet();
+ bit_reader.Next();
+ }
+ } else {
+ RETURN_NOT_OK(helper.AppendNull());
+ --null_count;
+ ++num_appended;
+ }
+ }
+ *out_num_values = values_decoded;
+ return Status::OK();
+ }
+
+ Status DecodeArrowDenseNonNull(int num_values,
+ typename EncodingTraits<ByteArrayType>::Accumulator* out,
+ int* out_num_values) {
+ constexpr int32_t kBufferSize = 2048;
+ int32_t indices[kBufferSize];
+ int values_decoded = 0;
+
+ ArrowBinaryHelper helper(out);
+ auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
+
+ while (values_decoded < num_values) {
+ int32_t batch_size = std::min<int32_t>(kBufferSize, num_values - values_decoded);
+ int num_indices = idx_decoder_.GetBatch(indices, batch_size);
+ if (num_indices == 0) ParquetException::EofException();
+ for (int i = 0; i < num_indices; ++i) {
+ auto idx = indices[i];
+ RETURN_NOT_OK(IndexInBounds(idx));
+ const auto& val = dict_values[idx];
+ if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
+ RETURN_NOT_OK(helper.PushChunk());
+ }
+ RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
+ }
+ values_decoded += num_indices;
+ }
+ *out_num_values = values_decoded;
+ return Status::OK();
+ }
+
+ template <typename BuilderType>
+ Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset, BuilderType* builder,
+ int* out_num_values) {
+ constexpr int32_t kBufferSize = 1024;
+ int32_t indices[kBufferSize];
+
+ RETURN_NOT_OK(builder->Reserve(num_values));
+ ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
+
+ auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
+
+ int values_decoded = 0;
+ int num_appended = 0;
+ while (num_appended < num_values) {
+ bool is_valid = bit_reader.IsSet();
+ bit_reader.Next();
+
+ if (is_valid) {
+ int32_t batch_size =
+ std::min<int32_t>(kBufferSize, num_values - num_appended - null_count);
+ int num_indices = idx_decoder_.GetBatch(indices, batch_size);
+
+ int i = 0;
+ while (true) {
+ // Consume all indices
+ if (is_valid) {
+ auto idx = indices[i];
+ RETURN_NOT_OK(IndexInBounds(idx));
+ const auto& val = dict_values[idx];
+ RETURN_NOT_OK(builder->Append(val.ptr, val.len));
+ ++i;
+ ++values_decoded;
+ } else {
+ RETURN_NOT_OK(builder->AppendNull());
+ --null_count;
+ }
+ ++num_appended;
+ if (i == num_indices) {
+ // Do not advance the bit_reader if we have fulfilled the decode
+ // request
+ break;
+ }
+ is_valid = bit_reader.IsSet();
+ bit_reader.Next();
+ }
+ } else {
+ RETURN_NOT_OK(builder->AppendNull());
+ --null_count;
+ ++num_appended;
+ }
+ }
+ *out_num_values = values_decoded;
+ return Status::OK();
+ }
+
+ template <typename BuilderType>
+ Status DecodeArrowNonNull(int num_values, BuilderType* builder, int* out_num_values) {
+ constexpr int32_t kBufferSize = 2048;
+ int32_t indices[kBufferSize];
+
+ RETURN_NOT_OK(builder->Reserve(num_values));
+
+ auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
+
+ int values_decoded = 0;
+ while (values_decoded < num_values) {
+ int32_t batch_size = std::min<int32_t>(kBufferSize, num_values - values_decoded);
+ int num_indices = idx_decoder_.GetBatch(indices, batch_size);
+ if (num_indices == 0) ParquetException::EofException();
+ for (int i = 0; i < num_indices; ++i) {
+ auto idx = indices[i];
+ RETURN_NOT_OK(IndexInBounds(idx));
+ const auto& val = dict_values[idx];
+ RETURN_NOT_OK(builder->Append(val.ptr, val.len));
+ }
+ values_decoded += num_indices;
+ }
+ *out_num_values = values_decoded;
+ return Status::OK();
+ }
+};
+
+// ----------------------------------------------------------------------
+// DeltaBitPackDecoder
+
+template <typename DType>
+class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
+ public:
+ typedef typename DType::c_type T;
+
+ explicit DeltaBitPackDecoder(const ColumnDescriptor* descr,
+ MemoryPool* pool = ::arrow::default_memory_pool())
+ : DecoderImpl(descr, Encoding::DELTA_BINARY_PACKED), pool_(pool) {
+ if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) {
+ throw ParquetException("Delta bit pack encoding should only be for integer data.");
+ }
+ }
+
+ void SetData(int num_values, const uint8_t* data, int len) override {
+ this->num_values_ = num_values;
+ decoder_ = ::arrow::BitUtil::BitReader(data, len);
+ values_current_block_ = 0;
+ values_current_mini_block_ = 0;
+ }
+
+ int Decode(T* buffer, int max_values) override {
+ return GetInternal(buffer, max_values);
+ }
+
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<DType>::Accumulator* out) override {
+ if (null_count != 0) {
+ ParquetException::NYI("Delta bit pack DecodeArrow with null slots");
+ }
+ std::vector<T> values(num_values);
+ GetInternal(values.data(), num_values);
+ PARQUET_THROW_NOT_OK(out->AppendValues(values));
+ return num_values;
+ }
+
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<DType>::DictAccumulator* out) override {
+ if (null_count != 0) {
+ ParquetException::NYI("Delta bit pack DecodeArrow with null slots");
+ }
+ std::vector<T> values(num_values);
+ GetInternal(values.data(), num_values);
+ PARQUET_THROW_NOT_OK(out->Reserve(num_values));
+ for (T value : values) {
+ PARQUET_THROW_NOT_OK(out->Append(value));
+ }
+ return num_values;
+ }
+
+ private:
+ void InitBlock() {
+ // The number of values per block.
+ uint32_t block_size;
+ if (!decoder_.GetVlqInt(&block_size)) ParquetException::EofException();
+ if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException();
+ if (!decoder_.GetVlqInt(&values_current_block_)) {
+ ParquetException::EofException();
+ }
+ if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException();
+
+ delta_bit_widths_ = AllocateBuffer(pool_, num_mini_blocks_);
+ uint8_t* bit_width_data = delta_bit_widths_->mutable_data();
+
+ if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException();
+ for (uint32_t i = 0; i < num_mini_blocks_; ++i) {
+ if (!decoder_.GetAligned<uint8_t>(1, bit_width_data + i)) {
+ ParquetException::EofException();
+ }
+ }
+ values_per_mini_block_ = block_size / num_mini_blocks_;
+ mini_block_idx_ = 0;
+ delta_bit_width_ = bit_width_data[0];
+ values_current_mini_block_ = values_per_mini_block_;
+ }
+
+ template <typename T>
+ int GetInternal(T* buffer, int max_values) {
+ max_values = std::min(max_values, this->num_values_);
+ const uint8_t* bit_width_data = delta_bit_widths_->data();
+ for (int i = 0; i < max_values; ++i) {
+ if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) {
+ ++mini_block_idx_;
+ if (mini_block_idx_ < static_cast<size_t>(delta_bit_widths_->size())) {
+ delta_bit_width_ = bit_width_data[mini_block_idx_];
+ values_current_mini_block_ = values_per_mini_block_;
+ } else {
+ InitBlock();
+ buffer[i] = last_value_;
+ continue;
+ }
+ }
+
+ // TODO: the key to this algorithm is to decode the entire miniblock at once.
+ int64_t delta;
+ if (!decoder_.GetValue(delta_bit_width_, &delta)) ParquetException::EofException();
+ delta += min_delta_;
+ last_value_ += static_cast<int32_t>(delta);
+ buffer[i] = last_value_;
+ --values_current_mini_block_;
+ }
+ this->num_values_ -= max_values;
+ return max_values;
+ }
+
+ MemoryPool* pool_;
+ ::arrow::BitUtil::BitReader decoder_;
+ uint32_t values_current_block_;
+ uint32_t num_mini_blocks_;
+ uint64_t values_per_mini_block_;
+ uint64_t values_current_mini_block_;
+
+ int32_t min_delta_;
+ size_t mini_block_idx_;
+ std::shared_ptr<ResizableBuffer> delta_bit_widths_;
+ int delta_bit_width_;
+
+ int32_t last_value_;
+};
+
+// ----------------------------------------------------------------------
+// DELTA_LENGTH_BYTE_ARRAY
+
+class DeltaLengthByteArrayDecoder : public DecoderImpl,
+ virtual public TypedDecoder<ByteArrayType> {
+ public:
+ explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr,
+ MemoryPool* pool = ::arrow::default_memory_pool())
+ : DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
+ len_decoder_(nullptr, pool),
+ pool_(pool) {}
+
+ void SetData(int num_values, const uint8_t* data, int len) override {
+ num_values_ = num_values;
+ if (len == 0) return;
+ int total_lengths_len = ::arrow::util::SafeLoadAs<int32_t>(data);
+ data += 4;
+ this->len_decoder_.SetData(num_values, data, total_lengths_len);
+ data_ = data + total_lengths_len;
+ this->len_ = len - 4 - total_lengths_len;
+ }
+
+ int Decode(ByteArray* buffer, int max_values) override {
+ using VectorT = ArrowPoolVector<int>;
+ max_values = std::min(max_values, num_values_);
+ VectorT lengths(max_values, 0, ::arrow::stl::allocator<int>(pool_));
+ len_decoder_.Decode(lengths.data(), max_values);
+ for (int i = 0; i < max_values; ++i) {
+ buffer[i].len = lengths[i];
+ buffer[i].ptr = data_;
+ this->data_ += lengths[i];
+ this->len_ -= lengths[i];
+ }
+ this->num_values_ -= max_values;
+ return max_values;
+ }
+
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<ByteArrayType>::Accumulator* out) override {
+ ParquetException::NYI("DecodeArrow for DeltaLengthByteArrayDecoder");
+ }
+
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<ByteArrayType>::DictAccumulator* out) override {
+ ParquetException::NYI("DecodeArrow for DeltaLengthByteArrayDecoder");
+ }
+
+ private:
+ DeltaBitPackDecoder<Int32Type> len_decoder_;
+ ::arrow::MemoryPool* pool_;
+};
+
+// ----------------------------------------------------------------------
+// DELTA_BYTE_ARRAY
+
+class DeltaByteArrayDecoder : public DecoderImpl,
+ virtual public TypedDecoder<ByteArrayType> {
+ public:
+ explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+ MemoryPool* pool = ::arrow::default_memory_pool())
+ : DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY),
+ prefix_len_decoder_(nullptr, pool),
+ suffix_decoder_(nullptr, pool),
+ last_value_(0, nullptr) {}
+
+ virtual void SetData(int num_values, const uint8_t* data, int len) {
+ num_values_ = num_values;
+ if (len == 0) return;
+ int prefix_len_length = ::arrow::util::SafeLoadAs<int32_t>(data);
+ data += 4;
+ len -= 4;
+ prefix_len_decoder_.SetData(num_values, data, prefix_len_length);
+ data += prefix_len_length;
+ len -= prefix_len_length;
+ suffix_decoder_.SetData(num_values, data, len);
+ }
+
+ // TODO: this doesn't work and requires memory management. We need to allocate
+ // new strings to store the results.
+ virtual int Decode(ByteArray* buffer, int max_values) {
+ max_values = std::min(max_values, this->num_values_);
+ for (int i = 0; i < max_values; ++i) {
+ int prefix_len = 0;
+ prefix_len_decoder_.Decode(&prefix_len, 1);
+ ByteArray suffix = {0, nullptr};
+ suffix_decoder_.Decode(&suffix, 1);
+ buffer[i].len = prefix_len + suffix.len;
+
+ uint8_t* result = reinterpret_cast<uint8_t*>(malloc(buffer[i].len));
+ memcpy(result, last_value_.ptr, prefix_len);
+ memcpy(result + prefix_len, suffix.ptr, suffix.len);
+
+ buffer[i].ptr = result;
+ last_value_ = buffer[i];
+ }
+ this->num_values_ -= max_values;
+ return max_values;
+ }
+
+ private:
+ DeltaBitPackDecoder<Int32Type> prefix_len_decoder_;
+ DeltaLengthByteArrayDecoder suffix_decoder_;
+ ByteArray last_value_;
+};
+
+// ----------------------------------------------------------------------
+// BYTE_STREAM_SPLIT
+
+template <typename DType>
+class ByteStreamSplitDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
+ public:
+ using T = typename DType::c_type;
+ explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr);
+
+ int Decode(T* buffer, int max_values) override;
+
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<DType>::Accumulator* builder) override;
+
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<DType>::DictAccumulator* builder) override;
+
+ void SetData(int num_values, const uint8_t* data, int len) override;
+
+ T* EnsureDecodeBuffer(int64_t min_values) {
+ const int64_t size = sizeof(T) * min_values;
+ if (!decode_buffer_ || decode_buffer_->size() < size) {
+ PARQUET_ASSIGN_OR_THROW(decode_buffer_, ::arrow::AllocateBuffer(size));
+ }
+ return reinterpret_cast<T*>(decode_buffer_->mutable_data());
+ }
+
+ private:
+ int num_values_in_buffer_{0};
+ std::shared_ptr<Buffer> decode_buffer_;
+
+ static constexpr size_t kNumStreams = sizeof(T);
+};
+
+template <typename DType>
+ByteStreamSplitDecoder<DType>::ByteStreamSplitDecoder(const ColumnDescriptor* descr)
+ : DecoderImpl(descr, Encoding::BYTE_STREAM_SPLIT) {}
+
+template <typename DType>
+void ByteStreamSplitDecoder<DType>::SetData(int num_values, const uint8_t* data,
+ int len) {
+ DecoderImpl::SetData(num_values, data, len);
+ if (num_values * static_cast<int64_t>(sizeof(T)) > len) {
+ throw ParquetException("Data size too small for number of values (corrupted file?)");
+ }
+ num_values_in_buffer_ = num_values;
+}
+
+template <typename DType>
+int ByteStreamSplitDecoder<DType>::Decode(T* buffer, int max_values) {
+ const int values_to_decode = std::min(num_values_, max_values);
+ const int num_decoded_previously = num_values_in_buffer_ - num_values_;
+ const uint8_t* data = data_ + num_decoded_previously;
+
+ ::arrow::util::internal::ByteStreamSplitDecode<T>(data, values_to_decode,
+ num_values_in_buffer_, buffer);
+ num_values_ -= values_to_decode;
+ len_ -= sizeof(T) * values_to_decode;
+ return values_to_decode;
+}
+
+template <typename DType>
+int ByteStreamSplitDecoder<DType>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<DType>::Accumulator* builder) {
+ constexpr int value_size = static_cast<int>(kNumStreams);
+ int values_decoded = num_values - null_count;
+ if (ARROW_PREDICT_FALSE(len_ < value_size * values_decoded)) {
+ ParquetException::EofException();
+ }
+
+ PARQUET_THROW_NOT_OK(builder->Reserve(num_values));
+
+ const int num_decoded_previously = num_values_in_buffer_ - num_values_;
+ const uint8_t* data = data_ + num_decoded_previously;
+ int offset = 0;
+
+#if defined(ARROW_HAVE_SIMD_SPLIT)
+ // Use fast decoding into intermediate buffer. This will also decode
+ // some null values, but it's fast enough that we don't care.
+ T* decode_out = EnsureDecodeBuffer(values_decoded);
+ ::arrow::util::internal::ByteStreamSplitDecode<T>(data, values_decoded,
+ num_values_in_buffer_, decode_out);
+
+ // XXX If null_count is 0, we could even append in bulk or decode directly into
+ // builder
+ VisitNullBitmapInline(
+ valid_bits, valid_bits_offset, num_values, null_count,
+ [&]() {
+ builder->UnsafeAppend(decode_out[offset]);
+ ++offset;
+ },
+ [&]() { builder->UnsafeAppendNull(); });
+
+#else
+ VisitNullBitmapInline(
+ valid_bits, valid_bits_offset, num_values, null_count,
+ [&]() {
+ uint8_t gathered_byte_data[kNumStreams];
+ for (size_t b = 0; b < kNumStreams; ++b) {
+ const size_t byte_index = b * num_values_in_buffer_ + offset;
+ gathered_byte_data[b] = data[byte_index];
+ }
+ builder->UnsafeAppend(::arrow::util::SafeLoadAs<T>(&gathered_byte_data[0]));
+ ++offset;
+ },
+ [&]() { builder->UnsafeAppendNull(); });
+#endif
+
+ num_values_ -= values_decoded;
+ len_ -= sizeof(T) * values_decoded;
+ return values_decoded;
+}
+
+template <typename DType>
+int ByteStreamSplitDecoder<DType>::DecodeArrow(
+ int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ typename EncodingTraits<DType>::DictAccumulator* builder) {
+ ParquetException::NYI("DecodeArrow for ByteStreamSplitDecoder");
+}
+
+} // namespace
+
+// ----------------------------------------------------------------------
+// Encoder and decoder factory functions
+
+std::unique_ptr<Encoder> MakeEncoder(Type::type type_num, Encoding::type encoding,
+ bool use_dictionary, const ColumnDescriptor* descr,
+ MemoryPool* pool) {
+ if (use_dictionary) {
+ switch (type_num) {
+ case Type::INT32:
+ return std::unique_ptr<Encoder>(new DictEncoderImpl<Int32Type>(descr, pool));
+ case Type::INT64:
+ return std::unique_ptr<Encoder>(new DictEncoderImpl<Int64Type>(descr, pool));
+ case Type::INT96:
+ return std::unique_ptr<Encoder>(new DictEncoderImpl<Int96Type>(descr, pool));
+ case Type::FLOAT:
+ return std::unique_ptr<Encoder>(new DictEncoderImpl<FloatType>(descr, pool));
+ case Type::DOUBLE:
+ return std::unique_ptr<Encoder>(new DictEncoderImpl<DoubleType>(descr, pool));
+ case Type::BYTE_ARRAY:
+ return std::unique_ptr<Encoder>(new DictEncoderImpl<ByteArrayType>(descr, pool));
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return std::unique_ptr<Encoder>(new DictEncoderImpl<FLBAType>(descr, pool));
+ default:
+ DCHECK(false) << "Encoder not implemented";
+ break;
+ }
+ } else if (encoding == Encoding::PLAIN) {
+ switch (type_num) {
+ case Type::BOOLEAN:
+ return std::unique_ptr<Encoder>(new PlainEncoder<BooleanType>(descr, pool));
+ case Type::INT32:
+ return std::unique_ptr<Encoder>(new PlainEncoder<Int32Type>(descr, pool));
+ case Type::INT64:
+ return std::unique_ptr<Encoder>(new PlainEncoder<Int64Type>(descr, pool));
+ case Type::INT96:
+ return std::unique_ptr<Encoder>(new PlainEncoder<Int96Type>(descr, pool));
+ case Type::FLOAT:
+ return std::unique_ptr<Encoder>(new PlainEncoder<FloatType>(descr, pool));
+ case Type::DOUBLE:
+ return std::unique_ptr<Encoder>(new PlainEncoder<DoubleType>(descr, pool));
+ case Type::BYTE_ARRAY:
+ return std::unique_ptr<Encoder>(new PlainEncoder<ByteArrayType>(descr, pool));
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return std::unique_ptr<Encoder>(new PlainEncoder<FLBAType>(descr, pool));
+ default:
+ DCHECK(false) << "Encoder not implemented";
+ break;
+ }
+ } else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
+ switch (type_num) {
+ case Type::FLOAT:
+ return std::unique_ptr<Encoder>(
+ new ByteStreamSplitEncoder<FloatType>(descr, pool));
+ case Type::DOUBLE:
+ return std::unique_ptr<Encoder>(
+ new ByteStreamSplitEncoder<DoubleType>(descr, pool));
+ default:
+ throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
+ break;
+ }
+ } else {
+ ParquetException::NYI("Selected encoding is not supported");
+ }
+ DCHECK(false) << "Should not be able to reach this code";
+ return nullptr;
+}
+
+std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encoding,
+ const ColumnDescriptor* descr) {
+ if (encoding == Encoding::PLAIN) {
+ switch (type_num) {
+ case Type::BOOLEAN:
+ return std::unique_ptr<Decoder>(new PlainBooleanDecoder(descr));
+ case Type::INT32:
+ return std::unique_ptr<Decoder>(new PlainDecoder<Int32Type>(descr));
+ case Type::INT64:
+ return std::unique_ptr<Decoder>(new PlainDecoder<Int64Type>(descr));
+ case Type::INT96:
+ return std::unique_ptr<Decoder>(new PlainDecoder<Int96Type>(descr));
+ case Type::FLOAT:
+ return std::unique_ptr<Decoder>(new PlainDecoder<FloatType>(descr));
+ case Type::DOUBLE:
+ return std::unique_ptr<Decoder>(new PlainDecoder<DoubleType>(descr));
+ case Type::BYTE_ARRAY:
+ return std::unique_ptr<Decoder>(new PlainByteArrayDecoder(descr));
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return std::unique_ptr<Decoder>(new PlainFLBADecoder(descr));
+ default:
+ break;
+ }
+ } else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
+ switch (type_num) {
+ case Type::FLOAT:
+ return std::unique_ptr<Decoder>(new ByteStreamSplitDecoder<FloatType>(descr));
+ case Type::DOUBLE:
+ return std::unique_ptr<Decoder>(new ByteStreamSplitDecoder<DoubleType>(descr));
+ default:
+ throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
+ break;
+ }
+ } else {
+ ParquetException::NYI("Selected encoding is not supported");
+ }
+ DCHECK(false) << "Should not be able to reach this code";
+ return nullptr;
+}
+
+namespace detail {
+std::unique_ptr<Decoder> MakeDictDecoder(Type::type type_num,
+ const ColumnDescriptor* descr,
+ MemoryPool* pool) {
+ switch (type_num) {
+ case Type::BOOLEAN:
+ ParquetException::NYI("Dictionary encoding not implemented for boolean type");
+ case Type::INT32:
+ return std::unique_ptr<Decoder>(new DictDecoderImpl<Int32Type>(descr, pool));
+ case Type::INT64:
+ return std::unique_ptr<Decoder>(new DictDecoderImpl<Int64Type>(descr, pool));
+ case Type::INT96:
+ return std::unique_ptr<Decoder>(new DictDecoderImpl<Int96Type>(descr, pool));
+ case Type::FLOAT:
+ return std::unique_ptr<Decoder>(new DictDecoderImpl<FloatType>(descr, pool));
+ case Type::DOUBLE:
+ return std::unique_ptr<Decoder>(new DictDecoderImpl<DoubleType>(descr, pool));
+ case Type::BYTE_ARRAY:
+ return std::unique_ptr<Decoder>(new DictByteArrayDecoderImpl(descr, pool));
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return std::unique_ptr<Decoder>(new DictDecoderImpl<FLBAType>(descr, pool));
+ default:
+ break;
+ }
+ DCHECK(false) << "Should not be able to reach this code";
+ return nullptr;
+}
+
+} // namespace detail
+} // namespace parquet