diff options
author | sskvor <sskvor@yandex-team.ru> | 2022-02-10 16:48:04 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:04 +0300 |
commit | 10ade5dcb952a8fae61f734485641a8409e1c545 (patch) | |
tree | b222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/logger/sync_page_cache_file.cpp | |
parent | 75abffb472365d28bd0a019db1a54cb32a6100dd (diff) | |
download | ydb-10ade5dcb952a8fae61f734485641a8409e1c545.tar.gz |
Restoring authorship annotation for <sskvor@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/logger/sync_page_cache_file.cpp')
-rw-r--r-- | library/cpp/logger/sync_page_cache_file.cpp | 250 |
1 files changed, 125 insertions, 125 deletions
diff --git a/library/cpp/logger/sync_page_cache_file.cpp b/library/cpp/logger/sync_page_cache_file.cpp index c767b8d75d..a0e93a78d7 100644 --- a/library/cpp/logger/sync_page_cache_file.cpp +++ b/library/cpp/logger/sync_page_cache_file.cpp @@ -1,125 +1,125 @@ -#include "sync_page_cache_file.h" -#include "record.h" - -#include <util/generic/buffer.h> -#include <util/system/file.h> -#include <util/system/info.h> -#include <util/system/mutex.h> -#include <util/system/rwlock.h> -#include <util/system/align.h> - -class TSyncPageCacheFileLogBackend::TImpl: public TNonCopyable { -public: - TImpl(const TString& path, size_t maxBufferSize, size_t maxPendingCacheSize) - : File_{OpenFile(path)} - , MaxBufferSize_{maxBufferSize} - , MaxPendingCacheSize_{maxPendingCacheSize} - , Buffer_{maxBufferSize} - { - ResetPtrs(); - } - - ~TImpl() noexcept { - try { - Write(); - FlushSync(GuaranteedWrittenPtr_, WrittenPtr_); - } catch (...) { - } - } - - void WriteData(const TLogRecord& rec) { - TGuard guard{Lock_}; - - Buffer_.Append(rec.Data, rec.Len); - if (Buffer_.size() >= MaxBufferSize_) { - const i64 prevAlignedEndPtr = PageAlignedWrittenPtr_; - Write(); - - if (prevAlignedEndPtr < PageAlignedWrittenPtr_) { - FlushAsync(prevAlignedEndPtr, PageAlignedWrittenPtr_); - } - - const i64 minPendingCacheOffset = PageAlignedWrittenPtr_ - MaxPendingCacheSize_; - if (minPendingCacheOffset > GuaranteedWrittenPtr_) { - FlushSync(GuaranteedWrittenPtr_, minPendingCacheOffset); - } - } - } - - void ReopenLog() { - TGuard guard{Lock_}; - - Write(); - FlushSync(GuaranteedWrittenPtr_, WrittenPtr_); - - File_.LinkTo(OpenFile(File_.GetName())); - - ResetPtrs(); - } - -private: - void ResetPtrs() { - WrittenPtr_ = File_.GetLength(); - PageAlignedWrittenPtr_ = AlignDown(WrittenPtr_, GetPageSize()); - GuaranteedWrittenPtr_ = WrittenPtr_; - } - - static TFile OpenFile(const TString& path) { - return TFile{path, OpenAlways | WrOnly | ForAppend | Seq | NoReuse}; - } - - static i64 GetPageSize() { - static const i64 pageSize = NSystemInfo::GetPageSize(); - Y_ASSUME(IsPowerOf2(pageSize)); - return pageSize; - } - - void Write() { - File_.Write(Buffer_.Data(), Buffer_.Size()); - WrittenPtr_ += Buffer_.Size(); - PageAlignedWrittenPtr_ = AlignDown(WrittenPtr_, GetPageSize()); - Buffer_.Clear(); - } - - void FlushAsync(const i64 from, const i64 to) { - File_.FlushCache(from, to - from, /* wait = */ false); - } - - void FlushSync(const i64 from, const i64 to) { - const i64 begin = AlignDown(from, GetPageSize()); - const i64 end = AlignUp(to, GetPageSize()); - const i64 length = end - begin; - - File_.FlushCache(begin, length, /* wait = */ true); - File_.EvictCache(begin, length); - - GuaranteedWrittenPtr_ = to; - } - -private: - TMutex Lock_; - TFile File_; - - const size_t MaxBufferSize_ = 0; - const size_t MaxPendingCacheSize_ = 0; - - TBuffer Buffer_; - i64 WrittenPtr_ = 0; - i64 PageAlignedWrittenPtr_ = 0; - i64 GuaranteedWrittenPtr_ = 0; -}; - -TSyncPageCacheFileLogBackend::TSyncPageCacheFileLogBackend(const TString& path, size_t maxBufferSize, size_t maxPengingCacheSize) - : Impl_(MakeHolder<TImpl>(path, maxBufferSize, maxPengingCacheSize)) -{} - -TSyncPageCacheFileLogBackend::~TSyncPageCacheFileLogBackend() { -} - -void TSyncPageCacheFileLogBackend::WriteData(const TLogRecord& rec) { - Impl_->WriteData(rec); -} - -void TSyncPageCacheFileLogBackend::ReopenLog() { - Impl_->ReopenLog(); -} +#include "sync_page_cache_file.h" +#include "record.h" + +#include <util/generic/buffer.h> +#include <util/system/file.h> +#include <util/system/info.h> +#include <util/system/mutex.h> +#include <util/system/rwlock.h> +#include <util/system/align.h> + +class TSyncPageCacheFileLogBackend::TImpl: public TNonCopyable { +public: + TImpl(const TString& path, size_t maxBufferSize, size_t maxPendingCacheSize) + : File_{OpenFile(path)} + , MaxBufferSize_{maxBufferSize} + , MaxPendingCacheSize_{maxPendingCacheSize} + , Buffer_{maxBufferSize} + { + ResetPtrs(); + } + + ~TImpl() noexcept { + try { + Write(); + FlushSync(GuaranteedWrittenPtr_, WrittenPtr_); + } catch (...) { + } + } + + void WriteData(const TLogRecord& rec) { + TGuard guard{Lock_}; + + Buffer_.Append(rec.Data, rec.Len); + if (Buffer_.size() >= MaxBufferSize_) { + const i64 prevAlignedEndPtr = PageAlignedWrittenPtr_; + Write(); + + if (prevAlignedEndPtr < PageAlignedWrittenPtr_) { + FlushAsync(prevAlignedEndPtr, PageAlignedWrittenPtr_); + } + + const i64 minPendingCacheOffset = PageAlignedWrittenPtr_ - MaxPendingCacheSize_; + if (minPendingCacheOffset > GuaranteedWrittenPtr_) { + FlushSync(GuaranteedWrittenPtr_, minPendingCacheOffset); + } + } + } + + void ReopenLog() { + TGuard guard{Lock_}; + + Write(); + FlushSync(GuaranteedWrittenPtr_, WrittenPtr_); + + File_.LinkTo(OpenFile(File_.GetName())); + + ResetPtrs(); + } + +private: + void ResetPtrs() { + WrittenPtr_ = File_.GetLength(); + PageAlignedWrittenPtr_ = AlignDown(WrittenPtr_, GetPageSize()); + GuaranteedWrittenPtr_ = WrittenPtr_; + } + + static TFile OpenFile(const TString& path) { + return TFile{path, OpenAlways | WrOnly | ForAppend | Seq | NoReuse}; + } + + static i64 GetPageSize() { + static const i64 pageSize = NSystemInfo::GetPageSize(); + Y_ASSUME(IsPowerOf2(pageSize)); + return pageSize; + } + + void Write() { + File_.Write(Buffer_.Data(), Buffer_.Size()); + WrittenPtr_ += Buffer_.Size(); + PageAlignedWrittenPtr_ = AlignDown(WrittenPtr_, GetPageSize()); + Buffer_.Clear(); + } + + void FlushAsync(const i64 from, const i64 to) { + File_.FlushCache(from, to - from, /* wait = */ false); + } + + void FlushSync(const i64 from, const i64 to) { + const i64 begin = AlignDown(from, GetPageSize()); + const i64 end = AlignUp(to, GetPageSize()); + const i64 length = end - begin; + + File_.FlushCache(begin, length, /* wait = */ true); + File_.EvictCache(begin, length); + + GuaranteedWrittenPtr_ = to; + } + +private: + TMutex Lock_; + TFile File_; + + const size_t MaxBufferSize_ = 0; + const size_t MaxPendingCacheSize_ = 0; + + TBuffer Buffer_; + i64 WrittenPtr_ = 0; + i64 PageAlignedWrittenPtr_ = 0; + i64 GuaranteedWrittenPtr_ = 0; +}; + +TSyncPageCacheFileLogBackend::TSyncPageCacheFileLogBackend(const TString& path, size_t maxBufferSize, size_t maxPengingCacheSize) + : Impl_(MakeHolder<TImpl>(path, maxBufferSize, maxPengingCacheSize)) +{} + +TSyncPageCacheFileLogBackend::~TSyncPageCacheFileLogBackend() { +} + +void TSyncPageCacheFileLogBackend::WriteData(const TLogRecord& rec) { + Impl_->WriteData(rec); +} + +void TSyncPageCacheFileLogBackend::ReopenLog() { + Impl_->ReopenLog(); +} |