// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "parquet/column_reader.h" #include #include #include #include #include #include #include #include #include #include #include "arrow/array.h" #include "arrow/array/builder_binary.h" #include "arrow/array/builder_dict.h" #include "arrow/array/builder_primitive.h" #include "arrow/chunked_array.h" #include "arrow/type.h" #include "arrow/util/bit_stream_utils.h" #include "arrow/util/bit_util.h" #include "arrow/util/checked_cast.h" #include "arrow/util/compression.h" #include "arrow/util/int_util_internal.h" #include "arrow/util/logging.h" #include "arrow/util/rle_encoding.h" #include "parquet/column_page.h" #include "parquet/encoding.h" #include "parquet/encryption/encryption_internal.h" #include "parquet/encryption/internal_file_decryptor.h" #include "parquet/level_comparison.h" #include "parquet/level_conversion.h" #include "parquet/properties.h" #include "parquet/statistics.h" #include "parquet/thrift_internal.h" // IWYU pragma: keep // Required after "arrow/util/int_util_internal.h" (for OPTIONAL) #include "parquet/windows_compatibility.h" using arrow::MemoryPool; using arrow::internal::AddWithOverflow; using arrow::internal::checked_cast; using arrow::internal::MultiplyWithOverflow; namespace BitUtil = arrow::BitUtil; namespace parquet { namespace { inline bool HasSpacedValues(const ColumnDescriptor* descr) { if (descr->max_repetition_level() > 0) { // repeated+flat case return !descr->schema_node()->is_required(); } else { // non-repeated+nested case // Find if a node forces nulls in the lowest level along the hierarchy const schema::Node* node = descr->schema_node().get(); while (node) { if (node->is_optional()) { return true; } node = node->parent(); } return false; } } } // namespace LevelDecoder::LevelDecoder() : num_values_remaining_(0) {} LevelDecoder::~LevelDecoder() {} int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level, int num_buffered_values, const uint8_t* data, int32_t data_size) { max_level_ = max_level; int32_t num_bytes = 0; encoding_ = encoding; num_values_remaining_ = num_buffered_values; bit_width_ = BitUtil::Log2(max_level + 1); switch (encoding) { case Encoding::RLE: { if (data_size < 4) { throw ParquetException("Received invalid levels (corrupt data page?)"); } num_bytes = ::arrow::util::SafeLoadAs(data); if (num_bytes < 0 || num_bytes > data_size - 4) { throw ParquetException("Received invalid number of bytes (corrupt data page?)"); } const uint8_t* decoder_data = data + 4; if (!rle_decoder_) { rle_decoder_.reset( new ::arrow::util::RleDecoder(decoder_data, num_bytes, bit_width_)); } else { rle_decoder_->Reset(decoder_data, num_bytes, bit_width_); } return 4 + num_bytes; } case Encoding::BIT_PACKED: { int num_bits = 0; if (MultiplyWithOverflow(num_buffered_values, bit_width_, &num_bits)) { throw ParquetException( "Number of buffered values too large (corrupt data page?)"); } num_bytes = static_cast(BitUtil::BytesForBits(num_bits)); if (num_bytes < 0 || num_bytes > data_size - 4) { throw ParquetException("Received invalid number of bytes (corrupt data page?)"); } if (!bit_packed_decoder_) { bit_packed_decoder_.reset(new ::arrow::BitUtil::BitReader(data, num_bytes)); } else { bit_packed_decoder_->Reset(data, num_bytes); } return num_bytes; } default: throw ParquetException("Unknown encoding type for levels."); } return -1; } void LevelDecoder::SetDataV2(int32_t num_bytes, int16_t max_level, int num_buffered_values, const uint8_t* data) { max_level_ = max_level; // Repetition and definition levels always uses RLE encoding // in the DataPageV2 format. if (num_bytes < 0) { throw ParquetException("Invalid page header (corrupt data page?)"); } encoding_ = Encoding::RLE; num_values_remaining_ = num_buffered_values; bit_width_ = BitUtil::Log2(max_level + 1); if (!rle_decoder_) { rle_decoder_.reset(new ::arrow::util::RleDecoder(data, num_bytes, bit_width_)); } else { rle_decoder_->Reset(data, num_bytes, bit_width_); } } int LevelDecoder::Decode(int batch_size, int16_t* levels) { int num_decoded = 0; int num_values = std::min(num_values_remaining_, batch_size); if (encoding_ == Encoding::RLE) { num_decoded = rle_decoder_->GetBatch(levels, num_values); } else { num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values); } if (num_decoded > 0) { internal::MinMax min_max = internal::FindMinMax(levels, num_decoded); if (ARROW_PREDICT_FALSE(min_max.min < 0 || min_max.max > max_level_)) { std::stringstream ss; ss << "Malformed levels. min: " << min_max.min << " max: " << min_max.max << " out of range. Max Level: " << max_level_; throw ParquetException(ss.str()); } } num_values_remaining_ -= num_decoded; return num_decoded; } ReaderProperties default_reader_properties() { static ReaderProperties default_reader_properties; return default_reader_properties; } namespace { // Extracts encoded statistics from V1 and V2 data page headers template EncodedStatistics ExtractStatsFromHeader(const H& header) { EncodedStatistics page_statistics; if (!header.__isset.statistics) { return page_statistics; } const format::Statistics& stats = header.statistics; if (stats.__isset.max) { page_statistics.set_max(stats.max); } if (stats.__isset.min) { page_statistics.set_min(stats.min); } if (stats.__isset.null_count) { page_statistics.set_null_count(stats.null_count); } if (stats.__isset.distinct_count) { page_statistics.set_distinct_count(stats.distinct_count); } return page_statistics; } // ---------------------------------------------------------------------- // SerializedPageReader deserializes Thrift metadata and pages that have been // assembled in a serialized stream for storing in a Parquet files // This subclass delimits pages appearing in a serialized stream, each preceded // by a serialized Thrift format::PageHeader indicating the type of each page // and the page metadata. class SerializedPageReader : public PageReader { public: SerializedPageReader(std::shared_ptr stream, int64_t total_num_rows, Compression::type codec, ::arrow::MemoryPool* pool, const CryptoContext* crypto_ctx) : stream_(std::move(stream)), decompression_buffer_(AllocateBuffer(pool, 0)), page_ordinal_(0), seen_num_rows_(0), total_num_rows_(total_num_rows), decryption_buffer_(AllocateBuffer(pool, 0)) { if (crypto_ctx != nullptr) { crypto_ctx_ = *crypto_ctx; InitDecryption(); } max_page_header_size_ = kDefaultMaxPageHeaderSize; decompressor_ = GetCodec(codec); } // Implement the PageReader interface std::shared_ptr NextPage() override; void set_max_page_header_size(uint32_t size) override { max_page_header_size_ = size; } private: void UpdateDecryption(const std::shared_ptr& decryptor, int8_t module_type, const std::string& page_aad); void InitDecryption(); std::shared_ptr DecompressIfNeeded(std::shared_ptr page_buffer, int compressed_len, int uncompressed_len, int levels_byte_len = 0); std::shared_ptr stream_; format::PageHeader current_page_header_; std::shared_ptr current_page_; // Compression codec to use. std::unique_ptr<::arrow::util::Codec> decompressor_; std::shared_ptr decompression_buffer_; // The fields below are used for calculation of AAD (additional authenticated data) // suffix which is part of the Parquet Modular Encryption. // The AAD suffix for a parquet module is built internally by // concatenating different parts some of which include // the row group ordinal, column ordinal and page ordinal. // Please refer to the encryption specification for more details: // https://github.com/apache/parquet-format/blob/encryption/Encryption.md#44-additional-authenticated-data // The ordinal fields in the context below are used for AAD suffix calculation. CryptoContext crypto_ctx_; int16_t page_ordinal_; // page ordinal does not count the dictionary page // Maximum allowed page size uint32_t max_page_header_size_; // Number of rows read in data pages so far int64_t seen_num_rows_; // Number of rows in all the data pages int64_t total_num_rows_; // data_page_aad_ and data_page_header_aad_ contain the AAD for data page and data page // header in a single column respectively. // While calculating AAD for different pages in a single column the pages AAD is // updated by only the page ordinal. std::string data_page_aad_; std::string data_page_header_aad_; // Encryption std::shared_ptr decryption_buffer_; }; void SerializedPageReader::InitDecryption() { // Prepare the AAD for quick update later. if (crypto_ctx_.data_decryptor != nullptr) { DCHECK(!crypto_ctx_.data_decryptor->file_aad().empty()); data_page_aad_ = encryption::CreateModuleAad( crypto_ctx_.data_decryptor->file_aad(), encryption::kDataPage, crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal, kNonPageOrdinal); } if (crypto_ctx_.meta_decryptor != nullptr) { DCHECK(!crypto_ctx_.meta_decryptor->file_aad().empty()); data_page_header_aad_ = encryption::CreateModuleAad( crypto_ctx_.meta_decryptor->file_aad(), encryption::kDataPageHeader, crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal, kNonPageOrdinal); } } void SerializedPageReader::UpdateDecryption(const std::shared_ptr& decryptor, int8_t module_type, const std::string& page_aad) { DCHECK(decryptor != nullptr); if (crypto_ctx_.start_decrypt_with_dictionary_page) { std::string aad = encryption::CreateModuleAad( decryptor->file_aad(), module_type, crypto_ctx_.row_group_ordinal, crypto_ctx_.column_ordinal, kNonPageOrdinal); decryptor->UpdateAad(aad); } else { encryption::QuickUpdatePageAad(page_aad, page_ordinal_); decryptor->UpdateAad(page_aad); } } std::shared_ptr SerializedPageReader::NextPage() { // Loop here because there may be unhandled page types that we skip until // finding a page that we do know what to do with while (seen_num_rows_ < total_num_rows_) { uint32_t header_size = 0; uint32_t allowed_page_size = kDefaultPageHeaderSize; // Page headers can be very large because of page statistics // We try to deserialize a larger buffer progressively // until a maximum allowed header limit while (true) { PARQUET_ASSIGN_OR_THROW(auto view, stream_->Peek(allowed_page_size)); if (view.size() == 0) { return std::shared_ptr(nullptr); } // This gets used, then set by DeserializeThriftMsg header_size = static_cast(view.size()); try { if (crypto_ctx_.meta_decryptor != nullptr) { UpdateDecryption(crypto_ctx_.meta_decryptor, encryption::kDictionaryPageHeader, data_page_header_aad_); } DeserializeThriftMsg(reinterpret_cast(view.data()), &header_size, ¤t_page_header_, crypto_ctx_.meta_decryptor); break; } catch (std::exception& e) { // Failed to deserialize. Double the allowed page header size and try again std::stringstream ss; ss << e.what(); allowed_page_size *= 2; if (allowed_page_size > max_page_header_size_) { ss << "Deserializing page header failed.\n"; throw ParquetException(ss.str()); } } } // Advance the stream offset PARQUET_THROW_NOT_OK(stream_->Advance(header_size)); int compressed_len = current_page_header_.compressed_page_size; int uncompressed_len = current_page_header_.uncompressed_page_size; if (compressed_len < 0 || uncompressed_len < 0) { throw ParquetException("Invalid page header"); } if (crypto_ctx_.data_decryptor != nullptr) { UpdateDecryption(crypto_ctx_.data_decryptor, encryption::kDictionaryPage, data_page_aad_); } // Read the compressed data page. PARQUET_ASSIGN_OR_THROW(auto page_buffer, stream_->Read(compressed_len)); if (page_buffer->size() != compressed_len) { std::stringstream ss; ss << "Page was smaller (" << page_buffer->size() << ") than expected (" << compressed_len << ")"; ParquetException::EofException(ss.str()); } // Decrypt it if we need to if (crypto_ctx_.data_decryptor != nullptr) { PARQUET_THROW_NOT_OK(decryption_buffer_->Resize( compressed_len - crypto_ctx_.data_decryptor->CiphertextSizeDelta(), false)); compressed_len = crypto_ctx_.data_decryptor->Decrypt( page_buffer->data(), compressed_len, decryption_buffer_->mutable_data()); page_buffer = decryption_buffer_; } const PageType::type page_type = LoadEnumSafe(¤t_page_header_.type); if (page_type == PageType::DICTIONARY_PAGE) { crypto_ctx_.start_decrypt_with_dictionary_page = false; const format::DictionaryPageHeader& dict_header = current_page_header_.dictionary_page_header; bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false; if (dict_header.num_values < 0) { throw ParquetException("Invalid page header (negative number of values)"); } // Uncompress if needed page_buffer = DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len); return std::make_shared(page_buffer, dict_header.num_values, LoadEnumSafe(&dict_header.encoding), is_sorted); } else if (page_type == PageType::DATA_PAGE) { ++page_ordinal_; const format::DataPageHeader& header = current_page_header_.data_page_header; if (header.num_values < 0) { throw ParquetException("Invalid page header (negative number of values)"); } EncodedStatistics page_statistics = ExtractStatsFromHeader(header); seen_num_rows_ += header.num_values; // Uncompress if needed page_buffer = DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len); return std::make_shared(page_buffer, header.num_values, LoadEnumSafe(&header.encoding), LoadEnumSafe(&header.definition_level_encoding), LoadEnumSafe(&header.repetition_level_encoding), uncompressed_len, page_statistics); } else if (page_type == PageType::DATA_PAGE_V2) { ++page_ordinal_; const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2; if (header.num_values < 0) { throw ParquetException("Invalid page header (negative number of values)"); } if (header.definition_levels_byte_length < 0 || header.repetition_levels_byte_length < 0) { throw ParquetException("Invalid page header (negative levels byte length)"); } bool is_compressed = header.__isset.is_compressed ? header.is_compressed : false; EncodedStatistics page_statistics = ExtractStatsFromHeader(header); seen_num_rows_ += header.num_values; // Uncompress if needed int levels_byte_len; if (AddWithOverflow(header.definition_levels_byte_length, header.repetition_levels_byte_length, &levels_byte_len)) { throw ParquetException("Levels size too large (corrupt file?)"); } // DecompressIfNeeded doesn't take `is_compressed` into account as // it's page type-agnostic. if (is_compressed) { page_buffer = DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len, levels_byte_len); } return std::make_shared( page_buffer, header.num_values, header.num_nulls, header.num_rows, LoadEnumSafe(&header.encoding), header.definition_levels_byte_length, header.repetition_levels_byte_length, uncompressed_len, is_compressed, page_statistics); } else { // We don't know what this page type is. We're allowed to skip non-data // pages. continue; } } return std::shared_ptr(nullptr); } std::shared_ptr SerializedPageReader::DecompressIfNeeded( std::shared_ptr page_buffer, int compressed_len, int uncompressed_len, int levels_byte_len) { if (decompressor_ == nullptr) { return page_buffer; } if (compressed_len < levels_byte_len || uncompressed_len < levels_byte_len) { throw ParquetException("Invalid page header"); } // Grow the uncompressed buffer if we need to. if (uncompressed_len > static_cast(decompression_buffer_->size())) { PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false)); } if (levels_byte_len > 0) { // First copy the levels as-is uint8_t* decompressed = decompression_buffer_->mutable_data(); memcpy(decompressed, page_buffer->data(), levels_byte_len); } // Decompress the values PARQUET_THROW_NOT_OK(decompressor_->Decompress( compressed_len - levels_byte_len, page_buffer->data() + levels_byte_len, uncompressed_len - levels_byte_len, decompression_buffer_->mutable_data() + levels_byte_len)); return decompression_buffer_; } } // namespace std::unique_ptr PageReader::Open(std::shared_ptr stream, int64_t total_num_rows, Compression::type codec, ::arrow::MemoryPool* pool, const CryptoContext* ctx) { return std::unique_ptr( new SerializedPageReader(std::move(stream), total_num_rows, codec, pool, ctx)); } namespace { // ---------------------------------------------------------------------- // Impl base class for TypedColumnReader and RecordReader // PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index // encoding. static bool IsDictionaryIndexEncoding(const Encoding::type& e) { return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY; } template class ColumnReaderImplBase { public: using T = typename DType::c_type; ColumnReaderImplBase(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) : descr_(descr), max_def_level_(descr->max_definition_level()), max_rep_level_(descr->max_repetition_level()), num_buffered_values_(0), num_decoded_values_(0), pool_(pool), current_decoder_(nullptr), current_encoding_(Encoding::UNKNOWN) {} virtual ~ColumnReaderImplBase() = default; protected: // Read up to batch_size values from the current data page into the // pre-allocated memory T* // // @returns: the number of values read into the out buffer int64_t ReadValues(int64_t batch_size, T* out) { int64_t num_decoded = current_decoder_->Decode(out, static_cast(batch_size)); return num_decoded; } // Read up to batch_size values from the current data page into the // pre-allocated memory T*, leaving spaces for null entries according // to the def_levels. // // @returns: the number of values read into the out buffer int64_t ReadValuesSpaced(int64_t batch_size, T* out, int64_t null_count, uint8_t* valid_bits, int64_t valid_bits_offset) { return current_decoder_->DecodeSpaced(out, static_cast(batch_size), static_cast(null_count), valid_bits, valid_bits_offset); } // Read multiple definition levels into preallocated memory // // Returns the number of decoded definition levels int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) { if (max_def_level_ == 0) { return 0; } return definition_level_decoder_.Decode(static_cast(batch_size), levels); } bool HasNextInternal() { // Either there is no data page available yet, or the data page has been // exhausted if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) { if (!ReadNewPage() || num_buffered_values_ == 0) { return false; } } return true; } // Read multiple repetition levels into preallocated memory // Returns the number of decoded repetition levels int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) { if (max_rep_level_ == 0) { return 0; } return repetition_level_decoder_.Decode(static_cast(batch_size), levels); } // Advance to the next data page bool ReadNewPage() { // Loop until we find the next data page. while (true) { current_page_ = pager_->NextPage(); if (!current_page_) { // EOS return false; } if (current_page_->type() == PageType::DICTIONARY_PAGE) { ConfigureDictionary(static_cast(current_page_.get())); continue; } else if (current_page_->type() == PageType::DATA_PAGE) { const auto page = std::static_pointer_cast(current_page_); const int64_t levels_byte_size = InitializeLevelDecoders( *page, page->repetition_level_encoding(), page->definition_level_encoding()); InitializeDataDecoder(*page, levels_byte_size); return true; } else if (current_page_->type() == PageType::DATA_PAGE_V2) { const auto page = std::static_pointer_cast(current_page_); int64_t levels_byte_size = InitializeLevelDecodersV2(*page); InitializeDataDecoder(*page, levels_byte_size); return true; } else { // We don't know what this page type is. We're allowed to skip non-data // pages. continue; } } return true; } void ConfigureDictionary(const DictionaryPage* page) { int encoding = static_cast(page->encoding()); if (page->encoding() == Encoding::PLAIN_DICTIONARY || page->encoding() == Encoding::PLAIN) { encoding = static_cast(Encoding::RLE_DICTIONARY); } auto it = decoders_.find(encoding); if (it != decoders_.end()) { throw ParquetException("Column cannot have more than one dictionary."); } if (page->encoding() == Encoding::PLAIN_DICTIONARY || page->encoding() == Encoding::PLAIN) { auto dictionary = MakeTypedDecoder(Encoding::PLAIN, descr_); dictionary->SetData(page->num_values(), page->data(), page->size()); // The dictionary is fully decoded during DictionaryDecoder::Init, so the // DictionaryPage buffer is no longer required after this step // // TODO(wesm): investigate whether this all-or-nothing decoding of the // dictionary makes sense and whether performance can be improved std::unique_ptr> decoder = MakeDictDecoder(descr_, pool_); decoder->SetDict(dictionary.get()); decoders_[encoding] = std::unique_ptr(dynamic_cast(decoder.release())); } else { ParquetException::NYI("only plain dictionary encoding has been implemented"); } new_dictionary_ = true; current_decoder_ = decoders_[encoding].get(); DCHECK(current_decoder_); } // Initialize repetition and definition level decoders on the next data page. // If the data page includes repetition and definition levels, we // initialize the level decoders and return the number of encoded level bytes. // The return value helps determine the number of bytes in the encoded data. int64_t InitializeLevelDecoders(const DataPage& page, Encoding::type repetition_level_encoding, Encoding::type definition_level_encoding) { // Read a data page. num_buffered_values_ = page.num_values(); // Have not decoded any values from the data page yet num_decoded_values_ = 0; const uint8_t* buffer = page.data(); int32_t levels_byte_size = 0; int32_t max_size = page.size(); // Data page Layout: Repetition Levels - Definition Levels - encoded values. // Levels are encoded as rle or bit-packed. // Init repetition levels if (max_rep_level_ > 0) { int32_t rep_levels_bytes = repetition_level_decoder_.SetData( repetition_level_encoding, max_rep_level_, static_cast(num_buffered_values_), buffer, max_size); buffer += rep_levels_bytes; levels_byte_size += rep_levels_bytes; max_size -= rep_levels_bytes; } // TODO figure a way to set max_def_level_ to 0 // if the initial value is invalid // Init definition levels if (max_def_level_ > 0) { int32_t def_levels_bytes = definition_level_decoder_.SetData( definition_level_encoding, max_def_level_, static_cast(num_buffered_values_), buffer, max_size); levels_byte_size += def_levels_bytes; max_size -= def_levels_bytes; } return levels_byte_size; } int64_t InitializeLevelDecodersV2(const DataPageV2& page) { // Read a data page. num_buffered_values_ = page.num_values(); // Have not decoded any values from the data page yet num_decoded_values_ = 0; const uint8_t* buffer = page.data(); const int64_t total_levels_length = static_cast(page.repetition_levels_byte_length()) + page.definition_levels_byte_length(); if (total_levels_length > page.size()) { throw ParquetException("Data page too small for levels (corrupt header?)"); } if (max_rep_level_ > 0) { repetition_level_decoder_.SetDataV2(page.repetition_levels_byte_length(), max_rep_level_, static_cast(num_buffered_values_), buffer); buffer += page.repetition_levels_byte_length(); } if (max_def_level_ > 0) { definition_level_decoder_.SetDataV2(page.definition_levels_byte_length(), max_def_level_, static_cast(num_buffered_values_), buffer); } return total_levels_length; } // Get a decoder object for this page or create a new decoder if this is the // first page with this encoding. void InitializeDataDecoder(const DataPage& page, int64_t levels_byte_size) { const uint8_t* buffer = page.data() + levels_byte_size; const int64_t data_size = page.size() - levels_byte_size; if (data_size < 0) { throw ParquetException("Page smaller than size of encoded levels"); } Encoding::type encoding = page.encoding(); if (IsDictionaryIndexEncoding(encoding)) { encoding = Encoding::RLE_DICTIONARY; } auto it = decoders_.find(static_cast(encoding)); if (it != decoders_.end()) { DCHECK(it->second.get() != nullptr); if (encoding == Encoding::RLE_DICTIONARY) { DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY); } current_decoder_ = it->second.get(); } else { switch (encoding) { case Encoding::PLAIN: { auto decoder = MakeTypedDecoder(Encoding::PLAIN, descr_); current_decoder_ = decoder.get(); decoders_[static_cast(encoding)] = std::move(decoder); break; } case Encoding::BYTE_STREAM_SPLIT: { auto decoder = MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT, descr_); current_decoder_ = decoder.get(); decoders_[static_cast(encoding)] = std::move(decoder); break; } case Encoding::RLE_DICTIONARY: throw ParquetException("Dictionary page must be before data page."); case Encoding::DELTA_BINARY_PACKED: case Encoding::DELTA_LENGTH_BYTE_ARRAY: case Encoding::DELTA_BYTE_ARRAY: ParquetException::NYI("Unsupported encoding"); default: throw ParquetException("Unknown encoding type."); } } current_encoding_ = encoding; current_decoder_->SetData(static_cast(num_buffered_values_), buffer, static_cast(data_size)); } const ColumnDescriptor* descr_; const int16_t max_def_level_; const int16_t max_rep_level_; std::unique_ptr pager_; std::shared_ptr current_page_; // Not set if full schema for this field has no optional or repeated elements LevelDecoder definition_level_decoder_; // Not set for flat schemas. LevelDecoder repetition_level_decoder_; // The total number of values stored in the data page. This is the maximum of // the number of encoded definition levels or encoded values. For // non-repeated, required columns, this is equal to the number of encoded // values. For repeated or optional values, there may be fewer data values // than levels, and this tells you how many encoded levels there are in that // case. int64_t num_buffered_values_; // The number of values from the current data page that have been decoded // into memory int64_t num_decoded_values_; ::arrow::MemoryPool* pool_; using DecoderType = TypedDecoder; DecoderType* current_decoder_; Encoding::type current_encoding_; /// Flag to signal when a new dictionary has been set, for the benefit of /// DictionaryRecordReader bool new_dictionary_; // The exposed encoding ExposedEncoding exposed_encoding_ = ExposedEncoding::NO_ENCODING; // Map of encoding type to the respective decoder object. For example, a // column chunk's data pages may include both dictionary-encoded and // plain-encoded data. std::unordered_map> decoders_; void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; } }; // ---------------------------------------------------------------------- // TypedColumnReader implementations template class TypedColumnReaderImpl : public TypedColumnReader, public ColumnReaderImplBase { public: using T = typename DType::c_type; TypedColumnReaderImpl(const ColumnDescriptor* descr, std::unique_ptr pager, ::arrow::MemoryPool* pool) : ColumnReaderImplBase(descr, pool) { this->pager_ = std::move(pager); } bool HasNext() override { return this->HasNextInternal(); } int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values, int64_t* values_read) override; int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values, uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read, int64_t* values_read, int64_t* null_count) override; int64_t Skip(int64_t num_rows_to_skip) override; Type::type type() const override { return this->descr_->physical_type(); } const ColumnDescriptor* descr() const override { return this->descr_; } ExposedEncoding GetExposedEncoding() override { return this->exposed_encoding_; }; int64_t ReadBatchWithDictionary(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, int32_t* indices, int64_t* indices_read, const T** dict, int32_t* dict_len) override; protected: void SetExposedEncoding(ExposedEncoding encoding) override { this->exposed_encoding_ = encoding; } private: // Read dictionary indices. Similar to ReadValues but decode data to dictionary indices. // This function is called only by ReadBatchWithDictionary(). int64_t ReadDictionaryIndices(int64_t indices_to_read, int32_t* indices) { auto decoder = dynamic_cast*>(this->current_decoder_); return decoder->DecodeIndices(static_cast(indices_to_read), indices); } // Get dictionary. The dictionary should have been set by SetDict(). The dictionary is // owned by the internal decoder and is destroyed when the reader is destroyed. This // function is called only by ReadBatchWithDictionary() after dictionary is configured. void GetDictionary(const T** dictionary, int32_t* dictionary_length) { auto decoder = dynamic_cast*>(this->current_decoder_); decoder->GetDictionary(dictionary, dictionary_length); } // Read definition and repetition levels. Also return the number of definition levels // and number of values to read. This function is called before reading values. void ReadLevels(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, int64_t* num_def_levels, int64_t* values_to_read) { batch_size = std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); // If the field is required and non-repeated, there are no definition levels if (this->max_def_level_ > 0 && def_levels != nullptr) { *num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); // TODO(wesm): this tallying of values-to-decode can be performed with better // cache-efficiency if fused with the level decoding. for (int64_t i = 0; i < *num_def_levels; ++i) { if (def_levels[i] == this->max_def_level_) { ++(*values_to_read); } } } else { // Required field, read all values *values_to_read = batch_size; } // Not present for non-repeated fields if (this->max_rep_level_ > 0 && rep_levels != nullptr) { int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels); if (def_levels != nullptr && *num_def_levels != num_rep_levels) { throw ParquetException("Number of decoded rep / def levels did not match"); } } } }; template int64_t TypedColumnReaderImpl::ReadBatchWithDictionary( int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, int32_t* indices, int64_t* indices_read, const T** dict, int32_t* dict_len) { bool has_dict_output = dict != nullptr && dict_len != nullptr; // Similar logic as ReadValues to get pages. if (!HasNext()) { *indices_read = 0; if (has_dict_output) { *dict = nullptr; *dict_len = 0; } return 0; } // Verify the current data page is dictionary encoded. if (this->current_encoding_ != Encoding::RLE_DICTIONARY) { std::stringstream ss; ss << "Data page is not dictionary encoded. Encoding: " << EncodingToString(this->current_encoding_); throw ParquetException(ss.str()); } // Get dictionary pointer and length. if (has_dict_output) { GetDictionary(dict, dict_len); } // Similar logic as ReadValues to get def levels and rep levels. int64_t num_def_levels = 0; int64_t indices_to_read = 0; ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, &indices_to_read); // Read dictionary indices. *indices_read = ReadDictionaryIndices(indices_to_read, indices); int64_t total_indices = std::max(num_def_levels, *indices_read); this->ConsumeBufferedValues(total_indices); return total_indices; } template int64_t TypedColumnReaderImpl::ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values, int64_t* values_read) { // HasNext invokes ReadNewPage if (!HasNext()) { *values_read = 0; return 0; } // TODO(wesm): keep reading data pages until batch_size is reached, or the // row group is finished int64_t num_def_levels = 0; int64_t values_to_read = 0; ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, &values_to_read); *values_read = this->ReadValues(values_to_read, values); int64_t total_values = std::max(num_def_levels, *values_read); this->ConsumeBufferedValues(total_values); return total_values; } template int64_t TypedColumnReaderImpl::ReadBatchSpaced( int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values, uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read, int64_t* values_read, int64_t* null_count_out) { // HasNext invokes ReadNewPage if (!HasNext()) { *levels_read = 0; *values_read = 0; *null_count_out = 0; return 0; } int64_t total_values; // TODO(wesm): keep reading data pages until batch_size is reached, or the // row group is finished batch_size = std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); // If the field is required and non-repeated, there are no definition levels if (this->max_def_level_ > 0) { int64_t num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); // Not present for non-repeated fields if (this->max_rep_level_ > 0) { int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels); if (num_def_levels != num_rep_levels) { throw ParquetException("Number of decoded rep / def levels did not match"); } } const bool has_spaced_values = HasSpacedValues(this->descr_); int64_t null_count = 0; if (!has_spaced_values) { int values_to_read = 0; for (int64_t i = 0; i < num_def_levels; ++i) { if (def_levels[i] == this->max_def_level_) { ++values_to_read; } } total_values = this->ReadValues(values_to_read, values); ::arrow::BitUtil::SetBitsTo(valid_bits, valid_bits_offset, /*length=*/total_values, /*bits_are_set=*/true); *values_read = total_values; } else { internal::LevelInfo info; info.repeated_ancestor_def_level = this->max_def_level_ - 1; info.def_level = this->max_def_level_; info.rep_level = this->max_rep_level_; internal::ValidityBitmapInputOutput validity_io; validity_io.values_read_upper_bound = num_def_levels; validity_io.valid_bits = valid_bits; validity_io.valid_bits_offset = valid_bits_offset; validity_io.null_count = null_count; validity_io.values_read = *values_read; internal::DefLevelsToBitmap(def_levels, num_def_levels, info, &validity_io); null_count = validity_io.null_count; *values_read = validity_io.values_read; total_values = this->ReadValuesSpaced(*values_read, values, static_cast(null_count), valid_bits, valid_bits_offset); } *levels_read = num_def_levels; *null_count_out = null_count; } else { // Required field, read all values total_values = this->ReadValues(batch_size, values); ::arrow::BitUtil::SetBitsTo(valid_bits, valid_bits_offset, /*length=*/total_values, /*bits_are_set=*/true); *null_count_out = 0; *values_read = total_values; *levels_read = total_values; } this->ConsumeBufferedValues(*levels_read); return total_values; } template int64_t TypedColumnReaderImpl::Skip(int64_t num_rows_to_skip) { int64_t rows_to_skip = num_rows_to_skip; while (HasNext() && rows_to_skip > 0) { // If the number of rows to skip is more than the number of undecoded values, skip the // Page. if (rows_to_skip > (this->num_buffered_values_ - this->num_decoded_values_)) { rows_to_skip -= this->num_buffered_values_ - this->num_decoded_values_; this->num_decoded_values_ = this->num_buffered_values_; } else { // We need to read this Page // Jump to the right offset in the Page int64_t batch_size = 1024; // ReadBatch with a smaller memory footprint int64_t values_read = 0; // This will be enough scratch space to accommodate 16-bit levels or any // value type std::shared_ptr scratch = AllocateBuffer( this->pool_, batch_size * type_traits::value_byte_size); do { batch_size = std::min(batch_size, rows_to_skip); values_read = ReadBatch(static_cast(batch_size), reinterpret_cast(scratch->mutable_data()), reinterpret_cast(scratch->mutable_data()), reinterpret_cast(scratch->mutable_data()), &values_read); rows_to_skip -= values_read; } while (values_read > 0 && rows_to_skip > 0); } } return num_rows_to_skip - rows_to_skip; } } // namespace // ---------------------------------------------------------------------- // Dynamic column reader constructor std::shared_ptr ColumnReader::Make(const ColumnDescriptor* descr, std::unique_ptr pager, MemoryPool* pool) { switch (descr->physical_type()) { case Type::BOOLEAN: return std::make_shared>(descr, std::move(pager), pool); case Type::INT32: return std::make_shared>(descr, std::move(pager), pool); case Type::INT64: return std::make_shared>(descr, std::move(pager), pool); case Type::INT96: return std::make_shared>(descr, std::move(pager), pool); case Type::FLOAT: return std::make_shared>(descr, std::move(pager), pool); case Type::DOUBLE: return std::make_shared>(descr, std::move(pager), pool); case Type::BYTE_ARRAY: return std::make_shared>( descr, std::move(pager), pool); case Type::FIXED_LEN_BYTE_ARRAY: return std::make_shared>(descr, std::move(pager), pool); default: ParquetException::NYI("type reader not implemented"); } // Unreachable code, but suppress compiler warning return std::shared_ptr(nullptr); } // ---------------------------------------------------------------------- // RecordReader namespace internal { namespace { // The minimum number of repetition/definition levels to decode at a time, for // better vectorized performance when doing many smaller record reads constexpr int64_t kMinLevelBatchSize = 1024; template class TypedRecordReader : public ColumnReaderImplBase, virtual public RecordReader { public: using T = typename DType::c_type; using BASE = ColumnReaderImplBase; TypedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, MemoryPool* pool) : BASE(descr, pool) { leaf_info_ = leaf_info; nullable_values_ = leaf_info.HasNullableValues(); at_record_start_ = true; records_read_ = 0; values_written_ = 0; values_capacity_ = 0; null_count_ = 0; levels_written_ = 0; levels_position_ = 0; levels_capacity_ = 0; uses_values_ = !(descr->physical_type() == Type::BYTE_ARRAY); if (uses_values_) { values_ = AllocateBuffer(pool); } valid_bits_ = AllocateBuffer(pool); def_levels_ = AllocateBuffer(pool); rep_levels_ = AllocateBuffer(pool); Reset(); } int64_t available_values_current_page() const { return this->num_buffered_values_ - this->num_decoded_values_; } // Compute the values capacity in bytes for the given number of elements int64_t bytes_for_values(int64_t nitems) const { int64_t type_size = GetTypeByteSize(this->descr_->physical_type()); int64_t bytes_for_values = -1; if (MultiplyWithOverflow(nitems, type_size, &bytes_for_values)) { throw ParquetException("Total size of items too large"); } return bytes_for_values; } int64_t ReadRecords(int64_t num_records) override { // Delimit records, then read values at the end int64_t records_read = 0; if (levels_position_ < levels_written_) { records_read += ReadRecordData(num_records); } int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records); // If we are in the middle of a record, we continue until reaching the // desired number of records or the end of the current record if we've found // enough records while (!at_record_start_ || records_read < num_records) { // Is there more data to read in this row group? if (!this->HasNextInternal()) { if (!at_record_start_) { // We ended the row group while inside a record that we haven't seen // the end of yet. So increment the record count for the last record in // the row group ++records_read; at_record_start_ = true; } break; } /// We perform multiple batch reads until we either exhaust the row group /// or observe the desired number of records int64_t batch_size = std::min(level_batch_size, available_values_current_page()); // No more data in column if (batch_size == 0) { break; } if (this->max_def_level_ > 0) { ReserveLevels(batch_size); int16_t* def_levels = this->def_levels() + levels_written_; int16_t* rep_levels = this->rep_levels() + levels_written_; // Not present for non-repeated fields int64_t levels_read = 0; if (this->max_rep_level_ > 0) { levels_read = this->ReadDefinitionLevels(batch_size, def_levels); if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { throw ParquetException("Number of decoded rep / def levels did not match"); } } else if (this->max_def_level_ > 0) { levels_read = this->ReadDefinitionLevels(batch_size, def_levels); } // Exhausted column chunk if (levels_read == 0) { break; } levels_written_ += levels_read; records_read += ReadRecordData(num_records - records_read); } else { // No repetition or definition levels batch_size = std::min(num_records - records_read, batch_size); records_read += ReadRecordData(batch_size); } } return records_read; } // We may outwardly have the appearance of having exhausted a column chunk // when in fact we are in the middle of processing the last batch bool has_values_to_process() const { return levels_position_ < levels_written_; } std::shared_ptr ReleaseValues() override { if (uses_values_) { auto result = values_; PARQUET_THROW_NOT_OK(result->Resize(bytes_for_values(values_written_), true)); values_ = AllocateBuffer(this->pool_); values_capacity_ = 0; return result; } else { return nullptr; } } std::shared_ptr ReleaseIsValid() override { if (leaf_info_.HasNullableValues()) { auto result = valid_bits_; PARQUET_THROW_NOT_OK(result->Resize(BitUtil::BytesForBits(values_written_), true)); valid_bits_ = AllocateBuffer(this->pool_); return result; } else { return nullptr; } } // Process written repetition/definition levels to reach the end of // records. Process no more levels than necessary to delimit the indicated // number of logical records. Updates internal state of RecordReader // // \return Number of records delimited int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) { int64_t values_to_read = 0; int64_t records_read = 0; const int16_t* def_levels = this->def_levels() + levels_position_; const int16_t* rep_levels = this->rep_levels() + levels_position_; DCHECK_GT(this->max_rep_level_, 0); // Count logical records and number of values to read while (levels_position_ < levels_written_) { const int16_t rep_level = *rep_levels++; if (rep_level == 0) { // If at_record_start_ is true, we are seeing the start of a record // for the second time, such as after repeated calls to // DelimitRecords. In this case we must continue until we find // another record start or exhausting the ColumnChunk if (!at_record_start_) { // We've reached the end of a record; increment the record count. ++records_read; if (records_read == num_records) { // We've found the number of records we were looking for. Set // at_record_start_ to true and break at_record_start_ = true; break; } } } // We have decided to consume the level at this position; therefore we // must advance until we find another record boundary at_record_start_ = false; const int16_t def_level = *def_levels++; if (def_level == this->max_def_level_) { ++values_to_read; } ++levels_position_; } *values_seen = values_to_read; return records_read; } void Reserve(int64_t capacity) override { ReserveLevels(capacity); ReserveValues(capacity); } int64_t UpdateCapacity(int64_t capacity, int64_t size, int64_t extra_size) { if (extra_size < 0) { throw ParquetException("Negative size (corrupt file?)"); } int64_t target_size = -1; if (AddWithOverflow(size, extra_size, &target_size)) { throw ParquetException("Allocation size too large (corrupt file?)"); } if (target_size >= (1LL << 62)) { throw ParquetException("Allocation size too large (corrupt file?)"); } if (capacity >= target_size) { return capacity; } return BitUtil::NextPower2(target_size); } void ReserveLevels(int64_t extra_levels) { if (this->max_def_level_ > 0) { const int64_t new_levels_capacity = UpdateCapacity(levels_capacity_, levels_written_, extra_levels); if (new_levels_capacity > levels_capacity_) { constexpr auto kItemSize = static_cast(sizeof(int16_t)); int64_t capacity_in_bytes = -1; if (MultiplyWithOverflow(new_levels_capacity, kItemSize, &capacity_in_bytes)) { throw ParquetException("Allocation size too large (corrupt file?)"); } PARQUET_THROW_NOT_OK(def_levels_->Resize(capacity_in_bytes, false)); if (this->max_rep_level_ > 0) { PARQUET_THROW_NOT_OK(rep_levels_->Resize(capacity_in_bytes, false)); } levels_capacity_ = new_levels_capacity; } } } void ReserveValues(int64_t extra_values) { const int64_t new_values_capacity = UpdateCapacity(values_capacity_, values_written_, extra_values); if (new_values_capacity > values_capacity_) { // XXX(wesm): A hack to avoid memory allocation when reading directly // into builder classes if (uses_values_) { PARQUET_THROW_NOT_OK( values_->Resize(bytes_for_values(new_values_capacity), false)); } values_capacity_ = new_values_capacity; } if (leaf_info_.HasNullableValues()) { int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_); if (valid_bits_->size() < valid_bytes_new) { int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_); PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false)); // Avoid valgrind warnings memset(valid_bits_->mutable_data() + valid_bytes_old, 0, valid_bytes_new - valid_bytes_old); } } } void Reset() override { ResetValues(); if (levels_written_ > 0) { const int64_t levels_remaining = levels_written_ - levels_position_; // Shift remaining levels to beginning of buffer and trim to only the number // of decoded levels remaining int16_t* def_data = def_levels(); int16_t* rep_data = rep_levels(); std::copy(def_data + levels_position_, def_data + levels_written_, def_data); PARQUET_THROW_NOT_OK( def_levels_->Resize(levels_remaining * sizeof(int16_t), false)); if (this->max_rep_level_ > 0) { std::copy(rep_data + levels_position_, rep_data + levels_written_, rep_data); PARQUET_THROW_NOT_OK( rep_levels_->Resize(levels_remaining * sizeof(int16_t), false)); } levels_written_ -= levels_position_; levels_position_ = 0; levels_capacity_ = levels_remaining; } records_read_ = 0; // Call Finish on the binary builders to reset them } void SetPageReader(std::unique_ptr reader) override { at_record_start_ = true; this->pager_ = std::move(reader); ResetDecoders(); } bool HasMoreData() const override { return this->pager_ != nullptr; } // Dictionary decoders must be reset when advancing row groups void ResetDecoders() { this->decoders_.clear(); } virtual void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) { uint8_t* valid_bits = valid_bits_->mutable_data(); const int64_t valid_bits_offset = values_written_; int64_t num_decoded = this->current_decoder_->DecodeSpaced( ValuesHead(), static_cast(values_with_nulls), static_cast(null_count), valid_bits, valid_bits_offset); DCHECK_EQ(num_decoded, values_with_nulls); } virtual void ReadValuesDense(int64_t values_to_read) { int64_t num_decoded = this->current_decoder_->Decode(ValuesHead(), static_cast(values_to_read)); DCHECK_EQ(num_decoded, values_to_read); } // Return number of logical records read int64_t ReadRecordData(int64_t num_records) { // Conservative upper bound const int64_t possible_num_values = std::max(num_records, levels_written_ - levels_position_); ReserveValues(possible_num_values); const int64_t start_levels_position = levels_position_; int64_t values_to_read = 0; int64_t records_read = 0; if (this->max_rep_level_ > 0) { records_read = DelimitRecords(num_records, &values_to_read); } else if (this->max_def_level_ > 0) { // No repetition levels, skip delimiting logic. Each level represents a // null or not null entry records_read = std::min(levels_written_ - levels_position_, num_records); // This is advanced by DelimitRecords, which we skipped levels_position_ += records_read; } else { records_read = values_to_read = num_records; } int64_t null_count = 0; if (leaf_info_.HasNullableValues()) { ValidityBitmapInputOutput validity_io; validity_io.values_read_upper_bound = levels_position_ - start_levels_position; validity_io.valid_bits = valid_bits_->mutable_data(); validity_io.valid_bits_offset = values_written_; DefLevelsToBitmap(def_levels() + start_levels_position, levels_position_ - start_levels_position, leaf_info_, &validity_io); values_to_read = validity_io.values_read - validity_io.null_count; null_count = validity_io.null_count; DCHECK_GE(values_to_read, 0); ReadValuesSpaced(validity_io.values_read, null_count); } else { DCHECK_GE(values_to_read, 0); ReadValuesDense(values_to_read); } if (this->leaf_info_.def_level > 0) { // Optional, repeated, or some mix thereof this->ConsumeBufferedValues(levels_position_ - start_levels_position); } else { // Flat, non-repeated this->ConsumeBufferedValues(values_to_read); } // Total values, including null spaces, if any values_written_ += values_to_read + null_count; null_count_ += null_count; return records_read; } void DebugPrintState() override { const int16_t* def_levels = this->def_levels(); const int16_t* rep_levels = this->rep_levels(); const int64_t total_levels_read = levels_position_; const T* vals = reinterpret_cast(this->values()); std::cout << "def levels: "; for (int64_t i = 0; i < total_levels_read; ++i) { std::cout << def_levels[i] << " "; } std::cout << std::endl; std::cout << "rep levels: "; for (int64_t i = 0; i < total_levels_read; ++i) { std::cout << rep_levels[i] << " "; } std::cout << std::endl; std::cout << "values: "; for (int64_t i = 0; i < this->values_written(); ++i) { std::cout << vals[i] << " "; } std::cout << std::endl; } void ResetValues() { if (values_written_ > 0) { // Resize to 0, but do not shrink to fit if (uses_values_) { PARQUET_THROW_NOT_OK(values_->Resize(0, false)); } PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false)); values_written_ = 0; values_capacity_ = 0; null_count_ = 0; } } protected: template T* ValuesHead() { return reinterpret_cast(values_->mutable_data()) + values_written_; } LevelInfo leaf_info_; }; class FLBARecordReader : public TypedRecordReader, virtual public BinaryRecordReader { public: FLBARecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, ::arrow::MemoryPool* pool) : TypedRecordReader(descr, leaf_info, pool), builder_(nullptr) { DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY); int byte_width = descr_->type_length(); std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width); builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, this->pool_)); } ::arrow::ArrayVector GetBuilderChunks() override { std::shared_ptr<::arrow::Array> chunk; PARQUET_THROW_NOT_OK(builder_->Finish(&chunk)); return ::arrow::ArrayVector({chunk}); } void ReadValuesDense(int64_t values_to_read) override { auto values = ValuesHead(); int64_t num_decoded = this->current_decoder_->Decode(values, static_cast(values_to_read)); DCHECK_EQ(num_decoded, values_to_read); for (int64_t i = 0; i < num_decoded; i++) { PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); } ResetValues(); } void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { uint8_t* valid_bits = valid_bits_->mutable_data(); const int64_t valid_bits_offset = values_written_; auto values = ValuesHead(); int64_t num_decoded = this->current_decoder_->DecodeSpaced( values, static_cast(values_to_read), static_cast(null_count), valid_bits, valid_bits_offset); DCHECK_EQ(num_decoded, values_to_read); for (int64_t i = 0; i < num_decoded; i++) { if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) { PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); } else { PARQUET_THROW_NOT_OK(builder_->AppendNull()); } } ResetValues(); } private: std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder_; }; class ByteArrayChunkedRecordReader : public TypedRecordReader, virtual public BinaryRecordReader { public: ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, ::arrow::MemoryPool* pool) : TypedRecordReader(descr, leaf_info, pool) { DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool)); } ::arrow::ArrayVector GetBuilderChunks() override { ::arrow::ArrayVector result = accumulator_.chunks; if (result.size() == 0 || accumulator_.builder->length() > 0) { std::shared_ptr<::arrow::Array> last_chunk; PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk)); result.push_back(std::move(last_chunk)); } accumulator_.chunks = {}; return result; } void ReadValuesDense(int64_t values_to_read) override { int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( static_cast(values_to_read), &accumulator_); DCHECK_EQ(num_decoded, values_to_read); ResetValues(); } void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { int64_t num_decoded = this->current_decoder_->DecodeArrow( static_cast(values_to_read), static_cast(null_count), valid_bits_->mutable_data(), values_written_, &accumulator_); DCHECK_EQ(num_decoded, values_to_read - null_count); ResetValues(); } private: // Helper data structure for accumulating builder chunks typename EncodingTraits::Accumulator accumulator_; }; class ByteArrayDictionaryRecordReader : public TypedRecordReader, virtual public DictionaryRecordReader { public: ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, ::arrow::MemoryPool* pool) : TypedRecordReader(descr, leaf_info, pool), builder_(pool) { this->read_dictionary_ = true; } std::shared_ptr<::arrow::ChunkedArray> GetResult() override { FlushBuilder(); std::vector> result; std::swap(result, result_chunks_); return std::make_shared<::arrow::ChunkedArray>(std::move(result), builder_.type()); } void FlushBuilder() { if (builder_.length() > 0) { std::shared_ptr<::arrow::Array> chunk; PARQUET_THROW_NOT_OK(builder_.Finish(&chunk)); result_chunks_.emplace_back(std::move(chunk)); // Also clears the dictionary memo table builder_.Reset(); } } void MaybeWriteNewDictionary() { if (this->new_dictionary_) { /// If there is a new dictionary, we may need to flush the builder, then /// insert the new dictionary values FlushBuilder(); builder_.ResetFull(); auto decoder = dynamic_cast(this->current_decoder_); decoder->InsertDictionary(&builder_); this->new_dictionary_ = false; } } void ReadValuesDense(int64_t values_to_read) override { int64_t num_decoded = 0; if (current_encoding_ == Encoding::RLE_DICTIONARY) { MaybeWriteNewDictionary(); auto decoder = dynamic_cast(this->current_decoder_); num_decoded = decoder->DecodeIndices(static_cast(values_to_read), &builder_); } else { num_decoded = this->current_decoder_->DecodeArrowNonNull( static_cast(values_to_read), &builder_); /// Flush values since they have been copied into the builder ResetValues(); } DCHECK_EQ(num_decoded, values_to_read); } void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { int64_t num_decoded = 0; if (current_encoding_ == Encoding::RLE_DICTIONARY) { MaybeWriteNewDictionary(); auto decoder = dynamic_cast(this->current_decoder_); num_decoded = decoder->DecodeIndicesSpaced( static_cast(values_to_read), static_cast(null_count), valid_bits_->mutable_data(), values_written_, &builder_); } else { num_decoded = this->current_decoder_->DecodeArrow( static_cast(values_to_read), static_cast(null_count), valid_bits_->mutable_data(), values_written_, &builder_); /// Flush values since they have been copied into the builder ResetValues(); } DCHECK_EQ(num_decoded, values_to_read - null_count); } private: using BinaryDictDecoder = DictDecoder; ::arrow::BinaryDictionary32Builder builder_; std::vector> result_chunks_; }; // TODO(wesm): Implement these to some satisfaction template <> void TypedRecordReader::DebugPrintState() {} template <> void TypedRecordReader::DebugPrintState() {} template <> void TypedRecordReader::DebugPrintState() {} std::shared_ptr MakeByteArrayRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, ::arrow::MemoryPool* pool, bool read_dictionary) { if (read_dictionary) { return std::make_shared(descr, leaf_info, pool); } else { return std::make_shared(descr, leaf_info, pool); } } } // namespace std::shared_ptr RecordReader::Make(const ColumnDescriptor* descr, LevelInfo leaf_info, MemoryPool* pool, const bool read_dictionary) { switch (descr->physical_type()) { case Type::BOOLEAN: return std::make_shared>(descr, leaf_info, pool); case Type::INT32: return std::make_shared>(descr, leaf_info, pool); case Type::INT64: return std::make_shared>(descr, leaf_info, pool); case Type::INT96: return std::make_shared>(descr, leaf_info, pool); case Type::FLOAT: return std::make_shared>(descr, leaf_info, pool); case Type::DOUBLE: return std::make_shared>(descr, leaf_info, pool); case Type::BYTE_ARRAY: return MakeByteArrayRecordReader(descr, leaf_info, pool, read_dictionary); case Type::FIXED_LEN_BYTE_ARRAY: return std::make_shared(descr, leaf_info, pool); default: { // PARQUET-1481: This can occur if the file is corrupt std::stringstream ss; ss << "Invalid physical column type: " << static_cast(descr->physical_type()); throw ParquetException(ss.str()); } } // Unreachable code, but suppress compiler warning return nullptr; } } // namespace internal } // namespace parquet