diff options
Diffstat (limited to 'library/cpp/logger/sync_page_cache_file.cpp')
-rw-r--r-- | library/cpp/logger/sync_page_cache_file.cpp | 84 |
1 files changed, 69 insertions, 15 deletions
diff --git a/library/cpp/logger/sync_page_cache_file.cpp b/library/cpp/logger/sync_page_cache_file.cpp index 1424159af7..7f262c7ee7 100644 --- a/library/cpp/logger/sync_page_cache_file.cpp +++ b/library/cpp/logger/sync_page_cache_file.cpp @@ -1,26 +1,44 @@ #include "sync_page_cache_file.h" + #include "record.h" #include <util/generic/buffer.h> #include <util/generic/yexception.h> +#include <util/system/align.h> +#include <util/system/event.h> #include <util/system/file.h> #include <util/system/info.h> #include <util/system/mutex.h> -#include <util/system/align.h> +#include <util/system/thread.h> class TSyncPageCacheFileLogBackend::TImpl: public TNonCopyable { public: - TImpl(const TString& path, size_t maxBufferSize, size_t maxPendingCacheSize) + TImpl( + const TString& path, + size_t maxBufferSize, + size_t maxPendingCacheSize, + TMaybe<TDuration> bufferFlushPeriod + ) : File_{OpenFile(path)} , MaxBufferSize_{maxBufferSize} , MaxPendingCacheSize_{maxPendingCacheSize} , Buffer_{maxBufferSize} + , BufferFlushPeriod_{bufferFlushPeriod} { ResetPtrs(); + + if (BufferFlushPeriod_) { + BufferFlushThreadPtr_ = MakeHolder<TThread>([this] {RunBufferFlushThread();}); + BufferFlushThreadPtr_->Start(); + } } ~TImpl() noexcept { try { + if (BufferFlushThreadPtr_) { + BufferFlushThreadExitWaiter_.Signal(); + } + Write(); FlushSync(GuaranteedWrittenPtr_, WrittenPtr_); } catch (...) { @@ -32,17 +50,7 @@ public: Buffer_.Append(rec.Data, rec.Len); if (Buffer_.size() >= MaxBufferSize_) { - const i64 prevAlignedEndPtr = PageAlignedWrittenPtr_; - Write(); - - if (prevAlignedEndPtr < PageAlignedWrittenPtr_) { - FlushAsync(prevAlignedEndPtr, PageAlignedWrittenPtr_); - } - - const i64 minPendingCacheOffset = PageAlignedWrittenPtr_ - MaxPendingCacheSize_; - if (minPendingCacheOffset > GuaranteedWrittenPtr_) { - FlushSync(GuaranteedWrittenPtr_, minPendingCacheOffset); - } + WriteAndFlush(); } } @@ -101,6 +109,36 @@ private: GuaranteedWrittenPtr_ = to; } + void WriteAndFlush() { + const i64 prevAlignedEndPtr = PageAlignedWrittenPtr_; + Write(); + + if (prevAlignedEndPtr < PageAlignedWrittenPtr_) { + FlushAsync(prevAlignedEndPtr, PageAlignedWrittenPtr_); + } + + const i64 minPendingCacheOffset = PageAlignedWrittenPtr_ - MaxPendingCacheSize_; + if (minPendingCacheOffset > GuaranteedWrittenPtr_) { + FlushSync(GuaranteedWrittenPtr_, minPendingCacheOffset); + } + } + + void RunBufferFlushThread() { + Y_ENSURE(BufferFlushPeriod_); + TInstant deadline; + do { + deadline = TInstant::Now() + *BufferFlushPeriod_; + try { + TGuard guard{Lock_}; + if (!Buffer_.Empty()) { + WriteAndFlush(); + } + } catch (...) { + Cerr << "Failed to flush eventlog buffer: " << CurrentExceptionMessage() << Endl; + } + } while (!BufferFlushThreadExitWaiter_.WaitD(deadline)); + } + private: TMutex Lock_; TFile File_; @@ -112,10 +150,26 @@ private: i64 WrittenPtr_ = 0; i64 PageAlignedWrittenPtr_ = 0; i64 GuaranteedWrittenPtr_ = 0; + + const TMaybe<TDuration> BufferFlushPeriod_; + TManualEvent BufferFlushThreadExitWaiter_; + + // thread should be declared last to be destroyed before other props + THolder<TThread> BufferFlushThreadPtr_; }; -TSyncPageCacheFileLogBackend::TSyncPageCacheFileLogBackend(const TString& path, size_t maxBufferSize, size_t maxPengingCacheSize) - : Impl_(MakeHolder<TImpl>(path, maxBufferSize, maxPengingCacheSize)) +TSyncPageCacheFileLogBackend::TSyncPageCacheFileLogBackend( + const TString& path, + size_t maxBufferSize, + size_t maxPengingCacheSize, + TMaybe<TDuration> bufferFlushPeriod +) + : Impl_(MakeHolder<TImpl>( + path, + maxBufferSize, + maxPengingCacheSize, + bufferFlushPeriod + )) {} TSyncPageCacheFileLogBackend::~TSyncPageCacheFileLogBackend() { |