// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "contrib/libs/apache/arrow_next/cpp/src/arrow/csv/reader.h" #include #include #include #include #include #include #include #include #include #include #include #include "contrib/libs/apache/arrow_next/cpp/src/arrow/array.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/buffer.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/csv/chunker.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/csv/column_builder.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/csv/column_decoder.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/csv/options.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/csv/parser.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/io/interfaces.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/result.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/status.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/table.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/type.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/type_fwd.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/async_generator.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/future.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/iterator.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/logging.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/macros.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/task_group.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/thread_pool.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/utf8_internal.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/vector.h" namespace arrow20 { using internal::Executor; using internal::TaskGroup; using internal::UnwrapOrRaise; namespace csv { namespace { struct ConversionSchema { struct Column { std::string name; // Physical column index in CSV file int32_t index; // If true, make a column of nulls bool is_missing; // If set, convert the CSV column to this type // If unset (and is_missing is false), infer the type from the CSV column std::shared_ptr type; }; static Column NullColumn(std::string col_name, std::shared_ptr type) { return Column{std::move(col_name), -1, true, std::move(type)}; } static Column TypedColumn(std::string col_name, int32_t col_index, std::shared_ptr type) { return Column{std::move(col_name), col_index, false, std::move(type)}; } static Column InferredColumn(std::string col_name, int32_t col_index) { return Column{std::move(col_name), col_index, false, nullptr}; } std::vector columns; }; // An iterator of Buffers that makes sure there is no straddling CRLF sequence. class CSVBufferIterator { public: static Iterator> Make( Iterator> buffer_iterator) { Transformer, std::shared_ptr> fn = CSVBufferIterator(); return MakeTransformedIterator(std::move(buffer_iterator), fn); } static AsyncGenerator> MakeAsync( AsyncGenerator> buffer_iterator) { Transformer, std::shared_ptr> fn = CSVBufferIterator(); return MakeTransformedGenerator(std::move(buffer_iterator), fn); } Result>> operator()(std::shared_ptr buf) { if (buf == nullptr) { // EOF return TransformFinish(); } int64_t offset = 0; if (first_buffer_) { ARROW_ASSIGN_OR_RAISE(auto data, util::SkipUTF8BOM(buf->data(), buf->size())); offset += data - buf->data(); DCHECK_GE(offset, 0); first_buffer_ = false; } if (trailing_cr_ && buf->data()[offset] == '\n') { // Skip '\r\n' line separator that started at the end of previous buffer ++offset; } trailing_cr_ = (buf->data()[buf->size() - 1] == '\r'); buf = SliceBuffer(std::move(buf), offset); if (buf->size() == 0) { // EOF return TransformFinish(); } else { return TransformYield(std::move(buf)); } } protected: bool first_buffer_ = true; // Whether there was a trailing CR at the end of last received buffer bool trailing_cr_ = false; }; struct CSVBlock { // (partial + completion + buffer) is an entire delimited CSV buffer. std::shared_ptr partial; std::shared_ptr completion; std::shared_ptr buffer; int64_t block_index; bool is_final; int64_t bytes_skipped; std::function consume_bytes; }; } // namespace } // namespace csv template <> struct IterationTraits { static csv::CSVBlock End() { return csv::CSVBlock{{}, {}, {}, -1, true, 0, {}}; } static bool IsEnd(const csv::CSVBlock& val) { return val.block_index < 0; } }; namespace csv { namespace { // This is a callable that can be used to transform an iterator. The source iterator // will contain buffers of data and the output iterator will contain delimited CSV // blocks. std::optional is used so that there is an end token (required by the // iterator APIs (e.g. Visit)) even though an empty optional is never used in this code. class BlockReader { public: BlockReader(std::unique_ptr chunker, std::shared_ptr first_buffer, int64_t skip_rows) : chunker_(std::move(chunker)), partial_(std::make_shared("")), buffer_(std::move(first_buffer)), skip_rows_(skip_rows) {} protected: std::unique_ptr chunker_; std::shared_ptr partial_, buffer_; int64_t skip_rows_; int64_t block_index_ = 0; // Whether there was a trailing CR at the end of last received buffer bool trailing_cr_ = false; }; // An object that reads delimited CSV blocks for serial use. // The number of bytes consumed should be notified after each read, // using CSVBlock::consume_bytes. class SerialBlockReader : public BlockReader { public: using BlockReader::BlockReader; static Iterator MakeIterator( Iterator> buffer_iterator, std::unique_ptr chunker, std::shared_ptr first_buffer, int64_t skip_rows) { auto block_reader = std::make_shared(std::move(chunker), first_buffer, skip_rows); // Wrap shared pointer in callable Transformer, CSVBlock> block_reader_fn = [block_reader](std::shared_ptr buf) { return (*block_reader)(std::move(buf)); }; return MakeTransformedIterator(std::move(buffer_iterator), block_reader_fn); } static AsyncGenerator MakeAsyncIterator( AsyncGenerator> buffer_generator, std::unique_ptr chunker, std::shared_ptr first_buffer, int64_t skip_rows) { auto block_reader = std::make_shared(std::move(chunker), first_buffer, skip_rows); // Wrap shared pointer in callable Transformer, CSVBlock> block_reader_fn = [block_reader](std::shared_ptr next) { return (*block_reader)(std::move(next)); }; return MakeTransformedGenerator(std::move(buffer_generator), block_reader_fn); } Result> operator()(std::shared_ptr next_buffer) { if (buffer_ == nullptr) { return TransformFinish(); } bool is_final = (next_buffer == nullptr); int64_t bytes_skipped = 0; if (skip_rows_) { bytes_skipped += partial_->size(); auto orig_size = buffer_->size(); RETURN_NOT_OK( chunker_->ProcessSkip(partial_, buffer_, is_final, &skip_rows_, &buffer_)); bytes_skipped += orig_size - buffer_->size(); auto empty = std::make_shared(nullptr, 0); if (skip_rows_) { // Still have rows beyond this buffer to skip return empty block partial_ = std::move(buffer_); buffer_ = next_buffer; return TransformYield(CSVBlock{empty, empty, empty, block_index_++, is_final, bytes_skipped, [](int64_t) { return Status::OK(); }}); } partial_ = std::move(empty); } std::shared_ptr completion; if (is_final) { // End of file reached => compute completion from penultimate block RETURN_NOT_OK(chunker_->ProcessFinal(partial_, buffer_, &completion, &buffer_)); } else { // Get completion of partial from previous block. RETURN_NOT_OK( chunker_->ProcessWithPartial(partial_, buffer_, &completion, &buffer_)); } int64_t bytes_before_buffer = partial_->size() + completion->size(); auto consume_bytes = [this, bytes_before_buffer, next_buffer](int64_t nbytes) -> Status { DCHECK_GE(nbytes, 0); int64_t offset = nbytes - bytes_before_buffer; // All data before the buffer should have been consumed. // This is checked in Parse() and BlockParsingOperator::operator(). DCHECK_GE(offset, 0); partial_ = SliceBuffer(buffer_, offset); buffer_ = next_buffer; return Status::OK(); }; return TransformYield(CSVBlock{partial_, completion, buffer_, block_index_++, is_final, bytes_skipped, std::move(consume_bytes)}); } }; // An object that reads delimited CSV blocks for threaded use. class ThreadedBlockReader : public BlockReader { public: using BlockReader::BlockReader; static AsyncGenerator MakeAsyncIterator( AsyncGenerator> buffer_generator, std::unique_ptr chunker, std::shared_ptr first_buffer, int64_t skip_rows) { auto block_reader = std::make_shared(std::move(chunker), first_buffer, skip_rows); // Wrap shared pointer in callable Transformer, CSVBlock> block_reader_fn = [block_reader](std::shared_ptr next) { return (*block_reader)(next); }; return MakeTransformedGenerator(std::move(buffer_generator), block_reader_fn); } Result> operator()(std::shared_ptr next_buffer) { if (buffer_ == nullptr) { // EOF return TransformFinish(); } bool is_final = (next_buffer == nullptr); auto current_partial = std::move(partial_); auto current_buffer = std::move(buffer_); int64_t bytes_skipped = 0; if (skip_rows_) { auto orig_size = current_buffer->size(); bytes_skipped = current_partial->size(); RETURN_NOT_OK(chunker_->ProcessSkip(current_partial, current_buffer, is_final, &skip_rows_, ¤t_buffer)); bytes_skipped += orig_size - current_buffer->size(); current_partial = std::make_shared(nullptr, 0); if (skip_rows_) { partial_ = std::move(current_buffer); buffer_ = std::move(next_buffer); return TransformYield(CSVBlock{current_partial, current_partial, current_partial, block_index_++, is_final, bytes_skipped, {}}); } } std::shared_ptr whole, completion, next_partial; if (is_final) { // End of file reached => compute completion from penultimate block RETURN_NOT_OK( chunker_->ProcessFinal(current_partial, current_buffer, &completion, &whole)); } else { // Get completion of partial from previous block. std::shared_ptr starts_with_whole; // Get completion of partial from previous block. RETURN_NOT_OK(chunker_->ProcessWithPartial(current_partial, current_buffer, &completion, &starts_with_whole)); // Get a complete CSV block inside `partial + block`, and keep // the rest for the next iteration. RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial)); } partial_ = std::move(next_partial); buffer_ = std::move(next_buffer); return TransformYield(CSVBlock{ current_partial, completion, whole, block_index_++, is_final, bytes_skipped, {}}); } }; struct ParsedBlock { std::shared_ptr parser; int64_t block_index; int64_t bytes_parsed_or_skipped; }; struct DecodedBlock { std::shared_ptr record_batch; // Represents the number of input bytes represented by this batch // This will include bytes skipped when skipping rows after the header int64_t bytes_processed; }; } // namespace } // namespace csv template <> struct IterationTraits { static csv::ParsedBlock End() { return csv::ParsedBlock{nullptr, -1, -1}; } static bool IsEnd(const csv::ParsedBlock& val) { return val.block_index < 0; } }; template <> struct IterationTraits { static csv::DecodedBlock End() { return csv::DecodedBlock{nullptr, -1}; } static bool IsEnd(const csv::DecodedBlock& val) { return val.bytes_processed < 0; } }; namespace csv { namespace { // A function object that takes in a buffer of CSV data and returns a parsed batch of CSV // data (CSVBlock -> ParsedBlock) for use with MakeMappedGenerator. // The parsed batch contains a list of offsets for each of the columns so that columns // can be individually scanned // // This operator is not reentrant class BlockParsingOperator { public: BlockParsingOperator(io::IOContext io_context, ParseOptions parse_options, int num_csv_cols, int64_t first_row) : io_context_(io_context), parse_options_(parse_options), num_csv_cols_(num_csv_cols), count_rows_(first_row >= 0), num_rows_seen_(first_row) {} // TODO: this is almost entirely the same as ReaderMixin::Parse(). Refactor? Result operator()(const CSVBlock& block) { constexpr int32_t max_num_rows = std::numeric_limits::max(); auto parser = std::make_shared( io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows); std::shared_ptr straddling; std::vector views; if (block.partial->size() != 0 || block.completion->size() != 0) { if (block.partial->size() == 0) { straddling = block.completion; } else if (block.completion->size() == 0) { straddling = block.partial; } else { ARROW_ASSIGN_OR_RAISE( straddling, ConcatenateBuffers({block.partial, block.completion}, io_context_.pool())); } views = {std::string_view(*straddling), std::string_view(*block.buffer)}; } else { views = {std::string_view(*block.buffer)}; } uint32_t parsed_size; if (block.is_final) { RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size)); } else { RETURN_NOT_OK(parser->Parse(views, &parsed_size)); } // `partial + completion` should have been entirely consumed. const int64_t bytes_before_buffer = block.partial->size() + block.completion->size(); if (static_cast(parsed_size) < bytes_before_buffer) { // This can happen if `newlines_in_values` is not enabled and // `partial + completion` ends with a newline inside a quoted string. // In this case, the BlockParser stops at the truncated data in the first // block (see gh-39857). return Status::Invalid( "CSV parser got out of sync with chunker. This can mean the data file " "contains cell values spanning multiple lines; please consider enabling " "the option 'newlines_in_values'."); } if (count_rows_) { num_rows_seen_ += parser->total_num_rows(); } if (block.consume_bytes) { RETURN_NOT_OK(block.consume_bytes(parsed_size)); } return ParsedBlock{std::move(parser), block.block_index, static_cast(parsed_size) + block.bytes_skipped}; } int num_csv_cols() const { return num_csv_cols_; } private: io::IOContext io_context_; const ParseOptions parse_options_; const int num_csv_cols_; const bool count_rows_; int64_t num_rows_seen_; }; // A function object that takes in parsed batch of CSV data and decodes it to an arrow // record batch (ParsedBlock -> DecodedBlock) for use with MakeMappedGenerator. class BlockDecodingOperator { public: Future operator()(const ParsedBlock& block) { DCHECK(!state_->column_decoders.empty()); std::vector>> decoded_array_futs; for (auto& decoder : state_->column_decoders) { decoded_array_futs.push_back(decoder->Decode(block.parser)); } auto bytes_parsed_or_skipped = block.bytes_parsed_or_skipped; auto decoded_arrays_fut = All(std::move(decoded_array_futs)); auto state = state_; return decoded_arrays_fut.Then( [state, bytes_parsed_or_skipped]( const std::vector>>& maybe_decoded_arrays) -> Result { ARROW_ASSIGN_OR_RAISE(auto decoded_arrays, arrow20::internal::UnwrapOrRaise(maybe_decoded_arrays)); ARROW_ASSIGN_OR_RAISE(auto batch, state->DecodedArraysToBatch(std::move(decoded_arrays))); return DecodedBlock{std::move(batch), bytes_parsed_or_skipped}; }); } static Result Make(io::IOContext io_context, ConvertOptions convert_options, ConversionSchema conversion_schema) { BlockDecodingOperator op(std::move(io_context), std::move(convert_options), std::move(conversion_schema)); RETURN_NOT_OK(op.state_->MakeColumnDecoders(io_context)); return op; } private: BlockDecodingOperator(io::IOContext io_context, ConvertOptions convert_options, ConversionSchema conversion_schema) : state_(std::make_shared(std::move(io_context), std::move(convert_options), std::move(conversion_schema))) {} struct State { State(io::IOContext io_context, ConvertOptions convert_options, ConversionSchema conversion_schema) : convert_options(std::move(convert_options)), conversion_schema(std::move(conversion_schema)) {} Result> DecodedArraysToBatch( std::vector> arrays) { const auto n_rows = arrays[0]->length(); if (schema == nullptr) { FieldVector fields(arrays.size()); for (size_t i = 0; i < arrays.size(); ++i) { fields[i] = field(conversion_schema.columns[i].name, arrays[i]->type()); } if (n_rows == 0) { // No rows so schema is not reliable. return RecordBatch but do not set schema return RecordBatch::Make(arrow20::schema(std::move(fields)), n_rows, std::move(arrays)); } schema = arrow20::schema(std::move(fields)); } return RecordBatch::Make(schema, n_rows, std::move(arrays)); } // Make column decoders from conversion schema Status MakeColumnDecoders(io::IOContext io_context) { for (const auto& column : conversion_schema.columns) { std::shared_ptr decoder; if (column.is_missing) { ARROW_ASSIGN_OR_RAISE(decoder, ColumnDecoder::MakeNull(io_context.pool(), column.type)); } else if (column.type != nullptr) { ARROW_ASSIGN_OR_RAISE( decoder, ColumnDecoder::Make(io_context.pool(), column.type, column.index, convert_options)); } else { ARROW_ASSIGN_OR_RAISE( decoder, ColumnDecoder::Make(io_context.pool(), column.index, convert_options)); } column_decoders.push_back(std::move(decoder)); } return Status::OK(); } ConvertOptions convert_options; ConversionSchema conversion_schema; std::vector> column_decoders; std::shared_ptr schema; }; std::shared_ptr state_; }; ///////////////////////////////////////////////////////////////////////// // Base class for common functionality class ReaderMixin { public: ReaderMixin(io::IOContext io_context, std::shared_ptr input, const ReadOptions& read_options, const ParseOptions& parse_options, const ConvertOptions& convert_options, bool count_rows) : io_context_(std::move(io_context)), read_options_(read_options), parse_options_(parse_options), convert_options_(convert_options), count_rows_(count_rows), input_(std::move(input)) {} protected: // Read header and column names from buffer, create column builders // Returns the # of bytes consumed Result ProcessHeader(const std::shared_ptr& buf, std::shared_ptr* rest) { const uint8_t* data = buf->data(); const auto data_end = data + buf->size(); DCHECK_GT(data_end - data, 0); int64_t num_rows_seen = 1; if (read_options_.skip_rows) { // Skip initial rows (potentially invalid CSV data) auto num_skipped_rows = SkipRows(data, static_cast(data_end - data), read_options_.skip_rows, &data); if (num_skipped_rows < read_options_.skip_rows) { return Status::Invalid( "Could not skip initial ", read_options_.skip_rows, " rows from CSV file, " "either file is too short or header is larger than block size"); } if (count_rows_) { num_rows_seen += num_skipped_rows; } } if (read_options_.column_names.empty()) { // Parse one row (either to read column names or to know the number of columns) BlockParser parser(io_context_.pool(), parse_options_, /*num_cols=*/-1, /*first_row=*/num_rows_seen, /*max_num_rows=*/1); uint32_t parsed_size = 0; RETURN_NOT_OK(parser.Parse( std::string_view(reinterpret_cast(data), data_end - data), &parsed_size)); if (parser.num_rows() != 1) { return Status::Invalid( "Could not read first row from CSV file, either " "file is too short or header is larger than block size"); } if (parser.num_cols() == 0) { return Status::Invalid("No columns in CSV file"); } if (read_options_.autogenerate_column_names) { column_names_ = GenerateColumnNames(parser.num_cols()); } else { // Read column names from header row auto visit = [&](const uint8_t* data, uint32_t size, bool quoted) -> Status { column_names_.emplace_back(reinterpret_cast(data), size); return Status::OK(); }; RETURN_NOT_OK(parser.VisitLastRow(visit)); DCHECK_EQ(static_cast(parser.num_cols()), column_names_.size()); // Skip parsed header row data += parsed_size; if (count_rows_) { ++num_rows_seen; } } } else { column_names_ = read_options_.column_names; } if (count_rows_) { // increase rows seen to skip past rows which will be skipped num_rows_seen += read_options_.skip_rows_after_names; } auto bytes_consumed = data - buf->data(); *rest = SliceBuffer(buf, bytes_consumed); int32_t num_csv_cols = static_cast(column_names_.size()); DCHECK_GT(num_csv_cols, 0); // Since we know the number of columns, we can instantiate the BlockParsingOperator parsing_operator_.emplace(io_context_, parse_options_, num_csv_cols, count_rows_ ? num_rows_seen : -1); RETURN_NOT_OK(MakeConversionSchema()); return bytes_consumed; } std::vector GenerateColumnNames(int32_t num_cols) { std::vector res; res.reserve(num_cols); for (int32_t i = 0; i < num_cols; ++i) { std::stringstream ss; ss << "f" << i; res.push_back(ss.str()); } return res; } // Make conversion schema from options and parsed CSV header Status MakeConversionSchema() { // Append a column converted from CSV data auto append_csv_column = [&](std::string col_name, int32_t col_index) { // Does the named column have a fixed type? auto it = convert_options_.column_types.find(col_name); if (it == convert_options_.column_types.end()) { conversion_schema_.columns.push_back( ConversionSchema::InferredColumn(std::move(col_name), col_index)); } else { conversion_schema_.columns.push_back( ConversionSchema::TypedColumn(std::move(col_name), col_index, it->second)); } }; // Append a column of nulls auto append_null_column = [&](std::string col_name) { // If the named column has a fixed type, use it, otherwise use null() std::shared_ptr type; auto it = convert_options_.column_types.find(col_name); if (it == convert_options_.column_types.end()) { type = null(); } else { type = it->second; } conversion_schema_.columns.push_back( ConversionSchema::NullColumn(std::move(col_name), std::move(type))); }; if (convert_options_.include_columns.empty()) { // Include all columns in CSV file order for (int32_t col_index = 0; col_index < num_csv_cols(); ++col_index) { append_csv_column(column_names_[col_index], col_index); } } else { // Include columns from `include_columns` (in that order) // Compute indices of columns in the CSV file std::unordered_map col_indices; col_indices.reserve(column_names_.size()); for (int32_t i = 0; i < static_cast(column_names_.size()); ++i) { col_indices.emplace(column_names_[i], i); } for (const auto& col_name : convert_options_.include_columns) { auto it = col_indices.find(col_name); if (it != col_indices.end()) { append_csv_column(col_name, it->second); } else if (convert_options_.include_missing_columns) { append_null_column(col_name); } else { return Status::KeyError("Column '", col_name, "' in include_columns " "does not exist in CSV file"); } } } return Status::OK(); } Result Parse(const CSVBlock& block) { DCHECK(parsing_operator_.has_value()); return (*parsing_operator_)(block); } int num_csv_cols() const { DCHECK(parsing_operator_.has_value()); return parsing_operator_->num_csv_cols(); } io::IOContext io_context_; const ReadOptions read_options_; const ParseOptions parse_options_; const ConvertOptions convert_options_; // Whether to track the number of rows seen in the CSV being parsed const bool count_rows_; std::optional parsing_operator_; // Column names in the CSV file std::vector column_names_; ConversionSchema conversion_schema_; std::shared_ptr input_; std::shared_ptr task_group_; }; ///////////////////////////////////////////////////////////////////////// // Base class for one-shot table readers class BaseTableReader : public ReaderMixin, public csv::TableReader { public: using ReaderMixin::ReaderMixin; virtual Status Init() = 0; Future> ReadAsync() override { return Future>::MakeFinished(Read()); } protected: // Make column builders from conversion schema Status MakeColumnBuilders() { for (const auto& column : conversion_schema_.columns) { std::shared_ptr builder; if (column.is_missing) { ARROW_ASSIGN_OR_RAISE(builder, ColumnBuilder::MakeNull(io_context_.pool(), column.type, task_group_)); } else if (column.type != nullptr) { ARROW_ASSIGN_OR_RAISE( builder, ColumnBuilder::Make(io_context_.pool(), column.type, column.index, convert_options_, task_group_)); } else { ARROW_ASSIGN_OR_RAISE(builder, ColumnBuilder::Make(io_context_.pool(), column.index, convert_options_, task_group_)); } column_builders_.push_back(std::move(builder)); } return Status::OK(); } Status ParseAndInsert(const CSVBlock& block) { ARROW_ASSIGN_OR_RAISE(auto result, Parse(block)); RETURN_NOT_OK(ProcessData(result.parser, result.block_index)); return Status::OK(); } // Trigger conversion of parsed block data Status ProcessData(const std::shared_ptr& parser, int64_t block_index) { for (auto& builder : column_builders_) { builder->Insert(block_index, parser); } return Status::OK(); } Result> MakeTable() { DCHECK_EQ(column_builders_.size(), conversion_schema_.columns.size()); std::vector> fields; std::vector> columns; for (int32_t i = 0; i < static_cast(column_builders_.size()); ++i) { const auto& column = conversion_schema_.columns[i]; ARROW_ASSIGN_OR_RAISE(auto array, column_builders_[i]->Finish()); fields.push_back(::arrow20::field(column.name, array->type())); columns.emplace_back(std::move(array)); } return Table::Make(schema(std::move(fields)), std::move(columns)); } // Column builders for target Table (in ConversionSchema order) std::vector> column_builders_; }; ///////////////////////////////////////////////////////////////////////// // Base class for streaming readers class StreamingReaderImpl : public ReaderMixin, public csv::StreamingReader, public std::enable_shared_from_this { public: StreamingReaderImpl(io::IOContext io_context, std::shared_ptr input, const ReadOptions& read_options, const ParseOptions& parse_options, const ConvertOptions& convert_options, bool count_rows) : ReaderMixin(io_context, std::move(input), read_options, parse_options, convert_options, count_rows), bytes_decoded_(std::make_shared>(0)) {} Future<> Init(Executor* cpu_executor) { ARROW_ASSIGN_OR_RAISE(auto istream_it, io::MakeInputStreamIterator(input_, read_options_.block_size)); // TODO Consider exposing readahead as a read option (ARROW-12090) ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it), io_context_.executor())); auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor); auto buffer_generator = CSVBufferIterator::MakeAsync(std::move(transferred_it)); int max_readahead = cpu_executor->GetCapacity(); auto self = shared_from_this(); return buffer_generator().Then([self, buffer_generator, max_readahead]( const std::shared_ptr& first_buffer) { return self->InitAfterFirstBuffer(first_buffer, buffer_generator, max_readahead); }); } std::shared_ptr schema() const override { return schema_; } int64_t bytes_read() const override { return bytes_decoded_->load(); } Status ReadNext(std::shared_ptr* batch) override { auto next_fut = ReadNextAsync(); auto next_result = next_fut.result(); return std::move(next_result).Value(batch); } Future> ReadNextAsync() override { return record_batch_gen_(); } protected: Future<> InitAfterFirstBuffer(const std::shared_ptr& first_buffer, AsyncGenerator> buffer_generator, int max_readahead) { if (first_buffer == nullptr) { return Status::Invalid("Empty CSV file"); } std::shared_ptr after_header; ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed, ProcessHeader(first_buffer, &after_header)); bytes_decoded_->fetch_add(header_bytes_consumed); ARROW_ASSIGN_OR_RAISE( auto decoder_op, BlockDecodingOperator::Make(io_context_, convert_options_, conversion_schema_)); auto block_gen = SerialBlockReader::MakeAsyncIterator( std::move(buffer_generator), MakeChunker(parse_options_), std::move(after_header), read_options_.skip_rows_after_names); auto parsed_block_gen = MakeMappedGenerator(std::move(block_gen), *parsing_operator_); auto rb_gen = MakeMappedGenerator(std::move(parsed_block_gen), std::move(decoder_op)); auto self = shared_from_this(); return rb_gen().Then([self, rb_gen, max_readahead](const DecodedBlock& first_block) { return self->InitFromBlock(first_block, std::move(rb_gen), max_readahead, 0); }); } Future<> InitFromBlock(const DecodedBlock& block, AsyncGenerator batch_gen, int max_readahead, int64_t prev_bytes_processed) { if (!block.record_batch) { // End of file just return null batches record_batch_gen_ = MakeEmptyGenerator>(); return Status::OK(); } schema_ = block.record_batch->schema(); if (block.record_batch->num_rows() == 0) { // Keep consuming blocks until the first non empty block is found auto self = shared_from_this(); prev_bytes_processed += block.bytes_processed; return batch_gen().Then([self, batch_gen, max_readahead, prev_bytes_processed](const DecodedBlock& next_block) { return self->InitFromBlock(next_block, std::move(batch_gen), max_readahead, prev_bytes_processed); }); } AsyncGenerator readahead_gen; if (read_options_.use_threads) { readahead_gen = MakeReadaheadGenerator(std::move(batch_gen), max_readahead); } else { readahead_gen = std::move(batch_gen); } AsyncGenerator restarted_gen = MakeGeneratorStartsWith({block}, std::move(readahead_gen)); auto bytes_decoded = bytes_decoded_; auto unwrap_and_record_bytes = [bytes_decoded, prev_bytes_processed]( const DecodedBlock& block) mutable -> Result> { bytes_decoded->fetch_add(block.bytes_processed + prev_bytes_processed); prev_bytes_processed = 0; return block.record_batch; }; auto unwrapped = MakeMappedGenerator(std::move(restarted_gen), std::move(unwrap_and_record_bytes)); record_batch_gen_ = MakeCancellable(std::move(unwrapped), io_context_.stop_token()); return Status::OK(); } std::shared_ptr schema_; AsyncGenerator> record_batch_gen_; // bytes which have been decoded and asked for by the caller std::shared_ptr> bytes_decoded_; }; ///////////////////////////////////////////////////////////////////////// // Serial TableReader implementation class SerialTableReader : public BaseTableReader { public: using BaseTableReader::BaseTableReader; Status Init() override { ARROW_ASSIGN_OR_RAISE(auto istream_it, io::MakeInputStreamIterator(input_, read_options_.block_size)); // Since we're converting serially, no need to readahead more than one block int32_t block_queue_size = 1; ARROW_ASSIGN_OR_RAISE(auto rh_it, MakeReadaheadIterator(std::move(istream_it), block_queue_size)); buffer_iterator_ = CSVBufferIterator::Make(std::move(rh_it)); return Status::OK(); } Result> Read() override { task_group_ = TaskGroup::MakeSerial(io_context_.stop_token()); // First block ARROW_ASSIGN_OR_RAISE(auto first_buffer, buffer_iterator_.Next()); if (first_buffer == nullptr) { return Status::Invalid("Empty CSV file"); } RETURN_NOT_OK(ProcessHeader(first_buffer, &first_buffer)); RETURN_NOT_OK(MakeColumnBuilders()); auto block_iterator = SerialBlockReader::MakeIterator( std::move(buffer_iterator_), MakeChunker(parse_options_), std::move(first_buffer), read_options_.skip_rows_after_names); while (true) { RETURN_NOT_OK(io_context_.stop_token().Poll()); ARROW_ASSIGN_OR_RAISE(auto maybe_block, block_iterator.Next()); if (IsIterationEnd(maybe_block)) { // EOF break; } RETURN_NOT_OK(ParseAndInsert(maybe_block)); } // Finish conversion, create schema and table RETURN_NOT_OK(task_group_->Finish()); return MakeTable(); } protected: Iterator> buffer_iterator_; }; class AsyncThreadedTableReader : public BaseTableReader, public std::enable_shared_from_this { public: using BaseTableReader::BaseTableReader; AsyncThreadedTableReader(io::IOContext io_context, std::shared_ptr input, const ReadOptions& read_options, const ParseOptions& parse_options, const ConvertOptions& convert_options, Executor* cpu_executor) // Count rows is currently not supported during parallel read : BaseTableReader(std::move(io_context), input, read_options, parse_options, convert_options, /*count_rows=*/false), cpu_executor_(cpu_executor) {} ~AsyncThreadedTableReader() override { if (task_group_) { // In case of error, make sure all pending tasks are finished before // we start destroying BaseTableReader members ARROW_UNUSED(task_group_->Finish()); } } Status Init() override { ARROW_ASSIGN_OR_RAISE(auto istream_it, io::MakeInputStreamIterator(input_, read_options_.block_size)); int max_readahead = cpu_executor_->GetCapacity(); int readahead_restart = std::max(1, max_readahead / 2); ARROW_ASSIGN_OR_RAISE( auto bg_it, MakeBackgroundGenerator(std::move(istream_it), io_context_.executor(), max_readahead, readahead_restart)); auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_); buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(transferred_it)); return Status::OK(); } Result> Read() override { return ReadAsync().result(); } Future> ReadAsync() override { task_group_ = TaskGroup::MakeThreaded(cpu_executor_, io_context_.stop_token()); auto self = shared_from_this(); return ProcessFirstBuffer().Then([self](const std::shared_ptr& first_buffer) { auto block_generator = ThreadedBlockReader::MakeAsyncIterator( self->buffer_generator_, MakeChunker(self->parse_options_), first_buffer, self->read_options_.skip_rows_after_names); std::function block_visitor = [self](CSVBlock maybe_block) -> Status { // The logic in VisitAsyncGenerator ensures that we will never be // passed an empty block (visit does not call with the end token) so // we can be assured maybe_block has a value. DCHECK_GE(maybe_block.block_index, 0); DCHECK(!maybe_block.consume_bytes); // Launch parse task self->task_group_->Append( [self, maybe_block] { return self->ParseAndInsert(maybe_block); }); return Status::OK(); }; return VisitAsyncGenerator(std::move(block_generator), block_visitor) .Then([self]() -> Future<> { // By this point we've added all top level tasks so it is safe to call // FinishAsync return self->task_group_->FinishAsync(); }) .Then([self]() -> Result> { // Finish conversion, create schema and table return self->MakeTable(); }); }); } protected: Future> ProcessFirstBuffer() { // First block auto first_buffer_future = buffer_generator_(); return first_buffer_future.Then( [self = shared_from_this()](const std::shared_ptr& first_buffer) -> Result> { if (first_buffer == nullptr) { return Status::Invalid("Empty CSV file"); } std::shared_ptr first_buffer_processed; RETURN_NOT_OK(self->ProcessHeader(first_buffer, &first_buffer_processed)); RETURN_NOT_OK(self->MakeColumnBuilders()); return first_buffer_processed; }); } Executor* cpu_executor_; AsyncGenerator> buffer_generator_; }; Result> MakeTableReader( MemoryPool* pool, io::IOContext io_context, std::shared_ptr input, const ReadOptions& read_options, const ParseOptions& parse_options, const ConvertOptions& convert_options) { RETURN_NOT_OK(parse_options.Validate()); RETURN_NOT_OK(read_options.Validate()); RETURN_NOT_OK(convert_options.Validate()); std::shared_ptr reader; if (read_options.use_threads) { auto cpu_executor = arrow20::internal::GetCpuThreadPool(); reader = std::make_shared( io_context, input, read_options, parse_options, convert_options, cpu_executor); } else { reader = std::make_shared(io_context, input, read_options, parse_options, convert_options, /*count_rows=*/true); } RETURN_NOT_OK(reader->Init()); return reader; } Future> MakeStreamingReader( io::IOContext io_context, std::shared_ptr input, Executor* cpu_executor, const ReadOptions& read_options, const ParseOptions& parse_options, const ConvertOptions& convert_options) { RETURN_NOT_OK(parse_options.Validate()); RETURN_NOT_OK(read_options.Validate()); RETURN_NOT_OK(convert_options.Validate()); std::shared_ptr reader; reader = std::make_shared( io_context, input, read_options, parse_options, convert_options, /*count_rows=*/!read_options.use_threads || cpu_executor->GetCapacity() == 1); return reader->Init(cpu_executor).Then([reader] { return std::dynamic_pointer_cast(reader); }); } ///////////////////////////////////////////////////////////////////////// // Row count implementation class CSVRowCounter : public ReaderMixin, public std::enable_shared_from_this { public: CSVRowCounter(io::IOContext io_context, Executor* cpu_executor, std::shared_ptr input, const ReadOptions& read_options, const ParseOptions& parse_options) : ReaderMixin(io_context, std::move(input), read_options, parse_options, ConvertOptions::Defaults(), /*count_rows=*/true), cpu_executor_(cpu_executor), row_count_(0) {} Future Count() { auto self = shared_from_this(); return Init(self).Then([self]() { return self->DoCount(self); }); } private: Future<> Init(const std::shared_ptr& self) { ARROW_ASSIGN_OR_RAISE(auto istream_it, io::MakeInputStreamIterator(input_, read_options_.block_size)); // TODO Consider exposing readahead as a read option (ARROW-12090) ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it), io_context_.executor())); auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_); auto buffer_generator = CSVBufferIterator::MakeAsync(std::move(transferred_it)); return buffer_generator().Then( [self, buffer_generator](std::shared_ptr first_buffer) { if (!first_buffer) { return Status::Invalid("Empty CSV file"); } RETURN_NOT_OK(self->ProcessHeader(first_buffer, &first_buffer)); self->block_generator_ = SerialBlockReader::MakeAsyncIterator( buffer_generator, MakeChunker(self->parse_options_), std::move(first_buffer), 0); return Status::OK(); }); } Future DoCount(const std::shared_ptr& self) { // count_cb must return a value instead of Status/Future<> to work with // MakeMappedGenerator, and it must use a type with a valid end value to work with // IterationEnd. std::function>(const CSVBlock&)> count_cb = [self](const CSVBlock& maybe_block) -> Result> { ARROW_ASSIGN_OR_RAISE(auto parsed_block, self->Parse(maybe_block)); int32_t total_row_count = parsed_block.parser->total_num_rows(); self->row_count_ += total_row_count; return total_row_count; }; auto count_gen = MakeMappedGenerator(block_generator_, std::move(count_cb)); return DiscardAllFromAsyncGenerator(count_gen).Then( [self]() { return self->row_count_; }); } Executor* cpu_executor_; AsyncGenerator block_generator_; int64_t row_count_; }; } // namespace ///////////////////////////////////////////////////////////////////////// // Factory functions Result> TableReader::Make( io::IOContext io_context, std::shared_ptr input, const ReadOptions& read_options, const ParseOptions& parse_options, const ConvertOptions& convert_options) { return MakeTableReader(io_context.pool(), io_context, std::move(input), read_options, parse_options, convert_options); } Result> StreamingReader::Make( io::IOContext io_context, std::shared_ptr input, const ReadOptions& read_options, const ParseOptions& parse_options, const ConvertOptions& convert_options) { auto cpu_executor = arrow20::internal::GetCpuThreadPool(); auto reader_fut = MakeStreamingReader(io_context, std::move(input), cpu_executor, read_options, parse_options, convert_options); auto reader_result = reader_fut.result(); ARROW_ASSIGN_OR_RAISE(auto reader, reader_result); return reader; } Future> StreamingReader::MakeAsync( io::IOContext io_context, std::shared_ptr input, Executor* cpu_executor, const ReadOptions& read_options, const ParseOptions& parse_options, const ConvertOptions& convert_options) { return MakeStreamingReader(io_context, std::move(input), cpu_executor, read_options, parse_options, convert_options); } Future CountRowsAsync(io::IOContext io_context, std::shared_ptr input, Executor* cpu_executor, const ReadOptions& read_options, const ParseOptions& parse_options) { RETURN_NOT_OK(parse_options.Validate()); RETURN_NOT_OK(read_options.Validate()); auto counter = std::make_shared( io_context, cpu_executor, std::move(input), read_options, parse_options); return counter->Count(); } } // namespace csv } // namespace arrow20