// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/windows_compatibility.h" // IWYU pragma: keep // sys/mman.h not present in Visual Studio or Cygwin #ifdef _WIN32 # ifndef NOMINMAX # define NOMINMAX # endif #include "contrib/libs/apache/arrow_next/cpp/src/arrow/io/mman.h" # undef Realloc # undef Free #else # include # include # include // IWYU pragma: keep #endif #include #include #include #include #include #include #include #include #include #include #include // ---------------------------------------------------------------------- // Other Arrow includes #include "contrib/libs/apache/arrow_next/cpp/src/arrow/io/file.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/io/interfaces.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/io/util_internal.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/buffer.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/memory_pool.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/status.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/future.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/io_util.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/logging.h" namespace arrow20 { using internal::FileDescriptor; using internal::IOErrorFromErrno; namespace io { class OSFile { public: // Note: only one of the Open* methods below may be called on a given instance Status OpenWritable(const std::string& path, bool truncate, bool append, bool write_only) { RETURN_NOT_OK(SetFileName(path)); ARROW_ASSIGN_OR_RAISE(fd_, ::arrow20::internal::FileOpenWritable(file_name_, write_only, truncate, append)); mode_ = write_only ? FileMode::WRITE : FileMode::READWRITE; if (!truncate) { ARROW_ASSIGN_OR_RAISE(size_, ::arrow20::internal::FileGetSize(fd_.fd())); } else { size_ = 0; } return Status::OK(); } // This is different from OpenWritable(string, ...) in that it doesn't // truncate nor mandate a seekable file Status OpenWritable(int fd) { auto result = ::arrow20::internal::FileGetSize(fd); if (result.ok()) { size_ = *result; } else { // Non-seekable file size_ = -1; } RETURN_NOT_OK(SetFileName(fd)); mode_ = FileMode::WRITE; fd_ = FileDescriptor(fd); return Status::OK(); } Status OpenReadable(const std::string& path) { RETURN_NOT_OK(SetFileName(path)); ARROW_ASSIGN_OR_RAISE(fd_, ::arrow20::internal::FileOpenReadable(file_name_)); ARROW_ASSIGN_OR_RAISE(size_, ::arrow20::internal::FileGetSize(fd_.fd())); mode_ = FileMode::READ; return Status::OK(); } Status OpenReadable(int fd) { ARROW_ASSIGN_OR_RAISE(size_, ::arrow20::internal::FileGetSize(fd)); RETURN_NOT_OK(SetFileName(fd)); mode_ = FileMode::READ; fd_ = FileDescriptor(fd); return Status::OK(); } Status CheckClosed() const { if (fd_.closed()) { return Status::Invalid("Invalid operation on closed file"); } return Status::OK(); } Status Close() { return fd_.Close(); } Result Read(int64_t nbytes, void* out) { RETURN_NOT_OK(CheckClosed()); RETURN_NOT_OK(CheckPositioned()); return ::arrow20::internal::FileRead(fd_.fd(), reinterpret_cast(out), nbytes); } Result ReadAt(int64_t position, int64_t nbytes, void* out) { RETURN_NOT_OK(CheckClosed()); RETURN_NOT_OK(internal::ValidateRange(position, nbytes)); // ReadAt() leaves the file position undefined, so require that we seek // before calling Read() or Write(). need_seeking_.store(true); return ::arrow20::internal::FileReadAt(fd_.fd(), reinterpret_cast(out), position, nbytes); } Status Seek(int64_t pos) { RETURN_NOT_OK(CheckClosed()); if (pos < 0) { return Status::Invalid("Invalid position"); } Status st = ::arrow20::internal::FileSeek(fd_.fd(), pos); if (st.ok()) { need_seeking_.store(false); } return st; } Result Tell() const { RETURN_NOT_OK(CheckClosed()); return ::arrow20::internal::FileTell(fd_.fd()); } Status Write(const void* data, int64_t length) { RETURN_NOT_OK(CheckClosed()); std::lock_guard guard(lock_); RETURN_NOT_OK(CheckPositioned()); if (length < 0) { return Status::IOError("Length must be non-negative"); } return ::arrow20::internal::FileWrite(fd_.fd(), reinterpret_cast(data), length); } int fd() const { return fd_.fd(); } bool is_open() const { return !fd_.closed(); } int64_t size() const { return size_; } FileMode::type mode() const { return mode_; } std::mutex& lock() { return lock_; } protected: Status SetFileName(const std::string& file_name) { return ::arrow20::internal::PlatformFilename::FromString(file_name).Value(&file_name_); } Status SetFileName(int fd) { std::stringstream ss; ss << ""; return SetFileName(ss.str()); } Status CheckPositioned() { if (need_seeking_.load()) { return Status::Invalid( "Need seeking after ReadAt() before " "calling implicitly-positioned operation"); } return Status::OK(); } ::arrow20::internal::PlatformFilename file_name_; std::mutex lock_; FileDescriptor fd_; FileMode::type mode_; int64_t size_{-1}; // Whether ReadAt made the file position non-deterministic. std::atomic need_seeking_{false}; }; // ---------------------------------------------------------------------- // ReadableFile implementation class ReadableFile::ReadableFileImpl : public OSFile { public: explicit ReadableFileImpl(MemoryPool* pool) : OSFile(), pool_(pool) {} Status Open(const std::string& path) { return OpenReadable(path); } Status Open(int fd) { return OpenReadable(fd); } Result> ReadBuffer(int64_t nbytes) { ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_)); ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data())); if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); buffer->ZeroPadding(); } // R build with openSUSE155 requires an explicit shared_ptr construction return std::shared_ptr(std::move(buffer)); } Result> ReadBufferAt(int64_t position, int64_t nbytes) { ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_)); ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(position, nbytes, buffer->mutable_data())); if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); buffer->ZeroPadding(); } // R build with openSUSE155 requires an explicit shared_ptr construction return std::shared_ptr(std::move(buffer)); } Status WillNeed(const std::vector& ranges) { auto report_error = [](int errnum, const char* msg) -> Status { if (errnum == EBADF || errnum == EINVAL) { // These are logic errors, so raise them return IOErrorFromErrno(errnum, msg); } #ifndef NDEBUG // Other errors may be encountered if the target device or filesystem // does not support fadvise advisory (for example, macOS can return // ENOTTY on macOS: ARROW-13983). Log the error for diagnosis // on debug builds, but avoid bothering the user otherwise. ARROW_LOG(WARNING) << IOErrorFromErrno(errnum, msg).ToString(); #else ARROW_UNUSED(msg); #endif return Status::OK(); }; RETURN_NOT_OK(CheckClosed()); for (const auto& range : ranges) { RETURN_NOT_OK(internal::ValidateRange(range.offset, range.length)); #if defined(POSIX_FADV_WILLNEED) int ret = posix_fadvise(fd_.fd(), range.offset, range.length, POSIX_FADV_WILLNEED); if (ret) { RETURN_NOT_OK(report_error(ret, "posix_fadvise failed")); } #elif defined(F_RDADVISE) // macOS, BSD? struct { off_t ra_offset; int ra_count; } radvisory{range.offset, static_cast(range.length)}; if (radvisory.ra_count > 0 && fcntl(fd_.fd(), F_RDADVISE, &radvisory) == -1) { RETURN_NOT_OK(report_error(errno, "fcntl(fd, F_RDADVISE, ...) failed")); } #else ARROW_UNUSED(report_error); #endif } return Status::OK(); } private: MemoryPool* pool_; }; ReadableFile::ReadableFile(MemoryPool* pool) { impl_.reset(new ReadableFileImpl(pool)); } ReadableFile::~ReadableFile() { internal::CloseFromDestructor(this); } Result> ReadableFile::Open(const std::string& path, MemoryPool* pool) { auto file = std::shared_ptr(new ReadableFile(pool)); RETURN_NOT_OK(file->impl_->Open(path)); return file; } Result> ReadableFile::Open(int fd, MemoryPool* pool) { auto file = std::shared_ptr(new ReadableFile(pool)); RETURN_NOT_OK(file->impl_->Open(fd)); return file; } Status ReadableFile::DoClose() { return impl_->Close(); } bool ReadableFile::closed() const { return !impl_->is_open(); } Status ReadableFile::WillNeed(const std::vector& ranges) { return impl_->WillNeed(ranges); } Result ReadableFile::DoTell() const { return impl_->Tell(); } Result ReadableFile::DoRead(int64_t nbytes, void* out) { return impl_->Read(nbytes, out); } Result ReadableFile::DoReadAt(int64_t position, int64_t nbytes, void* out) { return impl_->ReadAt(position, nbytes, out); } Result> ReadableFile::DoReadAt(int64_t position, int64_t nbytes) { return impl_->ReadBufferAt(position, nbytes); } Result> ReadableFile::DoRead(int64_t nbytes) { return impl_->ReadBuffer(nbytes); } Result ReadableFile::DoGetSize() { return impl_->size(); } Status ReadableFile::DoSeek(int64_t pos) { return impl_->Seek(pos); } int ReadableFile::file_descriptor() const { return impl_->fd(); } // ---------------------------------------------------------------------- // FileOutputStream class FileOutputStream::FileOutputStreamImpl : public OSFile { public: Status Open(const std::string& path, bool append) { const bool truncate = !append; return OpenWritable(path, truncate, append, true /* write_only */); } Status Open(int fd) { return OpenWritable(fd); } }; FileOutputStream::FileOutputStream() { impl_.reset(new FileOutputStreamImpl()); } FileOutputStream::~FileOutputStream() { internal::CloseFromDestructor(this); } Result> FileOutputStream::Open(const std::string& path, bool append) { auto stream = std::shared_ptr(new FileOutputStream()); RETURN_NOT_OK(stream->impl_->Open(path, append)); return stream; } Result> FileOutputStream::Open(int fd) { auto stream = std::shared_ptr(new FileOutputStream()); RETURN_NOT_OK(stream->impl_->Open(fd)); return stream; } Status FileOutputStream::Close() { return impl_->Close(); } bool FileOutputStream::closed() const { return !impl_->is_open(); } Result FileOutputStream::Tell() const { return impl_->Tell(); } Status FileOutputStream::Write(const void* data, int64_t length) { return impl_->Write(data, length); } int FileOutputStream::file_descriptor() const { return impl_->fd(); } // ---------------------------------------------------------------------- // Implement MemoryMappedFile class MemoryMappedFile::MemoryMap : public std::enable_shared_from_this { public: // An object representing the entire memory-mapped region. // It can be sliced in order to return individual subregions, which // will then keep the original region alive as long as necessary. class Region : public Buffer { public: Region(std::shared_ptr memory_map, uint8_t* data, int64_t size) : Buffer(data, size) { is_mutable_ = memory_map->writable(); } ~Region() { if (data_ != nullptr) { #ifndef __EMSCRIPTEN__ int result = munmap(data(), static_cast(size_)); // emscripten erroneously reports failures in munmap // https://github.com/emscripten-core/emscripten/issues/20459 ARROW_CHECK_EQ(result, 0) << "munmap failed"; #else munmap(data(), static_cast(size_)); #endif } } // For convenience uint8_t* data() { return const_cast(data_); } void Detach() { data_ = nullptr; } }; MemoryMap() : file_size_(0), map_len_(0) {} ~MemoryMap() { ARROW_CHECK_OK(Close()); } Status Close() { if (file_->is_open()) { // Lose our reference to the MemoryMappedRegion, so that munmap() // is called as soon as all buffer exports are released. region_.reset(); return file_->Close(); } else { return Status::OK(); } } bool closed() const { return !file_->is_open(); } Status CheckClosed() const { if (closed()) { return Status::Invalid("Invalid operation on closed file"); } return Status::OK(); } Status Open(const std::string& path, FileMode::type mode, const int64_t offset = 0, const int64_t length = -1) { file_ = std::make_unique(); if (mode != FileMode::READ) { // Memory mapping has permission failures if PROT_READ not set prot_flags_ = PROT_READ | PROT_WRITE; map_mode_ = MAP_SHARED; constexpr bool append = false; constexpr bool truncate = false; constexpr bool write_only = false; RETURN_NOT_OK(file_->OpenWritable(path, truncate, append, write_only)); } else { prot_flags_ = PROT_READ; map_mode_ = MAP_PRIVATE; // Changes are not to be committed back to the file RETURN_NOT_OK(file_->OpenReadable(path)); } map_len_ = offset_ = 0; // Memory mapping fails when file size is 0 // delay it until the first resize if (file_->size() > 0) { RETURN_NOT_OK(InitMMap(file_->size(), false, offset, length)); } position_ = 0; return Status::OK(); } // Resize the mmap and file to the specified size. // Resize on memory mapped file region is not supported. Status Resize(const int64_t new_size) { if (!writable()) { return Status::IOError("Cannot resize a readonly memory map"); } if (map_len_ != file_size_) { return Status::IOError("Cannot resize a partial memory map"); } if (region_.use_count() > 1) { // There are buffer exports currently, the MemoryMapRemap() call // would make the buffers invalid return Status::IOError("Cannot resize memory map while there are active readers"); } if (new_size == 0) { if (map_len_ > 0) { // Just unmap the mmap and truncate the file to 0 size region_.reset(); RETURN_NOT_OK(::arrow20::internal::FileTruncate(file_->fd(), 0)); map_len_ = offset_ = file_size_ = 0; } position_ = 0; return Status::OK(); } if (map_len_ > 0) { void* result; auto data = region_->data(); RETURN_NOT_OK(::arrow20::internal::MemoryMapRemap(data, map_len_, new_size, file_->fd(), &result)); region_->Detach(); // avoid munmap() on destruction region_ = std::make_shared(shared_from_this(), static_cast(result), new_size); map_len_ = file_size_ = new_size; offset_ = 0; if (position_ > map_len_) { position_ = map_len_; } } else { DCHECK_EQ(position_, 0); // the mmap is not yet initialized, resize the underlying // file, since it might have been 0-sized RETURN_NOT_OK(InitMMap(new_size, /*resize_file*/ true)); } return Status::OK(); } Status Seek(int64_t position) { if (position < 0) { return Status::Invalid("position is out of bounds"); } position_ = position; return Status::OK(); } Result> Slice(int64_t offset, int64_t length) { length = std::max(0, std::min(length, map_len_ - offset)); if (length > 0) { DCHECK_NE(region_, nullptr); return SliceBuffer(region_, offset, length); } else { return std::make_shared(nullptr, 0); } } // map_len_ == file_size_ if memory mapping on the whole file int64_t size() const { return map_len_; } int64_t position() { return position_; } void advance(int64_t nbytes) { position_ = position_ + nbytes; } uint8_t* data() { return region_ ? region_->data() : nullptr; } uint8_t* head() { return data() + position_; } bool writable() { return file_->mode() != FileMode::READ; } bool opened() { return file_->is_open(); } int fd() const { return file_->fd(); } std::mutex& write_lock() { return file_->lock(); } std::mutex& resize_lock() { return resize_lock_; } private: // Initialize the mmap and set size, capacity and the data pointers Status InitMMap(int64_t initial_size, bool resize_file = false, const int64_t offset = 0, const int64_t length = -1) { DCHECK(!region_); if (resize_file) { RETURN_NOT_OK(::arrow20::internal::FileTruncate(file_->fd(), initial_size)); } int64_t mmap_length = initial_size; if (length >= 0) { // memory mapping a file region if (length > initial_size) { return Status::Invalid("mapping length is beyond file size"); } mmap_length = length; } if (static_cast(static_cast(mmap_length)) != mmap_length) { return Status::CapacityError("Requested memory map length ", mmap_length, " does not fit in a C size_t " "(are you using a 32-bit build of Arrow?)"); } void* result = mmap(nullptr, static_cast(mmap_length), prot_flags_, map_mode_, file_->fd(), static_cast(offset)); if (result == MAP_FAILED) { return Status::IOError("Memory mapping file failed: ", ::arrow20::internal::ErrnoMessage(errno)); } map_len_ = mmap_length; offset_ = offset; region_ = std::make_shared(shared_from_this(), static_cast(result), map_len_); file_size_ = initial_size; return Status::OK(); } std::unique_ptr file_; int prot_flags_; int map_mode_; std::shared_ptr region_; int64_t file_size_; int64_t position_; int64_t offset_; int64_t map_len_; std::mutex resize_lock_; }; MemoryMappedFile::MemoryMappedFile() {} MemoryMappedFile::~MemoryMappedFile() { internal::CloseFromDestructor(this); } Result> MemoryMappedFile::Create( const std::string& path, int64_t size) { ARROW_ASSIGN_OR_RAISE(auto file, FileOutputStream::Open(path)); RETURN_NOT_OK(::arrow20::internal::FileTruncate(file->file_descriptor(), size)); RETURN_NOT_OK(file->Close()); return MemoryMappedFile::Open(path, FileMode::READWRITE); } Result> MemoryMappedFile::Open(const std::string& path, FileMode::type mode) { std::shared_ptr result(new MemoryMappedFile()); result->memory_map_.reset(new MemoryMap()); RETURN_NOT_OK(result->memory_map_->Open(path, mode)); return result; } Result> MemoryMappedFile::Open(const std::string& path, FileMode::type mode, const int64_t offset, const int64_t length) { std::shared_ptr result(new MemoryMappedFile()); result->memory_map_.reset(new MemoryMap()); RETURN_NOT_OK(result->memory_map_->Open(path, mode, offset, length)); return result; } Result MemoryMappedFile::GetSize() { RETURN_NOT_OK(memory_map_->CheckClosed()); return memory_map_->size(); } Result MemoryMappedFile::Tell() const { RETURN_NOT_OK(memory_map_->CheckClosed()); return memory_map_->position(); } Status MemoryMappedFile::Seek(int64_t position) { RETURN_NOT_OK(memory_map_->CheckClosed()); return memory_map_->Seek(position); } Status MemoryMappedFile::Close() { return memory_map_->Close(); } bool MemoryMappedFile::closed() const { return memory_map_->closed(); } Result> MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes) { RETURN_NOT_OK(memory_map_->CheckClosed()); // if the file is writable, we acquire the lock before creating any slices // in case a resize is triggered concurrently, otherwise we wouldn't detect // a change in the use count auto guard_resize = memory_map_->writable() ? std::unique_lock(memory_map_->resize_lock()) : std::unique_lock(); ARROW_ASSIGN_OR_RAISE( nbytes, internal::ValidateReadRange(position, nbytes, memory_map_->size())); // Arrange to page data in RETURN_NOT_OK(::arrow20::internal::MemoryAdviseWillNeed( {{memory_map_->data() + position, static_cast(nbytes)}})); return memory_map_->Slice(position, nbytes); } Result MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, void* out) { RETURN_NOT_OK(memory_map_->CheckClosed()); auto guard_resize = memory_map_->writable() ? std::unique_lock(memory_map_->resize_lock()) : std::unique_lock(); ARROW_ASSIGN_OR_RAISE( nbytes, internal::ValidateReadRange(position, nbytes, memory_map_->size())); if (nbytes > 0) { memcpy(out, memory_map_->data() + position, static_cast(nbytes)); } return nbytes; } Result MemoryMappedFile::Read(int64_t nbytes, void* out) { RETURN_NOT_OK(memory_map_->CheckClosed()); ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(memory_map_->position(), nbytes, out)); memory_map_->advance(bytes_read); return bytes_read; } Result> MemoryMappedFile::Read(int64_t nbytes) { RETURN_NOT_OK(memory_map_->CheckClosed()); ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(memory_map_->position(), nbytes)); memory_map_->advance(buffer->size()); return buffer; } Future> MemoryMappedFile::ReadAsync(const IOContext&, int64_t position, int64_t nbytes) { return Future>::MakeFinished(ReadAt(position, nbytes)); } Status MemoryMappedFile::WillNeed(const std::vector& ranges) { using ::arrow20::internal::MemoryRegion; RETURN_NOT_OK(memory_map_->CheckClosed()); auto guard_resize = memory_map_->writable() ? std::unique_lock(memory_map_->resize_lock()) : std::unique_lock(); std::vector regions(ranges.size()); for (size_t i = 0; i < ranges.size(); ++i) { const auto& range = ranges[i]; ARROW_ASSIGN_OR_RAISE( auto size, internal::ValidateReadRange(range.offset, range.length, memory_map_->size())); DCHECK_NE(memory_map_->data(), nullptr); regions[i] = {const_cast(memory_map_->data() + range.offset), static_cast(size)}; } return ::arrow20::internal::MemoryAdviseWillNeed(regions); } bool MemoryMappedFile::supports_zero_copy() const { return true; } Status MemoryMappedFile::WriteAt(int64_t position, const void* data, int64_t nbytes) { RETURN_NOT_OK(memory_map_->CheckClosed()); std::lock_guard guard(memory_map_->write_lock()); if (!memory_map_->opened() || !memory_map_->writable()) { return Status::IOError("Unable to write"); } RETURN_NOT_OK(internal::ValidateWriteRange(position, nbytes, memory_map_->size())); RETURN_NOT_OK(memory_map_->Seek(position)); return WriteInternal(data, nbytes); } Status MemoryMappedFile::Write(const void* data, int64_t nbytes) { RETURN_NOT_OK(memory_map_->CheckClosed()); std::lock_guard guard(memory_map_->write_lock()); if (!memory_map_->opened() || !memory_map_->writable()) { return Status::IOError("Unable to write"); } RETURN_NOT_OK( internal::ValidateWriteRange(memory_map_->position(), nbytes, memory_map_->size())); return WriteInternal(data, nbytes); } Status MemoryMappedFile::WriteInternal(const void* data, int64_t nbytes) { memcpy(memory_map_->head(), data, static_cast(nbytes)); memory_map_->advance(nbytes); return Status::OK(); } Status MemoryMappedFile::Resize(int64_t new_size) { RETURN_NOT_OK(memory_map_->CheckClosed()); std::unique_lock write_guard(memory_map_->write_lock(), std::defer_lock); std::unique_lock resize_guard(memory_map_->resize_lock(), std::defer_lock); std::lock(write_guard, resize_guard); RETURN_NOT_OK(memory_map_->Resize(new_size)); return Status::OK(); } int MemoryMappedFile::file_descriptor() const { return memory_map_->fd(); } } // namespace io } // namespace arrow20