// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "contrib/libs/apache/arrow_next/cpp/src/arrow/csv/column_decoder.h" #include #include #include #include #include #include #include "contrib/libs/apache/arrow_next/cpp/src/arrow/array.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/array/builder_base.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/csv/converter.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/csv/inference_internal.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/csv/options.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/csv/parser.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/status.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/type.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/type_fwd.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/future.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/logging.h" namespace arrow20 { namespace csv { class ConcreteColumnDecoder : public ColumnDecoder { public: explicit ConcreteColumnDecoder(MemoryPool* pool, int32_t col_index = -1) : ColumnDecoder(), pool_(pool), col_index_(col_index) {} protected: // XXX useful? virtual std::shared_ptr type() const = 0; Result> WrapConversionError( const Result>& result) { if (ARROW_PREDICT_TRUE(result.ok())) { return result; } else { const auto& st = result.status(); std::stringstream ss; ss << "In CSV column #" << col_index_ << ": " << st.message(); return st.WithMessage(ss.str()); } } MemoryPool* pool_; int32_t col_index_; }; ////////////////////////////////////////////////////////////////////////// // Null column decoder implementation (for a column not in the CSV file) class NullColumnDecoder : public ConcreteColumnDecoder { public: explicit NullColumnDecoder(const std::shared_ptr& type, MemoryPool* pool) : ConcreteColumnDecoder(pool), type_(type) {} Future> Decode( const std::shared_ptr& parser) override; protected: std::shared_ptr type() const override { return type_; } std::shared_ptr type_; }; Future> NullColumnDecoder::Decode( const std::shared_ptr& parser) { DCHECK_GE(parser->num_rows(), 0); return WrapConversionError(MakeArrayOfNull(type_, parser->num_rows(), pool_)); } ////////////////////////////////////////////////////////////////////////// // Pre-typed column decoder implementation class TypedColumnDecoder : public ConcreteColumnDecoder { public: TypedColumnDecoder(const std::shared_ptr& type, int32_t col_index, const ConvertOptions& options, MemoryPool* pool) : ConcreteColumnDecoder(pool, col_index), type_(type), options_(options) {} Status Init(); Future> Decode( const std::shared_ptr& parser) override; protected: std::shared_ptr type() const override { return type_; } std::shared_ptr type_; // CAUTION: ConvertOptions can grow large (if it customizes hundreds or // thousands of columns), so avoid copying it in each TypedColumnDecoder. const ConvertOptions& options_; std::shared_ptr converter_; }; Status TypedColumnDecoder::Init() { ARROW_ASSIGN_OR_RAISE(converter_, Converter::Make(type_, options_, pool_)); return Status::OK(); } Future> TypedColumnDecoder::Decode( const std::shared_ptr& parser) { DCHECK_NE(converter_, nullptr); return Future>::MakeFinished( WrapConversionError(converter_->Convert(*parser, col_index_))); } ////////////////////////////////////////////////////////////////////////// // Type-inferring column builder implementation class InferringColumnDecoder : public ConcreteColumnDecoder { public: InferringColumnDecoder(int32_t col_index, const ConvertOptions& options, MemoryPool* pool) : ConcreteColumnDecoder(pool, col_index), options_(options), infer_status_(options), type_frozen_(false) { first_inference_run_ = Future<>::Make(); first_inferrer_ = 0; } Status Init(); Future> Decode( const std::shared_ptr& parser) override; protected: std::shared_ptr type() const override { DCHECK_NE(converter_, nullptr); return converter_->type(); } Status UpdateType(); Result> RunInference(const std::shared_ptr& parser); // CAUTION: ConvertOptions can grow large (if it customizes hundreds or // thousands of columns), so avoid copying it in each InferringColumnDecoder. const ConvertOptions& options_; // Current inference status InferStatus infer_status_; bool type_frozen_; std::atomic first_inferrer_; Future<> first_inference_run_; std::shared_ptr converter_; }; Status InferringColumnDecoder::Init() { return UpdateType(); } Status InferringColumnDecoder::UpdateType() { return infer_status_.MakeConverter(pool_).Value(&converter_); } Result> InferringColumnDecoder::RunInference( const std::shared_ptr& parser) { while (true) { // (no one else should be updating converter_ concurrently) auto maybe_array = converter_->Convert(*parser, col_index_); if (maybe_array.ok() || !infer_status_.can_loosen_type()) { // Conversion succeeded, or failed definitively DCHECK(!type_frozen_); type_frozen_ = true; return maybe_array; } // Conversion failed temporarily, try another type infer_status_.LoosenType(maybe_array.status()); auto update_status = UpdateType(); if (!update_status.ok()) { return update_status; } } } Future> InferringColumnDecoder::Decode( const std::shared_ptr& parser) { // Empty arrays before the first inference run must be discarded since the type of the // array will be NA and not match arrays decoded later if (parser->num_rows() == 0) { return Future>::MakeFinished( MakeArrayOfNull(converter_->type(), 0)); } bool already_taken = first_inferrer_.fetch_or(1); // First block: run inference if (!already_taken) { auto maybe_array = RunInference(parser); first_inference_run_.MarkFinished(); return Future>::MakeFinished(std::move(maybe_array)); } // Non-first block: wait for inference to finish on first block now, // without blocking a worker thread. return first_inference_run_.Then([this, parser] { DCHECK(type_frozen_); auto maybe_array = converter_->Convert(*parser, col_index_); return WrapConversionError(converter_->Convert(*parser, col_index_)); }); } ////////////////////////////////////////////////////////////////////////// // Factory functions Result> ColumnDecoder::Make( MemoryPool* pool, int32_t col_index, const ConvertOptions& options) { auto ptr = std::make_shared(col_index, options, pool); RETURN_NOT_OK(ptr->Init()); return ptr; } Result> ColumnDecoder::Make( MemoryPool* pool, std::shared_ptr type, int32_t col_index, const ConvertOptions& options) { auto ptr = std::make_shared(std::move(type), col_index, options, pool); RETURN_NOT_OK(ptr->Init()); return ptr; } Result> ColumnDecoder::MakeNull( MemoryPool* pool, std::shared_ptr type) { return std::make_shared(std::move(type), pool); } } // namespace csv } // namespace arrow20