diff options
author | Alexander Smirnov <alex@ydb.tech> | 2025-04-02 00:51:46 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2025-04-02 00:51:46 +0000 |
commit | f6416f5c4da60c922a0e1f07ec342fda00837d54 (patch) | |
tree | fd9b88400f5fc74b30a0ffacba8d6c0905b2fc6a /library/cpp/logger/sync_page_cache_file.cpp | |
parent | d30ab64d66bbeced779befb5033f1d81462dde2a (diff) | |
parent | 82a20ed13f9ce647673791b772685ee5997f256d (diff) | |
download | ydb-f6416f5c4da60c922a0e1f07ec342fda00837d54.tar.gz |
Merge branch 'rightlib' into merge-libs-250402-0050
Diffstat (limited to 'library/cpp/logger/sync_page_cache_file.cpp')
-rw-r--r-- | library/cpp/logger/sync_page_cache_file.cpp | 84 |
1 files changed, 69 insertions, 15 deletions
diff --git a/library/cpp/logger/sync_page_cache_file.cpp b/library/cpp/logger/sync_page_cache_file.cpp index 1424159af7..7f262c7ee7 100644 --- a/library/cpp/logger/sync_page_cache_file.cpp +++ b/library/cpp/logger/sync_page_cache_file.cpp @@ -1,26 +1,44 @@ #include "sync_page_cache_file.h" + #include "record.h" #include <util/generic/buffer.h> #include <util/generic/yexception.h> +#include <util/system/align.h> +#include <util/system/event.h> #include <util/system/file.h> #include <util/system/info.h> #include <util/system/mutex.h> -#include <util/system/align.h> +#include <util/system/thread.h> class TSyncPageCacheFileLogBackend::TImpl: public TNonCopyable { public: - TImpl(const TString& path, size_t maxBufferSize, size_t maxPendingCacheSize) + TImpl( + const TString& path, + size_t maxBufferSize, + size_t maxPendingCacheSize, + TMaybe<TDuration> bufferFlushPeriod + ) : File_{OpenFile(path)} , MaxBufferSize_{maxBufferSize} , MaxPendingCacheSize_{maxPendingCacheSize} , Buffer_{maxBufferSize} + , BufferFlushPeriod_{bufferFlushPeriod} { ResetPtrs(); + + if (BufferFlushPeriod_) { + BufferFlushThreadPtr_ = MakeHolder<TThread>([this] {RunBufferFlushThread();}); + BufferFlushThreadPtr_->Start(); + } } ~TImpl() noexcept { try { + if (BufferFlushThreadPtr_) { + BufferFlushThreadExitWaiter_.Signal(); + } + Write(); FlushSync(GuaranteedWrittenPtr_, WrittenPtr_); } catch (...) { @@ -32,17 +50,7 @@ public: Buffer_.Append(rec.Data, rec.Len); if (Buffer_.size() >= MaxBufferSize_) { - const i64 prevAlignedEndPtr = PageAlignedWrittenPtr_; - Write(); - - if (prevAlignedEndPtr < PageAlignedWrittenPtr_) { - FlushAsync(prevAlignedEndPtr, PageAlignedWrittenPtr_); - } - - const i64 minPendingCacheOffset = PageAlignedWrittenPtr_ - MaxPendingCacheSize_; - if (minPendingCacheOffset > GuaranteedWrittenPtr_) { - FlushSync(GuaranteedWrittenPtr_, minPendingCacheOffset); - } + WriteAndFlush(); } } @@ -101,6 +109,36 @@ private: GuaranteedWrittenPtr_ = to; } + void WriteAndFlush() { + const i64 prevAlignedEndPtr = PageAlignedWrittenPtr_; + Write(); + + if (prevAlignedEndPtr < PageAlignedWrittenPtr_) { + FlushAsync(prevAlignedEndPtr, PageAlignedWrittenPtr_); + } + + const i64 minPendingCacheOffset = PageAlignedWrittenPtr_ - MaxPendingCacheSize_; + if (minPendingCacheOffset > GuaranteedWrittenPtr_) { + FlushSync(GuaranteedWrittenPtr_, minPendingCacheOffset); + } + } + + void RunBufferFlushThread() { + Y_ENSURE(BufferFlushPeriod_); + TInstant deadline; + do { + deadline = TInstant::Now() + *BufferFlushPeriod_; + try { + TGuard guard{Lock_}; + if (!Buffer_.Empty()) { + WriteAndFlush(); + } + } catch (...) { + Cerr << "Failed to flush eventlog buffer: " << CurrentExceptionMessage() << Endl; + } + } while (!BufferFlushThreadExitWaiter_.WaitD(deadline)); + } + private: TMutex Lock_; TFile File_; @@ -112,10 +150,26 @@ private: i64 WrittenPtr_ = 0; i64 PageAlignedWrittenPtr_ = 0; i64 GuaranteedWrittenPtr_ = 0; + + const TMaybe<TDuration> BufferFlushPeriod_; + TManualEvent BufferFlushThreadExitWaiter_; + + // thread should be declared last to be destroyed before other props + THolder<TThread> BufferFlushThreadPtr_; }; -TSyncPageCacheFileLogBackend::TSyncPageCacheFileLogBackend(const TString& path, size_t maxBufferSize, size_t maxPengingCacheSize) - : Impl_(MakeHolder<TImpl>(path, maxBufferSize, maxPengingCacheSize)) +TSyncPageCacheFileLogBackend::TSyncPageCacheFileLogBackend( + const TString& path, + size_t maxBufferSize, + size_t maxPengingCacheSize, + TMaybe<TDuration> bufferFlushPeriod +) + : Impl_(MakeHolder<TImpl>( + path, + maxBufferSize, + maxPengingCacheSize, + bufferFlushPeriod + )) {} TSyncPageCacheFileLogBackend::~TSyncPageCacheFileLogBackend() { |