// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "contrib/libs/apache/arrow_next/cpp/src/parquet/file_reader.h" #include #include #include #include #include #include #include #include #include "contrib/libs/apache/arrow_next/cpp/src/arrow/io/caching.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/io/file.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/io/memory.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/io/util_internal.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/bit_util.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/checked_cast.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/future.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/int_util_overflow.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/logging.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/ubsan.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/bloom_filter.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/bloom_filter_reader.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/column_reader.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/column_scanner.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/encryption/encryption_internal.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/encryption/internal_file_decryptor.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/exception.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/file_writer.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/metadata.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/page_index.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/platform.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/properties.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/schema.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/types.h" using arrow20::internal::AddWithOverflow; namespace parquet20 { namespace { bool IsColumnChunkFullyDictionaryEncoded(const ColumnChunkMetaData& col) { // Check the encoding_stats to see if all data pages are dictionary encoded. const std::vector& encoding_stats = col.encoding_stats(); if (encoding_stats.empty()) { // Some parquet files may have empty encoding_stats. In this case we are // not sure whether all data pages are dictionary encoded. return false; } // The 1st page should be the dictionary page. if (encoding_stats[0].page_type != PageType::DICTIONARY_PAGE || (encoding_stats[0].encoding != Encoding::PLAIN && encoding_stats[0].encoding != Encoding::PLAIN_DICTIONARY)) { return false; } // The following pages should be dictionary encoded data pages. for (size_t idx = 1; idx < encoding_stats.size(); ++idx) { if (!IsDictionaryIndexEncoding(encoding_stats[idx].encoding) || (encoding_stats[idx].page_type != PageType::DATA_PAGE && encoding_stats[idx].page_type != PageType::DATA_PAGE_V2)) { // Return false if any following page is not a dictionary encoded data // page. return false; } } return true; } } // namespace static constexpr uint32_t kFooterSize = 8; // For PARQUET-816 static constexpr int64_t kMaxDictHeaderSize = 100; // ---------------------------------------------------------------------- // RowGroupReader public API RowGroupReader::RowGroupReader(std::unique_ptr contents) : contents_(std::move(contents)) {} std::shared_ptr RowGroupReader::Column(int i) { if (i >= metadata()->num_columns()) { std::stringstream ss; ss << "Trying to read column index " << i << " but row group metadata has only " << metadata()->num_columns() << " columns"; throw ParquetException(ss.str()); } const ColumnDescriptor* descr = metadata()->schema()->Column(i); std::unique_ptr page_reader = contents_->GetColumnPageReader(i); return ColumnReader::Make( descr, std::move(page_reader), const_cast(contents_->properties())->memory_pool()); } std::shared_ptr RowGroupReader::RecordReader( int i, bool read_dictionary) { if (i >= metadata()->num_columns()) { std::stringstream ss; ss << "Trying to read column index " << i << " but row group metadata has only " << metadata()->num_columns() << " columns"; throw ParquetException(ss.str()); } const ColumnDescriptor* descr = metadata()->schema()->Column(i); std::unique_ptr page_reader = contents_->GetColumnPageReader(i); internal::LevelInfo level_info = internal::LevelInfo::ComputeLevelInfo(descr); auto reader = internal::RecordReader::Make( descr, level_info, contents_->properties()->memory_pool(), read_dictionary, contents_->properties()->read_dense_for_nullable()); reader->SetPageReader(std::move(page_reader)); return reader; } std::shared_ptr RowGroupReader::ColumnWithExposeEncoding( int i, ExposedEncoding encoding_to_expose) { std::shared_ptr reader = Column(i); if (encoding_to_expose == ExposedEncoding::DICTIONARY && IsColumnChunkFullyDictionaryEncoded(*metadata()->ColumnChunk(i))) { // Set exposed encoding. reader->SetExposedEncoding(encoding_to_expose); } return reader; } std::shared_ptr RowGroupReader::RecordReaderWithExposeEncoding( int i, ExposedEncoding encoding_to_expose) { return RecordReader( i, /*read_dictionary=*/encoding_to_expose == ExposedEncoding::DICTIONARY && IsColumnChunkFullyDictionaryEncoded(*metadata()->ColumnChunk(i))); } std::unique_ptr RowGroupReader::GetColumnPageReader(int i) { if (i >= metadata()->num_columns()) { std::stringstream ss; ss << "Trying to read column index " << i << " but row group metadata has only " << metadata()->num_columns() << " columns"; throw ParquetException(ss.str()); } return contents_->GetColumnPageReader(i); } // Returns the rowgroup metadata const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->metadata(); } /// Compute the section of the file that should be read for the given /// row group and column chunk. ::arrow20::io::ReadRange ComputeColumnChunkRange(FileMetaData* file_metadata, int64_t source_size, int row_group_index, int column_index) { std::unique_ptr row_group_metadata = file_metadata->RowGroup(row_group_index); std::unique_ptr column_metadata = row_group_metadata->ColumnChunk(column_index); int64_t col_start = column_metadata->data_page_offset(); if (column_metadata->has_dictionary_page() && column_metadata->dictionary_page_offset() > 0 && col_start > column_metadata->dictionary_page_offset()) { col_start = column_metadata->dictionary_page_offset(); } int64_t col_length = column_metadata->total_compressed_size(); int64_t col_end; if (col_start < 0 || col_length < 0) { throw ParquetException("Invalid column metadata (corrupt file?)"); } if (AddWithOverflow(col_start, col_length, &col_end) || col_end > source_size) { throw ParquetException("Invalid column metadata (corrupt file?)"); } // PARQUET-816 workaround for old files created by older parquet-mr const ApplicationVersion& version = file_metadata->writer_version(); if (version.VersionLt(ApplicationVersion::PARQUET_816_FIXED_VERSION())) { // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the // dictionary page header size in total_compressed_size and total_uncompressed_size // (see IMPALA-694). We add padding to compensate. int64_t bytes_remaining = source_size - col_end; int64_t padding = std::min(kMaxDictHeaderSize, bytes_remaining); col_length += padding; } return {col_start, col_length}; } // RowGroupReader::Contents implementation for the Parquet file specification class SerializedRowGroup : public RowGroupReader::Contents { public: SerializedRowGroup(std::shared_ptr source, std::shared_ptr<::arrow20::io::internal::ReadRangeCache> cached_source, int64_t source_size, FileMetaData* file_metadata, int row_group_number, ReaderProperties props, std::shared_ptr prebuffered_column_chunks_bitmap) : source_(std::move(source)), cached_source_(std::move(cached_source)), source_size_(source_size), file_metadata_(file_metadata), properties_(std::move(props)), row_group_ordinal_(row_group_number), prebuffered_column_chunks_bitmap_(std::move(prebuffered_column_chunks_bitmap)) { row_group_metadata_ = file_metadata->RowGroup(row_group_number); } const RowGroupMetaData* metadata() const override { return row_group_metadata_.get(); } const ReaderProperties* properties() const override { return &properties_; } std::unique_ptr GetColumnPageReader(int i) override { // Read column chunk from the file auto col = row_group_metadata_->ColumnChunk(i); ::arrow20::io::ReadRange col_range = ComputeColumnChunkRange(file_metadata_, source_size_, row_group_ordinal_, i); std::shared_ptr stream; if (cached_source_ && prebuffered_column_chunks_bitmap_ != nullptr && ::arrow20::bit_util::GetBit(prebuffered_column_chunks_bitmap_->data(), i)) { // PARQUET-1698: if read coalescing is enabled, read from pre-buffered // segments. PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range)); stream = std::make_shared<::arrow20::io::BufferReader>(buffer); } else { stream = properties_.GetStream(source_, col_range.offset, col_range.length); } std::unique_ptr crypto_metadata = col->crypto_metadata(); // Prior to Arrow 3.0.0, is_compressed was always set to false in column headers, // even if compression was used. See ARROW-17100. bool always_compressed = file_metadata_->writer_version().VersionLt( ApplicationVersion::PARQUET_CPP_10353_FIXED_VERSION()); // Column is encrypted only if crypto_metadata exists. if (!crypto_metadata) { return PageReader::Open(stream, col->num_values(), col->compression(), properties_, always_compressed); } // The column is encrypted auto* file_decryptor = file_metadata_->file_decryptor().get(); auto meta_decryptor_factory = InternalFileDecryptor::GetColumnMetaDecryptorFactory( file_decryptor, crypto_metadata.get()); auto data_decryptor_factory = InternalFileDecryptor::GetColumnDataDecryptorFactory( file_decryptor, crypto_metadata.get()); constexpr auto kEncryptedOrdinalLimit = 32767; if (ARROW_PREDICT_FALSE(row_group_ordinal_ > kEncryptedOrdinalLimit)) { throw ParquetException("Encrypted files cannot contain more than 32767 row groups"); } if (ARROW_PREDICT_FALSE(i > kEncryptedOrdinalLimit)) { throw ParquetException("Encrypted files cannot contain more than 32767 columns"); } CryptoContext ctx{col->has_dictionary_page(), static_cast(row_group_ordinal_), static_cast(i), std::move(meta_decryptor_factory), std::move(data_decryptor_factory)}; return PageReader::Open(stream, col->num_values(), col->compression(), properties_, always_compressed, &ctx); } private: std::shared_ptr source_; // Will be nullptr if PreBuffer() is not called. std::shared_ptr<::arrow20::io::internal::ReadRangeCache> cached_source_; int64_t source_size_; FileMetaData* file_metadata_; std::unique_ptr row_group_metadata_; ReaderProperties properties_; int row_group_ordinal_; const std::shared_ptr prebuffered_column_chunks_bitmap_; }; // ---------------------------------------------------------------------- // SerializedFile: An implementation of ParquetFileReader::Contents that deals // with the Parquet file structure, Thrift deserialization, and other internal // matters // This class takes ownership of the provided data source class SerializedFile : public ParquetFileReader::Contents { public: SerializedFile(std::shared_ptr source, const ReaderProperties& props = default_reader_properties()) : source_(std::move(source)), properties_(props) { PARQUET_ASSIGN_OR_THROW(source_size_, source_->GetSize()); } ~SerializedFile() override { try { Close(); } catch (...) { } } void Close() override {} std::shared_ptr GetRowGroup(int i) override { std::shared_ptr prebuffered_column_chunks_bitmap; // Avoid updating the bitmap as this function can be called concurrently. The bitmap // can only be updated within Prebuffer(). auto prebuffered_column_chunks_iter = prebuffered_column_chunks_.find(i); if (prebuffered_column_chunks_iter != prebuffered_column_chunks_.end()) { prebuffered_column_chunks_bitmap = prebuffered_column_chunks_iter->second; } std::unique_ptr contents = std::make_unique( source_, cached_source_, source_size_, file_metadata_.get(), i, properties_, std::move(prebuffered_column_chunks_bitmap)); return std::make_shared(std::move(contents)); } std::shared_ptr metadata() const override { return file_metadata_; } std::shared_ptr GetPageIndexReader() override { if (!file_metadata_) { // Usually this won't happen if user calls one of the static Open() functions // to create a ParquetFileReader instance. But if user calls the constructor // directly and calls GetPageIndexReader() before Open() then this could happen. throw ParquetException( "Cannot call GetPageIndexReader() due to missing file metadata. Did you " "forget to call ParquetFileReader::Open() first?"); } if (!page_index_reader_) { page_index_reader_ = PageIndexReader::Make(source_.get(), file_metadata_, properties_, file_metadata_->file_decryptor().get()); } return page_index_reader_; } BloomFilterReader& GetBloomFilterReader() override { if (!file_metadata_) { // Usually this won't happen if user calls one of the static Open() functions // to create a ParquetFileReader instance. But if user calls the constructor // directly and calls GetBloomFilterReader() before Open() then this could happen. throw ParquetException( "Cannot call GetBloomFilterReader() due to missing file metadata. Did you " "forget to call ParquetFileReader::Open() first?"); } if (!bloom_filter_reader_) { bloom_filter_reader_ = BloomFilterReader::Make(source_, file_metadata_, properties_, file_metadata_->file_decryptor()); if (bloom_filter_reader_ == nullptr) { throw ParquetException("Cannot create BloomFilterReader"); } } return *bloom_filter_reader_; } void set_metadata(std::shared_ptr metadata) { file_metadata_ = std::move(metadata); } void PreBuffer(const std::vector& row_groups, const std::vector& column_indices, const ::arrow20::io::IOContext& ctx, const ::arrow20::io::CacheOptions& options) { cached_source_ = std::make_shared<::arrow20::io::internal::ReadRangeCache>(source_, ctx, options); std::vector<::arrow20::io::ReadRange> ranges; prebuffered_column_chunks_.clear(); int num_cols = file_metadata_->num_columns(); // a bitmap for buffered columns. std::shared_ptr buffer_columns; if (!row_groups.empty()) { PARQUET_THROW_NOT_OK(AllocateEmptyBitmap(num_cols, properties_.memory_pool()) .Value(&buffer_columns)); for (int col : column_indices) { ::arrow20::bit_util::SetBit(buffer_columns->mutable_data(), col); } } for (int row : row_groups) { prebuffered_column_chunks_[row] = buffer_columns; for (int col : column_indices) { ranges.push_back( ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col)); } } PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges)); } ::arrow20::Result> GetReadRanges( const std::vector& row_groups, const std::vector& column_indices, int64_t hole_size_limit, int64_t range_size_limit) { std::vector<::arrow20::io::ReadRange> ranges; for (int row_group : row_groups) { for (int col : column_indices) { ranges.push_back( ComputeColumnChunkRange(file_metadata_.get(), source_size_, row_group, col)); } } return ::arrow20::io::internal::CoalesceReadRanges(std::move(ranges), hole_size_limit, range_size_limit); } ::arrow20::Future<> WhenBuffered(const std::vector& row_groups, const std::vector& column_indices) const { if (!cached_source_) { return ::arrow20::Status::Invalid("Must call PreBuffer before WhenBuffered"); } std::vector<::arrow20::io::ReadRange> ranges; for (int row : row_groups) { for (int col : column_indices) { ranges.push_back( ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col)); } } return cached_source_->WaitFor(ranges); } // Metadata/footer parsing. Divided up to separate sync/async paths, and to use // exceptions for error handling (with the async path converting to Future/Status). void ParseMetaData() { int64_t footer_read_size = GetFooterReadSize(); PARQUET_ASSIGN_OR_THROW( auto footer_buffer, source_->ReadAt(source_size_ - footer_read_size, footer_read_size)); uint32_t metadata_len = ParseFooterLength(footer_buffer, footer_read_size); int64_t metadata_start = source_size_ - kFooterSize - metadata_len; std::shared_ptr<::arrow20::Buffer> metadata_buffer; if (footer_read_size >= (metadata_len + kFooterSize)) { metadata_buffer = SliceBuffer( footer_buffer, footer_read_size - metadata_len - kFooterSize, metadata_len); } else { PARQUET_ASSIGN_OR_THROW(metadata_buffer, source_->ReadAt(metadata_start, metadata_len)); } // Parse the footer depending on encryption type const bool is_encrypted_footer = memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) == 0; std::shared_ptr file_decryptor; if (is_encrypted_footer) { // Encrypted file with Encrypted footer. const std::pair read_size = ParseMetaDataOfEncryptedFileWithEncryptedFooter(metadata_buffer, metadata_len, &file_decryptor); // Read the actual footer metadata_start = read_size.first; metadata_len = read_size.second; PARQUET_ASSIGN_OR_THROW(metadata_buffer, source_->ReadAt(metadata_start, metadata_len)); // Fall through } const uint32_t read_metadata_len = ParseUnencryptedFileMetadata( metadata_buffer, metadata_len, std::move(file_decryptor)); auto file_decryption_properties = properties_.file_decryption_properties(); if (is_encrypted_footer) { // Nothing else to do here. return; } else if (!file_metadata_->is_encryption_algorithm_set()) { // Non encrypted file. if (file_decryption_properties != nullptr) { if (!file_decryption_properties->plaintext_files_allowed()) { throw ParquetException("Applying decryption properties on plaintext file"); } } } else { // Encrypted file with plaintext footer mode. ParseMetaDataOfEncryptedFileWithPlaintextFooter( file_decryption_properties, metadata_buffer, metadata_len, read_metadata_len); } } // Validate the source size and get the initial read size. int64_t GetFooterReadSize() { if (source_size_ == 0) { throw ParquetInvalidOrCorruptedFileException("Parquet file size is 0 bytes"); } else if (source_size_ < kFooterSize) { throw ParquetInvalidOrCorruptedFileException( "Parquet file size is ", source_size_, " bytes, smaller than the minimum file footer (", kFooterSize, " bytes)"); } return std::min(static_cast(source_size_), properties_.footer_read_size()); } // Validate the magic bytes and get the length of the full footer. uint32_t ParseFooterLength(const std::shared_ptr<::arrow20::Buffer>& footer_buffer, const int64_t footer_read_size) { // Check if all bytes are read. Check if last 4 bytes read have the magic bits if (footer_buffer->size() != footer_read_size || (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetMagic, 4) != 0 && memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) != 0)) { throw ParquetInvalidOrCorruptedFileException( "Parquet magic bytes not found in footer. Either the file is corrupted or this " "is not a parquet file."); } // Both encrypted/unencrypted footers have the same footer length check. uint32_t metadata_len = ::arrow20::bit_util::FromLittleEndian(::arrow20::util::SafeLoadAs( reinterpret_cast(footer_buffer->data()) + footer_read_size - kFooterSize)); if (metadata_len > source_size_ - kFooterSize) { throw ParquetInvalidOrCorruptedFileException( "Parquet file size is ", source_size_, " bytes, smaller than the size reported by footer's (", metadata_len, "bytes)"); } return metadata_len; } // Does not throw. ::arrow20::Future<> ParseMetaDataAsync() { int64_t footer_read_size; BEGIN_PARQUET_CATCH_EXCEPTIONS footer_read_size = GetFooterReadSize(); END_PARQUET_CATCH_EXCEPTIONS // Assumes this is kept alive externally return source_->ReadAsync(source_size_ - footer_read_size, footer_read_size) .Then([this, footer_read_size](const std::shared_ptr<::arrow20::Buffer>& footer_buffer) -> ::arrow20::Future<> { uint32_t metadata_len; BEGIN_PARQUET_CATCH_EXCEPTIONS metadata_len = ParseFooterLength(footer_buffer, footer_read_size); END_PARQUET_CATCH_EXCEPTIONS int64_t metadata_start = source_size_ - kFooterSize - metadata_len; std::shared_ptr<::arrow20::Buffer> metadata_buffer; if (footer_read_size >= (metadata_len + kFooterSize)) { metadata_buffer = SliceBuffer(footer_buffer, footer_read_size - metadata_len - kFooterSize, metadata_len); return ParseMaybeEncryptedMetaDataAsync(footer_buffer, std::move(metadata_buffer), footer_read_size, metadata_len); } return source_->ReadAsync(metadata_start, metadata_len) .Then([this, footer_buffer, footer_read_size, metadata_len]( const std::shared_ptr<::arrow20::Buffer>& metadata_buffer) { return ParseMaybeEncryptedMetaDataAsync(footer_buffer, metadata_buffer, footer_read_size, metadata_len); }); }); } // Continuation ::arrow20::Future<> ParseMaybeEncryptedMetaDataAsync( std::shared_ptr<::arrow20::Buffer> footer_buffer, std::shared_ptr<::arrow20::Buffer> metadata_buffer, int64_t footer_read_size, uint32_t metadata_len) { // Parse the footer depending on encryption type const bool is_encrypted_footer = memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) == 0; std::shared_ptr file_decryptor; if (is_encrypted_footer) { // Encrypted file with Encrypted footer. std::pair read_size; BEGIN_PARQUET_CATCH_EXCEPTIONS read_size = ParseMetaDataOfEncryptedFileWithEncryptedFooter( metadata_buffer, metadata_len, &file_decryptor); END_PARQUET_CATCH_EXCEPTIONS // Read the actual footer int64_t metadata_start = read_size.first; metadata_len = read_size.second; return source_->ReadAsync(metadata_start, metadata_len) .Then([this, metadata_len, is_encrypted_footer, file_decryptor = std::move(file_decryptor)]( const std::shared_ptr<::arrow20::Buffer>& metadata_buffer) { // Continue and read the file footer return ParseMetaDataFinal(metadata_buffer, metadata_len, is_encrypted_footer, file_decryptor); }); } return ParseMetaDataFinal(std::move(metadata_buffer), metadata_len, is_encrypted_footer, std::move(file_decryptor)); } // Continuation ::arrow20::Status ParseMetaDataFinal( std::shared_ptr<::arrow20::Buffer> metadata_buffer, uint32_t metadata_len, const bool is_encrypted_footer, std::shared_ptr file_decryptor) { BEGIN_PARQUET_CATCH_EXCEPTIONS const uint32_t read_metadata_len = ParseUnencryptedFileMetadata( metadata_buffer, metadata_len, std::move(file_decryptor)); auto file_decryption_properties = properties_.file_decryption_properties(); if (is_encrypted_footer) { // Nothing else to do here. return ::arrow20::Status::OK(); } else if (!file_metadata_->is_encryption_algorithm_set()) { // Non encrypted file. if (file_decryption_properties != nullptr) { if (!file_decryption_properties->plaintext_files_allowed()) { throw ParquetException("Applying decryption properties on plaintext file"); } } } else { // Encrypted file with plaintext footer mode. ParseMetaDataOfEncryptedFileWithPlaintextFooter( file_decryption_properties, metadata_buffer, metadata_len, read_metadata_len); } END_PARQUET_CATCH_EXCEPTIONS return ::arrow20::Status::OK(); } private: std::shared_ptr source_; std::shared_ptr<::arrow20::io::internal::ReadRangeCache> cached_source_; int64_t source_size_; std::shared_ptr file_metadata_; ReaderProperties properties_; std::shared_ptr page_index_reader_; std::unique_ptr bloom_filter_reader_; // Maps row group ordinal and prebuffer status of its column chunks in the form of a // bitmap buffer. std::unordered_map> prebuffered_column_chunks_; // \return The true length of the metadata in bytes uint32_t ParseUnencryptedFileMetadata( const std::shared_ptr& footer_buffer, const uint32_t metadata_len, std::shared_ptr file_decryptor); std::string HandleAadPrefix( const std::shared_ptr& file_decryption_properties, const EncryptionAlgorithm& algo); void ParseMetaDataOfEncryptedFileWithPlaintextFooter( const std::shared_ptr& file_decryption_properties, const std::shared_ptr& metadata_buffer, uint32_t metadata_len, uint32_t read_metadata_len); // \return The position and size of the actual footer std::pair ParseMetaDataOfEncryptedFileWithEncryptedFooter( const std::shared_ptr& crypto_metadata_buffer, uint32_t footer_len, std::shared_ptr* file_decryptor); }; uint32_t SerializedFile::ParseUnencryptedFileMetadata( const std::shared_ptr& metadata_buffer, const uint32_t metadata_len, std::shared_ptr file_decryptor) { if (metadata_buffer->size() != metadata_len) { throw ParquetException("Failed reading metadata buffer (requested " + std::to_string(metadata_len) + " bytes but got " + std::to_string(metadata_buffer->size()) + " bytes)"); } uint32_t read_metadata_len = metadata_len; // The encrypted read path falls through to here, so pass in the decryptor file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &read_metadata_len, properties_, std::move(file_decryptor)); return read_metadata_len; } std::pair SerializedFile::ParseMetaDataOfEncryptedFileWithEncryptedFooter( const std::shared_ptr<::arrow20::Buffer>& crypto_metadata_buffer, // both metadata & crypto metadata length const uint32_t footer_len, std::shared_ptr* file_decryptor) { // encryption with encrypted footer // Check if the footer_buffer contains the entire metadata if (crypto_metadata_buffer->size() != footer_len) { throw ParquetException("Failed reading encrypted metadata buffer (requested " + std::to_string(footer_len) + " bytes but got " + std::to_string(crypto_metadata_buffer->size()) + " bytes)"); } auto file_decryption_properties = properties_.file_decryption_properties(); if (file_decryption_properties == nullptr) { throw ParquetException( "Could not read encrypted metadata, no decryption found in reader's properties"); } uint32_t crypto_metadata_len = footer_len; std::shared_ptr file_crypto_metadata = FileCryptoMetaData::Make(crypto_metadata_buffer->data(), &crypto_metadata_len); // Handle AAD prefix EncryptionAlgorithm algo = file_crypto_metadata->encryption_algorithm(); std::string file_aad = HandleAadPrefix(file_decryption_properties, algo); *file_decryptor = std::make_shared( file_decryption_properties, file_aad, algo.algorithm, file_crypto_metadata->key_metadata(), properties_.memory_pool()); int64_t metadata_offset = source_size_ - kFooterSize - footer_len + crypto_metadata_len; uint32_t metadata_len = footer_len - crypto_metadata_len; return std::make_pair(metadata_offset, metadata_len); } void SerializedFile::ParseMetaDataOfEncryptedFileWithPlaintextFooter( const std::shared_ptr& file_decryption_properties, const std::shared_ptr& metadata_buffer, uint32_t metadata_len, uint32_t read_metadata_len) { // Providing decryption properties in plaintext footer mode is not mandatory, for // example when reading by legacy reader. if (file_decryption_properties != nullptr) { EncryptionAlgorithm algo = file_metadata_->encryption_algorithm(); // Handle AAD prefix std::string file_aad = HandleAadPrefix(file_decryption_properties, algo); auto file_decryptor = std::make_shared( file_decryption_properties, file_aad, algo.algorithm, file_metadata_->footer_signing_key_metadata(), properties_.memory_pool()); // set the InternalFileDecryptor in the metadata as well, as it's used // for signature verification and for ColumnChunkMetaData creation. file_metadata_->set_file_decryptor(std::move(file_decryptor)); if (file_decryption_properties->check_plaintext_footer_integrity()) { if (metadata_len - read_metadata_len != (parquet20::encryption::kGcmTagLength + parquet20::encryption::kNonceLength)) { throw ParquetInvalidOrCorruptedFileException( "Failed reading metadata for encryption signature (requested ", parquet20::encryption::kGcmTagLength + parquet20::encryption::kNonceLength, " bytes but have ", metadata_len - read_metadata_len, " bytes)"); } if (!file_metadata_->VerifySignature(metadata_buffer->data() + read_metadata_len)) { throw ParquetInvalidOrCorruptedFileException( "Parquet crypto signature verification failed"); } } } } std::string SerializedFile::HandleAadPrefix( const std::shared_ptr& file_decryption_properties, const EncryptionAlgorithm& algo) { std::string aad_prefix_in_properties = file_decryption_properties->aad_prefix(); std::string aad_prefix = aad_prefix_in_properties; bool file_has_aad_prefix = algo.aad.aad_prefix.empty() ? false : true; std::string aad_prefix_in_file = algo.aad.aad_prefix; if (algo.aad.supply_aad_prefix && aad_prefix_in_properties.empty()) { throw ParquetException( "AAD prefix used for file encryption, " "but not stored in file and not supplied " "in decryption properties"); } if (file_has_aad_prefix) { if (!aad_prefix_in_properties.empty()) { if (aad_prefix_in_properties.compare(aad_prefix_in_file) != 0) { throw ParquetException( "AAD Prefix in file and in properties " "is not the same"); } } aad_prefix = aad_prefix_in_file; std::shared_ptr aad_prefix_verifier = file_decryption_properties->aad_prefix_verifier(); if (aad_prefix_verifier != nullptr) aad_prefix_verifier->Verify(aad_prefix); } else { if (!algo.aad.supply_aad_prefix && !aad_prefix_in_properties.empty()) { throw ParquetException( "AAD Prefix set in decryption properties, but was not used " "for file encryption"); } std::shared_ptr aad_prefix_verifier = file_decryption_properties->aad_prefix_verifier(); if (aad_prefix_verifier != nullptr) { throw ParquetException( "AAD Prefix Verifier is set, but AAD Prefix not found in file"); } } return aad_prefix + algo.aad.aad_file_unique; } // ---------------------------------------------------------------------- // ParquetFileReader public API ParquetFileReader::ParquetFileReader() {} ParquetFileReader::~ParquetFileReader() { try { Close(); } catch (...) { } } // Open the file. If no metadata is passed, it is parsed from the footer of // the file std::unique_ptr ParquetFileReader::Contents::Open( std::shared_ptr source, const ReaderProperties& props, std::shared_ptr metadata) { std::unique_ptr result( new SerializedFile(std::move(source), props)); // Access private methods here, but otherwise unavailable SerializedFile* file = static_cast(result.get()); if (metadata == nullptr) { // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor file->ParseMetaData(); } else { file->set_metadata(std::move(metadata)); } return result; } ::arrow20::Future> ParquetFileReader::Contents::OpenAsync(std::shared_ptr source, const ReaderProperties& props, std::shared_ptr metadata) { BEGIN_PARQUET_CATCH_EXCEPTIONS std::unique_ptr result( new SerializedFile(std::move(source), props)); SerializedFile* file = static_cast(result.get()); if (metadata == nullptr) { // TODO(ARROW-12259): workaround since we have Future<(move-only type)> struct { ::arrow20::Result> operator()() { return std::move(result); } std::unique_ptr result; } Continuation; Continuation.result = std::move(result); return file->ParseMetaDataAsync().Then(std::move(Continuation)); } else { file->set_metadata(std::move(metadata)); return ::arrow20::Future>::MakeFinished( std::move(result)); } END_PARQUET_CATCH_EXCEPTIONS } std::unique_ptr ParquetFileReader::Open( std::shared_ptr<::arrow20::io::RandomAccessFile> source, const ReaderProperties& props, std::shared_ptr metadata) { auto contents = SerializedFile::Open(std::move(source), props, std::move(metadata)); std::unique_ptr result = std::make_unique(); result->Open(std::move(contents)); return result; } std::unique_ptr ParquetFileReader::OpenFile( const std::string& path, bool memory_map, const ReaderProperties& props, std::shared_ptr metadata) { std::shared_ptr<::arrow20::io::RandomAccessFile> source; if (memory_map) { PARQUET_ASSIGN_OR_THROW( source, ::arrow20::io::MemoryMappedFile::Open(path, ::arrow20::io::FileMode::READ)); } else { PARQUET_ASSIGN_OR_THROW(source, ::arrow20::io::ReadableFile::Open(path, props.memory_pool())); } return Open(std::move(source), props, std::move(metadata)); } ::arrow20::Future> ParquetFileReader::OpenAsync( std::shared_ptr<::arrow20::io::RandomAccessFile> source, const ReaderProperties& props, std::shared_ptr metadata) { BEGIN_PARQUET_CATCH_EXCEPTIONS auto fut = SerializedFile::OpenAsync(std::move(source), props, std::move(metadata)); // TODO(ARROW-12259): workaround since we have Future<(move-only type)> auto completed = ::arrow20::Future>::Make(); fut.AddCallback([fut, completed]( const ::arrow20::Result>& contents) mutable { if (!contents.ok()) { completed.MarkFinished(contents.status()); return; } std::unique_ptr result = std::make_unique(); result->Open(fut.MoveResult().MoveValueUnsafe()); completed.MarkFinished(std::move(result)); }); return completed; END_PARQUET_CATCH_EXCEPTIONS } void ParquetFileReader::Open(std::unique_ptr contents) { contents_ = std::move(contents); } void ParquetFileReader::Close() { if (contents_) { contents_->Close(); } } std::shared_ptr ParquetFileReader::metadata() const { return contents_->metadata(); } std::shared_ptr ParquetFileReader::GetPageIndexReader() { return contents_->GetPageIndexReader(); } BloomFilterReader& ParquetFileReader::GetBloomFilterReader() { return contents_->GetBloomFilterReader(); } std::shared_ptr ParquetFileReader::RowGroup(int i) { if (i >= metadata()->num_row_groups()) { std::stringstream ss; ss << "Trying to read row group " << i << " but file only has " << metadata()->num_row_groups() << " row groups"; throw ParquetException(ss.str()); } return contents_->GetRowGroup(i); } void ParquetFileReader::PreBuffer(const std::vector& row_groups, const std::vector& column_indices, const ::arrow20::io::IOContext& ctx, const ::arrow20::io::CacheOptions& options) { // Access private methods here SerializedFile* file = ::arrow20::internal::checked_cast(contents_.get()); file->PreBuffer(row_groups, column_indices, ctx, options); } ::arrow20::Result> ParquetFileReader::GetReadRanges( const std::vector& row_groups, const std::vector& column_indices, int64_t hole_size_limit, int64_t range_size_limit) { // Access private methods here SerializedFile* file = ::arrow20::internal::checked_cast(contents_.get()); return file->GetReadRanges(row_groups, column_indices, hole_size_limit, range_size_limit); } ::arrow20::Future<> ParquetFileReader::WhenBuffered( const std::vector& row_groups, const std::vector& column_indices) const { // Access private methods here SerializedFile* file = ::arrow20::internal::checked_cast(contents_.get()); return file->WhenBuffered(row_groups, column_indices); } // ---------------------------------------------------------------------- // File metadata helpers std::shared_ptr ReadMetaData( const std::shared_ptr<::arrow20::io::RandomAccessFile>& source) { return ParquetFileReader::Open(source)->metadata(); } // ---------------------------------------------------------------------- // File scanner for performance testing int64_t ScanFileContents(std::vector columns, const int32_t column_batch_size, ParquetFileReader* reader) { std::vector rep_levels(column_batch_size); std::vector def_levels(column_batch_size); int num_columns = static_cast(columns.size()); // columns are not specified explicitly. Add all columns if (columns.size() == 0) { num_columns = reader->metadata()->num_columns(); columns.resize(num_columns); for (int i = 0; i < num_columns; i++) { columns[i] = i; } } if (num_columns == 0) { // If we still have no columns(none in file), return early. The remainder of function // expects there to be at least one column. return 0; } std::vector total_rows(num_columns, 0); for (int r = 0; r < reader->metadata()->num_row_groups(); ++r) { auto group_reader = reader->RowGroup(r); int col = 0; for (auto i : columns) { std::shared_ptr col_reader = group_reader->Column(i); size_t value_byte_size = GetTypeByteSize(col_reader->descr()->physical_type()); std::vector values(column_batch_size * value_byte_size); int64_t values_read = 0; while (col_reader->HasNext()) { int64_t levels_read = ScanAllValues(column_batch_size, def_levels.data(), rep_levels.data(), values.data(), &values_read, col_reader.get()); if (col_reader->descr()->max_repetition_level() > 0) { for (size_t i = 0; i < static_cast(levels_read); i++) { if (rep_levels[i] == 0) { total_rows[col]++; } } } else { total_rows[col] += levels_read; } } col++; } } for (int i = 1; i < num_columns; ++i) { if (total_rows[0] != total_rows[i]) { throw ParquetException("Parquet error: Total rows among columns do not match"); } } return total_rows[0]; } } // namespace parquet20