#pragma clang system_header // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once #include "contrib/libs/apache/arrow_next/cpp/src/parquet/windows_compatibility.h" #include #include #include #include #include #include #include #include // TCompactProtocol requires some #defines to work right. #define SIGNED_RIGHT_SHIFT_IS 1 #define ARITHMETIC_RIGHT_SHIFT 1 #include #include #include #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/logging.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/encryption/internal_file_decryptor.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/encryption/internal_file_encryptor.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/exception.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/platform.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/properties.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/size_statistics.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/statistics.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/types.h" #include "contrib/libs/apache/arrow_next/cpp/src/generated/parquet_types.h" // IWYU pragma: export namespace parquet20 { // ---------------------------------------------------------------------- // Convert Thrift enums to Parquet enums // Unsafe enum converters (input is not checked for validity) static inline Type::type FromThriftUnsafe(format::Type::type type) { return static_cast(type); } static inline ConvertedType::type FromThriftUnsafe(format::ConvertedType::type type) { // item 0 is NONE return static_cast(static_cast(type) + 1); } static inline Repetition::type FromThriftUnsafe(format::FieldRepetitionType::type type) { return static_cast(type); } static inline Encoding::type FromThriftUnsafe(format::Encoding::type type) { return static_cast(type); } static inline PageType::type FromThriftUnsafe(format::PageType::type type) { return static_cast(type); } static inline Compression::type FromThriftUnsafe(format::CompressionCodec::type type) { switch (type) { case format::CompressionCodec::UNCOMPRESSED: return Compression::UNCOMPRESSED; case format::CompressionCodec::SNAPPY: return Compression::SNAPPY; case format::CompressionCodec::GZIP: return Compression::GZIP; case format::CompressionCodec::LZO: return Compression::LZO; case format::CompressionCodec::BROTLI: return Compression::BROTLI; case format::CompressionCodec::LZ4: return Compression::LZ4_HADOOP; case format::CompressionCodec::LZ4_RAW: return Compression::LZ4; case format::CompressionCodec::ZSTD: return Compression::ZSTD; default: DCHECK(false) << "Cannot reach here"; return Compression::UNCOMPRESSED; } } static inline BoundaryOrder::type FromThriftUnsafe(format::BoundaryOrder::type type) { return static_cast(type); } namespace internal { template struct ThriftEnumTypeTraits {}; template <> struct ThriftEnumTypeTraits<::parquet20::format::Type::type> { using ParquetEnum = Type; }; template <> struct ThriftEnumTypeTraits<::parquet20::format::ConvertedType::type> { using ParquetEnum = ConvertedType; }; template <> struct ThriftEnumTypeTraits<::parquet20::format::FieldRepetitionType::type> { using ParquetEnum = Repetition; }; template <> struct ThriftEnumTypeTraits<::parquet20::format::Encoding::type> { using ParquetEnum = Encoding; }; template <> struct ThriftEnumTypeTraits<::parquet20::format::PageType::type> { using ParquetEnum = PageType; }; template <> struct ThriftEnumTypeTraits<::parquet20::format::BoundaryOrder::type> { using ParquetEnum = BoundaryOrder; }; // If the parquet file is corrupted it is possible the enum value decoded // will not be in the range of defined values, which is undefined behaviour. // This facility prevents this by loading the value as the underlying type // and checking to make sure it is in range. template ::type> inline static EnumTypeRaw LoadEnumRaw(const EnumType* in) { EnumTypeRaw raw_value; // Use memcpy(), as a regular cast would be undefined behaviour on invalid values memcpy(&raw_value, in, sizeof(EnumType)); return raw_value; } template struct SafeLoader { using ApiTypeEnum = typename ApiType::type; using ApiTypeRawEnum = typename std::underlying_type::type; template inline static ApiTypeRawEnum LoadRaw(const ThriftType* in) { static_assert(sizeof(ApiTypeEnum) == sizeof(ThriftType), "parquet type should always be the same size as thrift type"); return static_cast(LoadEnumRaw(in)); } template inline static ApiTypeEnum LoadChecked( const typename std::enable_if::type* in) { auto raw_value = LoadRaw(in); if (ARROW_PREDICT_FALSE(raw_value >= static_cast(ApiType::UNDEFINED))) { return ApiType::UNDEFINED; } return FromThriftUnsafe(static_cast(raw_value)); } template inline static ApiTypeEnum LoadChecked( const typename std::enable_if::type* in) { auto raw_value = LoadRaw(in); if (ARROW_PREDICT_FALSE(raw_value >= static_cast(ApiType::UNDEFINED) || raw_value < 0)) { return ApiType::UNDEFINED; } return FromThriftUnsafe(static_cast(raw_value)); } template inline static ApiTypeEnum Load(const ThriftType* in) { return LoadChecked::value>(in); } }; } // namespace internal // Safe enum loader: will check for invalid enum value before converting template ::ParquetEnum> inline typename ParquetEnum::type LoadEnumSafe(const ThriftType* in) { return internal::SafeLoader::Load(in); } inline typename Compression::type LoadEnumSafe(const format::CompressionCodec::type* in) { const auto raw_value = internal::LoadEnumRaw(in); // Check bounds manually, as Compression::type doesn't have the same values // as format::CompressionCodec. const auto min_value = static_cast(format::CompressionCodec::UNCOMPRESSED); const auto max_value = static_cast(format::CompressionCodec::LZ4_RAW); if (raw_value < min_value || raw_value > max_value) { return Compression::UNCOMPRESSED; } return FromThriftUnsafe(*in); } // Safe non-enum converters static inline AadMetadata FromThrift(format::AesGcmV1 aesGcmV1) { return AadMetadata{aesGcmV1.aad_prefix, aesGcmV1.aad_file_unique, aesGcmV1.supply_aad_prefix}; } static inline AadMetadata FromThrift(format::AesGcmCtrV1 aesGcmCtrV1) { return AadMetadata{aesGcmCtrV1.aad_prefix, aesGcmCtrV1.aad_file_unique, aesGcmCtrV1.supply_aad_prefix}; } static inline EncryptionAlgorithm FromThrift(format::EncryptionAlgorithm encryption) { EncryptionAlgorithm encryption_algorithm; if (encryption.__isset.AES_GCM_V1) { encryption_algorithm.algorithm = ParquetCipher::AES_GCM_V1; encryption_algorithm.aad = FromThrift(encryption.AES_GCM_V1); } else if (encryption.__isset.AES_GCM_CTR_V1) { encryption_algorithm.algorithm = ParquetCipher::AES_GCM_CTR_V1; encryption_algorithm.aad = FromThrift(encryption.AES_GCM_CTR_V1); } else { throw ParquetException("Unsupported algorithm"); } return encryption_algorithm; } static inline SortingColumn FromThrift(format::SortingColumn thrift_sorting_column) { SortingColumn sorting_column; sorting_column.column_idx = thrift_sorting_column.column_idx; sorting_column.nulls_first = thrift_sorting_column.nulls_first; sorting_column.descending = thrift_sorting_column.descending; return sorting_column; } static inline SizeStatistics FromThrift(const format::SizeStatistics& size_stats) { return SizeStatistics{ size_stats.definition_level_histogram, size_stats.repetition_level_histogram, size_stats.__isset.unencoded_byte_array_data_bytes ? std::make_optional(size_stats.unencoded_byte_array_data_bytes) : std::nullopt}; } // ---------------------------------------------------------------------- // Convert Thrift enums from Parquet enums static inline format::Type::type ToThrift(Type::type type) { return static_cast(type); } static inline format::ConvertedType::type ToThrift(ConvertedType::type type) { // item 0 is NONE DCHECK_NE(type, ConvertedType::NONE); // it is forbidden to emit "NA" (PARQUET-1990) DCHECK_NE(type, ConvertedType::NA); DCHECK_NE(type, ConvertedType::UNDEFINED); return static_cast(static_cast(type) - 1); } static inline format::FieldRepetitionType::type ToThrift(Repetition::type type) { return static_cast(type); } static inline format::Encoding::type ToThrift(Encoding::type type) { return static_cast(type); } static inline format::CompressionCodec::type ToThrift(Compression::type type) { switch (type) { case Compression::UNCOMPRESSED: return format::CompressionCodec::UNCOMPRESSED; case Compression::SNAPPY: return format::CompressionCodec::SNAPPY; case Compression::GZIP: return format::CompressionCodec::GZIP; case Compression::LZO: return format::CompressionCodec::LZO; case Compression::BROTLI: return format::CompressionCodec::BROTLI; case Compression::LZ4: return format::CompressionCodec::LZ4_RAW; case Compression::LZ4_HADOOP: // Deprecated "LZ4" Parquet compression has Hadoop-specific framing return format::CompressionCodec::LZ4; case Compression::ZSTD: return format::CompressionCodec::ZSTD; default: DCHECK(false) << "Cannot reach here"; return format::CompressionCodec::UNCOMPRESSED; } } static inline format::BoundaryOrder::type ToThrift(BoundaryOrder::type type) { switch (type) { case BoundaryOrder::Unordered: case BoundaryOrder::Ascending: case BoundaryOrder::Descending: return static_cast(type); default: DCHECK(false) << "Cannot reach here"; return format::BoundaryOrder::UNORDERED; } } static inline format::SortingColumn ToThrift(SortingColumn sorting_column) { format::SortingColumn thrift_sorting_column; thrift_sorting_column.column_idx = sorting_column.column_idx; thrift_sorting_column.descending = sorting_column.descending; thrift_sorting_column.nulls_first = sorting_column.nulls_first; return thrift_sorting_column; } static inline format::Statistics ToThrift(const EncodedStatistics& stats) { format::Statistics statistics; if (stats.has_min) { statistics.__set_min_value(stats.min()); // If the order is SIGNED, then the old min value must be set too. // This for backward compatibility if (stats.is_signed()) { statistics.__set_min(stats.min()); } } if (stats.has_max) { statistics.__set_max_value(stats.max()); // If the order is SIGNED, then the old max value must be set too. // This for backward compatibility if (stats.is_signed()) { statistics.__set_max(stats.max()); } } if (stats.has_null_count) { statistics.__set_null_count(stats.null_count); } if (stats.has_distinct_count) { statistics.__set_distinct_count(stats.distinct_count); } return statistics; } static inline format::AesGcmV1 ToAesGcmV1Thrift(AadMetadata aad) { format::AesGcmV1 aesGcmV1; // aad_file_unique is always set aesGcmV1.__set_aad_file_unique(aad.aad_file_unique); aesGcmV1.__set_supply_aad_prefix(aad.supply_aad_prefix); if (!aad.aad_prefix.empty()) { aesGcmV1.__set_aad_prefix(aad.aad_prefix); } return aesGcmV1; } static inline format::AesGcmCtrV1 ToAesGcmCtrV1Thrift(AadMetadata aad) { format::AesGcmCtrV1 aesGcmCtrV1; // aad_file_unique is always set aesGcmCtrV1.__set_aad_file_unique(aad.aad_file_unique); aesGcmCtrV1.__set_supply_aad_prefix(aad.supply_aad_prefix); if (!aad.aad_prefix.empty()) { aesGcmCtrV1.__set_aad_prefix(aad.aad_prefix); } return aesGcmCtrV1; } static inline format::EncryptionAlgorithm ToThrift(EncryptionAlgorithm encryption) { format::EncryptionAlgorithm encryption_algorithm; if (encryption.algorithm == ParquetCipher::AES_GCM_V1) { encryption_algorithm.__set_AES_GCM_V1(ToAesGcmV1Thrift(encryption.aad)); } else { encryption_algorithm.__set_AES_GCM_CTR_V1(ToAesGcmCtrV1Thrift(encryption.aad)); } return encryption_algorithm; } static inline format::SizeStatistics ToThrift(const SizeStatistics& size_stats) { format::SizeStatistics size_statistics; size_statistics.__set_definition_level_histogram(size_stats.definition_level_histogram); size_statistics.__set_repetition_level_histogram(size_stats.repetition_level_histogram); if (size_stats.unencoded_byte_array_data_bytes.has_value()) { size_statistics.__set_unencoded_byte_array_data_bytes( size_stats.unencoded_byte_array_data_bytes.value()); } return size_statistics; } // ---------------------------------------------------------------------- // Thrift struct serialization / deserialization utilities using ThriftBuffer = apache::thrift::transport::TMemoryBuffer; class ThriftDeserializer { public: explicit ThriftDeserializer(const ReaderProperties& properties) : ThriftDeserializer(properties.thrift_string_size_limit(), properties.thrift_container_size_limit()) {} ThriftDeserializer(int32_t string_size_limit, int32_t container_size_limit) : string_size_limit_(string_size_limit), container_size_limit_(container_size_limit) {} // Deserialize a thrift message from buf/len. buf/len must at least contain // all the bytes needed to store the thrift message. On return, len will be // set to the actual length of the header. template void DeserializeMessage(const uint8_t* buf, uint32_t* len, T* deserialized_msg, Decryptor* decryptor = NULLPTR) { if (decryptor == NULLPTR) { // thrift message is not encrypted DeserializeUnencryptedMessage(buf, len, deserialized_msg); } else { // thrift message is encrypted uint32_t clen; clen = *len; if (clen > static_cast(std::numeric_limits::max())) { std::stringstream ss; ss << "Cannot decrypt buffer with length " << clen << ", which overflows int32\n"; throw ParquetException(ss.str()); } // decrypt auto decrypted_buffer = AllocateBuffer( decryptor->pool(), decryptor->PlaintextLength(static_cast(clen))); ::arrow20::util::span cipher_buf(buf, clen); uint32_t decrypted_buffer_len = decryptor->Decrypt(cipher_buf, decrypted_buffer->mutable_span_as()); if (decrypted_buffer_len <= 0) { throw ParquetException("Couldn't decrypt buffer\n"); } *len = decryptor->CiphertextLength(static_cast(decrypted_buffer_len)); DeserializeUnencryptedMessage(decrypted_buffer->data(), &decrypted_buffer_len, deserialized_msg); } } private: // On Thrift 0.14.0+, we want to use TConfiguration to raise the max message size // limit (ARROW-13655). If we wanted to protect against huge messages, we could // do it ourselves since we know the message size up front. std::shared_ptr CreateReadOnlyMemoryBuffer(uint8_t* buf, uint32_t len) { return std::make_shared(buf, len); } template void DeserializeUnencryptedMessage(const uint8_t* buf, uint32_t* len, T* deserialized_msg) { // Deserialize msg bytes into c++ thrift msg using memory transport. auto tmem_transport = CreateReadOnlyMemoryBuffer(const_cast(buf), *len); auto tproto = apache::thrift::protocol::TCompactProtocolT( tmem_transport, string_size_limit_, container_size_limit_); try { deserialized_msg ->template read>( &tproto); } catch (std::exception& e) { std::stringstream ss; ss << "Couldn't deserialize thrift: " << e.what() << "\n"; throw ParquetException(ss.str()); } uint32_t bytes_left = tmem_transport->available_read(); *len = *len - bytes_left; } const int32_t string_size_limit_; const int32_t container_size_limit_; }; /// Utility class to serialize thrift objects to a binary format. This object /// should be reused if possible to reuse the underlying memory. /// Note: thrift will encode NULLs into the serialized buffer so it is not valid /// to treat it as a string. class ThriftSerializer { public: explicit ThriftSerializer(int initial_buffer_size = 1024) : mem_buffer_(new ThriftBuffer(initial_buffer_size)) { apache::thrift::protocol::TCompactProtocolFactoryT factory; protocol_ = factory.getProtocol(mem_buffer_); } /// Serialize obj into a memory buffer. The result is returned in buffer/len. The /// memory returned is owned by this object and will be invalid when another object /// is serialized. template void SerializeToBuffer(const T* obj, uint32_t* len, uint8_t** buffer) { SerializeObject(obj); mem_buffer_->getBuffer(buffer, len); } template void SerializeToString(const T* obj, std::string* result) { SerializeObject(obj); *result = mem_buffer_->getBufferAsString(); } template int64_t Serialize(const T* obj, ArrowOutputStream* out, Encryptor* encryptor = NULLPTR) { uint8_t* out_buffer; uint32_t out_length; SerializeToBuffer(obj, &out_length, &out_buffer); // obj is not encrypted if (encryptor == NULLPTR) { PARQUET_THROW_NOT_OK(out->Write(out_buffer, out_length)); return static_cast(out_length); } else { // obj is encrypted return SerializeEncryptedObj(out, out_buffer, out_length, encryptor); } } private: template void SerializeObject(const T* obj) { try { mem_buffer_->resetBuffer(); obj->write(protocol_.get()); } catch (std::exception& e) { std::stringstream ss; ss << "Couldn't serialize thrift: " << e.what() << "\n"; throw ParquetException(ss.str()); } } int64_t SerializeEncryptedObj(ArrowOutputStream* out, const uint8_t* out_buffer, uint32_t out_length, Encryptor* encryptor) { auto cipher_buffer = AllocateBuffer(encryptor->pool(), encryptor->CiphertextLength(out_length)); ::arrow20::util::span out_span(out_buffer, out_length); int32_t cipher_buffer_len = encryptor->Encrypt(out_span, cipher_buffer->mutable_span_as()); PARQUET_THROW_NOT_OK(out->Write(cipher_buffer->data(), cipher_buffer_len)); return static_cast(cipher_buffer_len); } std::shared_ptr mem_buffer_; std::shared_ptr protocol_; }; } // namespace parquet20