// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "contrib/libs/apache/arrow_next/cpp/src/parquet/metadata.h" #include #include #include #include #include #include #include #include #include #include #include "contrib/libs/apache/arrow_next/cpp/src/arrow/io/memory.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/key_value_metadata.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/logging.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/pcg_random.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/encryption/encryption_internal.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/encryption/internal_file_decryptor.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/exception.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/schema.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/schema_internal.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/size_statistics.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/thrift_internal.h" namespace parquet20 { const ApplicationVersion& ApplicationVersion::PARQUET_251_FIXED_VERSION() { static ApplicationVersion version("parquet-mr", 1, 8, 0); return version; } const ApplicationVersion& ApplicationVersion::PARQUET_816_FIXED_VERSION() { static ApplicationVersion version("parquet-mr", 1, 2, 9); return version; } const ApplicationVersion& ApplicationVersion::PARQUET_CPP_FIXED_STATS_VERSION() { static ApplicationVersion version("parquet-cpp", 1, 3, 0); return version; } const ApplicationVersion& ApplicationVersion::PARQUET_MR_FIXED_STATS_VERSION() { static ApplicationVersion version("parquet-mr", 1, 10, 0); return version; } const ApplicationVersion& ApplicationVersion::PARQUET_CPP_10353_FIXED_VERSION() { // parquet-cpp versions released prior to Arrow 3.0 would write DataPageV2 pages // with is_compressed==0 but still write compressed data. (See: ARROW-10353). // Parquet 1.5.1 had this problem, and after that we switched to the // application name "parquet-cpp-arrow", so this version is fake. static ApplicationVersion version("parquet-cpp", 2, 0, 0); return version; } std::string ParquetVersionToString(ParquetVersion::type ver) { switch (ver) { case ParquetVersion::PARQUET_1_0: return "1.0"; ARROW_SUPPRESS_DEPRECATION_WARNING case ParquetVersion::PARQUET_2_4: return "2.4"; case ParquetVersion::PARQUET_2_6: return "2.6"; } // This should be unreachable return "UNKNOWN"; } template static std::shared_ptr MakeTypedColumnStats( const format::ColumnMetaData& metadata, const ColumnDescriptor* descr) { // If ColumnOrder is defined, return max_value and min_value if (descr->column_order().get_order() == ColumnOrder::TYPE_DEFINED_ORDER) { return MakeStatistics( descr, metadata.statistics.min_value, metadata.statistics.max_value, metadata.num_values - metadata.statistics.null_count, metadata.statistics.null_count, metadata.statistics.distinct_count, metadata.statistics.__isset.max_value && metadata.statistics.__isset.min_value, metadata.statistics.__isset.null_count, metadata.statistics.__isset.distinct_count); } // Default behavior return MakeStatistics( descr, metadata.statistics.min, metadata.statistics.max, metadata.num_values - metadata.statistics.null_count, metadata.statistics.null_count, metadata.statistics.distinct_count, metadata.statistics.__isset.max && metadata.statistics.__isset.min, metadata.statistics.__isset.null_count, metadata.statistics.__isset.distinct_count); } std::shared_ptr MakeColumnStats(const format::ColumnMetaData& meta_data, const ColumnDescriptor* descr) { switch (static_cast(meta_data.type)) { case Type::BOOLEAN: return MakeTypedColumnStats(meta_data, descr); case Type::INT32: return MakeTypedColumnStats(meta_data, descr); case Type::INT64: return MakeTypedColumnStats(meta_data, descr); case Type::INT96: return MakeTypedColumnStats(meta_data, descr); case Type::DOUBLE: return MakeTypedColumnStats(meta_data, descr); case Type::FLOAT: return MakeTypedColumnStats(meta_data, descr); case Type::BYTE_ARRAY: return MakeTypedColumnStats(meta_data, descr); case Type::FIXED_LEN_BYTE_ARRAY: return MakeTypedColumnStats(meta_data, descr); case Type::UNDEFINED: break; } throw ParquetException("Can't decode page statistics for selected column type"); } // Get KeyValueMetadata from parquet Thrift RowGroup or ColumnChunk metadata. // // Returns nullptr if the metadata is not set. template std::shared_ptr FromThriftKeyValueMetadata(const Metadata& source) { std::shared_ptr metadata = nullptr; if (source.__isset.key_value_metadata) { std::vector keys; std::vector values; keys.reserve(source.key_value_metadata.size()); values.reserve(source.key_value_metadata.size()); for (const auto& it : source.key_value_metadata) { keys.push_back(it.key); values.push_back(it.value); } metadata = std::make_shared(std::move(keys), std::move(values)); } return metadata; } template void ToThriftKeyValueMetadata(const KeyValueMetadata& source, Metadata* metadata) { std::vector key_value_metadata; key_value_metadata.reserve(static_cast(source.size())); for (int64_t i = 0; i < source.size(); ++i) { format::KeyValue kv_pair; kv_pair.__set_key(source.key(i)); kv_pair.__set_value(source.value(i)); key_value_metadata.emplace_back(std::move(kv_pair)); } metadata->__set_key_value_metadata(std::move(key_value_metadata)); } // MetaData Accessor // ColumnCryptoMetaData class ColumnCryptoMetaData::ColumnCryptoMetaDataImpl { public: explicit ColumnCryptoMetaDataImpl(const format::ColumnCryptoMetaData* crypto_metadata) : crypto_metadata_(crypto_metadata) {} bool encrypted_with_footer_key() const { return crypto_metadata_->__isset.ENCRYPTION_WITH_FOOTER_KEY; } bool encrypted_with_column_key() const { return crypto_metadata_->__isset.ENCRYPTION_WITH_COLUMN_KEY; } std::shared_ptr path_in_schema() const { return std::make_shared( crypto_metadata_->ENCRYPTION_WITH_COLUMN_KEY.path_in_schema); } const std::string& key_metadata() const { return crypto_metadata_->ENCRYPTION_WITH_COLUMN_KEY.key_metadata; } private: const format::ColumnCryptoMetaData* crypto_metadata_; }; std::unique_ptr ColumnCryptoMetaData::Make( const uint8_t* metadata) { return std::unique_ptr(new ColumnCryptoMetaData(metadata)); } ColumnCryptoMetaData::ColumnCryptoMetaData(const uint8_t* metadata) : impl_(std::make_unique( reinterpret_cast(metadata))) {} ColumnCryptoMetaData::~ColumnCryptoMetaData() = default; std::shared_ptr ColumnCryptoMetaData::path_in_schema() const { return impl_->path_in_schema(); } bool ColumnCryptoMetaData::encrypted_with_footer_key() const { return impl_->encrypted_with_footer_key(); } const std::string& ColumnCryptoMetaData::key_metadata() const { return impl_->key_metadata(); } // ColumnChunk metadata class ColumnChunkMetaData::ColumnChunkMetaDataImpl { public: explicit ColumnChunkMetaDataImpl( const format::ColumnChunk* column, const ColumnDescriptor* descr, int16_t row_group_ordinal, int16_t column_ordinal, const ReaderProperties& properties, const ApplicationVersion* writer_version, const std::shared_ptr& file_decryptor) : column_(column), descr_(descr), properties_(properties), writer_version_(writer_version) { column_metadata_ = &column->meta_data; if (column->__isset.crypto_metadata) { // column metadata is encrypted const format::ColumnCryptoMetaData& ccmd = column->crypto_metadata; if (ccmd.__isset.ENCRYPTION_WITH_COLUMN_KEY) { if (file_decryptor != nullptr && file_decryptor->properties() != nullptr) { // should decrypt metadata std::shared_ptr path = std::make_shared( ccmd.ENCRYPTION_WITH_COLUMN_KEY.path_in_schema); const std::string& key_metadata = ccmd.ENCRYPTION_WITH_COLUMN_KEY.key_metadata; std::string aad_column_metadata = encryption::CreateModuleAad( file_decryptor->file_aad(), encryption::kColumnMetaData, row_group_ordinal, column_ordinal, /*page_ordinal=*/static_cast(-1)); auto decryptor = file_decryptor->GetColumnMetaDecryptor( path->ToDotString(), key_metadata, aad_column_metadata); auto len = static_cast(column->encrypted_column_metadata.size()); ThriftDeserializer deserializer(properties_); deserializer.DeserializeMessage( reinterpret_cast(column->encrypted_column_metadata.c_str()), &len, &decrypted_metadata_, decryptor.get()); column_metadata_ = &decrypted_metadata_; } else { throw ParquetException( "Cannot decrypt ColumnMetadata." " FileDecryption is not setup correctly"); } } } for (const auto& encoding : column_metadata_->encodings) { encodings_.push_back(LoadEnumSafe(&encoding)); } for (const auto& encoding_stats : column_metadata_->encoding_stats) { encoding_stats_.push_back({LoadEnumSafe(&encoding_stats.page_type), LoadEnumSafe(&encoding_stats.encoding), encoding_stats.count}); } if (column_metadata_->__isset.size_statistics) { size_statistics_ = std::make_shared(FromThrift(column_metadata_->size_statistics)); size_statistics_->Validate(descr_); } possible_stats_ = nullptr; InitKeyValueMetadata(); } bool Equals(const ColumnChunkMetaDataImpl& other) const { return *column_metadata_ == *other.column_metadata_; } // column chunk inline int64_t file_offset() const { return column_->file_offset; } inline const std::string& file_path() const { return column_->file_path; } inline Type::type type() const { return LoadEnumSafe(&column_metadata_->type); } inline int64_t num_values() const { return column_metadata_->num_values; } std::shared_ptr path_in_schema() { return std::make_shared(column_metadata_->path_in_schema); } // Check if statistics are set and are valid // 1) Must be set in the metadata // 2) Statistics must not be corrupted inline bool is_stats_set() const { DCHECK(writer_version_ != nullptr); // If the column statistics don't exist or column sort order is unknown // we cannot use the column stats if (!column_metadata_->__isset.statistics || descr_->sort_order() == SortOrder::UNKNOWN) { return false; } if (possible_stats_ == nullptr) { possible_stats_ = MakeColumnStats(*column_metadata_, descr_); } EncodedStatistics encodedStatistics = possible_stats_->Encode(); return writer_version_->HasCorrectStatistics(type(), encodedStatistics, descr_->sort_order()); } inline std::shared_ptr statistics() const { return is_stats_set() ? possible_stats_ : nullptr; } inline std::shared_ptr size_statistics() const { return size_statistics_; } inline Compression::type compression() const { return LoadEnumSafe(&column_metadata_->codec); } const std::vector& encodings() const { return encodings_; } const std::vector& encoding_stats() const { return encoding_stats_; } inline std::optional bloom_filter_offset() const { if (column_metadata_->__isset.bloom_filter_offset) { return column_metadata_->bloom_filter_offset; } return std::nullopt; } inline std::optional bloom_filter_length() const { if (column_metadata_->__isset.bloom_filter_length) { return column_metadata_->bloom_filter_length; } return std::nullopt; } inline bool has_dictionary_page() const { return column_metadata_->__isset.dictionary_page_offset; } inline int64_t dictionary_page_offset() const { return column_metadata_->dictionary_page_offset; } inline int64_t data_page_offset() const { return column_metadata_->data_page_offset; } inline bool has_index_page() const { return column_metadata_->__isset.index_page_offset; } inline int64_t index_page_offset() const { return column_metadata_->index_page_offset; } inline int64_t total_compressed_size() const { return column_metadata_->total_compressed_size; } inline int64_t total_uncompressed_size() const { return column_metadata_->total_uncompressed_size; } inline std::unique_ptr crypto_metadata() const { if (column_->__isset.crypto_metadata) { return ColumnCryptoMetaData::Make( reinterpret_cast(&column_->crypto_metadata)); } else { return nullptr; } } std::optional GetColumnIndexLocation() const { if (column_->__isset.column_index_offset && column_->__isset.column_index_length) { return IndexLocation{column_->column_index_offset, column_->column_index_length}; } return std::nullopt; } std::optional GetOffsetIndexLocation() const { if (column_->__isset.offset_index_offset && column_->__isset.offset_index_length) { return IndexLocation{column_->offset_index_offset, column_->offset_index_length}; } return std::nullopt; } const std::shared_ptr& key_value_metadata() const { return key_value_metadata_; } private: void InitKeyValueMetadata() { key_value_metadata_ = FromThriftKeyValueMetadata(*column_metadata_); } mutable std::shared_ptr possible_stats_; std::vector encodings_; std::vector encoding_stats_; const format::ColumnChunk* column_; const format::ColumnMetaData* column_metadata_; format::ColumnMetaData decrypted_metadata_; const ColumnDescriptor* descr_; const ReaderProperties properties_; const ApplicationVersion* writer_version_; std::shared_ptr key_value_metadata_; std::shared_ptr size_statistics_; }; std::unique_ptr ColumnChunkMetaData::Make( const void* metadata, const ColumnDescriptor* descr, const ReaderProperties& properties, const ApplicationVersion* writer_version, int16_t row_group_ordinal, int16_t column_ordinal, std::shared_ptr file_decryptor) { return std::unique_ptr( new ColumnChunkMetaData(metadata, descr, row_group_ordinal, column_ordinal, properties, writer_version, std::move(file_decryptor))); } ColumnChunkMetaData::ColumnChunkMetaData( const void* metadata, const ColumnDescriptor* descr, int16_t row_group_ordinal, int16_t column_ordinal, const ReaderProperties& properties, const ApplicationVersion* writer_version, std::shared_ptr file_decryptor) : impl_{new ColumnChunkMetaDataImpl( reinterpret_cast(metadata), descr, row_group_ordinal, column_ordinal, properties, writer_version, std::move(file_decryptor))} {} ColumnChunkMetaData::~ColumnChunkMetaData() = default; // column chunk int64_t ColumnChunkMetaData::file_offset() const { return impl_->file_offset(); } const std::string& ColumnChunkMetaData::file_path() const { return impl_->file_path(); } Type::type ColumnChunkMetaData::type() const { return impl_->type(); } int64_t ColumnChunkMetaData::num_values() const { return impl_->num_values(); } std::shared_ptr ColumnChunkMetaData::path_in_schema() const { return impl_->path_in_schema(); } std::shared_ptr ColumnChunkMetaData::statistics() const { return impl_->statistics(); } bool ColumnChunkMetaData::is_stats_set() const { return impl_->is_stats_set(); } std::shared_ptr ColumnChunkMetaData::size_statistics() const { return impl_->size_statistics(); } std::optional ColumnChunkMetaData::bloom_filter_offset() const { return impl_->bloom_filter_offset(); } std::optional ColumnChunkMetaData::bloom_filter_length() const { return impl_->bloom_filter_length(); } bool ColumnChunkMetaData::has_dictionary_page() const { return impl_->has_dictionary_page(); } int64_t ColumnChunkMetaData::dictionary_page_offset() const { return impl_->dictionary_page_offset(); } int64_t ColumnChunkMetaData::data_page_offset() const { return impl_->data_page_offset(); } bool ColumnChunkMetaData::has_index_page() const { return impl_->has_index_page(); } int64_t ColumnChunkMetaData::index_page_offset() const { return impl_->index_page_offset(); } Compression::type ColumnChunkMetaData::compression() const { return impl_->compression(); } bool ColumnChunkMetaData::can_decompress() const { return ::arrow20::util::Codec::IsAvailable(compression()); } const std::vector& ColumnChunkMetaData::encodings() const { return impl_->encodings(); } const std::vector& ColumnChunkMetaData::encoding_stats() const { return impl_->encoding_stats(); } int64_t ColumnChunkMetaData::total_uncompressed_size() const { return impl_->total_uncompressed_size(); } int64_t ColumnChunkMetaData::total_compressed_size() const { return impl_->total_compressed_size(); } std::unique_ptr ColumnChunkMetaData::crypto_metadata() const { return impl_->crypto_metadata(); } std::optional ColumnChunkMetaData::GetColumnIndexLocation() const { return impl_->GetColumnIndexLocation(); } std::optional ColumnChunkMetaData::GetOffsetIndexLocation() const { return impl_->GetOffsetIndexLocation(); } bool ColumnChunkMetaData::Equals(const ColumnChunkMetaData& other) const { return impl_->Equals(*other.impl_); } const std::shared_ptr& ColumnChunkMetaData::key_value_metadata() const { return impl_->key_value_metadata(); } // row-group metadata class RowGroupMetaData::RowGroupMetaDataImpl { public: explicit RowGroupMetaDataImpl(const format::RowGroup* row_group, const SchemaDescriptor* schema, const ReaderProperties& properties, const ApplicationVersion* writer_version, std::shared_ptr file_decryptor) : row_group_(row_group), schema_(schema), properties_(properties), writer_version_(writer_version), file_decryptor_(std::move(file_decryptor)) { if (ARROW_PREDICT_FALSE(row_group_->columns.size() > static_cast(std::numeric_limits::max()))) { throw ParquetException("Row group had too many columns: ", row_group_->columns.size()); } } bool Equals(const RowGroupMetaDataImpl& other) const { return *row_group_ == *other.row_group_; } inline int num_columns() const { return static_cast(row_group_->columns.size()); } inline int64_t num_rows() const { return row_group_->num_rows; } inline int64_t total_byte_size() const { return row_group_->total_byte_size; } inline int64_t total_compressed_size() const { return row_group_->total_compressed_size; } inline int64_t file_offset() const { return row_group_->file_offset; } inline const SchemaDescriptor* schema() const { return schema_; } std::unique_ptr ColumnChunk(int i) { if (i >= 0 && i < num_columns()) { int16_t row_group_ordinal = row_group_->__isset.ordinal ? row_group_->ordinal : static_cast(-1); return ColumnChunkMetaData::Make(&row_group_->columns[i], schema_->Column(i), properties_, writer_version_, row_group_ordinal, i, file_decryptor_); } throw ParquetException("The file only has ", num_columns(), " columns, requested metadata for column: ", i); } std::vector sorting_columns() const { std::vector sorting_columns; if (!row_group_->__isset.sorting_columns) { return sorting_columns; } sorting_columns.resize(row_group_->sorting_columns.size()); for (size_t i = 0; i < sorting_columns.size(); ++i) { sorting_columns[i] = FromThrift(row_group_->sorting_columns[i]); } return sorting_columns; } private: const format::RowGroup* row_group_; const SchemaDescriptor* schema_; const ReaderProperties properties_; const ApplicationVersion* writer_version_; std::shared_ptr file_decryptor_; }; std::unique_ptr RowGroupMetaData::Make( const void* metadata, const SchemaDescriptor* schema, const ReaderProperties& properties, const ApplicationVersion* writer_version, std::shared_ptr file_decryptor) { return std::unique_ptr(new RowGroupMetaData( metadata, schema, properties, writer_version, std::move(file_decryptor))); } RowGroupMetaData::RowGroupMetaData(const void* metadata, const SchemaDescriptor* schema, const ReaderProperties& properties, const ApplicationVersion* writer_version, std::shared_ptr file_decryptor) : impl_{new RowGroupMetaDataImpl(reinterpret_cast(metadata), schema, properties, writer_version, std::move(file_decryptor))} {} RowGroupMetaData::~RowGroupMetaData() = default; bool RowGroupMetaData::Equals(const RowGroupMetaData& other) const { return impl_->Equals(*other.impl_); } int RowGroupMetaData::num_columns() const { return impl_->num_columns(); } int64_t RowGroupMetaData::num_rows() const { return impl_->num_rows(); } int64_t RowGroupMetaData::total_byte_size() const { return impl_->total_byte_size(); } int64_t RowGroupMetaData::total_compressed_size() const { return impl_->total_compressed_size(); } int64_t RowGroupMetaData::file_offset() const { return impl_->file_offset(); } const SchemaDescriptor* RowGroupMetaData::schema() const { return impl_->schema(); } std::unique_ptr RowGroupMetaData::ColumnChunk(int i) const { return impl_->ColumnChunk(i); } bool RowGroupMetaData::can_decompress() const { int n_columns = num_columns(); for (int i = 0; i < n_columns; i++) { if (!ColumnChunk(i)->can_decompress()) { return false; } } return true; } std::vector RowGroupMetaData::sorting_columns() const { return impl_->sorting_columns(); } // Replace string data with random-generated uppercase characters static void Scrub(std::string* s) { static ::arrow20::random::pcg64 rng; std::uniform_int_distribution<> caps(65, 90); for (auto& c : *s) c = caps(rng); } // Replace potentially sensitive metadata with random data static void Scrub(format::FileMetaData* md) { for (auto& s : md->schema) { Scrub(&s.name); } for (auto& r : md->row_groups) { for (auto& c : r.columns) { Scrub(&c.file_path); if (c.__isset.meta_data) { auto& m = c.meta_data; for (auto& p : m.path_in_schema) Scrub(&p); for (auto& kv : m.key_value_metadata) { Scrub(&kv.key); Scrub(&kv.value); } Scrub(&m.statistics.max_value); Scrub(&m.statistics.min_value); Scrub(&m.statistics.min); Scrub(&m.statistics.max); } if (c.crypto_metadata.__isset.ENCRYPTION_WITH_COLUMN_KEY) { auto& m = c.crypto_metadata.ENCRYPTION_WITH_COLUMN_KEY; for (auto& p : m.path_in_schema) Scrub(&p); Scrub(&m.key_metadata); } Scrub(&c.encrypted_column_metadata); } } for (auto& kv : md->key_value_metadata) { Scrub(&kv.key); Scrub(&kv.value); } Scrub(&md->footer_signing_key_metadata); } // file metadata class FileMetaData::FileMetaDataImpl { public: FileMetaDataImpl() = default; explicit FileMetaDataImpl( const void* metadata, uint32_t* metadata_len, ReaderProperties properties, std::shared_ptr file_decryptor = nullptr) : properties_(std::move(properties)), file_decryptor_(std::move(file_decryptor)) { metadata_ = std::make_unique(); auto footer_decryptor = file_decryptor_ != nullptr ? file_decryptor_->GetFooterDecryptor() : nullptr; ThriftDeserializer deserializer(properties_); deserializer.DeserializeMessage(reinterpret_cast(metadata), metadata_len, metadata_.get(), footer_decryptor.get()); metadata_len_ = *metadata_len; if (metadata_->__isset.created_by) { writer_version_ = ApplicationVersion(metadata_->created_by); } else { writer_version_ = ApplicationVersion("unknown 0.0.0"); } InitSchema(); InitColumnOrders(); InitKeyValueMetadata(); } bool VerifySignature(const void* signature) { // verify decryption properties are set if (file_decryptor_ == nullptr) { throw ParquetException("Decryption not set properly. cannot verify signature"); } // serialize the footer uint8_t* serialized_data; uint32_t serialized_len = metadata_len_; ThriftSerializer serializer; serializer.SerializeToBuffer(metadata_.get(), &serialized_len, &serialized_data); ::arrow20::util::span serialized_data_span(serialized_data, serialized_len); // encrypt with nonce ::arrow20::util::span nonce(reinterpret_cast(signature), encryption::kNonceLength); auto tag = reinterpret_cast(signature) + encryption::kNonceLength; std::string key = file_decryptor_->GetFooterKey(); std::string aad = encryption::CreateFooterAad(file_decryptor_->file_aad()); auto aes_encryptor = encryption::AesEncryptor::Make(file_decryptor_->algorithm(), static_cast(key.size()), true, false /*write_length*/); std::shared_ptr encrypted_buffer = AllocateBuffer( file_decryptor_->pool(), aes_encryptor->CiphertextLength(serialized_len)); int32_t encrypted_len = aes_encryptor->SignedFooterEncrypt( serialized_data_span, str2span(key), str2span(aad), nonce, encrypted_buffer->mutable_span_as()); return 0 == memcmp(encrypted_buffer->data() + encrypted_len - encryption::kGcmTagLength, tag, encryption::kGcmTagLength); } inline uint32_t size() const { return metadata_len_; } inline int num_columns() const { return schema_.num_columns(); } inline int64_t num_rows() const { return metadata_->num_rows; } inline int num_row_groups() const { return static_cast(metadata_->row_groups.size()); } inline int32_t version() const { return metadata_->version; } inline const std::string& created_by() const { return metadata_->created_by; } inline int num_schema_elements() const { return static_cast(metadata_->schema.size()); } inline bool is_encryption_algorithm_set() const { return metadata_->__isset.encryption_algorithm; } inline EncryptionAlgorithm encryption_algorithm() { return FromThrift(metadata_->encryption_algorithm); } inline const std::string& footer_signing_key_metadata() { return metadata_->footer_signing_key_metadata; } const ApplicationVersion& writer_version() const { return writer_version_; } void WriteTo(::arrow20::io::OutputStream* dst, const std::shared_ptr& encryptor) const { ThriftSerializer serializer; // Only in encrypted files with plaintext footers the // encryption_algorithm is set in footer if (is_encryption_algorithm_set()) { uint8_t* serialized_data; uint32_t serialized_len; serializer.SerializeToBuffer(metadata_.get(), &serialized_len, &serialized_data); ::arrow20::util::span serialized_data_span(serialized_data, serialized_len); // encrypt the footer key std::vector encrypted_data(encryptor->CiphertextLength(serialized_len)); int32_t encrypted_len = encryptor->Encrypt(serialized_data_span, encrypted_data); // write unencrypted footer PARQUET_THROW_NOT_OK(dst->Write(serialized_data, serialized_len)); // Write signature (nonce and tag) PARQUET_THROW_NOT_OK( dst->Write(encrypted_data.data() + 4, encryption::kNonceLength)); PARQUET_THROW_NOT_OK( dst->Write(encrypted_data.data() + encrypted_len - encryption::kGcmTagLength, encryption::kGcmTagLength)); } else { // either plaintext file (when encryptor is null) // or encrypted file with encrypted footer serializer.Serialize(metadata_.get(), dst, encryptor.get()); } } std::unique_ptr RowGroup(int i) { if (!(i >= 0 && i < num_row_groups())) { std::stringstream ss; ss << "The file only has " << num_row_groups() << " row groups, requested metadata for row group: " << i; throw ParquetException(ss.str()); } return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, properties_, &writer_version_, file_decryptor_); } bool Equals(const FileMetaDataImpl& other) const { return *metadata_ == *other.metadata_; } const SchemaDescriptor* schema() const { return &schema_; } const std::shared_ptr& key_value_metadata() const { return key_value_metadata_; } void set_file_path(const std::string& path) { for (format::RowGroup& row_group : metadata_->row_groups) { for (format::ColumnChunk& chunk : row_group.columns) { chunk.__set_file_path(path); } } } format::RowGroup& row_group(int i) { if (!(i >= 0 && i < num_row_groups())) { std::stringstream ss; ss << "The file only has " << num_row_groups() << " row groups, requested metadata for row group: " << i; throw ParquetException(ss.str()); } return metadata_->row_groups[i]; } void AppendRowGroups(FileMetaDataImpl* other) { std::ostringstream diff_output; if (!schema()->Equals(*other->schema(), &diff_output)) { auto msg = "AppendRowGroups requires equal schemas.\n" + diff_output.str(); throw ParquetException(msg); } // ARROW-13654: `other` may point to self, be careful not to enter an infinite loop const int n = other->num_row_groups(); // ARROW-16613: do not use reserve() as that may suppress overallocation // and incur O(n²) behavior on repeated calls to AppendRowGroups(). // (see https://en.cppreference.com/w/cpp/container/vector/reserve // about inappropriate uses of reserve()). const auto start = metadata_->row_groups.size(); metadata_->row_groups.resize(start + n); for (int i = 0; i < n; i++) { metadata_->row_groups[start + i] = other->row_group(i); metadata_->num_rows += metadata_->row_groups[start + i].num_rows; } } std::shared_ptr Subset(const std::vector& row_groups) { for (int i : row_groups) { if (i < num_row_groups()) continue; throw ParquetException( "The file only has ", num_row_groups(), " row groups, but requested a subset including row group: ", i); } std::shared_ptr out(new FileMetaData()); out->impl_ = std::make_unique(); out->impl_->metadata_ = std::make_unique(); auto metadata = out->impl_->metadata_.get(); metadata->version = metadata_->version; metadata->schema = metadata_->schema; metadata->row_groups.resize(row_groups.size()); int i = 0; for (int selected_index : row_groups) { metadata->num_rows += row_group(selected_index).num_rows; metadata->row_groups[i++] = row_group(selected_index); } metadata->key_value_metadata = metadata_->key_value_metadata; metadata->created_by = metadata_->created_by; metadata->column_orders = metadata_->column_orders; metadata->encryption_algorithm = metadata_->encryption_algorithm; metadata->footer_signing_key_metadata = metadata_->footer_signing_key_metadata; metadata->__isset = metadata_->__isset; out->impl_->schema_ = schema_; out->impl_->writer_version_ = writer_version_; out->impl_->key_value_metadata_ = key_value_metadata_; out->impl_->file_decryptor_ = file_decryptor_; return out; } std::string SerializeUnencrypted(bool scrub, bool debug) const { auto md = *metadata_; if (scrub) Scrub(&md); if (debug) { std::ostringstream ss; md.printTo(ss); return ss.str(); } else { ThriftSerializer serializer; std::string out; serializer.SerializeToString(&md, &out); return out; } } void set_file_decryptor(std::shared_ptr file_decryptor) { file_decryptor_ = std::move(file_decryptor); } const std::shared_ptr& file_decryptor() const { return file_decryptor_; } private: friend FileMetaDataBuilder; uint32_t metadata_len_ = 0; std::unique_ptr metadata_; SchemaDescriptor schema_; ApplicationVersion writer_version_; std::shared_ptr key_value_metadata_; const ReaderProperties properties_; std::shared_ptr file_decryptor_; void InitSchema() { if (metadata_->schema.empty()) { throw ParquetException("Empty file schema (no root)"); } schema_.Init(schema::Unflatten(&metadata_->schema[0], static_cast(metadata_->schema.size()))); } void InitColumnOrders() { // update ColumnOrder std::vector column_orders; if (metadata_->__isset.column_orders) { column_orders.reserve(metadata_->column_orders.size()); for (auto& column_order : metadata_->column_orders) { if (column_order.__isset.TYPE_ORDER) { column_orders.push_back(ColumnOrder::type_defined_); } else { column_orders.push_back(ColumnOrder::undefined_); } } } else { column_orders.resize(schema_.num_columns(), ColumnOrder::undefined_); } schema_.updateColumnOrders(column_orders); } void InitKeyValueMetadata() { key_value_metadata_ = FromThriftKeyValueMetadata(*metadata_); } }; std::shared_ptr FileMetaData::Make( const void* metadata, uint32_t* metadata_len, const ReaderProperties& properties, std::shared_ptr file_decryptor) { // This FileMetaData ctor is private, not compatible with std::make_shared return std::shared_ptr( new FileMetaData(metadata, metadata_len, properties, std::move(file_decryptor))); } FileMetaData::FileMetaData(const void* metadata, uint32_t* metadata_len, const ReaderProperties& properties, std::shared_ptr file_decryptor) : impl_(new FileMetaDataImpl(metadata, metadata_len, properties, std::move(file_decryptor))) {} FileMetaData::FileMetaData() : impl_(new FileMetaDataImpl()) {} FileMetaData::~FileMetaData() = default; bool FileMetaData::Equals(const FileMetaData& other) const { return impl_->Equals(*other.impl_); } std::unique_ptr FileMetaData::RowGroup(int i) const { return impl_->RowGroup(i); } bool FileMetaData::VerifySignature(const void* signature) { return impl_->VerifySignature(signature); } uint32_t FileMetaData::size() const { return impl_->size(); } int FileMetaData::num_columns() const { return impl_->num_columns(); } int64_t FileMetaData::num_rows() const { return impl_->num_rows(); } int FileMetaData::num_row_groups() const { return impl_->num_row_groups(); } bool FileMetaData::can_decompress() const { int n_row_groups = num_row_groups(); for (int i = 0; i < n_row_groups; i++) { if (!RowGroup(i)->can_decompress()) { return false; } } return true; } bool FileMetaData::is_encryption_algorithm_set() const { return impl_->is_encryption_algorithm_set(); } EncryptionAlgorithm FileMetaData::encryption_algorithm() const { return impl_->encryption_algorithm(); } const std::string& FileMetaData::footer_signing_key_metadata() const { return impl_->footer_signing_key_metadata(); } void FileMetaData::set_file_decryptor( std::shared_ptr file_decryptor) { impl_->set_file_decryptor(std::move(file_decryptor)); } const std::shared_ptr& FileMetaData::file_decryptor() const { return impl_->file_decryptor(); } ParquetVersion::type FileMetaData::version() const { switch (impl_->version()) { case 1: return ParquetVersion::PARQUET_1_0; case 2: return ParquetVersion::PARQUET_2_LATEST; default: // Improperly set version, assuming Parquet 1.0 break; } return ParquetVersion::PARQUET_1_0; } const ApplicationVersion& FileMetaData::writer_version() const { return impl_->writer_version(); } const std::string& FileMetaData::created_by() const { return impl_->created_by(); } int FileMetaData::num_schema_elements() const { return impl_->num_schema_elements(); } const SchemaDescriptor* FileMetaData::schema() const { return impl_->schema(); } const std::shared_ptr& FileMetaData::key_value_metadata() const { return impl_->key_value_metadata(); } void FileMetaData::set_file_path(const std::string& path) { impl_->set_file_path(path); } void FileMetaData::AppendRowGroups(const FileMetaData& other) { impl_->AppendRowGroups(other.impl_.get()); } std::shared_ptr FileMetaData::Subset( const std::vector& row_groups) const { return impl_->Subset(row_groups); } std::string FileMetaData::SerializeUnencrypted(bool scrub, bool debug) const { return impl_->SerializeUnencrypted(scrub, debug); } void FileMetaData::WriteTo(::arrow20::io::OutputStream* dst, const std::shared_ptr& encryptor) const { return impl_->WriteTo(dst, encryptor); } class FileCryptoMetaData::FileCryptoMetaDataImpl { public: FileCryptoMetaDataImpl() = default; explicit FileCryptoMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len, const ReaderProperties& properties) { ThriftDeserializer deserializer(properties); deserializer.DeserializeMessage(metadata, metadata_len, &metadata_); metadata_len_ = *metadata_len; } EncryptionAlgorithm encryption_algorithm() const { return FromThrift(metadata_.encryption_algorithm); } const std::string& key_metadata() const { return metadata_.key_metadata; } void WriteTo(::arrow20::io::OutputStream* dst) const { ThriftSerializer serializer; serializer.Serialize(&metadata_, dst); } private: friend FileMetaDataBuilder; format::FileCryptoMetaData metadata_; uint32_t metadata_len_; }; EncryptionAlgorithm FileCryptoMetaData::encryption_algorithm() const { return impl_->encryption_algorithm(); } const std::string& FileCryptoMetaData::key_metadata() const { return impl_->key_metadata(); } std::shared_ptr FileCryptoMetaData::Make( const uint8_t* serialized_metadata, uint32_t* metadata_len, const ReaderProperties& properties) { return std::shared_ptr( new FileCryptoMetaData(serialized_metadata, metadata_len, properties)); } FileCryptoMetaData::FileCryptoMetaData(const uint8_t* serialized_metadata, uint32_t* metadata_len, const ReaderProperties& properties) : impl_(new FileCryptoMetaDataImpl(serialized_metadata, metadata_len, properties)) {} FileCryptoMetaData::FileCryptoMetaData() : impl_(new FileCryptoMetaDataImpl()) {} FileCryptoMetaData::~FileCryptoMetaData() = default; void FileCryptoMetaData::WriteTo(::arrow20::io::OutputStream* dst) const { impl_->WriteTo(dst); } std::string FileMetaData::SerializeToString() const { // We need to pass in an initial size. Since it will automatically // increase the buffer size to hold the metadata, we just leave it 0. PARQUET_ASSIGN_OR_THROW(auto serializer, ::arrow20::io::BufferOutputStream::Create(0)); WriteTo(serializer.get()); PARQUET_ASSIGN_OR_THROW(auto metadata_buffer, serializer->Finish()); return metadata_buffer->ToString(); } ApplicationVersion::ApplicationVersion(std::string application, int major, int minor, int patch) : application_(std::move(application)), version{major, minor, patch, "", "", ""} {} namespace { // Parse the application version format and set parsed values to // ApplicationVersion. // // The application version format must be compatible parquet-mr's // one. See also: // * https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/VersionParser.java // * https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java // // The application version format: // "${APPLICATION_NAME}" // "${APPLICATION_NAME} version ${VERSION}" // "${APPLICATION_NAME} version ${VERSION} (build ${BUILD_NAME})" // // Eg: // parquet-cpp // parquet-cpp version 1.5.0ab-xyz5.5.0+cd // parquet-cpp version 1.5.0ab-xyz5.5.0+cd (build abcd) // // The VERSION format: // "${MAJOR}" // "${MAJOR}.${MINOR}" // "${MAJOR}.${MINOR}.${PATCH}" // "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}" // "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}-${PRE_RELEASE}" // "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}-${PRE_RELEASE}+${BUILD_INFO}" // "${MAJOR}.${MINOR}.${PATCH}${UNKNOWN}+${BUILD_INFO}" // "${MAJOR}.${MINOR}.${PATCH}-${PRE_RELEASE}" // "${MAJOR}.${MINOR}.${PATCH}-${PRE_RELEASE}+${BUILD_INFO}" // "${MAJOR}.${MINOR}.${PATCH}+${BUILD_INFO}" // // Eg: // 1 // 1.5 // 1.5.0 // 1.5.0ab // 1.5.0ab-cdh5.5.0 // 1.5.0ab-cdh5.5.0+cd // 1.5.0ab+cd // 1.5.0-cdh5.5.0 // 1.5.0-cdh5.5.0+cd // 1.5.0+cd class ApplicationVersionParser { public: ApplicationVersionParser(const std::string& created_by, ApplicationVersion& application_version) : created_by_(created_by), application_version_(application_version), spaces_(" \t\v\r\n\f"), digits_("0123456789") {} void Parse() { application_version_.application_ = "unknown"; application_version_.version = {0, 0, 0, "", "", ""}; if (!ParseApplicationName()) { return; } if (!ParseVersion()) { return; } if (!ParseBuildName()) { return; } } private: bool IsSpace(const std::string& string, const size_t& offset) { auto target = ::std::string_view(string).substr(offset, 1); return target.find_first_of(spaces_) != ::std::string_view::npos; } void RemovePrecedingSpaces(const std::string& string, size_t& start, const size_t& end) { while (start < end && IsSpace(string, start)) { ++start; } } void RemoveTrailingSpaces(const std::string& string, const size_t& start, size_t& end) { while (start < (end - 1) && (end - 1) < string.size() && IsSpace(string, end - 1)) { --end; } } bool ParseApplicationName() { std::string version_mark(" version "); auto version_mark_position = created_by_.find(version_mark); size_t application_name_end; // No VERSION and BUILD_NAME. if (version_mark_position == std::string::npos) { version_start_ = std::string::npos; application_name_end = created_by_.size(); } else { version_start_ = version_mark_position + version_mark.size(); application_name_end = version_mark_position; } size_t application_name_start = 0; RemovePrecedingSpaces(created_by_, application_name_start, application_name_end); RemoveTrailingSpaces(created_by_, application_name_start, application_name_end); application_version_.application_ = created_by_.substr( application_name_start, application_name_end - application_name_start); return true; } bool ParseVersion() { // No VERSION. if (version_start_ == std::string::npos) { return false; } RemovePrecedingSpaces(created_by_, version_start_, created_by_.size()); version_end_ = created_by_.find(" (", version_start_); // No BUILD_NAME. if (version_end_ == std::string::npos) { version_end_ = created_by_.size(); } RemoveTrailingSpaces(created_by_, version_start_, version_end_); // No VERSION. if (version_start_ == version_end_) { return false; } version_string_ = created_by_.substr(version_start_, version_end_ - version_start_); if (!ParseVersionMajor()) { return false; } if (!ParseVersionMinor()) { return false; } if (!ParseVersionPatch()) { return false; } if (!ParseVersionUnknown()) { return false; } if (!ParseVersionPreRelease()) { return false; } if (!ParseVersionBuildInfo()) { return false; } return true; } bool ParseVersionMajor() { size_t version_major_start = 0; auto version_major_end = version_string_.find_first_not_of(digits_); // MAJOR only. if (version_major_end == std::string::npos) { version_major_end = version_string_.size(); version_parsing_position_ = version_major_end; } else { // No ".". if (version_string_[version_major_end] != '.') { return false; } // No MAJOR. if (version_major_end == version_major_start) { return false; } version_parsing_position_ = version_major_end + 1; // +1 is for '.'. } auto version_major_string = version_string_.substr( version_major_start, version_major_end - version_major_start); application_version_.version.major = atoi(version_major_string.c_str()); return true; } bool ParseVersionMinor() { auto version_minor_start = version_parsing_position_; auto version_minor_end = version_string_.find_first_not_of(digits_, version_minor_start); // MAJOR.MINOR only. if (version_minor_end == std::string::npos) { version_minor_end = version_string_.size(); version_parsing_position_ = version_minor_end; } else { // No ".". if (version_string_[version_minor_end] != '.') { return false; } // No MINOR. if (version_minor_end == version_minor_start) { return false; } version_parsing_position_ = version_minor_end + 1; // +1 is for '.'. } auto version_minor_string = version_string_.substr( version_minor_start, version_minor_end - version_minor_start); application_version_.version.minor = atoi(version_minor_string.c_str()); return true; } bool ParseVersionPatch() { auto version_patch_start = version_parsing_position_; auto version_patch_end = version_string_.find_first_not_of(digits_, version_patch_start); // No UNKNOWN, PRE_RELEASE and BUILD_INFO. if (version_patch_end == std::string::npos) { version_patch_end = version_string_.size(); } // No PATCH. if (version_patch_end == version_patch_start) { return false; } auto version_patch_string = version_string_.substr( version_patch_start, version_patch_end - version_patch_start); application_version_.version.patch = atoi(version_patch_string.c_str()); version_parsing_position_ = version_patch_end; return true; } bool ParseVersionUnknown() { // No UNKNOWN. if (version_parsing_position_ == version_string_.size()) { return true; } auto version_unknown_start = version_parsing_position_; auto version_unknown_end = version_string_.find_first_of("-+", version_unknown_start); // No PRE_RELEASE and BUILD_INFO if (version_unknown_end == std::string::npos) { version_unknown_end = version_string_.size(); } application_version_.version.unknown = version_string_.substr( version_unknown_start, version_unknown_end - version_unknown_start); version_parsing_position_ = version_unknown_end; return true; } bool ParseVersionPreRelease() { // No PRE_RELEASE. if (version_parsing_position_ == version_string_.size() || version_string_[version_parsing_position_] != '-') { return true; } auto version_pre_release_start = version_parsing_position_ + 1; // +1 is for '-'. auto version_pre_release_end = version_string_.find_first_of("+", version_pre_release_start); // No BUILD_INFO if (version_pre_release_end == std::string::npos) { version_pre_release_end = version_string_.size(); } application_version_.version.pre_release = version_string_.substr( version_pre_release_start, version_pre_release_end - version_pre_release_start); version_parsing_position_ = version_pre_release_end; return true; } bool ParseVersionBuildInfo() { // No BUILD_INFO. if (version_parsing_position_ == version_string_.size() || version_string_[version_parsing_position_] != '+') { return true; } auto version_build_info_start = version_parsing_position_ + 1; // +1 is for '+'. application_version_.version.build_info = version_string_.substr(version_build_info_start); return true; } bool ParseBuildName() { std::string build_mark(" (build "); auto build_mark_position = created_by_.find(build_mark, version_end_); // No BUILD_NAME. if (build_mark_position == std::string::npos) { return false; } auto build_name_start = build_mark_position + build_mark.size(); RemovePrecedingSpaces(created_by_, build_name_start, created_by_.size()); auto build_name_end = created_by_.find_first_of(")", build_name_start); // No end ")". if (build_name_end == std::string::npos) { return false; } RemoveTrailingSpaces(created_by_, build_name_start, build_name_end); application_version_.build_ = created_by_.substr(build_name_start, build_name_end - build_name_start); return true; } const std::string& created_by_; ApplicationVersion& application_version_; // For parsing. std::string spaces_; std::string digits_; size_t version_parsing_position_{0}; size_t version_start_{0}; size_t version_end_{0}; std::string version_string_; }; } // namespace ApplicationVersion::ApplicationVersion(const std::string& created_by) { ApplicationVersionParser parser(created_by, *this); parser.Parse(); } bool ApplicationVersion::VersionLt(const ApplicationVersion& other_version) const { if (application_ != other_version.application_) return false; if (version.major < other_version.version.major) return true; if (version.major > other_version.version.major) return false; DCHECK_EQ(version.major, other_version.version.major); if (version.minor < other_version.version.minor) return true; if (version.minor > other_version.version.minor) return false; DCHECK_EQ(version.minor, other_version.version.minor); return version.patch < other_version.version.patch; } bool ApplicationVersion::VersionEq(const ApplicationVersion& other_version) const { return application_ == other_version.application_ && version.major == other_version.version.major && version.minor == other_version.version.minor && version.patch == other_version.version.patch; } // Reference: // parquet-mr/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java // PARQUET-686 has more discussion on statistics bool ApplicationVersion::HasCorrectStatistics(Type::type col_type, EncodedStatistics& statistics, SortOrder::type sort_order) const { // parquet-cpp version 1.3.0 and parquet-mr 1.10.0 onwards stats are computed // correctly for all types if ((application_ == "parquet-cpp" && VersionLt(PARQUET_CPP_FIXED_STATS_VERSION())) || (application_ == "parquet-mr" && VersionLt(PARQUET_MR_FIXED_STATS_VERSION()))) { // Only SIGNED are valid unless max and min are the same // (in which case the sort order does not matter) bool max_equals_min = statistics.has_min && statistics.has_max ? statistics.min() == statistics.max() : false; if (SortOrder::SIGNED != sort_order && !max_equals_min) { return false; } // Statistics of other types are OK if (col_type != Type::FIXED_LEN_BYTE_ARRAY && col_type != Type::BYTE_ARRAY) { return true; } } // created_by is not populated, which could have been caused by // parquet-mr during the same time as PARQUET-251, see PARQUET-297 if (application_ == "unknown") { return true; } // Unknown sort order has incorrect stats if (SortOrder::UNKNOWN == sort_order) { return false; } // PARQUET-251 if (VersionLt(PARQUET_251_FIXED_VERSION())) { return false; } return true; } // MetaData Builders // row-group metadata class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { public: explicit ColumnChunkMetaDataBuilderImpl(std::shared_ptr props, const ColumnDescriptor* column) : owned_column_chunk_(new format::ColumnChunk), properties_(std::move(props)), column_(column) { Init(owned_column_chunk_.get()); } explicit ColumnChunkMetaDataBuilderImpl(std::shared_ptr props, const ColumnDescriptor* column, format::ColumnChunk* column_chunk) : properties_(std::move(props)), column_(column) { Init(column_chunk); } const void* contents() const { return column_chunk_; } // column chunk void set_file_path(const std::string& val) { column_chunk_->__set_file_path(val); } // column metadata void SetStatistics(const EncodedStatistics& val) { column_chunk_->meta_data.__set_statistics(ToThrift(val)); } void SetSizeStatistics(const SizeStatistics& size_stats) { column_chunk_->meta_data.__set_size_statistics(ToThrift(size_stats)); } void Finish(int64_t num_values, int64_t dictionary_page_offset, int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback, const std::map& dict_encoding_stats, const std::map& data_encoding_stats, const std::shared_ptr& encryptor) { if (dictionary_page_offset > 0) { column_chunk_->meta_data.__set_dictionary_page_offset(dictionary_page_offset); } // The `file_offset` field is deprecated and should be set to 0. // See https://github.com/apache/parquet-format/pull/440 for detail. column_chunk_->__set_file_offset(0); column_chunk_->__isset.meta_data = true; column_chunk_->meta_data.__set_num_values(num_values); if (index_page_offset >= 0) { column_chunk_->meta_data.__set_index_page_offset(index_page_offset); } column_chunk_->meta_data.__set_data_page_offset(data_page_offset); column_chunk_->meta_data.__set_total_uncompressed_size(uncompressed_size); column_chunk_->meta_data.__set_total_compressed_size(compressed_size); std::vector thrift_encodings; std::vector thrift_encoding_stats; auto add_encoding = [&thrift_encodings](format::Encoding::type value) { auto it = std::find(thrift_encodings.begin(), thrift_encodings.end(), value); if (it == thrift_encodings.end()) { thrift_encodings.push_back(value); } }; // Add dictionary page encoding stats if (has_dictionary) { for (const auto& entry : dict_encoding_stats) { format::PageEncodingStats dict_enc_stat; dict_enc_stat.__set_page_type(format::PageType::DICTIONARY_PAGE); // Dictionary Encoding would be PLAIN_DICTIONARY in v1 and // PLAIN in v2. format::Encoding::type dict_encoding = ToThrift(entry.first); dict_enc_stat.__set_encoding(dict_encoding); dict_enc_stat.__set_count(entry.second); thrift_encoding_stats.push_back(dict_enc_stat); add_encoding(dict_encoding); } } // Always add encoding for RL/DL. // BIT_PACKED is supported in `LevelEncoder`, but would only be used // in benchmark and testing. // And for now, we always add RLE even if there are no levels at all, // while parquet-mr is more fine-grained. add_encoding(format::Encoding::RLE); // Add data page encoding stats for (const auto& entry : data_encoding_stats) { format::PageEncodingStats data_enc_stat; data_enc_stat.__set_page_type(format::PageType::DATA_PAGE); format::Encoding::type data_encoding = ToThrift(entry.first); data_enc_stat.__set_encoding(data_encoding); data_enc_stat.__set_count(entry.second); thrift_encoding_stats.push_back(data_enc_stat); add_encoding(data_encoding); } column_chunk_->meta_data.__set_encodings(std::move(thrift_encodings)); column_chunk_->meta_data.__set_encoding_stats(std::move(thrift_encoding_stats)); if (key_value_metadata_) { ToThriftKeyValueMetadata(*key_value_metadata_, &column_chunk_->meta_data); } const auto& encrypt_md = properties_->column_encryption_properties(column_->path()->ToDotString()); // column is encrypted if (encrypt_md != nullptr && encrypt_md->is_encrypted()) { column_chunk_->__isset.crypto_metadata = true; format::ColumnCryptoMetaData ccmd; if (encrypt_md->is_encrypted_with_footer_key()) { // encrypted with footer key ccmd.__isset.ENCRYPTION_WITH_FOOTER_KEY = true; ccmd.__set_ENCRYPTION_WITH_FOOTER_KEY(format::EncryptionWithFooterKey()); } else { // encrypted with column key format::EncryptionWithColumnKey eck; eck.__set_key_metadata(encrypt_md->key_metadata()); eck.__set_path_in_schema(column_->path()->ToDotVector()); ccmd.__isset.ENCRYPTION_WITH_COLUMN_KEY = true; ccmd.__set_ENCRYPTION_WITH_COLUMN_KEY(eck); } column_chunk_->__set_crypto_metadata(std::move(ccmd)); bool encrypted_footer = properties_->file_encryption_properties()->encrypted_footer(); bool encrypt_metadata = !encrypted_footer || !encrypt_md->is_encrypted_with_footer_key(); if (encrypt_metadata) { ThriftSerializer serializer; // Serialize and encrypt ColumnMetadata separately // Thrift-serialize the ColumnMetaData structure, // encrypt it with the column key, and write to encrypted_column_metadata uint8_t* serialized_data; uint32_t serialized_len; serializer.SerializeToBuffer(&column_chunk_->meta_data, &serialized_len, &serialized_data); ::arrow20::util::span serialized_data_span(serialized_data, serialized_len); std::vector encrypted_data(encryptor->CiphertextLength(serialized_len)); int32_t encrypted_len = encryptor->Encrypt(serialized_data_span, encrypted_data); const char* temp = const_cast(reinterpret_cast(encrypted_data.data())); std::string encrypted_column_metadata(temp, encrypted_len); column_chunk_->__set_encrypted_column_metadata(encrypted_column_metadata); if (encrypted_footer) { column_chunk_->__isset.meta_data = false; } else { // Keep redacted metadata version for old readers column_chunk_->__isset.meta_data = true; column_chunk_->meta_data.__isset.statistics = false; column_chunk_->meta_data.__isset.encoding_stats = false; } } } } void WriteTo(::arrow20::io::OutputStream* sink) { ThriftSerializer serializer; serializer.Serialize(column_chunk_, sink); } const ColumnDescriptor* descr() const { return column_; } int64_t total_compressed_size() const { return column_chunk_->meta_data.total_compressed_size; } void SetKeyValueMetadata(std::shared_ptr key_value_metadata) { key_value_metadata_ = std::move(key_value_metadata); } private: void Init(format::ColumnChunk* column_chunk) { column_chunk_ = column_chunk; column_chunk_->meta_data.__set_type(ToThrift(column_->physical_type())); column_chunk_->meta_data.__set_path_in_schema(column_->path()->ToDotVector()); column_chunk_->meta_data.__set_codec( ToThrift(properties_->compression(column_->path()))); } format::ColumnChunk* column_chunk_; std::unique_ptr owned_column_chunk_; const std::shared_ptr properties_; const ColumnDescriptor* column_; std::shared_ptr key_value_metadata_; }; std::unique_ptr ColumnChunkMetaDataBuilder::Make( std::shared_ptr props, const ColumnDescriptor* column, void* contents) { return std::unique_ptr( new ColumnChunkMetaDataBuilder(std::move(props), column, contents)); } std::unique_ptr ColumnChunkMetaDataBuilder::Make( std::shared_ptr props, const ColumnDescriptor* column) { return std::unique_ptr( new ColumnChunkMetaDataBuilder(std::move(props), column)); } ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder( std::shared_ptr props, const ColumnDescriptor* column) : impl_{std::make_unique(std::move(props), column)} {} ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilder( std::shared_ptr props, const ColumnDescriptor* column, void* contents) : impl_{std::make_unique( std::move(props), column, reinterpret_cast(contents))} {} ColumnChunkMetaDataBuilder::~ColumnChunkMetaDataBuilder() = default; const void* ColumnChunkMetaDataBuilder::contents() const { return impl_->contents(); } void ColumnChunkMetaDataBuilder::set_file_path(const std::string& path) { impl_->set_file_path(path); } void ColumnChunkMetaDataBuilder::Finish( int64_t num_values, int64_t dictionary_page_offset, int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback, const std::map& dict_encoding_stats, const std::map& data_encoding_stats, const std::shared_ptr& encryptor) { impl_->Finish(num_values, dictionary_page_offset, index_page_offset, data_page_offset, compressed_size, uncompressed_size, has_dictionary, dictionary_fallback, dict_encoding_stats, data_encoding_stats, encryptor); } void ColumnChunkMetaDataBuilder::WriteTo(::arrow20::io::OutputStream* sink) { impl_->WriteTo(sink); } const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const { return impl_->descr(); } void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result) { impl_->SetStatistics(result); } void ColumnChunkMetaDataBuilder::SetSizeStatistics(const SizeStatistics& size_stats) { impl_->SetSizeStatistics(size_stats); } void ColumnChunkMetaDataBuilder::SetKeyValueMetadata( std::shared_ptr key_value_metadata) { impl_->SetKeyValueMetadata(std::move(key_value_metadata)); } int64_t ColumnChunkMetaDataBuilder::total_compressed_size() const { return impl_->total_compressed_size(); } class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl { public: explicit RowGroupMetaDataBuilderImpl(std::shared_ptr props, const SchemaDescriptor* schema, void* contents) : properties_(std::move(props)), schema_(schema), next_column_(0) { row_group_ = reinterpret_cast(contents); InitializeColumns(schema->num_columns()); } ColumnChunkMetaDataBuilder* NextColumnChunk() { if (!(next_column_ < num_columns())) { std::stringstream ss; ss << "The schema only has " << num_columns() << " columns, requested metadata for column: " << next_column_; throw ParquetException(ss.str()); } auto column = schema_->Column(next_column_); auto column_builder = ColumnChunkMetaDataBuilder::Make( properties_, column, &row_group_->columns[next_column_++]); auto column_builder_ptr = column_builder.get(); column_builders_.push_back(std::move(column_builder)); return column_builder_ptr; } int current_column() { return next_column_ - 1; } void Finish(int64_t total_bytes_written, int16_t row_group_ordinal) { if (!(next_column_ == schema_->num_columns())) { std::stringstream ss; ss << "Only " << next_column_ - 1 << " out of " << schema_->num_columns() << " columns are initialized"; throw ParquetException(ss.str()); } int64_t file_offset = 0; int64_t total_compressed_size = 0; for (int i = 0; i < schema_->num_columns(); i++) { if (!(row_group_->columns[i].file_offset >= 0)) { std::stringstream ss; ss << "Column " << i << " is not complete."; throw ParquetException(ss.str()); } if (i == 0) { const format::ColumnMetaData& first_col = row_group_->columns[0].meta_data; // As per spec, file_offset for the row group points to the first // dictionary or data page of the column. if (first_col.__isset.dictionary_page_offset && first_col.dictionary_page_offset > 0) { file_offset = first_col.dictionary_page_offset; } else { file_offset = first_col.data_page_offset; } } // sometimes column metadata is encrypted and not available to read, // so we must get total_compressed_size from column builder total_compressed_size += column_builders_[i]->total_compressed_size(); } const auto& sorting_columns = properties_->sorting_columns(); if (!sorting_columns.empty()) { std::vector thrift_sorting_columns(sorting_columns.size()); for (size_t i = 0; i < sorting_columns.size(); ++i) { thrift_sorting_columns[i] = ToThrift(sorting_columns[i]); } row_group_->__set_sorting_columns(std::move(thrift_sorting_columns)); } row_group_->__set_file_offset(file_offset); row_group_->__set_total_compressed_size(total_compressed_size); row_group_->__set_total_byte_size(total_bytes_written); if (row_group_ordinal >= 0) { row_group_->__set_ordinal(row_group_ordinal); } } void set_num_rows(int64_t num_rows) { row_group_->num_rows = num_rows; } int num_columns() { return static_cast(row_group_->columns.size()); } int64_t num_rows() { return row_group_->num_rows; } private: void InitializeColumns(int ncols) { row_group_->columns.resize(ncols); } format::RowGroup* row_group_; const std::shared_ptr properties_; const SchemaDescriptor* schema_; std::vector> column_builders_; int next_column_; }; std::unique_ptr RowGroupMetaDataBuilder::Make( std::shared_ptr props, const SchemaDescriptor* schema_, void* contents) { return std::unique_ptr( new RowGroupMetaDataBuilder(std::move(props), schema_, contents)); } RowGroupMetaDataBuilder::RowGroupMetaDataBuilder(std::shared_ptr props, const SchemaDescriptor* schema_, void* contents) : impl_{new RowGroupMetaDataBuilderImpl(std::move(props), schema_, contents)} {} RowGroupMetaDataBuilder::~RowGroupMetaDataBuilder() = default; ColumnChunkMetaDataBuilder* RowGroupMetaDataBuilder::NextColumnChunk() { return impl_->NextColumnChunk(); } int RowGroupMetaDataBuilder::current_column() const { return impl_->current_column(); } int RowGroupMetaDataBuilder::num_columns() { return impl_->num_columns(); } int64_t RowGroupMetaDataBuilder::num_rows() { return impl_->num_rows(); } void RowGroupMetaDataBuilder::set_num_rows(int64_t num_rows) { impl_->set_num_rows(num_rows); } void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written, int16_t row_group_ordinal) { impl_->Finish(total_bytes_written, row_group_ordinal); } // file metadata class FileMetaDataBuilder::FileMetaDataBuilderImpl { public: explicit FileMetaDataBuilderImpl(const SchemaDescriptor* schema, std::shared_ptr props) : metadata_(new format::FileMetaData()), properties_(std::move(props)), schema_(schema) { if (properties_->file_encryption_properties() != nullptr && properties_->file_encryption_properties()->encrypted_footer()) { crypto_metadata_ = std::make_unique(); } } RowGroupMetaDataBuilder* AppendRowGroup() { row_groups_.emplace_back(); current_row_group_builder_ = RowGroupMetaDataBuilder::Make(properties_, schema_, &row_groups_.back()); return current_row_group_builder_.get(); } void SetPageIndexLocation(const PageIndexLocation& location) { auto set_index_location = [this](size_t row_group_ordinal, const PageIndexLocation::FileIndexLocation& file_index_location, bool column_index) { auto& row_group_metadata = this->row_groups_.at(row_group_ordinal); auto iter = file_index_location.find(row_group_ordinal); if (iter != file_index_location.cend()) { const auto& row_group_index_location = iter->second; for (size_t i = 0; i < row_group_index_location.size(); ++i) { if (i >= row_group_metadata.columns.size()) { throw ParquetException("Cannot find metadata for column ordinal ", i); } auto& column_metadata = row_group_metadata.columns.at(i); const auto& index_location = row_group_index_location.at(i); if (index_location.has_value()) { if (column_index) { column_metadata.__set_column_index_offset(index_location->offset); column_metadata.__set_column_index_length(index_location->length); } else { column_metadata.__set_offset_index_offset(index_location->offset); column_metadata.__set_offset_index_length(index_location->length); } } } } }; for (size_t i = 0; i < row_groups_.size(); ++i) { set_index_location(i, location.column_index_location, true); set_index_location(i, location.offset_index_location, false); } } std::unique_ptr Finish( const std::shared_ptr& key_value_metadata) { int64_t total_rows = 0; for (const auto& row_group : row_groups_) { total_rows += row_group.num_rows; } metadata_->__set_num_rows(total_rows); metadata_->__set_row_groups(row_groups_); if (key_value_metadata) { ToThriftKeyValueMetadata(*key_value_metadata, metadata_.get()); } int32_t file_version = 0; switch (properties_->version()) { case ParquetVersion::PARQUET_1_0: file_version = 1; break; default: file_version = 2; break; } metadata_->__set_version(file_version); metadata_->__set_created_by(properties_->created_by()); // Users cannot set the `ColumnOrder` since we do not have user defined sort order // in the spec yet. // We always default to `TYPE_DEFINED_ORDER`. We can expose it in // the API once we have user defined sort orders in the Parquet format. // TypeDefinedOrder implies choose SortOrder based on ConvertedType/PhysicalType format::TypeDefinedOrder type_defined_order; format::ColumnOrder column_order; column_order.__set_TYPE_ORDER(type_defined_order); column_order.__isset.TYPE_ORDER = true; metadata_->column_orders.resize(schema_->num_columns(), column_order); metadata_->__isset.column_orders = true; // if plaintext footer, set footer signing algorithm auto file_encryption_properties = properties_->file_encryption_properties(); if (file_encryption_properties && !file_encryption_properties->encrypted_footer()) { EncryptionAlgorithm signing_algorithm; EncryptionAlgorithm algo = file_encryption_properties->algorithm(); signing_algorithm.aad.aad_file_unique = algo.aad.aad_file_unique; signing_algorithm.aad.supply_aad_prefix = algo.aad.supply_aad_prefix; if (!algo.aad.supply_aad_prefix) { signing_algorithm.aad.aad_prefix = algo.aad.aad_prefix; } signing_algorithm.algorithm = ParquetCipher::AES_GCM_V1; metadata_->__set_encryption_algorithm(ToThrift(signing_algorithm)); const std::string& footer_signing_key_metadata = file_encryption_properties->footer_key_metadata(); if (footer_signing_key_metadata.size() > 0) { metadata_->__set_footer_signing_key_metadata(footer_signing_key_metadata); } } ToParquet(static_cast(schema_->schema_root().get()), &metadata_->schema); auto file_meta_data = std::unique_ptr(new FileMetaData()); file_meta_data->impl_->metadata_ = std::move(metadata_); file_meta_data->impl_->InitSchema(); file_meta_data->impl_->InitKeyValueMetadata(); return file_meta_data; } std::unique_ptr BuildFileCryptoMetaData() { if (crypto_metadata_ == nullptr) { return nullptr; } auto file_encryption_properties = properties_->file_encryption_properties(); crypto_metadata_->__set_encryption_algorithm( ToThrift(file_encryption_properties->algorithm())); std::string key_metadata = file_encryption_properties->footer_key_metadata(); if (!key_metadata.empty()) { crypto_metadata_->__set_key_metadata(key_metadata); } std::unique_ptr file_crypto_metadata(new FileCryptoMetaData()); file_crypto_metadata->impl_->metadata_ = std::move(*crypto_metadata_); return file_crypto_metadata; } protected: std::unique_ptr metadata_; std::unique_ptr crypto_metadata_; private: const std::shared_ptr properties_; std::vector row_groups_; std::unique_ptr current_row_group_builder_; const SchemaDescriptor* schema_; }; std::unique_ptr FileMetaDataBuilder::Make( const SchemaDescriptor* schema, std::shared_ptr props) { return std::unique_ptr( new FileMetaDataBuilder(schema, std::move(props))); } FileMetaDataBuilder::FileMetaDataBuilder(const SchemaDescriptor* schema, std::shared_ptr props) : impl_{std::make_unique(schema, std::move(props))} {} FileMetaDataBuilder::~FileMetaDataBuilder() = default; RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup() { return impl_->AppendRowGroup(); } void FileMetaDataBuilder::SetPageIndexLocation(const PageIndexLocation& location) { impl_->SetPageIndexLocation(location); } std::unique_ptr FileMetaDataBuilder::Finish( const std::shared_ptr& key_value_metadata) { return impl_->Finish(key_value_metadata); } std::unique_ptr FileMetaDataBuilder::GetCryptoMetaData() { return impl_->BuildFileCryptoMetaData(); } } // namespace parquet20