aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/logger/sync_page_cache_file.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'library/cpp/logger/sync_page_cache_file.cpp')
-rw-r--r--library/cpp/logger/sync_page_cache_file.cpp84
1 files changed, 69 insertions, 15 deletions
diff --git a/library/cpp/logger/sync_page_cache_file.cpp b/library/cpp/logger/sync_page_cache_file.cpp
index 1424159af7..7f262c7ee7 100644
--- a/library/cpp/logger/sync_page_cache_file.cpp
+++ b/library/cpp/logger/sync_page_cache_file.cpp
@@ -1,26 +1,44 @@
#include "sync_page_cache_file.h"
+
#include "record.h"
#include <util/generic/buffer.h>
#include <util/generic/yexception.h>
+#include <util/system/align.h>
+#include <util/system/event.h>
#include <util/system/file.h>
#include <util/system/info.h>
#include <util/system/mutex.h>
-#include <util/system/align.h>
+#include <util/system/thread.h>
class TSyncPageCacheFileLogBackend::TImpl: public TNonCopyable {
public:
- TImpl(const TString& path, size_t maxBufferSize, size_t maxPendingCacheSize)
+ TImpl(
+ const TString& path,
+ size_t maxBufferSize,
+ size_t maxPendingCacheSize,
+ TMaybe<TDuration> bufferFlushPeriod
+ )
: File_{OpenFile(path)}
, MaxBufferSize_{maxBufferSize}
, MaxPendingCacheSize_{maxPendingCacheSize}
, Buffer_{maxBufferSize}
+ , BufferFlushPeriod_{bufferFlushPeriod}
{
ResetPtrs();
+
+ if (BufferFlushPeriod_) {
+ BufferFlushThreadPtr_ = MakeHolder<TThread>([this] {RunBufferFlushThread();});
+ BufferFlushThreadPtr_->Start();
+ }
}
~TImpl() noexcept {
try {
+ if (BufferFlushThreadPtr_) {
+ BufferFlushThreadExitWaiter_.Signal();
+ }
+
Write();
FlushSync(GuaranteedWrittenPtr_, WrittenPtr_);
} catch (...) {
@@ -32,17 +50,7 @@ public:
Buffer_.Append(rec.Data, rec.Len);
if (Buffer_.size() >= MaxBufferSize_) {
- const i64 prevAlignedEndPtr = PageAlignedWrittenPtr_;
- Write();
-
- if (prevAlignedEndPtr < PageAlignedWrittenPtr_) {
- FlushAsync(prevAlignedEndPtr, PageAlignedWrittenPtr_);
- }
-
- const i64 minPendingCacheOffset = PageAlignedWrittenPtr_ - MaxPendingCacheSize_;
- if (minPendingCacheOffset > GuaranteedWrittenPtr_) {
- FlushSync(GuaranteedWrittenPtr_, minPendingCacheOffset);
- }
+ WriteAndFlush();
}
}
@@ -101,6 +109,36 @@ private:
GuaranteedWrittenPtr_ = to;
}
+ void WriteAndFlush() {
+ const i64 prevAlignedEndPtr = PageAlignedWrittenPtr_;
+ Write();
+
+ if (prevAlignedEndPtr < PageAlignedWrittenPtr_) {
+ FlushAsync(prevAlignedEndPtr, PageAlignedWrittenPtr_);
+ }
+
+ const i64 minPendingCacheOffset = PageAlignedWrittenPtr_ - MaxPendingCacheSize_;
+ if (minPendingCacheOffset > GuaranteedWrittenPtr_) {
+ FlushSync(GuaranteedWrittenPtr_, minPendingCacheOffset);
+ }
+ }
+
+ void RunBufferFlushThread() {
+ Y_ENSURE(BufferFlushPeriod_);
+ TInstant deadline;
+ do {
+ deadline = TInstant::Now() + *BufferFlushPeriod_;
+ try {
+ TGuard guard{Lock_};
+ if (!Buffer_.Empty()) {
+ WriteAndFlush();
+ }
+ } catch (...) {
+ Cerr << "Failed to flush eventlog buffer: " << CurrentExceptionMessage() << Endl;
+ }
+ } while (!BufferFlushThreadExitWaiter_.WaitD(deadline));
+ }
+
private:
TMutex Lock_;
TFile File_;
@@ -112,10 +150,26 @@ private:
i64 WrittenPtr_ = 0;
i64 PageAlignedWrittenPtr_ = 0;
i64 GuaranteedWrittenPtr_ = 0;
+
+ const TMaybe<TDuration> BufferFlushPeriod_;
+ TManualEvent BufferFlushThreadExitWaiter_;
+
+ // thread should be declared last to be destroyed before other props
+ THolder<TThread> BufferFlushThreadPtr_;
};
-TSyncPageCacheFileLogBackend::TSyncPageCacheFileLogBackend(const TString& path, size_t maxBufferSize, size_t maxPengingCacheSize)
- : Impl_(MakeHolder<TImpl>(path, maxBufferSize, maxPengingCacheSize))
+TSyncPageCacheFileLogBackend::TSyncPageCacheFileLogBackend(
+ const TString& path,
+ size_t maxBufferSize,
+ size_t maxPengingCacheSize,
+ TMaybe<TDuration> bufferFlushPeriod
+)
+ : Impl_(MakeHolder<TImpl>(
+ path,
+ maxBufferSize,
+ maxPengingCacheSize,
+ bufferFlushPeriod
+ ))
{}
TSyncPageCacheFileLogBackend::~TSyncPageCacheFileLogBackend() {