// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "contrib/libs/apache/arrow_next/cpp/src/arrow/io/buffered.h" #include #include #include #include #include #include #include "contrib/libs/apache/arrow_next/cpp/src/arrow/buffer.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/io/util_internal.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/memory_pool.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/status.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/logging.h" namespace arrow20 { namespace io { // ---------------------------------------------------------------------- // BufferedOutputStream implementation class BufferedBase { public: explicit BufferedBase(MemoryPool* pool) : pool_(pool), is_open_(true), buffer_data_(nullptr), buffer_pos_(0), buffer_size_(0), raw_pos_(-1) {} bool closed() const { std::lock_guard guard(lock_); return !is_open_; } // Allocate buffer_ if needed, and resize it to buffer_size_ if required. Status ResetBuffer() { if (!buffer_) { // On first invocation, or if the buffer has been released, we allocate a // new buffer ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(buffer_size_, pool_)); } else if (buffer_->size() != buffer_size_) { RETURN_NOT_OK(buffer_->Resize(buffer_size_)); } buffer_data_ = buffer_->mutable_data(); return Status::OK(); } Status ResizeBuffer(int64_t new_buffer_size) { buffer_size_ = new_buffer_size; return ResetBuffer(); } void AppendToBuffer(const void* data, int64_t nbytes) { DCHECK_LE(buffer_pos_ + nbytes, buffer_size_); std::memcpy(buffer_data_ + buffer_pos_, data, nbytes); buffer_pos_ += nbytes; } int64_t buffer_size() const { return buffer_size_; } int64_t buffer_pos() const { return buffer_pos_; } protected: MemoryPool* pool_; bool is_open_; std::shared_ptr buffer_; uint8_t* buffer_data_; int64_t buffer_pos_; int64_t buffer_size_; mutable int64_t raw_pos_; mutable std::mutex lock_; }; class BufferedOutputStream::Impl : public BufferedBase { public: explicit Impl(std::shared_ptr raw, MemoryPool* pool) : BufferedBase(pool), raw_(std::move(raw)) {} Status Close() { std::lock_guard guard(lock_); if (is_open_) { Status st = FlushUnlocked(); is_open_ = false; RETURN_NOT_OK(raw_->Close()); return st; } return Status::OK(); } Status Abort() { std::lock_guard guard(lock_); if (is_open_) { is_open_ = false; return raw_->Abort(); } return Status::OK(); } Result Tell() const { std::lock_guard guard(lock_); if (raw_pos_ == -1) { ARROW_ASSIGN_OR_RAISE(raw_pos_, raw_->Tell()); DCHECK_GE(raw_pos_, 0); } return raw_pos_ + buffer_pos_; } Status Write(const void* data, int64_t nbytes) { return DoWrite(data, nbytes); } Status Write(const std::shared_ptr& buffer) { return DoWrite(buffer->data(), buffer->size(), buffer); } Status DoWrite(const void* data, int64_t nbytes, const std::shared_ptr& buffer = nullptr) { std::lock_guard guard(lock_); if (nbytes < 0) { return Status::Invalid("write count should be >= 0"); } if (nbytes == 0) { return Status::OK(); } if (nbytes + buffer_pos_ >= buffer_size_) { RETURN_NOT_OK(FlushUnlocked()); DCHECK_EQ(buffer_pos_, 0); if (nbytes >= buffer_size_) { // Invalidate cached raw pos raw_pos_ = -1; // Direct write if (buffer) { return raw_->Write(buffer); } else { return raw_->Write(data, nbytes); } } } AppendToBuffer(data, nbytes); return Status::OK(); } Status FlushUnlocked() { if (buffer_pos_ > 0) { // Invalidate cached raw pos raw_pos_ = -1; RETURN_NOT_OK(raw_->Write(buffer_data_, buffer_pos_)); buffer_pos_ = 0; } return Status::OK(); } Status Flush() { std::lock_guard guard(lock_); return FlushUnlocked(); } Result> Detach() { std::lock_guard guard(lock_); RETURN_NOT_OK(FlushUnlocked()); is_open_ = false; return std::move(raw_); } Status SetBufferSize(int64_t new_buffer_size) { std::lock_guard guard(lock_); if (new_buffer_size <= 0) { return Status::Invalid("Buffer size should be positive"); } if (buffer_pos_ >= new_buffer_size) { // If the buffer is shrinking, first flush to the raw OutputStream RETURN_NOT_OK(FlushUnlocked()); } return ResizeBuffer(new_buffer_size); } std::shared_ptr raw() const { return raw_; } private: std::shared_ptr raw_; }; BufferedOutputStream::BufferedOutputStream(std::shared_ptr raw, MemoryPool* pool) { impl_.reset(new Impl(std::move(raw), pool)); } Result> BufferedOutputStream::Create( int64_t buffer_size, MemoryPool* pool, std::shared_ptr raw) { auto result = std::shared_ptr( new BufferedOutputStream(std::move(raw), pool)); RETURN_NOT_OK(result->SetBufferSize(buffer_size)); return result; } BufferedOutputStream::~BufferedOutputStream() { internal::CloseFromDestructor(this); } Status BufferedOutputStream::SetBufferSize(int64_t new_buffer_size) { return impl_->SetBufferSize(new_buffer_size); } int64_t BufferedOutputStream::buffer_size() const { return impl_->buffer_size(); } int64_t BufferedOutputStream::bytes_buffered() const { return impl_->buffer_pos(); } Result> BufferedOutputStream::Detach() { return impl_->Detach(); } Status BufferedOutputStream::Close() { return impl_->Close(); } Status BufferedOutputStream::Abort() { return impl_->Abort(); } bool BufferedOutputStream::closed() const { return impl_->closed(); } Result BufferedOutputStream::Tell() const { return impl_->Tell(); } Status BufferedOutputStream::Write(const void* data, int64_t nbytes) { return impl_->Write(data, nbytes); } Status BufferedOutputStream::Write(const std::shared_ptr& data) { return impl_->Write(data); } Status BufferedOutputStream::Flush() { return impl_->Flush(); } std::shared_ptr BufferedOutputStream::raw() const { return impl_->raw(); } // ---------------------------------------------------------------------- // BufferedInputStream implementation class BufferedInputStream::Impl : public BufferedBase { public: Impl(std::shared_ptr raw, MemoryPool* pool, int64_t raw_total_bytes_bound) : BufferedBase(pool), raw_(std::move(raw)), raw_read_total_(0), raw_read_bound_(raw_total_bytes_bound), bytes_buffered_(0) {} Status Close() { if (is_open_) { is_open_ = false; return raw_->Close(); } return Status::OK(); } Status Abort() { if (is_open_) { is_open_ = false; return raw_->Abort(); } return Status::OK(); } Result Tell() const { if (raw_pos_ == -1) { ARROW_ASSIGN_OR_RAISE(raw_pos_, raw_->Tell()); DCHECK_GE(raw_pos_, 0); } // Shift by bytes_buffered to return semantic stream position return raw_pos_ - bytes_buffered_; } // Resize internal read buffer. Note that the internal buffer-size // should not be larger than the raw_read_bound_. // It might change the buffer_size_, but will not change buffer states // buffer_pos_ and bytes_buffered_. Status SetBufferSize(int64_t new_buffer_size) { if (new_buffer_size <= 0) { return Status::Invalid("Buffer size should be positive"); } if ((buffer_pos_ + bytes_buffered_) >= new_buffer_size) { return Status::Invalid( "Cannot shrink read buffer if buffered data remains, new_buffer_size: ", new_buffer_size, ", buffer_pos: ", buffer_pos_, ", bytes_buffered: ", bytes_buffered_, ", buffer_size: ", buffer_size_); } if (raw_read_bound_ >= 0) { // No need to reserve space for more than the total remaining number of bytes. if (bytes_buffered_ == 0) { // Special case: we can not keep the current buffer because it does not // contain any required data. new_buffer_size = std::min(new_buffer_size, raw_read_bound_ - raw_read_total_); } else { // We should keep the current buffer because it contains data that // can be read. new_buffer_size = std::min(new_buffer_size, buffer_pos_ + bytes_buffered_ + (raw_read_bound_ - raw_read_total_)); } } return ResizeBuffer(new_buffer_size); } Result Peek(int64_t nbytes) { if (raw_read_bound_ >= 0) { // Do not try to peek more than the total remaining number of bytes. nbytes = std::min(nbytes, bytes_buffered_ + (raw_read_bound_ - raw_read_total_)); } if (bytes_buffered_ == 0 && nbytes < buffer_size_) { // Pre-buffer for small reads RETURN_NOT_OK(BufferIfNeeded()); } // Increase the buffer size if needed. if (nbytes > buffer_->size() - buffer_pos_) { RETURN_NOT_OK(SetBufferSize(nbytes + buffer_pos_)); DCHECK(buffer_->size() - buffer_pos_ >= nbytes); } // Read more data when buffer has insufficient left if (nbytes > bytes_buffered_) { int64_t additional_bytes_to_read = nbytes - bytes_buffered_; if (raw_read_bound_ >= 0) { additional_bytes_to_read = std::min(additional_bytes_to_read, raw_read_bound_ - raw_read_total_); } ARROW_ASSIGN_OR_RAISE( int64_t bytes_read, raw_->Read(additional_bytes_to_read, buffer_->mutable_data() + buffer_pos_ + bytes_buffered_)); bytes_buffered_ += bytes_read; raw_read_total_ += bytes_read; nbytes = bytes_buffered_; } DCHECK(nbytes <= bytes_buffered_); // Enough bytes available return std::string_view(reinterpret_cast(buffer_data_ + buffer_pos_), static_cast(nbytes)); } int64_t bytes_buffered() const { return bytes_buffered_; } int64_t buffer_size() const { return buffer_size_; } std::shared_ptr Detach() { is_open_ = false; return std::move(raw_); } void RewindBuffer() { // Invalidate buffered data, as with a Seek or large Read buffer_pos_ = bytes_buffered_ = 0; } Status DoBuffer() { // Fill the buffer from the raw stream with at most `buffer_size_` bytes. if (!buffer_) { RETURN_NOT_OK(ResetBuffer()); } int64_t bytes_to_buffer = buffer_size_; if (raw_read_bound_ >= 0) { bytes_to_buffer = std::min(buffer_size_, raw_read_bound_ - raw_read_total_); } ARROW_ASSIGN_OR_RAISE(bytes_buffered_, raw_->Read(bytes_to_buffer, buffer_data_)); buffer_pos_ = 0; raw_read_total_ += bytes_buffered_; // Do not make assumptions about the raw stream position raw_pos_ = -1; return Status::OK(); } Status BufferIfNeeded() { if (bytes_buffered_ == 0) { return DoBuffer(); } return Status::OK(); } void ConsumeBuffer(int64_t nbytes) { buffer_pos_ += nbytes; bytes_buffered_ -= nbytes; } Result Read(int64_t nbytes, void* out) { if (ARROW_PREDICT_FALSE(nbytes < 0)) { return Status::Invalid("Bytes to read must be positive. Received:", nbytes); } // 1. First consume pre-buffered data. int64_t pre_buffer_copy_bytes = std::min(nbytes, bytes_buffered_); if (pre_buffer_copy_bytes > 0) { memcpy(out, buffer_data_ + buffer_pos_, pre_buffer_copy_bytes); ConsumeBuffer(pre_buffer_copy_bytes); } int64_t remaining_bytes = nbytes - pre_buffer_copy_bytes; if (raw_read_bound_ >= 0) { remaining_bytes = std::min(remaining_bytes, raw_read_bound_ - raw_read_total_); } if (remaining_bytes == 0) { return pre_buffer_copy_bytes; } DCHECK_EQ(0, bytes_buffered_); // 2. Read from storage. if (remaining_bytes >= buffer_size_) { // 2.1. If read is larger than buffer size, read directly from storage. ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, raw_->Read(remaining_bytes, reinterpret_cast(out) + pre_buffer_copy_bytes)); raw_read_total_ += bytes_read; RewindBuffer(); return pre_buffer_copy_bytes + bytes_read; } else { // 2.2. If read is smaller than buffer size, fill buffer and copy from buffer. RETURN_NOT_OK(DoBuffer()); int64_t bytes_copy_after_buffer = std::min(bytes_buffered_, remaining_bytes); memcpy(reinterpret_cast(out) + pre_buffer_copy_bytes, buffer_data_ + buffer_pos_, bytes_copy_after_buffer); ConsumeBuffer(bytes_copy_after_buffer); return pre_buffer_copy_bytes + bytes_copy_after_buffer; } } Result> Read(int64_t nbytes) { ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_)); ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data())); if (bytes_read < nbytes) { // Change size but do not reallocate internal capacity RETURN_NOT_OK(buffer->Resize(bytes_read, false /* shrink_to_fit */)); buffer->ZeroPadding(); } // R build with openSUSE155 requires an explicit shared_ptr construction return std::shared_ptr(std::move(buffer)); } // For providing access to the raw file handles std::shared_ptr raw() const { return raw_; } private: std::shared_ptr raw_; int64_t raw_read_total_; // a bound on the maximum number of bytes to read from the raw input stream. // The default -1 indicates that it is unbounded int64_t raw_read_bound_; // Number of remaining valid bytes in the buffer, to be reduced on each read // from the buffer. int64_t bytes_buffered_; }; BufferedInputStream::BufferedInputStream(std::shared_ptr raw, MemoryPool* pool, int64_t raw_total_bytes_bound) { impl_ = std::make_unique(std::move(raw), pool, raw_total_bytes_bound); } BufferedInputStream::~BufferedInputStream() { internal::CloseFromDestructor(this); } Result> BufferedInputStream::Create( int64_t buffer_size, MemoryPool* pool, std::shared_ptr raw, int64_t raw_total_bytes_bound) { auto result = std::shared_ptr( new BufferedInputStream(std::move(raw), pool, raw_total_bytes_bound)); RETURN_NOT_OK(result->SetBufferSize(buffer_size)); return result; } Status BufferedInputStream::DoClose() { return impl_->Close(); } Status BufferedInputStream::DoAbort() { return impl_->Abort(); } bool BufferedInputStream::closed() const { return impl_->closed(); } std::shared_ptr BufferedInputStream::Detach() { return impl_->Detach(); } std::shared_ptr BufferedInputStream::raw() const { return impl_->raw(); } Result BufferedInputStream::DoTell() const { return impl_->Tell(); } Result BufferedInputStream::DoPeek(int64_t nbytes) { return impl_->Peek(nbytes); } Status BufferedInputStream::SetBufferSize(int64_t new_buffer_size) { return impl_->SetBufferSize(new_buffer_size); } int64_t BufferedInputStream::bytes_buffered() const { return impl_->bytes_buffered(); } int64_t BufferedInputStream::buffer_size() const { return impl_->buffer_size(); } Result BufferedInputStream::DoRead(int64_t nbytes, void* out) { return impl_->Read(nbytes, out); } Result> BufferedInputStream::DoRead(int64_t nbytes) { return impl_->Read(nbytes); } Result> BufferedInputStream::ReadMetadata() { return impl_->raw()->ReadMetadata(); } Future> BufferedInputStream::ReadMetadataAsync( const IOContext& io_context) { return impl_->raw()->ReadMetadataAsync(io_context); } } // namespace io } // namespace arrow20