// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "contrib/libs/apache/arrow_next/cpp/src/arrow/csv/converter.h" #include #include #include #include #include #include #include #include "contrib/libs/apache/arrow_next/cpp/src/arrow/array/builder_binary.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/array/builder_decimal.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/array/builder_dict.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/array/builder_primitive.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/csv/parser.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/status.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/type.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/type_fwd.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/type_traits.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/checked_cast.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/decimal.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/trie.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/utf8_internal.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/value_parsing.h" // IWYU pragma: keep namespace arrow20 { namespace csv { using internal::checked_cast; using internal::Trie; using internal::TrieBuilder; namespace { Status GenericConversionError(const std::shared_ptr& type, const uint8_t* data, uint32_t size) { return Status::Invalid("CSV conversion error to ", type->ToString(), ": invalid value '", std::string(reinterpret_cast(data), size), "'"); } inline bool IsWhitespace(uint8_t c) { if (ARROW_PREDICT_TRUE(c > ' ')) { return false; } return c == ' ' || c == '\t'; } // Updates data_inout and size_inout to not include leading/trailing whitespace // characters. inline void TrimWhiteSpace(const uint8_t** data_inout, uint32_t* size_inout) { const uint8_t*& data = *data_inout; uint32_t& size = *size_inout; // Skip trailing whitespace if (ARROW_PREDICT_TRUE(size > 0) && ARROW_PREDICT_FALSE(IsWhitespace(data[size - 1]))) { const uint8_t* p = data + size - 1; while (size > 0 && IsWhitespace(*p)) { --size; --p; } } // Skip leading whitespace if (ARROW_PREDICT_TRUE(size > 0) && ARROW_PREDICT_FALSE(IsWhitespace(data[0]))) { while (size > 0 && IsWhitespace(*data)) { --size; ++data; } } } Status InitializeTrie(const std::vector& inputs, Trie* trie) { TrieBuilder builder; for (const auto& s : inputs) { RETURN_NOT_OK(builder.Append(s, true /* allow_duplicates */)); } *trie = builder.Finish(); return Status::OK(); } // Presize a builder based on parser contents template Status PresizeBuilder(const BlockParser& parser, BuilderType* builder) { RETURN_NOT_OK(builder->Resize(parser.num_rows())); if constexpr (is_base_binary_type::value) { return builder->ReserveData(parser.num_bytes()); } else { return Status::OK(); } } ///////////////////////////////////////////////////////////////////////// // Per-type value decoders struct ValueDecoder { explicit ValueDecoder(const std::shared_ptr& type, const ConvertOptions& options) : type_(type), options_(options) {} Status Initialize() { // TODO no need to build a separate Trie for each instance return InitializeTrie(options_.null_values, &null_trie_); } bool IsNull(const uint8_t* data, uint32_t size, bool quoted) { if (quoted && !options_.quoted_strings_can_be_null) { return false; } return null_trie_.Find(std::string_view(reinterpret_cast(data), size)) >= 0; } protected: Trie null_trie_; const std::shared_ptr type_; const ConvertOptions& options_; }; // // Value decoder for fixed-size binary // struct FixedSizeBinaryValueDecoder : public ValueDecoder { using value_type = const uint8_t*; explicit FixedSizeBinaryValueDecoder(const std::shared_ptr& type, const ConvertOptions& options) : ValueDecoder(type, options), byte_width_(checked_cast(*type).byte_width()) {} Status Decode(const uint8_t* data, uint32_t size, bool quoted, value_type* out) { if (ARROW_PREDICT_FALSE(size != byte_width_)) { return Status::Invalid("CSV conversion error to ", type_->ToString(), ": got a ", size, "-byte long string"); } *out = data; return Status::OK(); } protected: const uint32_t byte_width_; }; // // Value decoder for variable-size binary // template struct BinaryValueDecoder : public ValueDecoder { using value_type = std::string_view; using ValueDecoder::ValueDecoder; Status Initialize() { util::InitializeUTF8(); return ValueDecoder::Initialize(); } Status Decode(const uint8_t* data, uint32_t size, bool quoted, value_type* out) { if (CheckUTF8 && ARROW_PREDICT_FALSE(!util::ValidateUTF8Inline(data, size))) { return Status::Invalid("CSV conversion error to ", type_->ToString(), ": invalid UTF8 data"); } *out = {reinterpret_cast(data), size}; return Status::OK(); } bool IsNull(const uint8_t* data, uint32_t size, bool quoted) { return options_.strings_can_be_null && (!quoted || options_.quoted_strings_can_be_null) && ValueDecoder::IsNull(data, size, false /* quoted */); } }; // // Value decoder for integers, floats and temporals // template static arrow20::internal::StringConverter MakeStringConverter( const ConvertOptions& options) { if constexpr (is_floating_type::value) { return arrow20::internal::StringConverter{options.decimal_point}; } else { return arrow20::internal::StringConverter{}; } } template struct NumericValueDecoder : public ValueDecoder { using value_type = typename T::c_type; NumericValueDecoder(const std::shared_ptr& type, const ConvertOptions& options) : ValueDecoder(type, options), concrete_type_(checked_cast(*type)), string_converter_(MakeStringConverter(options)) {} Status Decode(const uint8_t* data, uint32_t size, bool quoted, value_type* out) { // XXX should quoted values be allowed at all? TrimWhiteSpace(&data, &size); if (ARROW_PREDICT_FALSE(!string_converter_.Convert( concrete_type_, reinterpret_cast(data), size, out))) { return GenericConversionError(type_, data, size); } return Status::OK(); } protected: const T& concrete_type_; arrow20::internal::StringConverter string_converter_; }; // // Value decoder for booleans // struct BooleanValueDecoder : public ValueDecoder { using value_type = bool; using ValueDecoder::ValueDecoder; Status Initialize() { // TODO no need to build separate Tries for each instance RETURN_NOT_OK(InitializeTrie(options_.true_values, &true_trie_)); RETURN_NOT_OK(InitializeTrie(options_.false_values, &false_trie_)); return ValueDecoder::Initialize(); } Status Decode(const uint8_t* data, uint32_t size, bool quoted, value_type* out) { // XXX should quoted values be allowed at all? if (false_trie_.Find(std::string_view(reinterpret_cast(data), size)) >= 0) { *out = false; return Status::OK(); } if (ARROW_PREDICT_TRUE(true_trie_.Find(std::string_view( reinterpret_cast(data), size)) >= 0)) { *out = true; return Status::OK(); } return GenericConversionError(type_, data, size); } protected: Trie true_trie_; Trie false_trie_; }; // // Value decoder for decimals // struct DecimalValueDecoder : public ValueDecoder { using value_type = Decimal128; explicit DecimalValueDecoder(const std::shared_ptr& type, const ConvertOptions& options) : ValueDecoder(type, options), decimal_type_(internal::checked_cast(*type_)), type_precision_(decimal_type_.precision()), type_scale_(decimal_type_.scale()) {} Status Decode(const uint8_t* data, uint32_t size, bool quoted, value_type* out) { TrimWhiteSpace(&data, &size); Decimal128 decimal; int32_t precision, scale; std::string_view view(reinterpret_cast(data), size); RETURN_NOT_OK(Decimal128::FromString(view, &decimal, &precision, &scale)); if (precision > type_precision_) { return Status::Invalid("Error converting '", view, "' to ", type_->ToString(), ": precision not supported by type."); } if (scale != type_scale_) { ARROW_ASSIGN_OR_RAISE(*out, decimal.Rescale(scale, type_scale_)); } else { *out = std::move(decimal); } return Status::OK(); } protected: const DecimalType& decimal_type_; const int32_t type_precision_; const int32_t type_scale_; }; // // Value decoder wrapper for decimals with a non-default decimal point // template struct CustomDecimalPointValueDecoder : public ValueDecoder { using value_type = typename WrappedDecoder::value_type; explicit CustomDecimalPointValueDecoder(const std::shared_ptr& type, const ConvertOptions& options) : ValueDecoder(type, options), wrapped_decoder_(type, options) {} Status Initialize() { RETURN_NOT_OK(wrapped_decoder_.Initialize()); for (int i = 0; i < 256; ++i) { mapping_[i] = i; } mapping_[options_.decimal_point] = '.'; mapping_['.'] = options_.decimal_point; // error out on standard decimal point temp_.resize(30); return Status::OK(); } Status Decode(const uint8_t* data, uint32_t size, bool quoted, value_type* out) { if (ARROW_PREDICT_FALSE(size > temp_.size())) { temp_.resize(size); } uint8_t* temp_data = temp_.data(); for (uint32_t i = 0; i < size; ++i) { temp_data[i] = mapping_[data[i]]; } if (ARROW_PREDICT_FALSE( !wrapped_decoder_.Decode(temp_data, size, quoted, out).ok())) { return GenericConversionError(type_, data, size); } return Status::OK(); } bool IsNull(const uint8_t* data, uint32_t size, bool quoted) { return wrapped_decoder_.IsNull(data, size, quoted); } protected: WrappedDecoder wrapped_decoder_; std::array mapping_; std::vector temp_; }; // // Value decoders for timestamps // struct InlineISO8601ValueDecoder : public ValueDecoder { using value_type = int64_t; explicit InlineISO8601ValueDecoder(const std::shared_ptr& type, const ConvertOptions& options) : ValueDecoder(type, options), unit_(checked_cast(*type_).unit()), expect_timezone_(!checked_cast(*type_).timezone().empty()) { } Status Decode(const uint8_t* data, uint32_t size, bool quoted, value_type* out) { bool zone_offset_present = false; if (ARROW_PREDICT_FALSE( !internal::ParseTimestampISO8601(reinterpret_cast(data), size, unit_, out, &zone_offset_present))) { return GenericConversionError(type_, data, size); } if (zone_offset_present != expect_timezone_) { if (expect_timezone_) { return Status::Invalid("CSV conversion error to ", type_->ToString(), ": expected a zone offset in '", std::string(reinterpret_cast(data), size), "'. If these timestamps are in local time, parse them as " "timestamps without timezone, then call assume_timezone."); } else { return Status::Invalid("CSV conversion error to ", type_->ToString(), ": expected no zone offset in '", std::string(reinterpret_cast(data), size), "'"); } } return Status::OK(); } protected: TimeUnit::type unit_; bool expect_timezone_; }; struct SingleParserTimestampValueDecoder : public ValueDecoder { using value_type = int64_t; explicit SingleParserTimestampValueDecoder(const std::shared_ptr& type, const ConvertOptions& options) : ValueDecoder(type, options), unit_(checked_cast(*type_).unit()), expect_timezone_(!checked_cast(*type_).timezone().empty()), parser_(*options_.timestamp_parsers[0]) {} Status Decode(const uint8_t* data, uint32_t size, bool quoted, value_type* out) { bool zone_offset_present = false; if (ARROW_PREDICT_FALSE(!parser_(reinterpret_cast(data), size, unit_, out, &zone_offset_present))) { return GenericConversionError(type_, data, size); } if (zone_offset_present != expect_timezone_) { if (expect_timezone_) { return Status::Invalid("CSV conversion error to ", type_->ToString(), ": expected a zone offset in '", std::string(reinterpret_cast(data), size), "'. If these timestamps are in local time, parse them as " "timestamps without timezone, then call assume_timezone. " "If using strptime, ensure '%z' is in the format string."); } else { return Status::Invalid("CSV conversion error to ", type_->ToString(), ": expected no zone offset in '", std::string(reinterpret_cast(data), size), "'"); } } return Status::OK(); } protected: TimeUnit::type unit_; bool expect_timezone_; const TimestampParser& parser_; }; struct MultipleParsersTimestampValueDecoder : public ValueDecoder { using value_type = int64_t; explicit MultipleParsersTimestampValueDecoder(const std::shared_ptr& type, const ConvertOptions& options) : ValueDecoder(type, options), unit_(checked_cast(*type_).unit()), expect_timezone_(!checked_cast(*type_).timezone().empty()), parsers_(GetParsers(options_)) {} Status Decode(const uint8_t* data, uint32_t size, bool quoted, value_type* out) { bool zone_offset_present = false; for (const auto& parser : parsers_) { if (parser->operator()(reinterpret_cast(data), size, unit_, out, &zone_offset_present) && zone_offset_present == expect_timezone_) { return Status::OK(); } } return GenericConversionError(type_, data, size); } protected: using ParserVector = std::vector; static ParserVector GetParsers(const ConvertOptions& options) { ParserVector parsers(options.timestamp_parsers.size()); for (size_t i = 0; i < options.timestamp_parsers.size(); ++i) { parsers[i] = options.timestamp_parsers[i].get(); } return parsers; } TimeUnit::type unit_; bool expect_timezone_; std::vector parsers_; }; ///////////////////////////////////////////////////////////////////////// // Concrete Converter hierarchy class ConcreteConverter : public Converter { public: using Converter::Converter; }; class ConcreteDictionaryConverter : public DictionaryConverter { public: using DictionaryConverter::DictionaryConverter; }; // // Concrete Converter for nulls // class NullConverter : public ConcreteConverter { public: NullConverter(const std::shared_ptr& type, const ConvertOptions& options, MemoryPool* pool) : ConcreteConverter(type, options, pool), decoder_(type_, options_) {} Result> Convert(const BlockParser& parser, int32_t col_index) override { NullBuilder builder(pool_); auto visit = [&](const uint8_t* data, uint32_t size, bool quoted) -> Status { if (ARROW_PREDICT_TRUE(decoder_.IsNull(data, size, quoted))) { return builder.AppendNull(); } else { return GenericConversionError(type_, data, size); } }; RETURN_NOT_OK(parser.VisitColumn(col_index, visit)); std::shared_ptr res; RETURN_NOT_OK(builder.Finish(&res)); return res; } protected: Status Initialize() override { return decoder_.Initialize(); } ValueDecoder decoder_; }; // // Concrete Converter for primitives // template class PrimitiveConverter : public ConcreteConverter { public: PrimitiveConverter(const std::shared_ptr& type, const ConvertOptions& options, MemoryPool* pool) : ConcreteConverter(type, options, pool), decoder_(type_, options_) {} Result> Convert(const BlockParser& parser, int32_t col_index) override { using BuilderType = typename TypeTraits::BuilderType; using value_type = typename ValueDecoderType::value_type; BuilderType builder(type_, pool_); RETURN_NOT_OK(PresizeBuilder(parser, &builder)); auto visit = [&](const uint8_t* data, uint32_t size, bool quoted) -> Status { if (decoder_.IsNull(data, size, quoted /* quoted */)) { return builder.AppendNull(); } value_type value{}; RETURN_NOT_OK(decoder_.Decode(data, size, quoted, &value)); builder.UnsafeAppend(value); return Status::OK(); }; RETURN_NOT_OK(parser.VisitColumn(col_index, visit)); std::shared_ptr res; RETURN_NOT_OK(builder.Finish(&res)); return res; } protected: Status Initialize() override { return decoder_.Initialize(); } ValueDecoderType decoder_; }; // // Concrete Converter for dictionaries // template class TypedDictionaryConverter : public ConcreteDictionaryConverter { public: TypedDictionaryConverter(const std::shared_ptr& value_type, const ConvertOptions& options, MemoryPool* pool) : ConcreteDictionaryConverter(value_type, options, pool), decoder_(value_type, options_) {} Result> Convert(const BlockParser& parser, int32_t col_index) override { // We use a fixed index width so that all column chunks get the same index type using BuilderType = Dictionary32Builder; using value_type = typename ValueDecoderType::value_type; BuilderType builder(value_type_, pool_); RETURN_NOT_OK(PresizeBuilder(parser, &builder)); auto visit = [&](const uint8_t* data, uint32_t size, bool quoted) -> Status { if (decoder_.IsNull(data, size, quoted /* quoted */)) { return builder.AppendNull(); } if (ARROW_PREDICT_FALSE(builder.dictionary_length() > max_cardinality_)) { return Status::IndexError("Dictionary length exceeded max cardinality"); } value_type value{}; RETURN_NOT_OK(decoder_.Decode(data, size, quoted, &value)); return builder.Append(value); }; RETURN_NOT_OK(parser.VisitColumn(col_index, visit)); std::shared_ptr res; RETURN_NOT_OK(builder.Finish(&res)); return res; } void SetMaxCardinality(int32_t max_length) override { max_cardinality_ = max_length; } protected: Status Initialize() override { util::InitializeUTF8(); return decoder_.Initialize(); } ValueDecoderType decoder_; int32_t max_cardinality_ = std::numeric_limits::max(); }; // // Concrete Converter factory for timestamps // template