diff options
author | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/system/file.cpp |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/system/file.cpp')
-rw-r--r-- | util/system/file.cpp | 1302 |
1 files changed, 1302 insertions, 0 deletions
diff --git a/util/system/file.cpp b/util/system/file.cpp new file mode 100644 index 00000000000..4a261d020cb --- /dev/null +++ b/util/system/file.cpp @@ -0,0 +1,1302 @@ +#include "file.h" +#include "flock.h" +#include "fstat.h" +#include "sysstat.h" +#include "align.h" +#include "info.h" + +#include <array> + +#include <util/string/util.h> +#include <util/string/cast.h> +#include <util/string/builder.h> + +#include <util/stream/hex.h> +#include <util/stream/format.h> + +#include <util/random/random.h> + +#include <util/generic/size_literals.h> +#include <util/generic/string.h> +#include <util/generic/ylimits.h> +#include <util/generic/yexception.h> + +#include <util/datetime/base.h> + +#include <errno.h> + +#if defined(_unix_) + #include <fcntl.h> + + #if defined(_linux_) && (!defined(_android_) || __ANDROID_API__ >= 21) && !defined(FALLOC_FL_KEEP_SIZE) + #include <linux/falloc.h> + #endif + + #include <stdlib.h> + #include <unistd.h> + #include <sys/mman.h> +#elif defined(_win_) + #include "winint.h" + #include "fs_win.h" + #include <io.h> +#endif + +#if defined(_bionic_) + #include <sys/sendfile.h> + #define HAVE_POSIX_FADVISE 0 + #define HAVE_SYNC_FILE_RANGE 0 +#elif defined(_linux_) + #include <sys/sendfile.h> + #define HAVE_POSIX_FADVISE 1 + #define HAVE_SYNC_FILE_RANGE 1 +#elif defined(__FreeBSD__) && !defined(WITH_VALGRIND) + #include <sys/param.h> + #define HAVE_POSIX_FADVISE (__FreeBSD_version >= 900501) + #define HAVE_SYNC_FILE_RANGE 0 +#else + #define HAVE_POSIX_FADVISE 0 + #define HAVE_SYNC_FILE_RANGE 0 +#endif + +static bool IsStupidFlagCombination(EOpenMode oMode) { + // ForAppend will actually not be applied in the following combinations: + return (oMode & (CreateAlways | ForAppend)) == (CreateAlways | ForAppend) || (oMode & (TruncExisting | ForAppend)) == (TruncExisting | ForAppend) || (oMode & (CreateNew | ForAppend)) == (CreateNew | ForAppend); +} + +TFileHandle::TFileHandle(const TString& fName, EOpenMode oMode) noexcept { + ui32 fcMode = 0; + EOpenMode createMode = oMode & MaskCreation; + Y_VERIFY(!IsStupidFlagCombination(oMode), "oMode %d makes no sense", static_cast<int>(oMode)); + if (!(oMode & MaskRW)) { + oMode |= RdWr; + } + if (!(oMode & AMask)) { + oMode |= ARW; + } + +#ifdef _win_ + + switch (createMode) { + case OpenExisting: + fcMode = OPEN_EXISTING; + break; + case TruncExisting: + fcMode = TRUNCATE_EXISTING; + break; + case OpenAlways: + fcMode = OPEN_ALWAYS; + break; + case CreateNew: + fcMode = CREATE_NEW; + break; + case CreateAlways: + fcMode = CREATE_ALWAYS; + break; + default: + abort(); + break; + } + + ui32 faMode = 0; + if (oMode & RdOnly) { + faMode |= GENERIC_READ; + } + if (oMode & WrOnly) { + // WrOnly or RdWr + faMode |= GENERIC_WRITE; + } + if (oMode & ::ForAppend) { + faMode |= GENERIC_WRITE; + faMode |= FILE_APPEND_DATA; + faMode &= ~FILE_WRITE_DATA; + } + + bool inheritHandle = !(oMode & CloseOnExec); + + ui32 shMode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE; + + ui32 attrMode = FILE_ATTRIBUTE_NORMAL; + if ((createMode == OpenExisting || createMode == OpenAlways) && ((oMode & AMask) == (oMode & AR))) { + attrMode |= FILE_ATTRIBUTE_READONLY; + } + if (oMode & Seq) { + attrMode |= FILE_FLAG_SEQUENTIAL_SCAN; + } + if (oMode & Temp) { + // we use TTempFile instead of FILE_FLAG_DELETE_ON_CLOSE + attrMode |= FILE_ATTRIBUTE_TEMPORARY; + } + if (oMode & Transient) { + attrMode |= FILE_FLAG_DELETE_ON_CLOSE; + } + if ((oMode & (Direct | DirectAligned)) && (oMode & WrOnly)) { + // WrOnly or RdWr + attrMode |= /*FILE_FLAG_NO_BUFFERING |*/ FILE_FLAG_WRITE_THROUGH; + } + + Fd_ = NFsPrivate::CreateFileWithUtf8Name(fName, faMode, shMode, fcMode, attrMode, inheritHandle); + + if ((oMode & ::ForAppend) && (Fd_ != INVALID_FHANDLE)) { + ::SetFilePointer(Fd_, 0, 0, FILE_END); + } + +#elif defined(_unix_) + + switch (createMode) { + case OpenExisting: + fcMode = 0; + break; + case TruncExisting: + fcMode = O_TRUNC; + break; + case OpenAlways: + fcMode = O_CREAT; + break; + case CreateNew: + fcMode = O_CREAT | O_EXCL; + break; + case CreateAlways: + fcMode = O_CREAT | O_TRUNC; + break; + default: + abort(); + break; + } + + if ((oMode & RdOnly) && (oMode & WrOnly)) { + fcMode |= O_RDWR; + } else if (oMode & RdOnly) { + fcMode |= O_RDONLY; + } else if (oMode & WrOnly) { + fcMode |= O_WRONLY; + } + + if (oMode & ::ForAppend) { + fcMode |= O_APPEND; + } + + if (oMode & CloseOnExec) { + fcMode |= O_CLOEXEC; + } + + /* I don't now about this for unix... + if (oMode & Temp) { + } + */ + #if defined(_freebsd_) + if (oMode & (Direct | DirectAligned)) { + fcMode |= O_DIRECT; + } + + if (oMode & Sync) { + fcMode |= O_SYNC; + } + #elif defined(_linux_) + if (oMode & DirectAligned) { + /* + * O_DIRECT in Linux requires aligning request size and buffer address + * to size of hardware sector (see hw_sector_size or ioctl BLKSSZGET). + * Usually 512 bytes, but modern hardware works better with 4096 bytes. + */ + fcMode |= O_DIRECT; + } + if (oMode & Sync) { + fcMode |= O_SYNC; + } + #endif + + #if defined(_linux_) + fcMode |= O_LARGEFILE; + #endif + + ui32 permMode = 0; + if (oMode & AXOther) { + permMode |= S_IXOTH; + } + if (oMode & AWOther) { + permMode |= S_IWOTH; + } + if (oMode & AROther) { + permMode |= S_IROTH; + } + if (oMode & AXGroup) { + permMode |= S_IXGRP; + } + if (oMode & AWGroup) { + permMode |= S_IWGRP; + } + if (oMode & ARGroup) { + permMode |= S_IRGRP; + } + if (oMode & AXUser) { + permMode |= S_IXUSR; + } + if (oMode & AWUser) { + permMode |= S_IWUSR; + } + if (oMode & ARUser) { + permMode |= S_IRUSR; + } + + do { + Fd_ = ::open(fName.data(), fcMode, permMode); + } while (Fd_ == -1 && errno == EINTR); + + #if HAVE_POSIX_FADVISE + if (Fd_ >= 0) { + if (oMode & NoReuse) { + ::posix_fadvise(Fd_, 0, 0, POSIX_FADV_NOREUSE); + } + + if (oMode & Seq) { + ::posix_fadvise(Fd_, 0, 0, POSIX_FADV_SEQUENTIAL); + } + + if (oMode & NoReadAhead) { + ::posix_fadvise(Fd_, 0, 0, POSIX_FADV_RANDOM); + } + } + #endif + + //temp file + if (Fd_ >= 0 && (oMode & Transient)) { + unlink(fName.data()); + } +#else + #error unsupported platform +#endif +} + +bool TFileHandle::Close() noexcept { + bool isOk = true; +#ifdef _win_ + if (Fd_ != INVALID_FHANDLE) { + isOk = (::CloseHandle(Fd_) != 0); + } + if (!isOk) { + Y_VERIFY(GetLastError() != ERROR_INVALID_HANDLE, + "must not quietly close invalid handle"); + } +#elif defined(_unix_) + if (Fd_ != INVALID_FHANDLE) { + isOk = (::close(Fd_) == 0 || errno == EINTR); + } + if (!isOk) { + // Do not quietly close bad descriptor, + // because often it means double close + // that is disasterous + Y_VERIFY(errno != EBADF, "must not quietly close bad descriptor: fd=%d", int(Fd_)); + } +#else + #error unsupported platform +#endif + Fd_ = INVALID_FHANDLE; + return isOk; +} + +static inline i64 DoSeek(FHANDLE h, i64 offset, SeekDir origin) noexcept { + if (h == INVALID_FHANDLE) { + return -1L; + } +#if defined(_win_) + static ui32 dir[] = {FILE_BEGIN, FILE_CURRENT, FILE_END}; + LARGE_INTEGER pos; + pos.QuadPart = offset; + pos.LowPart = ::SetFilePointer(h, pos.LowPart, &pos.HighPart, dir[origin]); + if (pos.LowPart == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR) { + pos.QuadPart = -1; + } + return pos.QuadPart; +#elif defined(_unix_) + static int dir[] = {SEEK_SET, SEEK_CUR, SEEK_END}; + #if defined(_sun_) + return ::llseek(h, (offset_t)offset, dir[origin]); + #else + return ::lseek(h, (off_t)offset, dir[origin]); + #endif +#else + #error unsupported platform +#endif +} + +i64 TFileHandle::GetPosition() const noexcept { + return DoSeek(Fd_, 0, sCur); +} + +i64 TFileHandle::Seek(i64 offset, SeekDir origin) noexcept { + return DoSeek(Fd_, offset, origin); +} + +i64 TFileHandle::GetLength() const noexcept { + // XXX: returns error code, but does not set errno + if (!IsOpen()) { + return -1L; + } + return GetFileLength(Fd_); +} + +bool TFileHandle::Resize(i64 length) noexcept { + if (!IsOpen()) { + return false; + } + i64 currentLength = GetLength(); + if (length == currentLength) { + return true; + } +#if defined(_win_) + i64 currentPosition = GetPosition(); + if (currentPosition == -1L) { + return false; + } + Seek(length, sSet); + if (!::SetEndOfFile(Fd_)) { + return false; + } + if (currentPosition < length) { + Seek(currentPosition, sSet); + } + return true; +#elif defined(_unix_) + return (0 == ftruncate(Fd_, (off_t)length)); +#else + #error unsupported platform +#endif +} + +bool TFileHandle::Reserve(i64 length) noexcept { + // FIXME this should reserve disk space with fallocate + if (!IsOpen()) { + return false; + } + i64 currentLength = GetLength(); + if (length <= currentLength) { + return true; + } + if (!Resize(length)) { + return false; + } +#if defined(_win_) + if (!::SetFileValidData(Fd_, length)) { + Resize(currentLength); + return false; + } +#elif defined(_unix_) +// No way to implement this under FreeBSD. Just do nothing +#else + #error unsupported platform +#endif + return true; +} + +bool TFileHandle::FallocateNoResize(i64 length) noexcept { + if (!IsOpen()) { + return false; + } +#if defined(_linux_) && (!defined(_android_) || __ANDROID_API__ >= 21) + return !fallocate(Fd_, FALLOC_FL_KEEP_SIZE, 0, length); +#else + Y_UNUSED(length); + return true; +#endif +} + +// Pair for FallocateNoResize +bool TFileHandle::ShrinkToFit() noexcept { + if (!IsOpen()) { + return false; + } +#if defined(_linux_) && (!defined(_android_) || __ANDROID_API__ >= 21) + return !ftruncate(Fd_, (off_t)GetLength()); +#else + return true; +#endif +} + +bool TFileHandle::Flush() noexcept { + if (!IsOpen()) { + return false; + } +#if defined(_win_) + bool ok = ::FlushFileBuffers(Fd_) != 0; + /* + * FlushFileBuffers fails if hFile is a handle to the console output. + * That is because the console output is not buffered. + * The function returns FALSE, and GetLastError returns ERROR_INVALID_HANDLE. + */ + return ok || GetLastError() == ERROR_INVALID_HANDLE; +#elif defined(_unix_) + int ret = ::fsync(Fd_); + + /* + * Ignore EROFS, EINVAL - fd is bound to a special file + * (PIPE, FIFO, or socket) which does not support synchronization. + * Fail in case of EIO, ENOSPC, EDQUOT - data might be lost. + */ + return ret == 0 || errno == EROFS || errno == EINVAL + #if defined(_darwin_) + // ENOTSUP fd does not refer to a vnode + || errno == ENOTSUP + #endif + ; +#else + #error unsupported platform +#endif +} + +bool TFileHandle::FlushData() noexcept { +#if defined(_linux_) + if (!IsOpen()) { + return false; + } + + int ret = ::fdatasync(Fd_); + + // Same loginc in error handling as for fsync above. + return ret == 0 || errno == EROFS || errno == EINVAL; +#else + return Flush(); +#endif +} + +i32 TFileHandle::Read(void* buffer, ui32 byteCount) noexcept { + // FIXME size and return must be 64-bit + if (!IsOpen()) { + return -1; + } +#if defined(_win_) + DWORD bytesRead = 0; + if (::ReadFile(Fd_, buffer, byteCount, &bytesRead, nullptr)) { + return bytesRead; + } + return -1; +#elif defined(_unix_) + i32 ret; + do { + ret = ::read(Fd_, buffer, byteCount); + } while (ret == -1 && errno == EINTR); + return ret; +#else + #error unsupported platform +#endif +} + +i32 TFileHandle::Write(const void* buffer, ui32 byteCount) noexcept { + if (!IsOpen()) { + return -1; + } +#if defined(_win_) + DWORD bytesWritten = 0; + if (::WriteFile(Fd_, buffer, byteCount, &bytesWritten, nullptr)) { + return bytesWritten; + } + return -1; +#elif defined(_unix_) + i32 ret; + do { + ret = ::write(Fd_, buffer, byteCount); + } while (ret == -1 && errno == EINTR); + return ret; +#else + #error unsupported platform +#endif +} + +i32 TFileHandle::Pread(void* buffer, ui32 byteCount, i64 offset) const noexcept { +#if defined(_win_) + OVERLAPPED io; + Zero(io); + DWORD bytesRead = 0; + io.Offset = (ui32)offset; + io.OffsetHigh = (ui32)(offset >> 32); + if (::ReadFile(Fd_, buffer, byteCount, &bytesRead, &io)) { + return bytesRead; + } + if (::GetLastError() == ERROR_HANDLE_EOF) { + return 0; + } + return -1; +#elif defined(_unix_) + i32 ret; + do { + ret = ::pread(Fd_, buffer, byteCount, offset); + } while (ret == -1 && errno == EINTR); + return ret; +#else + #error unsupported platform +#endif +} + +i32 TFileHandle::Pwrite(const void* buffer, ui32 byteCount, i64 offset) const noexcept { +#if defined(_win_) + OVERLAPPED io; + Zero(io); + DWORD bytesWritten = 0; + io.Offset = (ui32)offset; + io.OffsetHigh = (ui32)(offset >> 32); + if (::WriteFile(Fd_, buffer, byteCount, &bytesWritten, &io)) { + return bytesWritten; + } + return -1; +#elif defined(_unix_) + i32 ret; + do { + ret = ::pwrite(Fd_, buffer, byteCount, offset); + } while (ret == -1 && errno == EINTR); + return ret; +#else + #error unsupported platform +#endif +} + +FHANDLE TFileHandle::Duplicate() const noexcept { + if (!IsOpen()) { + return INVALID_FHANDLE; + } +#if defined(_win_) + FHANDLE dupHandle; + if (!::DuplicateHandle(GetCurrentProcess(), Fd_, GetCurrentProcess(), &dupHandle, 0, TRUE, DUPLICATE_SAME_ACCESS)) { + return INVALID_FHANDLE; + } + return dupHandle; +#elif defined(_unix_) + return ::dup(Fd_); +#else + #error unsupported platform +#endif +} + +int TFileHandle::Duplicate2Posix(int dstHandle) const noexcept { + if (!IsOpen()) { + return -1; + } +#if defined(_win_) + FHANDLE dupHandle = Duplicate(); + if (dupHandle == INVALID_FHANDLE) { + _set_errno(EMFILE); + return -1; + } + int posixHandle = _open_osfhandle((intptr_t)dupHandle, 0); + if (posixHandle == -1) { + CloseHandle(dupHandle); + return -1; + } + if (dup2(posixHandle, dstHandle) == -1) { + dstHandle = -1; + } + _close(posixHandle); + return dstHandle; +#elif defined(_unix_) + while (dup2(Fd_, dstHandle) == -1) { + if (errno != EINTR) { + return -1; + } + } + return dstHandle; +#else + #error unsupported platform +#endif +} + +bool TFileHandle::LinkTo(const TFileHandle& fh) const noexcept { +#if defined(_unix_) + while (dup2(fh.Fd_, Fd_) == -1) { + if (errno != EINTR) { + return false; + } + } + return true; +#elif defined(_win_) + TFileHandle nh(fh.Duplicate()); + + if (!nh.IsOpen()) { + return false; + } + + //not thread-safe + nh.Swap(*const_cast<TFileHandle*>(this)); + + return true; +#else + #error unsupported +#endif +} + +int TFileHandle::Flock(int op) noexcept { + return ::Flock(Fd_, op); +} + +bool TFileHandle::SetDirect() { +#ifdef _linux_ + const long flags = fcntl(Fd_, F_GETFL); + const int r = fcntl(Fd_, F_SETFL, flags | O_DIRECT); + + return !r; +#endif + + return false; +} + +void TFileHandle::ResetDirect() { +#ifdef _linux_ + long flags = fcntl(Fd_, F_GETFL); + fcntl(Fd_, F_SETFL, flags & ~O_DIRECT); +#endif +} + +i64 TFileHandle::CountCache(i64 offset, i64 length) const noexcept { +#ifdef _linux_ + const i64 pageSize = NSystemInfo::GetPageSize(); + constexpr size_t vecSize = 512; // Fetch up to 2MiB at once + const i64 batchSize = vecSize * pageSize; + std::array<ui8, vecSize> vec; + void* ptr = nullptr; + i64 res = 0; + + if (!IsOpen()) { + return -1; + } + + if (!length) { + length = GetLength(); + length -= Min(length, offset); + } + + if (!length) { + return 0; + } + + const i64 begin = AlignDown(offset, pageSize); + const i64 end = AlignUp(offset + length, pageSize); + const i64 size = end - begin; + + /* + * Since fincode is not implemented yet use mmap and mincore. + * This is not so effective and scalable for frequent usage. + */ + ptr = ::mmap( + (caddr_t) nullptr, + size, + PROT_READ, + MAP_SHARED | MAP_NORESERVE, + Fd_, + begin); + if (MAP_FAILED == ptr) { + return -1; + } + + for (i64 base = begin; base < end; base += batchSize) { + const size_t batch = Min(vecSize, size_t((end - base) / pageSize)); + void* batchPtr = static_cast<caddr_t>(ptr) + (base - begin); + + if (::mincore(batchPtr, batch * pageSize, vec.data())) { + res = -1; + break; + } + + for (size_t i = 0; i < batch; i++) { + // count uptodate complete pages in cache + if (vec[i] & 1) { + res += pageSize; + } + } + + if (base == begin && (vec[0] & 1)) { + // cut head of first page + res -= offset - begin; + } + + if ((end - base) <= batchSize && (vec[batch - 1] & 1)) { + // cut tail of last page + res -= size - (offset - begin) - length; + } + } + + ::munmap(ptr, size); + + return res; +#else + Y_UNUSED(offset); + Y_UNUSED(length); + return -1; +#endif +} + +void TFileHandle::PrefetchCache(i64 offset, i64 length, bool wait) const noexcept { +#ifdef _linux_ + #if HAVE_POSIX_FADVISE + // POSIX_FADV_WILLNEED starts reading upto read_ahead_kb in background + ::posix_fadvise(Fd_, offset, length, POSIX_FADV_WILLNEED); + #endif + + if (wait) { + TFileHandle devnull("/dev/null", OpenExisting | WrOnly | CloseOnExec); + off_t end = length ? (offset + length) : GetLength(); + off_t pos = offset; + ssize_t ret; + + do { + ret = ::sendfile((FHANDLE)devnull, Fd_, &pos, end - pos); + } while (pos < end && (ret > 0 || errno == EINTR)); + } +#else + Y_UNUSED(offset); + Y_UNUSED(length); + Y_UNUSED(wait); +#endif +} + +void TFileHandle::EvictCache(i64 offset, i64 length) const noexcept { +#if HAVE_POSIX_FADVISE + /* + * This tries to evicts only unmaped, clean, complete pages. + */ + ::posix_fadvise(Fd_, offset, length, POSIX_FADV_DONTNEED); +#else + Y_UNUSED(offset); + Y_UNUSED(length); +#endif +} + +bool TFileHandle::FlushCache(i64 offset, i64 length, bool wait) noexcept { +#if HAVE_SYNC_FILE_RANGE + int flags = SYNC_FILE_RANGE_WRITE; + if (wait) { + flags |= SYNC_FILE_RANGE_WAIT_AFTER; + } + int ret = ::sync_file_range(Fd_, offset, length, flags); + return ret == 0 || errno == EROFS; +#else + Y_UNUSED(offset); + Y_UNUSED(length); + if (wait) { + return FlushData(); + } + return true; +#endif +} + +TString DecodeOpenMode(ui32 mode0) { + ui32 mode = mode0; + + TStringBuilder r; + +#define F(flag) \ + if ((mode & flag) == flag) { \ + mode &= ~flag; \ + if (r) { \ + r << TStringBuf("|"); \ + } \ + r << TStringBuf(#flag); \ + } + + F(RdWr) + F(RdOnly) + F(WrOnly) + + F(CreateAlways) + F(CreateNew) + F(OpenAlways) + F(TruncExisting) + F(ForAppend) + F(Transient) + F(CloseOnExec) + + F(Temp) + F(Sync) + F(Direct) + F(DirectAligned) + F(Seq) + F(NoReuse) + F(NoReadAhead) + + F(AX) + F(AR) + F(AW) + F(ARW) + + F(AXOther) + F(AWOther) + F(AROther) + F(AXGroup) + F(AWGroup) + F(ARGroup) + F(AXUser) + F(AWUser) + F(ARUser) + +#undef F + + if (mode != 0) { + if (r) { + r << TStringBuf("|"); + } + + r << Hex(mode); + } + + if (!r) { + return "0"; + } + + return r; +} + +class TFile::TImpl: public TAtomicRefCount<TImpl> { +public: + inline TImpl(FHANDLE fd, const TString& fname = TString()) + : Handle_(fd) + , FileName_(fname) + { + } + + inline TImpl(const TString& fName, EOpenMode oMode) + : Handle_(fName, oMode) + , FileName_(fName) + { + if (!Handle_.IsOpen()) { + ythrow TFileError() << "can't open " << fName.Quote() << " with mode " << DecodeOpenMode(oMode) << " (" << Hex(oMode.ToBaseType()) << ")"; + } + } + + inline ~TImpl() = default; + + inline void Close() { + if (!Handle_.Close()) { + ythrow TFileError() << "can't close " << FileName_.Quote(); + } + } + + const TString& GetName() const noexcept { + return FileName_; + } + + void SetName(const TString& newName) { + FileName_ = newName; + } + + const TFileHandle& GetHandle() const noexcept { + return Handle_; + } + + i64 Seek(i64 offset, SeekDir origin) { + i64 pos = Handle_.Seek(offset, origin); + if (pos == -1L) { + ythrow TFileError() << "can't seek " << offset << " bytes in " << FileName_.Quote(); + } + return pos; + } + + void Resize(i64 length) { + if (!Handle_.Resize(length)) { + ythrow TFileError() << "can't resize " << FileName_.Quote() << " to size " << length; + } + } + + void Reserve(i64 length) { + if (!Handle_.Reserve(length)) { + ythrow TFileError() << "can't reserve " << length << " for file " << FileName_.Quote(); + } + } + + void FallocateNoResize(i64 length) { + if (!Handle_.FallocateNoResize(length)) { + ythrow TFileError() << "can't allocate " << length << "bytes of space for file " << FileName_.Quote(); + } + } + + void ShrinkToFit() { + if (!Handle_.ShrinkToFit()) { + ythrow TFileError() << "can't shrink " << FileName_.Quote() << " to logical size"; + } + } + + void Flush() { + if (!Handle_.Flush()) { + ythrow TFileError() << "can't flush " << FileName_.Quote(); + } + } + + void FlushData() { + if (!Handle_.FlushData()) { + ythrow TFileError() << "can't flush data " << FileName_.Quote(); + } + } + + TFile Duplicate() const { + TFileHandle dupH(Handle_.Duplicate()); + if (!dupH.IsOpen()) { + ythrow TFileError() << "can't duplicate the handle of " << FileName_.Quote(); + } + TFile res(dupH); + dupH.Release(); + return res; + } + + // Maximum amount of bytes to be read via single system call. + // Some libraries fail when it is greater than max int. + // Syscalls can cause contention if they operate on very large data blocks. + static constexpr size_t MaxReadPortion = 1_GB; + + i32 RawRead(void* bufferIn, size_t numBytes) { + const size_t toRead = Min(MaxReadPortion, numBytes); + return Handle_.Read(bufferIn, toRead); + } + + size_t ReadOrFail(void* buf, size_t numBytes) { + const i32 reallyRead = RawRead(buf, numBytes); + + if (reallyRead < 0) { + ythrow TFileError() << "can not read data from " << FileName_.Quote(); + } + + return reallyRead; + } + + size_t Read(void* bufferIn, size_t numBytes) { + ui8* buf = (ui8*)bufferIn; + + while (numBytes) { + const size_t reallyRead = ReadOrFail(buf, numBytes); + + if (reallyRead == 0) { + // file exhausted + break; + } + + buf += reallyRead; + numBytes -= reallyRead; + } + + return buf - (ui8*)bufferIn; + } + + void Load(void* buf, size_t len) { + if (Read(buf, len) != len) { + ythrow TFileError() << "can't read " << len << " bytes from " << FileName_.Quote(); + } + } + + // Maximum amount of bytes to be written via single system call. + // Some libraries fail when it is greater than max int. + // Syscalls can cause contention if they operate on very large data blocks. + static constexpr size_t MaxWritePortion = 1_GB; + + void Write(const void* buffer, size_t numBytes) { + const ui8* buf = (const ui8*)buffer; + + while (numBytes) { + const i32 toWrite = (i32)Min(MaxWritePortion, numBytes); + const i32 reallyWritten = Handle_.Write(buf, toWrite); + + if (reallyWritten < 0) { + ythrow TFileError() << "can't write " << toWrite << " bytes to " << FileName_.Quote(); + } + + buf += reallyWritten; + numBytes -= reallyWritten; + } + } + + size_t Pread(void* bufferIn, size_t numBytes, i64 offset) const { + ui8* buf = (ui8*)bufferIn; + + while (numBytes) { + const i32 toRead = (i32)Min(MaxReadPortion, numBytes); + const i32 reallyRead = RawPread(buf, toRead, offset); + + if (reallyRead < 0) { + ythrow TFileError() << "can not read data from " << FileName_.Quote(); + } + + if (reallyRead == 0) { + // file exausted + break; + } + + buf += reallyRead; + offset += reallyRead; + numBytes -= reallyRead; + } + + return buf - (ui8*)bufferIn; + } + + i32 RawPread(void* buf, ui32 len, i64 offset) const { + return Handle_.Pread(buf, len, offset); + } + + void Pload(void* buf, size_t len, i64 offset) const { + if (Pread(buf, len, offset) != len) { + ythrow TFileError() << "can't read " << len << " bytes at offset " << offset << " from " << FileName_.Quote(); + } + } + + void Pwrite(const void* buffer, size_t numBytes, i64 offset) const { + const ui8* buf = (const ui8*)buffer; + + while (numBytes) { + const i32 toWrite = (i32)Min(MaxWritePortion, numBytes); + const i32 reallyWritten = Handle_.Pwrite(buf, toWrite, offset); + + if (reallyWritten < 0) { + ythrow TFileError() << "can't write " << toWrite << " bytes to " << FileName_.Quote(); + } + + buf += reallyWritten; + offset += reallyWritten; + numBytes -= reallyWritten; + } + } + + void Flock(int op) { + if (0 != Handle_.Flock(op)) { + ythrow TFileError() << "can't flock " << FileName_.Quote(); + } + } + + void SetDirect() { + if (!Handle_.SetDirect()) { + ythrow TFileError() << "can't set direct mode for " << FileName_.Quote(); + } + } + + void ResetDirect() { + Handle_.ResetDirect(); + } + + i64 CountCache(i64 offset, i64 length) const noexcept { + return Handle_.CountCache(offset, length); + } + + void PrefetchCache(i64 offset, i64 length, bool wait) const noexcept { + Handle_.PrefetchCache(offset, length, wait); + } + + void EvictCache(i64 offset, i64 length) const noexcept { + Handle_.EvictCache(offset, length); + } + + void FlushCache(i64 offset, i64 length, bool wait) { + if (!Handle_.FlushCache(offset, length, wait)) { + ythrow TFileError() << "can't flush data " << FileName_.Quote(); + } + } + +private: + TFileHandle Handle_; + TString FileName_; +}; + +TFile::TFile() + : Impl_(new TImpl(INVALID_FHANDLE)) +{ +} + +TFile::TFile(FHANDLE fd) + : Impl_(new TImpl(fd)) +{ +} + +TFile::TFile(FHANDLE fd, const TString& name) + : Impl_(new TImpl(fd, name)) +{ +} + +TFile::TFile(const TString& fName, EOpenMode oMode) + : Impl_(new TImpl(fName, oMode)) +{ +} + +TFile::~TFile() = default; + +void TFile::Close() { + Impl_->Close(); +} + +const TString& TFile::GetName() const noexcept { + return Impl_->GetName(); +} + +i64 TFile::GetPosition() const noexcept { + return Impl_->GetHandle().GetPosition(); +} + +i64 TFile::GetLength() const noexcept { + return Impl_->GetHandle().GetLength(); +} + +bool TFile::IsOpen() const noexcept { + return Impl_->GetHandle().IsOpen(); +} + +FHANDLE TFile::GetHandle() const noexcept { + return Impl_->GetHandle(); +} + +i64 TFile::Seek(i64 offset, SeekDir origin) { + return Impl_->Seek(offset, origin); +} + +void TFile::Resize(i64 length) { + Impl_->Resize(length); +} + +void TFile::Reserve(i64 length) { + Impl_->Reserve(length); +} + +void TFile::FallocateNoResize(i64 length) { + Impl_->FallocateNoResize(length); +} + +void TFile::ShrinkToFit() { + Impl_->ShrinkToFit(); +} + +void TFile::Flush() { + Impl_->Flush(); +} + +void TFile::FlushData() { + Impl_->FlushData(); +} + +TFile TFile::Duplicate() const { + TFile res = Impl_->Duplicate(); + res.Impl_->SetName(Impl_->GetName()); + return res; +} + +size_t TFile::Read(void* buf, size_t len) { + return Impl_->Read(buf, len); +} + +i32 TFile::RawRead(void* buf, size_t len) { + return Impl_->RawRead(buf, len); +} + +size_t TFile::ReadOrFail(void* buf, size_t len) { + return Impl_->ReadOrFail(buf, len); +} + +void TFile::Load(void* buf, size_t len) { + Impl_->Load(buf, len); +} + +void TFile::Write(const void* buf, size_t len) { + Impl_->Write(buf, len); +} + +size_t TFile::Pread(void* buf, size_t len, i64 offset) const { + return Impl_->Pread(buf, len, offset); +} + +i32 TFile::RawPread(void* buf, ui32 len, i64 offset) const { + return Impl_->RawPread(buf, len, offset); +} + +void TFile::Pload(void* buf, size_t len, i64 offset) const { + Impl_->Pload(buf, len, offset); +} + +void TFile::Pwrite(const void* buf, size_t len, i64 offset) const { + Impl_->Pwrite(buf, len, offset); +} + +void TFile::Flock(int op) { + Impl_->Flock(op); +} + +void TFile::SetDirect() { + Impl_->SetDirect(); +} + +void TFile::ResetDirect() { + Impl_->ResetDirect(); +} + +i64 TFile::CountCache(i64 offset, i64 length) const noexcept { + return Impl_->CountCache(offset, length); +} + +void TFile::PrefetchCache(i64 offset, i64 length, bool wait) const noexcept { + Impl_->PrefetchCache(offset, length, wait); +} + +void TFile::EvictCache(i64 offset, i64 length) const noexcept { + Impl_->EvictCache(offset, length); +} + +void TFile::FlushCache(i64 offset, i64 length, bool wait) { + Impl_->FlushCache(offset, length, wait); +} + +void TFile::LinkTo(const TFile& f) const { + if (!Impl_->GetHandle().LinkTo(f.Impl_->GetHandle())) { + ythrow TFileError() << "can not link fd(" << GetName() << " -> " << f.GetName() << ")"; + } +} + +TFile TFile::Temporary(const TString& prefix) { + //TODO - handle impossible case of name collision + return TFile(prefix + ToString(MicroSeconds()) + "-" + ToString(RandomNumber<ui64>()), CreateNew | RdWr | Seq | Temp | Transient); +} + +TFile TFile::ForAppend(const TString& path) { + return TFile(path, OpenAlways | WrOnly | Seq | ::ForAppend); +} + +TFile Duplicate(FILE* f) { + return Duplicate(fileno(f)); +} + +TFile Duplicate(int fd) { +#if defined(_win_) + /* There are two options of how to duplicate a file descriptor on Windows: + * + * 1: + * - Call dup. + * - Call _get_osfhandle on the result. + * - Use returned handle. + * - Call _close on file descriptor returned by dup. This will also close + * the handle. + * + * 2: + * - Call _get_osfhandle. + * - Call DuplicateHandle on the result. + * - Use returned handle. + * - Call CloseHandle. + * + * TFileHandle calls CloseHandle when destroyed, leaving us with option #2. */ + FHANDLE handle = reinterpret_cast<FHANDLE>(::_get_osfhandle(fd)); + + FHANDLE dupHandle; + if (!::DuplicateHandle(GetCurrentProcess(), handle, GetCurrentProcess(), &dupHandle, 0, TRUE, DUPLICATE_SAME_ACCESS)) { + ythrow TFileError() << "can not duplicate file descriptor " << LastSystemError() << Endl; + } + + return TFile(dupHandle); +#elif defined(_unix_) + return TFile(::dup(fd)); +#else + #error unsupported platform +#endif +} + +bool PosixDisableReadAhead(FHANDLE fileHandle, void* addr) noexcept { + int ret = -1; + +#if HAVE_POSIX_FADVISE + #if defined(_linux_) + Y_UNUSED(fileHandle); + ret = madvise(addr, 0, MADV_RANDOM); // according to klamm@ posix_fadvise does not work under linux, madvise does work + #else + Y_UNUSED(addr); + ret = ::posix_fadvise(fileHandle, 0, 0, POSIX_FADV_RANDOM); + #endif +#else + Y_UNUSED(fileHandle); + Y_UNUSED(addr); +#endif + return ret == 0; +} |