#include "file.h" #include "flock.h" #include "fstat.h" #include "sysstat.h" #include "align.h" #include "info.h" #include <array> #include <util/string/util.h> #include <util/string/cast.h> #include <util/string/builder.h> #include <util/stream/hex.h> #include <util/stream/format.h> #include <util/random/random.h> #include <util/generic/size_literals.h> #include <util/generic/string.h> #include <util/generic/ylimits.h> #include <util/generic/yexception.h> #include <util/datetime/base.h> #include <errno.h> #if defined(_unix_) #include <fcntl.h> #if defined(_linux_) && (!defined(_android_) || __ANDROID_API__ >= 21) && !defined(FALLOC_FL_KEEP_SIZE) #include <linux/falloc.h> #endif #include <stdlib.h> #include <unistd.h> #include <sys/mman.h> #elif defined(_win_) #include "winint.h" #include "fs_win.h" #include <io.h> #endif #if defined(_bionic_) #include <sys/sendfile.h> #define HAVE_POSIX_FADVISE 0 #define HAVE_SYNC_FILE_RANGE 0 #elif defined(_linux_) #include <sys/sendfile.h> #define HAVE_POSIX_FADVISE 1 #define HAVE_SYNC_FILE_RANGE 1 #elif defined(__FreeBSD__) && !defined(WITH_VALGRIND) #include <sys/param.h> #define HAVE_POSIX_FADVISE (__FreeBSD_version >= 900501) #define HAVE_SYNC_FILE_RANGE 0 #else #define HAVE_POSIX_FADVISE 0 #define HAVE_SYNC_FILE_RANGE 0 #endif static bool IsStupidFlagCombination(EOpenMode oMode) { // ForAppend will actually not be applied in the following combinations: return (oMode & (CreateAlways | ForAppend)) == (CreateAlways | ForAppend) || (oMode & (TruncExisting | ForAppend)) == (TruncExisting | ForAppend) || (oMode & (CreateNew | ForAppend)) == (CreateNew | ForAppend); } TFileHandle::TFileHandle(const TString& fName, EOpenMode oMode) noexcept { ui32 fcMode = 0; EOpenMode createMode = oMode & MaskCreation; Y_VERIFY(!IsStupidFlagCombination(oMode), "oMode %d makes no sense", static_cast<int>(oMode)); if (!(oMode & MaskRW)) { oMode |= RdWr; } if (!(oMode & AMask)) { oMode |= ARW; } #ifdef _win_ switch (createMode) { case OpenExisting: fcMode = OPEN_EXISTING; break; case TruncExisting: fcMode = TRUNCATE_EXISTING; break; case OpenAlways: fcMode = OPEN_ALWAYS; break; case CreateNew: fcMode = CREATE_NEW; break; case CreateAlways: fcMode = CREATE_ALWAYS; break; default: abort(); break; } ui32 faMode = 0; if (oMode & RdOnly) { faMode |= GENERIC_READ; } if (oMode & WrOnly) { // WrOnly or RdWr faMode |= GENERIC_WRITE; } if (oMode & ::ForAppend) { faMode |= GENERIC_WRITE; faMode |= FILE_APPEND_DATA; faMode &= ~FILE_WRITE_DATA; } bool inheritHandle = !(oMode & CloseOnExec); ui32 shMode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE; ui32 attrMode = FILE_ATTRIBUTE_NORMAL; if ((createMode == OpenExisting || createMode == OpenAlways) && ((oMode & AMask) == (oMode & AR))) { attrMode |= FILE_ATTRIBUTE_READONLY; } if (oMode & Seq) { attrMode |= FILE_FLAG_SEQUENTIAL_SCAN; } if (oMode & Temp) { // we use TTempFile instead of FILE_FLAG_DELETE_ON_CLOSE attrMode |= FILE_ATTRIBUTE_TEMPORARY; } if (oMode & Transient) { attrMode |= FILE_FLAG_DELETE_ON_CLOSE; } if ((oMode & (Direct | DirectAligned)) && (oMode & WrOnly)) { // WrOnly or RdWr attrMode |= /*FILE_FLAG_NO_BUFFERING |*/ FILE_FLAG_WRITE_THROUGH; } Fd_ = NFsPrivate::CreateFileWithUtf8Name(fName, faMode, shMode, fcMode, attrMode, inheritHandle); if ((oMode & ::ForAppend) && (Fd_ != INVALID_FHANDLE)) { ::SetFilePointer(Fd_, 0, 0, FILE_END); } #elif defined(_unix_) switch (createMode) { case OpenExisting: fcMode = 0; break; case TruncExisting: fcMode = O_TRUNC; break; case OpenAlways: fcMode = O_CREAT; break; case CreateNew: fcMode = O_CREAT | O_EXCL; break; case CreateAlways: fcMode = O_CREAT | O_TRUNC; break; default: abort(); break; } if ((oMode & RdOnly) && (oMode & WrOnly)) { fcMode |= O_RDWR; } else if (oMode & RdOnly) { fcMode |= O_RDONLY; } else if (oMode & WrOnly) { fcMode |= O_WRONLY; } if (oMode & ::ForAppend) { fcMode |= O_APPEND; } if (oMode & CloseOnExec) { fcMode |= O_CLOEXEC; } /* I don't now about this for unix... if (oMode & Temp) { } */ #if defined(_freebsd_) if (oMode & (Direct | DirectAligned)) { fcMode |= O_DIRECT; } if (oMode & Sync) { fcMode |= O_SYNC; } #elif defined(_linux_) if (oMode & DirectAligned) { /* * O_DIRECT in Linux requires aligning request size and buffer address * to size of hardware sector (see hw_sector_size or ioctl BLKSSZGET). * Usually 512 bytes, but modern hardware works better with 4096 bytes. */ fcMode |= O_DIRECT; } if (oMode & Sync) { fcMode |= O_SYNC; } #endif #if defined(_linux_) fcMode |= O_LARGEFILE; #endif ui32 permMode = 0; if (oMode & AXOther) { permMode |= S_IXOTH; } if (oMode & AWOther) { permMode |= S_IWOTH; } if (oMode & AROther) { permMode |= S_IROTH; } if (oMode & AXGroup) { permMode |= S_IXGRP; } if (oMode & AWGroup) { permMode |= S_IWGRP; } if (oMode & ARGroup) { permMode |= S_IRGRP; } if (oMode & AXUser) { permMode |= S_IXUSR; } if (oMode & AWUser) { permMode |= S_IWUSR; } if (oMode & ARUser) { permMode |= S_IRUSR; } do { Fd_ = ::open(fName.data(), fcMode, permMode); } while (Fd_ == -1 && errno == EINTR); #if HAVE_POSIX_FADVISE if (Fd_ >= 0) { if (oMode & NoReuse) { ::posix_fadvise(Fd_, 0, 0, POSIX_FADV_NOREUSE); } if (oMode & Seq) { ::posix_fadvise(Fd_, 0, 0, POSIX_FADV_SEQUENTIAL); } if (oMode & NoReadAhead) { ::posix_fadvise(Fd_, 0, 0, POSIX_FADV_RANDOM); } } #endif //temp file if (Fd_ >= 0 && (oMode & Transient)) { unlink(fName.data()); } #else #error unsupported platform #endif } bool TFileHandle::Close() noexcept { bool isOk = true; #ifdef _win_ if (Fd_ != INVALID_FHANDLE) { isOk = (::CloseHandle(Fd_) != 0); } if (!isOk) { Y_VERIFY(GetLastError() != ERROR_INVALID_HANDLE, "must not quietly close invalid handle"); } #elif defined(_unix_) if (Fd_ != INVALID_FHANDLE) { isOk = (::close(Fd_) == 0 || errno == EINTR); } if (!isOk) { // Do not quietly close bad descriptor, // because often it means double close // that is disasterous Y_VERIFY(errno != EBADF, "must not quietly close bad descriptor: fd=%d", int(Fd_)); } #else #error unsupported platform #endif Fd_ = INVALID_FHANDLE; return isOk; } static inline i64 DoSeek(FHANDLE h, i64 offset, SeekDir origin) noexcept { if (h == INVALID_FHANDLE) { return -1L; } #if defined(_win_) static ui32 dir[] = {FILE_BEGIN, FILE_CURRENT, FILE_END}; LARGE_INTEGER pos; pos.QuadPart = offset; pos.LowPart = ::SetFilePointer(h, pos.LowPart, &pos.HighPart, dir[origin]); if (pos.LowPart == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR) { pos.QuadPart = -1; } return pos.QuadPart; #elif defined(_unix_) static int dir[] = {SEEK_SET, SEEK_CUR, SEEK_END}; #if defined(_sun_) return ::llseek(h, (offset_t)offset, dir[origin]); #else return ::lseek(h, (off_t)offset, dir[origin]); #endif #else #error unsupported platform #endif } i64 TFileHandle::GetPosition() const noexcept { return DoSeek(Fd_, 0, sCur); } i64 TFileHandle::Seek(i64 offset, SeekDir origin) noexcept { return DoSeek(Fd_, offset, origin); } i64 TFileHandle::GetLength() const noexcept { // XXX: returns error code, but does not set errno if (!IsOpen()) { return -1L; } return GetFileLength(Fd_); } bool TFileHandle::Resize(i64 length) noexcept { if (!IsOpen()) { return false; } i64 currentLength = GetLength(); if (length == currentLength) { return true; } #if defined(_win_) i64 currentPosition = GetPosition(); if (currentPosition == -1L) { return false; } Seek(length, sSet); if (!::SetEndOfFile(Fd_)) { return false; } if (currentPosition < length) { Seek(currentPosition, sSet); } return true; #elif defined(_unix_) return (0 == ftruncate(Fd_, (off_t)length)); #else #error unsupported platform #endif } bool TFileHandle::Reserve(i64 length) noexcept { // FIXME this should reserve disk space with fallocate if (!IsOpen()) { return false; } i64 currentLength = GetLength(); if (length <= currentLength) { return true; } if (!Resize(length)) { return false; } #if defined(_win_) if (!::SetFileValidData(Fd_, length)) { Resize(currentLength); return false; } #elif defined(_unix_) // No way to implement this under FreeBSD. Just do nothing #else #error unsupported platform #endif return true; } bool TFileHandle::FallocateNoResize(i64 length) noexcept { if (!IsOpen()) { return false; } #if defined(_linux_) && (!defined(_android_) || __ANDROID_API__ >= 21) return !fallocate(Fd_, FALLOC_FL_KEEP_SIZE, 0, length); #else Y_UNUSED(length); return true; #endif } // Pair for FallocateNoResize bool TFileHandle::ShrinkToFit() noexcept { if (!IsOpen()) { return false; } #if defined(_linux_) && (!defined(_android_) || __ANDROID_API__ >= 21) return !ftruncate(Fd_, (off_t)GetLength()); #else return true; #endif } bool TFileHandle::Flush() noexcept { if (!IsOpen()) { return false; } #if defined(_win_) bool ok = ::FlushFileBuffers(Fd_) != 0; /* * FlushFileBuffers fails if hFile is a handle to the console output. * That is because the console output is not buffered. * The function returns FALSE, and GetLastError returns ERROR_INVALID_HANDLE. */ return ok || GetLastError() == ERROR_INVALID_HANDLE; #elif defined(_unix_) int ret = ::fsync(Fd_); /* * Ignore EROFS, EINVAL - fd is bound to a special file * (PIPE, FIFO, or socket) which does not support synchronization. * Fail in case of EIO, ENOSPC, EDQUOT - data might be lost. */ return ret == 0 || errno == EROFS || errno == EINVAL #if defined(_darwin_) // ENOTSUP fd does not refer to a vnode || errno == ENOTSUP #endif ; #else #error unsupported platform #endif } bool TFileHandle::FlushData() noexcept { #if defined(_linux_) if (!IsOpen()) { return false; } int ret = ::fdatasync(Fd_); // Same loginc in error handling as for fsync above. return ret == 0 || errno == EROFS || errno == EINVAL; #else return Flush(); #endif } i32 TFileHandle::Read(void* buffer, ui32 byteCount) noexcept { // FIXME size and return must be 64-bit if (!IsOpen()) { return -1; } #if defined(_win_) DWORD bytesRead = 0; if (::ReadFile(Fd_, buffer, byteCount, &bytesRead, nullptr)) { return bytesRead; } return -1; #elif defined(_unix_) i32 ret; do { ret = ::read(Fd_, buffer, byteCount); } while (ret == -1 && errno == EINTR); return ret; #else #error unsupported platform #endif } i32 TFileHandle::Write(const void* buffer, ui32 byteCount) noexcept { if (!IsOpen()) { return -1; } #if defined(_win_) DWORD bytesWritten = 0; if (::WriteFile(Fd_, buffer, byteCount, &bytesWritten, nullptr)) { return bytesWritten; } return -1; #elif defined(_unix_) i32 ret; do { ret = ::write(Fd_, buffer, byteCount); } while (ret == -1 && errno == EINTR); return ret; #else #error unsupported platform #endif } i32 TFileHandle::Pread(void* buffer, ui32 byteCount, i64 offset) const noexcept { #if defined(_win_) OVERLAPPED io; Zero(io); DWORD bytesRead = 0; io.Offset = (ui32)offset; io.OffsetHigh = (ui32)(offset >> 32); if (::ReadFile(Fd_, buffer, byteCount, &bytesRead, &io)) { return bytesRead; } if (::GetLastError() == ERROR_HANDLE_EOF) { return 0; } return -1; #elif defined(_unix_) i32 ret; do { ret = ::pread(Fd_, buffer, byteCount, offset); } while (ret == -1 && errno == EINTR); return ret; #else #error unsupported platform #endif } i32 TFileHandle::Pwrite(const void* buffer, ui32 byteCount, i64 offset) const noexcept { #if defined(_win_) OVERLAPPED io; Zero(io); DWORD bytesWritten = 0; io.Offset = (ui32)offset; io.OffsetHigh = (ui32)(offset >> 32); if (::WriteFile(Fd_, buffer, byteCount, &bytesWritten, &io)) { return bytesWritten; } return -1; #elif defined(_unix_) i32 ret; do { ret = ::pwrite(Fd_, buffer, byteCount, offset); } while (ret == -1 && errno == EINTR); return ret; #else #error unsupported platform #endif } FHANDLE TFileHandle::Duplicate() const noexcept { if (!IsOpen()) { return INVALID_FHANDLE; } #if defined(_win_) FHANDLE dupHandle; if (!::DuplicateHandle(GetCurrentProcess(), Fd_, GetCurrentProcess(), &dupHandle, 0, TRUE, DUPLICATE_SAME_ACCESS)) { return INVALID_FHANDLE; } return dupHandle; #elif defined(_unix_) return ::dup(Fd_); #else #error unsupported platform #endif } int TFileHandle::Duplicate2Posix(int dstHandle) const noexcept { if (!IsOpen()) { return -1; } #if defined(_win_) FHANDLE dupHandle = Duplicate(); if (dupHandle == INVALID_FHANDLE) { _set_errno(EMFILE); return -1; } int posixHandle = _open_osfhandle((intptr_t)dupHandle, 0); if (posixHandle == -1) { CloseHandle(dupHandle); return -1; } if (dup2(posixHandle, dstHandle) == -1) { dstHandle = -1; } _close(posixHandle); return dstHandle; #elif defined(_unix_) while (dup2(Fd_, dstHandle) == -1) { if (errno != EINTR) { return -1; } } return dstHandle; #else #error unsupported platform #endif } bool TFileHandle::LinkTo(const TFileHandle& fh) const noexcept { #if defined(_unix_) while (dup2(fh.Fd_, Fd_) == -1) { if (errno != EINTR) { return false; } } return true; #elif defined(_win_) TFileHandle nh(fh.Duplicate()); if (!nh.IsOpen()) { return false; } //not thread-safe nh.Swap(*const_cast<TFileHandle*>(this)); return true; #else #error unsupported #endif } int TFileHandle::Flock(int op) noexcept { return ::Flock(Fd_, op); } bool TFileHandle::SetDirect() { #ifdef _linux_ const long flags = fcntl(Fd_, F_GETFL); const int r = fcntl(Fd_, F_SETFL, flags | O_DIRECT); return !r; #endif return false; } void TFileHandle::ResetDirect() { #ifdef _linux_ long flags = fcntl(Fd_, F_GETFL); fcntl(Fd_, F_SETFL, flags & ~O_DIRECT); #endif } i64 TFileHandle::CountCache(i64 offset, i64 length) const noexcept { #ifdef _linux_ const i64 pageSize = NSystemInfo::GetPageSize(); constexpr size_t vecSize = 512; // Fetch up to 2MiB at once const i64 batchSize = vecSize * pageSize; std::array<ui8, vecSize> vec; void* ptr = nullptr; i64 res = 0; if (!IsOpen()) { return -1; } if (!length) { length = GetLength(); length -= Min(length, offset); } if (!length) { return 0; } const i64 begin = AlignDown(offset, pageSize); const i64 end = AlignUp(offset + length, pageSize); const i64 size = end - begin; /* * Since fincode is not implemented yet use mmap and mincore. * This is not so effective and scalable for frequent usage. */ ptr = ::mmap( (caddr_t) nullptr, size, PROT_READ, MAP_SHARED | MAP_NORESERVE, Fd_, begin); if (MAP_FAILED == ptr) { return -1; } for (i64 base = begin; base < end; base += batchSize) { const size_t batch = Min(vecSize, size_t((end - base) / pageSize)); void* batchPtr = static_cast<caddr_t>(ptr) + (base - begin); if (::mincore(batchPtr, batch * pageSize, vec.data())) { res = -1; break; } for (size_t i = 0; i < batch; i++) { // count uptodate complete pages in cache if (vec[i] & 1) { res += pageSize; } } if (base == begin && (vec[0] & 1)) { // cut head of first page res -= offset - begin; } if ((end - base) <= batchSize && (vec[batch - 1] & 1)) { // cut tail of last page res -= size - (offset - begin) - length; } } ::munmap(ptr, size); return res; #else Y_UNUSED(offset); Y_UNUSED(length); return -1; #endif } void TFileHandle::PrefetchCache(i64 offset, i64 length, bool wait) const noexcept { #ifdef _linux_ #if HAVE_POSIX_FADVISE // POSIX_FADV_WILLNEED starts reading upto read_ahead_kb in background ::posix_fadvise(Fd_, offset, length, POSIX_FADV_WILLNEED); #endif if (wait) { TFileHandle devnull("/dev/null", OpenExisting | WrOnly | CloseOnExec); off_t end = length ? (offset + length) : GetLength(); off_t pos = offset; ssize_t ret; do { ret = ::sendfile((FHANDLE)devnull, Fd_, &pos, end - pos); } while (pos < end && (ret > 0 || errno == EINTR)); } #else Y_UNUSED(offset); Y_UNUSED(length); Y_UNUSED(wait); #endif } void TFileHandle::EvictCache(i64 offset, i64 length) const noexcept { #if HAVE_POSIX_FADVISE /* * This tries to evicts only unmaped, clean, complete pages. */ ::posix_fadvise(Fd_, offset, length, POSIX_FADV_DONTNEED); #else Y_UNUSED(offset); Y_UNUSED(length); #endif } bool TFileHandle::FlushCache(i64 offset, i64 length, bool wait) noexcept { #if HAVE_SYNC_FILE_RANGE int flags = SYNC_FILE_RANGE_WRITE; if (wait) { flags |= SYNC_FILE_RANGE_WAIT_AFTER; } int ret = ::sync_file_range(Fd_, offset, length, flags); return ret == 0 || errno == EROFS; #else Y_UNUSED(offset); Y_UNUSED(length); if (wait) { return FlushData(); } return true; #endif } TString DecodeOpenMode(ui32 mode0) { ui32 mode = mode0; TStringBuilder r; #define F(flag) \ if ((mode & flag) == flag) { \ mode &= ~flag; \ if (r) { \ r << TStringBuf("|"); \ } \ r << TStringBuf(#flag); \ } F(RdWr) F(RdOnly) F(WrOnly) F(CreateAlways) F(CreateNew) F(OpenAlways) F(TruncExisting) F(ForAppend) F(Transient) F(CloseOnExec) F(Temp) F(Sync) F(Direct) F(DirectAligned) F(Seq) F(NoReuse) F(NoReadAhead) F(AX) F(AR) F(AW) F(ARW) F(AXOther) F(AWOther) F(AROther) F(AXGroup) F(AWGroup) F(ARGroup) F(AXUser) F(AWUser) F(ARUser) #undef F if (mode != 0) { if (r) { r << TStringBuf("|"); } r << Hex(mode); } if (!r) { return "0"; } return r; } class TFile::TImpl: public TAtomicRefCount<TImpl> { public: inline TImpl(FHANDLE fd, const TString& fname = TString()) : Handle_(fd) , FileName_(fname) { } inline TImpl(const TString& fName, EOpenMode oMode) : Handle_(fName, oMode) , FileName_(fName) { if (!Handle_.IsOpen()) { ythrow TFileError() << "can't open " << fName.Quote() << " with mode " << DecodeOpenMode(oMode) << " (" << Hex(oMode.ToBaseType()) << ")"; } } inline ~TImpl() = default; inline void Close() { if (!Handle_.Close()) { ythrow TFileError() << "can't close " << FileName_.Quote(); } } const TString& GetName() const noexcept { return FileName_; } void SetName(const TString& newName) { FileName_ = newName; } const TFileHandle& GetHandle() const noexcept { return Handle_; } i64 Seek(i64 offset, SeekDir origin) { i64 pos = Handle_.Seek(offset, origin); if (pos == -1L) { ythrow TFileError() << "can't seek " << offset << " bytes in " << FileName_.Quote(); } return pos; } void Resize(i64 length) { if (!Handle_.Resize(length)) { ythrow TFileError() << "can't resize " << FileName_.Quote() << " to size " << length; } } void Reserve(i64 length) { if (!Handle_.Reserve(length)) { ythrow TFileError() << "can't reserve " << length << " for file " << FileName_.Quote(); } } void FallocateNoResize(i64 length) { if (!Handle_.FallocateNoResize(length)) { ythrow TFileError() << "can't allocate " << length << "bytes of space for file " << FileName_.Quote(); } } void ShrinkToFit() { if (!Handle_.ShrinkToFit()) { ythrow TFileError() << "can't shrink " << FileName_.Quote() << " to logical size"; } } void Flush() { if (!Handle_.Flush()) { ythrow TFileError() << "can't flush " << FileName_.Quote(); } } void FlushData() { if (!Handle_.FlushData()) { ythrow TFileError() << "can't flush data " << FileName_.Quote(); } } TFile Duplicate() const { TFileHandle dupH(Handle_.Duplicate()); if (!dupH.IsOpen()) { ythrow TFileError() << "can't duplicate the handle of " << FileName_.Quote(); } TFile res(dupH); dupH.Release(); return res; } // Maximum amount of bytes to be read via single system call. // Some libraries fail when it is greater than max int. // Syscalls can cause contention if they operate on very large data blocks. static constexpr size_t MaxReadPortion = 1_GB; i32 RawRead(void* bufferIn, size_t numBytes) { const size_t toRead = Min(MaxReadPortion, numBytes); return Handle_.Read(bufferIn, toRead); } size_t ReadOrFail(void* buf, size_t numBytes) { const i32 reallyRead = RawRead(buf, numBytes); if (reallyRead < 0) { ythrow TFileError() << "can not read data from " << FileName_.Quote(); } return reallyRead; } size_t Read(void* bufferIn, size_t numBytes) { ui8* buf = (ui8*)bufferIn; while (numBytes) { const size_t reallyRead = ReadOrFail(buf, numBytes); if (reallyRead == 0) { // file exhausted break; } buf += reallyRead; numBytes -= reallyRead; } return buf - (ui8*)bufferIn; } void Load(void* buf, size_t len) { if (Read(buf, len) != len) { ythrow TFileError() << "can't read " << len << " bytes from " << FileName_.Quote(); } } // Maximum amount of bytes to be written via single system call. // Some libraries fail when it is greater than max int. // Syscalls can cause contention if they operate on very large data blocks. static constexpr size_t MaxWritePortion = 1_GB; void Write(const void* buffer, size_t numBytes) { const ui8* buf = (const ui8*)buffer; while (numBytes) { const i32 toWrite = (i32)Min(MaxWritePortion, numBytes); const i32 reallyWritten = Handle_.Write(buf, toWrite); if (reallyWritten < 0) { ythrow TFileError() << "can't write " << toWrite << " bytes to " << FileName_.Quote(); } buf += reallyWritten; numBytes -= reallyWritten; } } size_t Pread(void* bufferIn, size_t numBytes, i64 offset) const { ui8* buf = (ui8*)bufferIn; while (numBytes) { const i32 toRead = (i32)Min(MaxReadPortion, numBytes); const i32 reallyRead = RawPread(buf, toRead, offset); if (reallyRead < 0) { ythrow TFileError() << "can not read data from " << FileName_.Quote(); } if (reallyRead == 0) { // file exausted break; } buf += reallyRead; offset += reallyRead; numBytes -= reallyRead; } return buf - (ui8*)bufferIn; } i32 RawPread(void* buf, ui32 len, i64 offset) const { return Handle_.Pread(buf, len, offset); } void Pload(void* buf, size_t len, i64 offset) const { if (Pread(buf, len, offset) != len) { ythrow TFileError() << "can't read " << len << " bytes at offset " << offset << " from " << FileName_.Quote(); } } void Pwrite(const void* buffer, size_t numBytes, i64 offset) const { const ui8* buf = (const ui8*)buffer; while (numBytes) { const i32 toWrite = (i32)Min(MaxWritePortion, numBytes); const i32 reallyWritten = Handle_.Pwrite(buf, toWrite, offset); if (reallyWritten < 0) { ythrow TFileError() << "can't write " << toWrite << " bytes to " << FileName_.Quote(); } buf += reallyWritten; offset += reallyWritten; numBytes -= reallyWritten; } } void Flock(int op) { if (0 != Handle_.Flock(op)) { ythrow TFileError() << "can't flock " << FileName_.Quote(); } } void SetDirect() { if (!Handle_.SetDirect()) { ythrow TFileError() << "can't set direct mode for " << FileName_.Quote(); } } void ResetDirect() { Handle_.ResetDirect(); } i64 CountCache(i64 offset, i64 length) const noexcept { return Handle_.CountCache(offset, length); } void PrefetchCache(i64 offset, i64 length, bool wait) const noexcept { Handle_.PrefetchCache(offset, length, wait); } void EvictCache(i64 offset, i64 length) const noexcept { Handle_.EvictCache(offset, length); } void FlushCache(i64 offset, i64 length, bool wait) { if (!Handle_.FlushCache(offset, length, wait)) { ythrow TFileError() << "can't flush data " << FileName_.Quote(); } } private: TFileHandle Handle_; TString FileName_; }; TFile::TFile() : Impl_(new TImpl(INVALID_FHANDLE)) { } TFile::TFile(FHANDLE fd) : Impl_(new TImpl(fd)) { } TFile::TFile(FHANDLE fd, const TString& name) : Impl_(new TImpl(fd, name)) { } TFile::TFile(const TString& fName, EOpenMode oMode) : Impl_(new TImpl(fName, oMode)) { } TFile::~TFile() = default; void TFile::Close() { Impl_->Close(); } const TString& TFile::GetName() const noexcept { return Impl_->GetName(); } i64 TFile::GetPosition() const noexcept { return Impl_->GetHandle().GetPosition(); } i64 TFile::GetLength() const noexcept { return Impl_->GetHandle().GetLength(); } bool TFile::IsOpen() const noexcept { return Impl_->GetHandle().IsOpen(); } FHANDLE TFile::GetHandle() const noexcept { return Impl_->GetHandle(); } i64 TFile::Seek(i64 offset, SeekDir origin) { return Impl_->Seek(offset, origin); } void TFile::Resize(i64 length) { Impl_->Resize(length); } void TFile::Reserve(i64 length) { Impl_->Reserve(length); } void TFile::FallocateNoResize(i64 length) { Impl_->FallocateNoResize(length); } void TFile::ShrinkToFit() { Impl_->ShrinkToFit(); } void TFile::Flush() { Impl_->Flush(); } void TFile::FlushData() { Impl_->FlushData(); } TFile TFile::Duplicate() const { TFile res = Impl_->Duplicate(); res.Impl_->SetName(Impl_->GetName()); return res; } size_t TFile::Read(void* buf, size_t len) { return Impl_->Read(buf, len); } i32 TFile::RawRead(void* buf, size_t len) { return Impl_->RawRead(buf, len); } size_t TFile::ReadOrFail(void* buf, size_t len) { return Impl_->ReadOrFail(buf, len); } void TFile::Load(void* buf, size_t len) { Impl_->Load(buf, len); } void TFile::Write(const void* buf, size_t len) { Impl_->Write(buf, len); } size_t TFile::Pread(void* buf, size_t len, i64 offset) const { return Impl_->Pread(buf, len, offset); } i32 TFile::RawPread(void* buf, ui32 len, i64 offset) const { return Impl_->RawPread(buf, len, offset); } void TFile::Pload(void* buf, size_t len, i64 offset) const { Impl_->Pload(buf, len, offset); } void TFile::Pwrite(const void* buf, size_t len, i64 offset) const { Impl_->Pwrite(buf, len, offset); } void TFile::Flock(int op) { Impl_->Flock(op); } void TFile::SetDirect() { Impl_->SetDirect(); } void TFile::ResetDirect() { Impl_->ResetDirect(); } i64 TFile::CountCache(i64 offset, i64 length) const noexcept { return Impl_->CountCache(offset, length); } void TFile::PrefetchCache(i64 offset, i64 length, bool wait) const noexcept { Impl_->PrefetchCache(offset, length, wait); } void TFile::EvictCache(i64 offset, i64 length) const noexcept { Impl_->EvictCache(offset, length); } void TFile::FlushCache(i64 offset, i64 length, bool wait) { Impl_->FlushCache(offset, length, wait); } void TFile::LinkTo(const TFile& f) const { if (!Impl_->GetHandle().LinkTo(f.Impl_->GetHandle())) { ythrow TFileError() << "can not link fd(" << GetName() << " -> " << f.GetName() << ")"; } } TFile TFile::Temporary(const TString& prefix) { //TODO - handle impossible case of name collision return TFile(prefix + ToString(MicroSeconds()) + "-" + ToString(RandomNumber<ui64>()), CreateNew | RdWr | Seq | Temp | Transient); } TFile TFile::ForAppend(const TString& path) { return TFile(path, OpenAlways | WrOnly | Seq | ::ForAppend); } TFile Duplicate(FILE* f) { return Duplicate(fileno(f)); } TFile Duplicate(int fd) { #if defined(_win_) /* There are two options of how to duplicate a file descriptor on Windows: * * 1: * - Call dup. * - Call _get_osfhandle on the result. * - Use returned handle. * - Call _close on file descriptor returned by dup. This will also close * the handle. * * 2: * - Call _get_osfhandle. * - Call DuplicateHandle on the result. * - Use returned handle. * - Call CloseHandle. * * TFileHandle calls CloseHandle when destroyed, leaving us with option #2. */ FHANDLE handle = reinterpret_cast<FHANDLE>(::_get_osfhandle(fd)); FHANDLE dupHandle; if (!::DuplicateHandle(GetCurrentProcess(), handle, GetCurrentProcess(), &dupHandle, 0, TRUE, DUPLICATE_SAME_ACCESS)) { ythrow TFileError() << "can not duplicate file descriptor " << LastSystemError() << Endl; } return TFile(dupHandle); #elif defined(_unix_) return TFile(::dup(fd)); #else #error unsupported platform #endif } bool PosixDisableReadAhead(FHANDLE fileHandle, void* addr) noexcept { int ret = -1; #if HAVE_POSIX_FADVISE #if defined(_linux_) Y_UNUSED(fileHandle); ret = madvise(addr, 0, MADV_RANDOM); // according to klamm@ posix_fadvise does not work under linux, madvise does work #else Y_UNUSED(addr); ret = ::posix_fadvise(fileHandle, 0, 0, POSIX_FADV_RANDOM); #endif #else Y_UNUSED(fileHandle); Y_UNUSED(addr); #endif return ret == 0; }