// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "contrib/libs/apache/arrow_next/cpp/src/parquet/arrow/reader_internal.h" #include #include #include #include #include #include #include #include #include #include "contrib/libs/apache/arrow_next/cpp/src/arrow/array.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/compute/api.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/datum.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/io/memory.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/ipc/reader.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/ipc/writer.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/scalar.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/status.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/table.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/type.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/type_traits.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/base64.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/bit_util.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/checked_cast.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/endian.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/float16.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/int_util_overflow.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/logging.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/ubsan.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/arrow/reader.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/arrow/schema.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/arrow/schema_internal.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/column_reader.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/platform.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/properties.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/schema.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/statistics.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/types.h" #include "contrib/libs/apache/arrow_next/cpp/src/parquet/windows_fixup.h" // for OPTIONAL using arrow20::Array; using arrow20::BooleanArray; using arrow20::ChunkedArray; using arrow20::DataType; using arrow20::Datum; using arrow20::Decimal128; using arrow20::Decimal128Array; using arrow20::Decimal128Type; using arrow20::Decimal256; using arrow20::Decimal256Array; using arrow20::Decimal256Type; using arrow20::Field; using arrow20::Int32Array; using arrow20::ListArray; using arrow20::MemoryPool; using arrow20::ResizableBuffer; using arrow20::Status; using arrow20::StructArray; using arrow20::Table; using arrow20::TimestampArray; using ::arrow20::bit_util::FromBigEndian; using ::arrow20::internal::checked_cast; using ::arrow20::internal::checked_pointer_cast; using ::arrow20::internal::SafeLeftShift; using ::arrow20::util::Float16; using ::arrow20::util::SafeLoadAs; using parquet20::internal::BinaryRecordReader; using parquet20::internal::DictionaryRecordReader; using parquet20::internal::RecordReader; using parquet20::schema::GroupNode; using parquet20::schema::Node; using parquet20::schema::PrimitiveNode; using ParquetType = parquet20::Type; namespace bit_util = arrow20::bit_util; namespace parquet20::arrow20 { namespace { template using ArrayType = typename ::arrow20::TypeTraits::ArrayType; template Status MakeMinMaxScalar(const StatisticsType& statistics, std::shared_ptr<::arrow20::Scalar>* min, std::shared_ptr<::arrow20::Scalar>* max) { *min = ::arrow20::MakeScalar(static_cast(statistics.min())); *max = ::arrow20::MakeScalar(static_cast(statistics.max())); return Status::OK(); } template Status MakeMinMaxTypedScalar(const StatisticsType& statistics, std::shared_ptr type, std::shared_ptr<::arrow20::Scalar>* min, std::shared_ptr<::arrow20::Scalar>* max) { ARROW_ASSIGN_OR_RAISE(*min, ::arrow20::MakeScalar(type, statistics.min())); ARROW_ASSIGN_OR_RAISE(*max, ::arrow20::MakeScalar(type, statistics.max())); return Status::OK(); } template Status MakeMinMaxIntegralScalar(const StatisticsType& statistics, const ::arrow20::DataType& arrow_type, std::shared_ptr<::arrow20::Scalar>* min, std::shared_ptr<::arrow20::Scalar>* max) { const auto column_desc = statistics.descr(); const auto& logical_type = column_desc->logical_type(); const auto& integer = checked_pointer_cast(logical_type); const bool is_signed = integer->is_signed(); switch (integer->bit_width()) { case 8: return is_signed ? MakeMinMaxScalar(statistics, min, max) : MakeMinMaxScalar(statistics, min, max); case 16: return is_signed ? MakeMinMaxScalar(statistics, min, max) : MakeMinMaxScalar(statistics, min, max); case 32: return is_signed ? MakeMinMaxScalar(statistics, min, max) : MakeMinMaxScalar(statistics, min, max); case 64: return is_signed ? MakeMinMaxScalar(statistics, min, max) : MakeMinMaxScalar(statistics, min, max); } return Status::OK(); } static Status FromInt32Statistics(const Int32Statistics& statistics, const LogicalType& logical_type, std::shared_ptr<::arrow20::Scalar>* min, std::shared_ptr<::arrow20::Scalar>* max) { ARROW_ASSIGN_OR_RAISE(auto type, FromInt32(logical_type)); switch (logical_type.type()) { case LogicalType::Type::INT: return MakeMinMaxIntegralScalar(statistics, *type, min, max); break; case LogicalType::Type::DATE: case LogicalType::Type::TIME: case LogicalType::Type::NONE: return MakeMinMaxTypedScalar(statistics, type, min, max); break; default: break; } return Status::NotImplemented("Cannot extract statistics for type "); } static Status FromInt64Statistics(const Int64Statistics& statistics, const LogicalType& logical_type, std::shared_ptr<::arrow20::Scalar>* min, std::shared_ptr<::arrow20::Scalar>* max) { ARROW_ASSIGN_OR_RAISE(auto type, FromInt64(logical_type)); switch (logical_type.type()) { case LogicalType::Type::INT: return MakeMinMaxIntegralScalar(statistics, *type, min, max); break; case LogicalType::Type::TIME: case LogicalType::Type::TIMESTAMP: case LogicalType::Type::NONE: return MakeMinMaxTypedScalar(statistics, type, min, max); break; default: break; } return Status::NotImplemented("Cannot extract statistics for type "); } template Result> FromBigEndianString( const std::string& data, std::shared_ptr arrow_type) { ARROW_ASSIGN_OR_RAISE( DecimalType decimal, DecimalType::FromBigEndian(reinterpret_cast(data.data()), static_cast(data.size()))); return ::arrow20::MakeScalar(std::move(arrow_type), decimal); } // Extracts Min and Max scalar from bytes like types (i.e. types where // decimal is encoded as little endian. Status ExtractDecimalMinMaxFromBytesType(const Statistics& statistics, const LogicalType& logical_type, std::shared_ptr<::arrow20::Scalar>* min, std::shared_ptr<::arrow20::Scalar>* max) { const DecimalLogicalType& decimal_type = checked_cast(logical_type); Result> maybe_type = Decimal128Type::Make(decimal_type.precision(), decimal_type.scale()); std::shared_ptr arrow_type; if (maybe_type.ok()) { arrow_type = maybe_type.ValueOrDie(); ARROW_ASSIGN_OR_RAISE( *min, FromBigEndianString(statistics.EncodeMin(), arrow_type)); ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString(statistics.EncodeMax(), std::move(arrow_type))); return Status::OK(); } // Fallback to see if Decimal256 can represent the type. ARROW_ASSIGN_OR_RAISE( arrow_type, Decimal256Type::Make(decimal_type.precision(), decimal_type.scale())); ARROW_ASSIGN_OR_RAISE( *min, FromBigEndianString(statistics.EncodeMin(), arrow_type)); ARROW_ASSIGN_OR_RAISE(*max, FromBigEndianString(statistics.EncodeMax(), std::move(arrow_type))); return Status::OK(); } Status ByteArrayStatisticsAsScalars(const Statistics& statistics, std::shared_ptr<::arrow20::Scalar>* min, std::shared_ptr<::arrow20::Scalar>* max) { auto logical_type = statistics.descr()->logical_type(); if (logical_type->type() == LogicalType::Type::DECIMAL) { return ExtractDecimalMinMaxFromBytesType(statistics, *logical_type, min, max); } std::shared_ptr<::arrow20::DataType> type; if (statistics.descr()->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) { type = ::arrow20::fixed_size_binary(statistics.descr()->type_length()); } else { type = logical_type->type() == LogicalType::Type::STRING ? ::arrow20::utf8() : ::arrow20::binary(); } ARROW_ASSIGN_OR_RAISE( *min, ::arrow20::MakeScalar(type, Buffer::FromString(statistics.EncodeMin()))); ARROW_ASSIGN_OR_RAISE( *max, ::arrow20::MakeScalar(type, Buffer::FromString(statistics.EncodeMax()))); return Status::OK(); } Result> ViewOrCastChunkedArray( const std::shared_ptr& array, MemoryPool* pool, const std::shared_ptr& logical_value_type) { auto view_result = array->View(logical_value_type); if (view_result.ok()) { return view_result.ValueOrDie(); } else { ::arrow20::compute::ExecContext exec_context(pool); ARROW_ASSIGN_OR_RAISE( auto casted_datum, ::arrow20::compute::Cast(Datum(array), logical_value_type, ::arrow20::compute::CastOptions(), &exec_context)); return casted_datum.chunked_array(); } } } // namespace Status StatisticsAsScalars(const Statistics& statistics, std::shared_ptr<::arrow20::Scalar>* min, std::shared_ptr<::arrow20::Scalar>* max) { if (!statistics.HasMinMax()) { return Status::Invalid("Statistics has no min max."); } auto column_desc = statistics.descr(); if (column_desc == nullptr) { return Status::Invalid("Statistics carries no descriptor, can't infer arrow type."); } auto physical_type = column_desc->physical_type(); auto logical_type = column_desc->logical_type(); switch (physical_type) { case Type::BOOLEAN: return MakeMinMaxScalar( checked_cast(statistics), min, max); case Type::FLOAT: return MakeMinMaxScalar( checked_cast(statistics), min, max); case Type::DOUBLE: return MakeMinMaxScalar( checked_cast(statistics), min, max); case Type::INT32: return FromInt32Statistics(checked_cast(statistics), *logical_type, min, max); case Type::INT64: return FromInt64Statistics(checked_cast(statistics), *logical_type, min, max); case Type::BYTE_ARRAY: case Type::FIXED_LEN_BYTE_ARRAY: return ByteArrayStatisticsAsScalars(statistics, min, max); default: return Status::NotImplemented("Extract statistics unsupported for physical_type ", physical_type, " unsupported."); } return Status::OK(); } // ---------------------------------------------------------------------- // Primitive types namespace { /// Drop the validity buffer from each chunk. /// /// Used when reading a non-nullable field. void ReconstructChunksWithoutNulls(::arrow20::ArrayVector* chunks) { for (size_t i = 0; i < chunks->size(); i++) { if ((*chunks)[i]->data()->buffers[0]) { std::shared_ptr<::arrow20::ArrayData> data = (*chunks)[i]->data(); data->null_count = 0; data->buffers[0] = nullptr; (*chunks)[i] = MakeArray(data); } } } template void AttachStatistics(::arrow20::ArrayData* data, std::unique_ptr<::parquet20::ColumnChunkMetaData> metadata, const ReaderContext* ctx) { if (!metadata) { return; } using ArrowCType = typename ArrowType::c_type; auto statistics = metadata->statistics().get(); if (data->null_count == ::arrow20::kUnknownNullCount && !statistics) { return; } auto array_statistics = std::make_shared<::arrow20::ArrayStatistics>(); if (data->null_count != ::arrow20::kUnknownNullCount) { array_statistics->null_count = data->null_count; } if (statistics) { if (statistics->HasDistinctCount()) { array_statistics->distinct_count = statistics->distinct_count(); } if (statistics->HasMinMax()) { auto typed_statistics = static_cast<::parquet20::TypedStatistics*>(statistics); const ArrowCType min = typed_statistics->min(); const ArrowCType max = typed_statistics->max(); if constexpr (std::is_same::value) { array_statistics->min = static_cast(min); array_statistics->max = static_cast(max); } else if constexpr (std::is_floating_point::value) { array_statistics->min = static_cast(min); array_statistics->max = static_cast(max); } else if constexpr (std::is_signed::value) { array_statistics->min = static_cast(min); array_statistics->max = static_cast(max); } else { array_statistics->min = static_cast(min); array_statistics->max = static_cast(max); } // We can assume that integer/floating point number/boolean // based min/max are always exact if they exist. Apache // Parquet's "Statistics" has "is_min_value_exact" and // "is_max_value_exact" but we can ignore them for integer/ // floating point number/boolean based min/max. // // See also the discussion at dev@parquet.apache.org: // https://lists.apache.org/thread/zfnmg5p51b7oylft5w5k4670wgkd4zv4 array_statistics->is_min_exact = true; array_statistics->is_max_exact = true; } } data->statistics = std::move(array_statistics); } template Status TransferInt(RecordReader* reader, std::unique_ptr<::parquet20::ColumnChunkMetaData> metadata, const ReaderContext* ctx, const std::shared_ptr& field, Datum* out) { using ArrowCType = typename ArrowType::c_type; using ParquetCType = typename ParquetType::c_type; int64_t length = reader->values_written(); ARROW_ASSIGN_OR_RAISE(auto data, ::arrow20::AllocateBuffer(length * sizeof(ArrowCType), ctx->pool)); auto values = reinterpret_cast(reader->values()); auto out_ptr = reinterpret_cast(data->mutable_data()); std::copy(values, values + length, out_ptr); int64_t null_count = 0; std::vector> buffers = {nullptr, std::move(data)}; if (field->nullable()) { null_count = reader->null_count(); buffers[0] = reader->ReleaseIsValid(); } auto array_data = ::arrow20::ArrayData::Make(field->type(), length, std::move(buffers), null_count); AttachStatistics(array_data.get(), std::move(metadata), ctx); *out = std::make_shared>(std::move(array_data)); return Status::OK(); } template std::shared_ptr TransferZeroCopy( RecordReader* reader, std::unique_ptr<::parquet20::ColumnChunkMetaData> metadata, const ReaderContext* ctx, const std::shared_ptr& field) { std::shared_ptr<::arrow20::ArrayData> data; if (field->nullable()) { std::vector> buffers = {reader->ReleaseIsValid(), reader->ReleaseValues()}; data = std::make_shared<::arrow20::ArrayData>(field->type(), reader->values_written(), std::move(buffers), reader->null_count()); } else { std::vector> buffers = {nullptr, reader->ReleaseValues()}; data = std::make_shared<::arrow20::ArrayData>(field->type(), reader->values_written(), std::move(buffers), /*null_count=*/0); } AttachStatistics(data.get(), std::move(metadata), ctx); return ::arrow20::MakeArray(std::move(data)); } Status TransferBool(RecordReader* reader, std::unique_ptr<::parquet20::ColumnChunkMetaData> metadata, const ReaderContext* ctx, bool nullable, Datum* out) { int64_t length = reader->values_written(); const int64_t buffer_size = bit_util::BytesForBits(length); ARROW_ASSIGN_OR_RAISE(auto data, ::arrow20::AllocateBuffer(buffer_size, ctx->pool)); // Transfer boolean values to packed bitmap auto values = reinterpret_cast(reader->values()); uint8_t* data_ptr = data->mutable_data(); memset(data_ptr, 0, static_cast(buffer_size)); for (int64_t i = 0; i < length; i++) { if (values[i]) { ::arrow20::bit_util::SetBit(data_ptr, i); } } std::shared_ptr<::arrow20::ArrayData> array_data; if (nullable) { array_data = ::arrow20::ArrayData::Make(::arrow20::boolean(), length, {reader->ReleaseIsValid(), std::move(data)}, reader->null_count()); } else { array_data = ::arrow20::ArrayData::Make(::arrow20::boolean(), length, {/*null_bitmap=*/nullptr, std::move(data)}, /*null_count=*/0); } AttachStatistics<::arrow20::BooleanType, BooleanType>(array_data.get(), std::move(metadata), ctx); *out = std::make_shared(std::move(array_data)); return Status::OK(); } Status TransferInt96(RecordReader* reader, MemoryPool* pool, const std::shared_ptr& field, Datum* out, const ::arrow20::TimeUnit::type int96_arrow_time_unit) { int64_t length = reader->values_written(); auto values = reinterpret_cast(reader->values()); ARROW_ASSIGN_OR_RAISE(auto data, ::arrow20::AllocateBuffer(length * sizeof(int64_t), pool)); auto data_ptr = reinterpret_cast(data->mutable_data()); for (int64_t i = 0; i < length; i++) { if (values[i].value[2] == 0) { // Happens for null entries: avoid triggering UBSAN as that Int96 timestamp // isn't representable as a 64-bit Unix timestamp. *data_ptr++ = 0; } else { switch (int96_arrow_time_unit) { case ::arrow20::TimeUnit::NANO: *data_ptr++ = Int96GetNanoSeconds(values[i]); break; case ::arrow20::TimeUnit::MICRO: *data_ptr++ = Int96GetMicroSeconds(values[i]); break; case ::arrow20::TimeUnit::MILLI: *data_ptr++ = Int96GetMilliSeconds(values[i]); break; case ::arrow20::TimeUnit::SECOND: *data_ptr++ = Int96GetSeconds(values[i]); break; } } } if (field->nullable()) { *out = std::make_shared(field->type(), length, std::move(data), reader->ReleaseIsValid(), reader->null_count()); } else { *out = std::make_shared(field->type(), length, std::move(data), /*null_bitmap=*/nullptr, /*null_count=*/0); } return Status::OK(); } Status TransferDate64(RecordReader* reader, MemoryPool* pool, const std::shared_ptr& field, Datum* out) { int64_t length = reader->values_written(); auto values = reinterpret_cast(reader->values()); ARROW_ASSIGN_OR_RAISE(auto data, ::arrow20::AllocateBuffer(length * sizeof(int64_t), pool)); auto out_ptr = reinterpret_cast(data->mutable_data()); for (int64_t i = 0; i < length; i++) { *out_ptr++ = static_cast(values[i]) * kMillisecondsPerDay; } if (field->nullable()) { *out = std::make_shared<::arrow20::Date64Array>(field->type(), length, std::move(data), reader->ReleaseIsValid(), reader->null_count()); } else { *out = std::make_shared<::arrow20::Date64Array>(field->type(), length, std::move(data), /*null_bitmap=*/nullptr, /*null_count=*/0); } return Status::OK(); } // ---------------------------------------------------------------------- // Binary, direct to dictionary-encoded Status TransferDictionary(RecordReader* reader, MemoryPool* pool, const std::shared_ptr& logical_value_type, bool nullable, std::shared_ptr* out) { auto dict_reader = dynamic_cast(reader); DCHECK(dict_reader); *out = dict_reader->GetResult(); if (!logical_value_type->Equals(*(*out)->type())) { ARROW_ASSIGN_OR_RAISE(*out, ViewOrCastChunkedArray(*out, pool, logical_value_type)); } if (!nullable) { ::arrow20::ArrayVector chunks = (*out)->chunks(); ReconstructChunksWithoutNulls(&chunks); *out = std::make_shared(std::move(chunks), logical_value_type); } return Status::OK(); } Status TransferBinary(RecordReader* reader, MemoryPool* pool, const std::shared_ptr& logical_type_field, std::shared_ptr* out) { if (reader->read_dictionary()) { return TransferDictionary( reader, pool, ::arrow20::dictionary(::arrow20::int32(), logical_type_field->type()), logical_type_field->nullable(), out); } ::arrow20::compute::ExecContext ctx(pool); ::arrow20::compute::CastOptions cast_options; cast_options.allow_invalid_utf8 = true; // avoid spending time validating UTF8 data auto binary_reader = dynamic_cast(reader); DCHECK(binary_reader); auto chunks = binary_reader->GetBuilderChunks(); for (auto& chunk : chunks) { if (!chunk->type()->Equals(*logical_type_field->type())) { // XXX: if a LargeBinary chunk is larger than 2GB, the MSBs of offsets // will be lost because they are first created as int32 and then cast to int64. ARROW_ASSIGN_OR_RAISE( chunk, ::arrow20::compute::Cast(*chunk, logical_type_field->type(), cast_options, &ctx)); } } if (!logical_type_field->nullable()) { ReconstructChunksWithoutNulls(&chunks); } *out = std::make_shared(std::move(chunks), logical_type_field->type()); return Status::OK(); } // ---------------------------------------------------------------------- // INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128 || Decimal256 template Status RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width, uint8_t* out_buf) { ARROW_ASSIGN_OR_RAISE(DecimalType t, DecimalType::FromBigEndian(value, byte_width)); t.ToBytes(out_buf); return ::arrow20::Status::OK(); } template struct DecimalTypeTrait; template <> struct DecimalTypeTrait<::arrow20::Decimal128Array> { using value = ::arrow20::Decimal128; }; template <> struct DecimalTypeTrait<::arrow20::Decimal256Array> { using value = ::arrow20::Decimal256; }; template struct DecimalConverter { static inline Status ConvertToDecimal(const Array& array, const std::shared_ptr&, MemoryPool* pool, std::shared_ptr*) { return Status::NotImplemented("not implemented"); } }; template struct DecimalConverter { static inline Status ConvertToDecimal(const Array& array, const std::shared_ptr& type, MemoryPool* pool, std::shared_ptr* out) { const auto& fixed_size_binary_array = checked_cast(array); // The byte width of each decimal value const int32_t type_length = checked_cast(*type).byte_width(); // number of elements in the entire array const int64_t length = fixed_size_binary_array.length(); // Get the byte width of the values in the FixedSizeBinaryArray. Most of the time // this will be different from the decimal array width because we write the minimum // number of bytes necessary to represent a given precision const int32_t byte_width = checked_cast(*fixed_size_binary_array.type()) .byte_width(); // allocate memory for the decimal array ARROW_ASSIGN_OR_RAISE(auto data, ::arrow20::AllocateBuffer(length * type_length, pool)); // raw bytes that we can write to uint8_t* out_ptr = data->mutable_data(); // convert each FixedSizeBinary value to valid decimal bytes const int64_t null_count = fixed_size_binary_array.null_count(); using DecimalType = typename DecimalTypeTrait::value; if (null_count > 0) { for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { if (!fixed_size_binary_array.IsNull(i)) { RETURN_NOT_OK(RawBytesToDecimalBytes( fixed_size_binary_array.GetValue(i), byte_width, out_ptr)); } else { std::memset(out_ptr, 0, type_length); } } } else { for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { RETURN_NOT_OK(RawBytesToDecimalBytes( fixed_size_binary_array.GetValue(i), byte_width, out_ptr)); } } *out = std::make_shared( type, length, std::move(data), fixed_size_binary_array.null_bitmap(), null_count); return Status::OK(); } }; template struct DecimalConverter { static inline Status ConvertToDecimal(const Array& array, const std::shared_ptr& type, MemoryPool* pool, std::shared_ptr* out) { const auto& binary_array = checked_cast(array); const int64_t length = binary_array.length(); const auto& decimal_type = checked_cast(*type); const int64_t type_length = decimal_type.byte_width(); ARROW_ASSIGN_OR_RAISE(auto data, ::arrow20::AllocateBuffer(length * type_length, pool)); // raw bytes that we can write to uint8_t* out_ptr = data->mutable_data(); const int64_t null_count = binary_array.null_count(); // convert each BinaryArray value to valid decimal bytes for (int64_t i = 0; i < length; i++, out_ptr += type_length) { int32_t record_len = 0; const uint8_t* record_loc = binary_array.GetValue(i, &record_len); if (record_len < 0 || record_len > type_length) { return Status::Invalid("Invalid BYTE_ARRAY length for ", type->ToString()); } auto out_ptr_view = reinterpret_cast(out_ptr); out_ptr_view[0] = 0; out_ptr_view[1] = 0; // only convert rows that are not null if there are nulls, or // all rows, if there are not if ((null_count > 0 && !binary_array.IsNull(i)) || null_count <= 0) { using DecimalType = typename DecimalTypeTrait::value; RETURN_NOT_OK( RawBytesToDecimalBytes(record_loc, record_len, out_ptr)); } } *out = std::make_shared(type, length, std::move(data), binary_array.null_bitmap(), null_count); return Status::OK(); } }; /// \brief Convert an Int32 or Int64 array into a Decimal128Array /// The parquet spec allows systems to write decimals in int32, int64 if the values are /// small enough to fit in less 4 bytes or less than 8 bytes, respectively. /// This function implements the conversion from int32 and int64 arrays to decimal arrays. template < typename DecimalArrayType, typename ParquetIntegerType, typename = ::arrow20::enable_if_t::value || std::is_same::value>> static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, const std::shared_ptr& field, Datum* out) { // Decimal128 and Decimal256 are only Arrow constructs. Parquet does not // specifically distinguish between decimal byte widths. DCHECK(field->type()->id() == ::arrow20::Type::DECIMAL128 || field->type()->id() == ::arrow20::Type::DECIMAL256); const int64_t length = reader->values_written(); using ElementType = typename ParquetIntegerType::c_type; static_assert(std::is_same::value || std::is_same::value, "ElementType must be int32_t or int64_t"); const auto values = reinterpret_cast(reader->values()); const auto& decimal_type = checked_cast(*field->type()); const int64_t type_length = decimal_type.byte_width(); ARROW_ASSIGN_OR_RAISE(auto data, ::arrow20::AllocateBuffer(length * type_length, pool)); uint8_t* out_ptr = data->mutable_data(); using ::arrow20::bit_util::FromLittleEndian; for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { // sign/zero extend int32_t values, otherwise a no-op const auto value = static_cast(values[i]); if constexpr (std::is_same_v) { ::arrow20::Decimal128 decimal(value); decimal.ToBytes(out_ptr); } else { ::arrow20::Decimal256 decimal(value); decimal.ToBytes(out_ptr); } } if (reader->nullable_values() && field->nullable()) { std::shared_ptr is_valid = reader->ReleaseIsValid(); *out = std::make_shared(field->type(), length, std::move(data), is_valid, reader->null_count()); } else { *out = std::make_shared(field->type(), length, std::move(data)); } return Status::OK(); } /// \brief Convert an arrow20::BinaryArray to an arrow20::Decimal{128,256}Array /// We do this by: /// 1. Creating an arrow20::BinaryArray from the RecordReader's builder /// 2. Allocating a buffer for the arrow20::Decimal{128,256}Array /// 3. Converting the big-endian bytes in each BinaryArray entry to two integers /// representing the high and low bits of each decimal value. template Status TransferDecimal(RecordReader* reader, MemoryPool* pool, const std::shared_ptr& field, Datum* out) { auto binary_reader = dynamic_cast(reader); DCHECK(binary_reader); ::arrow20::ArrayVector chunks = binary_reader->GetBuilderChunks(); for (size_t i = 0; i < chunks.size(); ++i) { std::shared_ptr chunk_as_decimal; auto fn = &DecimalConverter::ConvertToDecimal; RETURN_NOT_OK(fn(*chunks[i], field->type(), pool, &chunk_as_decimal)); // Replace the chunk, which will hopefully also free memory as we go chunks[i] = chunk_as_decimal; } if (!field->nullable()) { ReconstructChunksWithoutNulls(&chunks); } *out = std::make_shared(chunks, field->type()); return Status::OK(); } Status TransferHalfFloat(RecordReader* reader, MemoryPool* pool, const std::shared_ptr& field, Datum* out) { static const auto binary_type = ::arrow20::fixed_size_binary(2); // Read as a FixedSizeBinaryArray - then, view as a HalfFloatArray std::shared_ptr chunked_array; RETURN_NOT_OK( TransferBinary(reader, pool, field->WithType(binary_type), &chunked_array)); ARROW_ASSIGN_OR_RAISE(*out, chunked_array->View(field->type())); return Status::OK(); } } // namespace #define TRANSFER_INT32(ENUM, ArrowType) \ case ::arrow20::Type::ENUM: { \ Status s = TransferInt(reader, std::move(metadata), ctx, \ value_field, &result); \ RETURN_NOT_OK(s); \ } break; #define TRANSFER_INT64(ENUM, ArrowType) \ case ::arrow20::Type::ENUM: { \ Status s = TransferInt(reader, std::move(metadata), ctx, \ value_field, &result); \ RETURN_NOT_OK(s); \ } break; Status TransferColumnData(RecordReader* reader, std::unique_ptr<::parquet20::ColumnChunkMetaData> metadata, const std::shared_ptr& value_field, const ColumnDescriptor* descr, const ReaderContext* ctx, std::shared_ptr* out) { auto pool = ctx->pool; Datum result; std::shared_ptr chunked_result; switch (value_field->type()->id()) { case ::arrow20::Type::DICTIONARY: { RETURN_NOT_OK(TransferDictionary(reader, pool, value_field->type(), value_field->nullable(), &chunked_result)); result = chunked_result; } break; case ::arrow20::Type::NA: { result = std::make_shared<::arrow20::NullArray>(reader->values_written()); break; } case ::arrow20::Type::INT32: result = TransferZeroCopy<::arrow20::Int32Type, Int32Type>( reader, std::move(metadata), ctx, value_field); break; case ::arrow20::Type::INT64: result = TransferZeroCopy<::arrow20::Int64Type, Int64Type>( reader, std::move(metadata), ctx, value_field); break; case ::arrow20::Type::FLOAT: result = TransferZeroCopy<::arrow20::FloatType, FloatType>( reader, std::move(metadata), ctx, value_field); break; case ::arrow20::Type::DOUBLE: result = TransferZeroCopy<::arrow20::DoubleType, DoubleType>( reader, std::move(metadata), ctx, value_field); break; case ::arrow20::Type::BOOL: RETURN_NOT_OK(TransferBool(reader, std::move(metadata), ctx, value_field->nullable(), &result)); break; TRANSFER_INT32(UINT8, ::arrow20::UInt8Type); TRANSFER_INT32(INT8, ::arrow20::Int8Type); TRANSFER_INT32(UINT16, ::arrow20::UInt16Type); TRANSFER_INT32(INT16, ::arrow20::Int16Type); TRANSFER_INT32(UINT32, ::arrow20::UInt32Type); TRANSFER_INT64(UINT64, ::arrow20::UInt64Type); TRANSFER_INT32(DATE32, ::arrow20::Date32Type); TRANSFER_INT32(TIME32, ::arrow20::Time32Type); TRANSFER_INT64(TIME64, ::arrow20::Time64Type); TRANSFER_INT64(DURATION, ::arrow20::DurationType); case ::arrow20::Type::DATE64: RETURN_NOT_OK(TransferDate64(reader, pool, value_field, &result)); break; case ::arrow20::Type::FIXED_SIZE_BINARY: case ::arrow20::Type::BINARY: case ::arrow20::Type::STRING: case ::arrow20::Type::LARGE_BINARY: case ::arrow20::Type::LARGE_STRING: { RETURN_NOT_OK(TransferBinary(reader, pool, value_field, &chunked_result)); result = chunked_result; } break; case ::arrow20::Type::HALF_FLOAT: { const auto& type = *value_field->type(); if (descr->physical_type() != ::parquet20::Type::FIXED_LEN_BYTE_ARRAY) { return Status::Invalid("Physical type for ", type.ToString(), " must be fixed length binary"); } if (descr->type_length() != type.byte_width()) { return Status::Invalid("Fixed length binary type for ", type.ToString(), " must have a byte width of ", type.byte_width()); } RETURN_NOT_OK(TransferHalfFloat(reader, pool, value_field, &result)); } break; case ::arrow20::Type::DECIMAL128: { switch (descr->physical_type()) { case ::parquet20::Type::INT32: { auto fn = DecimalIntegerTransfer; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; case ::parquet20::Type::INT64: { auto fn = &DecimalIntegerTransfer; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; case ::parquet20::Type::BYTE_ARRAY: { auto fn = &TransferDecimal; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; case ::parquet20::Type::FIXED_LEN_BYTE_ARRAY: { auto fn = &TransferDecimal; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; default: return Status::Invalid( "Physical type for decimal128 must be int32, int64, byte array, or fixed " "length binary"); } } break; case ::arrow20::Type::DECIMAL256: switch (descr->physical_type()) { case ::parquet20::Type::INT32: { auto fn = DecimalIntegerTransfer; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; case ::parquet20::Type::INT64: { auto fn = &DecimalIntegerTransfer; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; case ::parquet20::Type::BYTE_ARRAY: { auto fn = &TransferDecimal; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; case ::parquet20::Type::FIXED_LEN_BYTE_ARRAY: { auto fn = &TransferDecimal; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; default: return Status::Invalid( "Physical type for decimal256 must be int32, int64, byte array, or fixed " "length binary"); } break; case ::arrow20::Type::TIMESTAMP: { const ::arrow20::TimestampType& timestamp_type = checked_cast<::arrow20::TimestampType&>(*value_field->type()); if (descr->physical_type() == ::parquet20::Type::INT96) { RETURN_NOT_OK( TransferInt96(reader, pool, value_field, &result, timestamp_type.unit())); } else { switch (timestamp_type.unit()) { case ::arrow20::TimeUnit::MILLI: case ::arrow20::TimeUnit::MICRO: case ::arrow20::TimeUnit::NANO: result = TransferZeroCopy<::arrow20::Int64Type, Int64Type>( reader, std::move(metadata), ctx, value_field); break; default: return Status::NotImplemented("TimeUnit not supported"); } } } break; default: return Status::NotImplemented("No support for reading columns of type ", value_field->type()->ToString()); } if (result.kind() == Datum::ARRAY) { *out = std::make_shared(result.make_array()); } else if (result.kind() == Datum::CHUNKED_ARRAY) { *out = result.chunked_array(); } else { DCHECK(false) << "Should be impossible, result was " << result.ToString(); } return Status::OK(); } } // namespace parquet20::arrow20