diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/logger/sync_page_cache_file.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/logger/sync_page_cache_file.cpp')
-rw-r--r-- | library/cpp/logger/sync_page_cache_file.cpp | 125 |
1 files changed, 125 insertions, 0 deletions
diff --git a/library/cpp/logger/sync_page_cache_file.cpp b/library/cpp/logger/sync_page_cache_file.cpp new file mode 100644 index 0000000000..a0e93a78d7 --- /dev/null +++ b/library/cpp/logger/sync_page_cache_file.cpp @@ -0,0 +1,125 @@ +#include "sync_page_cache_file.h" +#include "record.h" + +#include <util/generic/buffer.h> +#include <util/system/file.h> +#include <util/system/info.h> +#include <util/system/mutex.h> +#include <util/system/rwlock.h> +#include <util/system/align.h> + +class TSyncPageCacheFileLogBackend::TImpl: public TNonCopyable { +public: + TImpl(const TString& path, size_t maxBufferSize, size_t maxPendingCacheSize) + : File_{OpenFile(path)} + , MaxBufferSize_{maxBufferSize} + , MaxPendingCacheSize_{maxPendingCacheSize} + , Buffer_{maxBufferSize} + { + ResetPtrs(); + } + + ~TImpl() noexcept { + try { + Write(); + FlushSync(GuaranteedWrittenPtr_, WrittenPtr_); + } catch (...) { + } + } + + void WriteData(const TLogRecord& rec) { + TGuard guard{Lock_}; + + Buffer_.Append(rec.Data, rec.Len); + if (Buffer_.size() >= MaxBufferSize_) { + const i64 prevAlignedEndPtr = PageAlignedWrittenPtr_; + Write(); + + if (prevAlignedEndPtr < PageAlignedWrittenPtr_) { + FlushAsync(prevAlignedEndPtr, PageAlignedWrittenPtr_); + } + + const i64 minPendingCacheOffset = PageAlignedWrittenPtr_ - MaxPendingCacheSize_; + if (minPendingCacheOffset > GuaranteedWrittenPtr_) { + FlushSync(GuaranteedWrittenPtr_, minPendingCacheOffset); + } + } + } + + void ReopenLog() { + TGuard guard{Lock_}; + + Write(); + FlushSync(GuaranteedWrittenPtr_, WrittenPtr_); + + File_.LinkTo(OpenFile(File_.GetName())); + + ResetPtrs(); + } + +private: + void ResetPtrs() { + WrittenPtr_ = File_.GetLength(); + PageAlignedWrittenPtr_ = AlignDown(WrittenPtr_, GetPageSize()); + GuaranteedWrittenPtr_ = WrittenPtr_; + } + + static TFile OpenFile(const TString& path) { + return TFile{path, OpenAlways | WrOnly | ForAppend | Seq | NoReuse}; + } + + static i64 GetPageSize() { + static const i64 pageSize = NSystemInfo::GetPageSize(); + Y_ASSUME(IsPowerOf2(pageSize)); + return pageSize; + } + + void Write() { + File_.Write(Buffer_.Data(), Buffer_.Size()); + WrittenPtr_ += Buffer_.Size(); + PageAlignedWrittenPtr_ = AlignDown(WrittenPtr_, GetPageSize()); + Buffer_.Clear(); + } + + void FlushAsync(const i64 from, const i64 to) { + File_.FlushCache(from, to - from, /* wait = */ false); + } + + void FlushSync(const i64 from, const i64 to) { + const i64 begin = AlignDown(from, GetPageSize()); + const i64 end = AlignUp(to, GetPageSize()); + const i64 length = end - begin; + + File_.FlushCache(begin, length, /* wait = */ true); + File_.EvictCache(begin, length); + + GuaranteedWrittenPtr_ = to; + } + +private: + TMutex Lock_; + TFile File_; + + const size_t MaxBufferSize_ = 0; + const size_t MaxPendingCacheSize_ = 0; + + TBuffer Buffer_; + i64 WrittenPtr_ = 0; + i64 PageAlignedWrittenPtr_ = 0; + i64 GuaranteedWrittenPtr_ = 0; +}; + +TSyncPageCacheFileLogBackend::TSyncPageCacheFileLogBackend(const TString& path, size_t maxBufferSize, size_t maxPengingCacheSize) + : Impl_(MakeHolder<TImpl>(path, maxBufferSize, maxPengingCacheSize)) +{} + +TSyncPageCacheFileLogBackend::~TSyncPageCacheFileLogBackend() { +} + +void TSyncPageCacheFileLogBackend::WriteData(const TLogRecord& rec) { + Impl_->WriteData(rec); +} + +void TSyncPageCacheFileLogBackend::ReopenLog() { + Impl_->ReopenLog(); +} |