aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorartin-phares <artin-phares@yandex-team.com>2025-04-01 16:46:05 +0300
committerartin-phares <artin-phares@yandex-team.com>2025-04-01 17:24:02 +0300
commit8b59ff7ff2d64cbb994b04bd6997191319d25eae (patch)
treeead95085b3bb2a79e30a401fb5f3e7fbca187f6b /library/cpp
parente47faab44f36e1fe6204a677ad72b3c0c506e36a (diff)
downloadydb-8b59ff7ff2d64cbb994b04bd6997191319d25eae.tar.gz
Add BufferFlushPeriod option for TSyncPageCacheFileLogBackend
commit_hash:c1554e872fcaa2b74aef0c46372defab03ba3859
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/logger/sync_page_cache_file.cpp84
-rw-r--r--library/cpp/logger/sync_page_cache_file.h9
2 files changed, 77 insertions, 16 deletions
diff --git a/library/cpp/logger/sync_page_cache_file.cpp b/library/cpp/logger/sync_page_cache_file.cpp
index 1424159af74..7f262c7ee7b 100644
--- a/library/cpp/logger/sync_page_cache_file.cpp
+++ b/library/cpp/logger/sync_page_cache_file.cpp
@@ -1,26 +1,44 @@
#include "sync_page_cache_file.h"
+
#include "record.h"
#include <util/generic/buffer.h>
#include <util/generic/yexception.h>
+#include <util/system/align.h>
+#include <util/system/event.h>
#include <util/system/file.h>
#include <util/system/info.h>
#include <util/system/mutex.h>
-#include <util/system/align.h>
+#include <util/system/thread.h>
class TSyncPageCacheFileLogBackend::TImpl: public TNonCopyable {
public:
- TImpl(const TString& path, size_t maxBufferSize, size_t maxPendingCacheSize)
+ TImpl(
+ const TString& path,
+ size_t maxBufferSize,
+ size_t maxPendingCacheSize,
+ TMaybe<TDuration> bufferFlushPeriod
+ )
: File_{OpenFile(path)}
, MaxBufferSize_{maxBufferSize}
, MaxPendingCacheSize_{maxPendingCacheSize}
, Buffer_{maxBufferSize}
+ , BufferFlushPeriod_{bufferFlushPeriod}
{
ResetPtrs();
+
+ if (BufferFlushPeriod_) {
+ BufferFlushThreadPtr_ = MakeHolder<TThread>([this] {RunBufferFlushThread();});
+ BufferFlushThreadPtr_->Start();
+ }
}
~TImpl() noexcept {
try {
+ if (BufferFlushThreadPtr_) {
+ BufferFlushThreadExitWaiter_.Signal();
+ }
+
Write();
FlushSync(GuaranteedWrittenPtr_, WrittenPtr_);
} catch (...) {
@@ -32,17 +50,7 @@ public:
Buffer_.Append(rec.Data, rec.Len);
if (Buffer_.size() >= MaxBufferSize_) {
- const i64 prevAlignedEndPtr = PageAlignedWrittenPtr_;
- Write();
-
- if (prevAlignedEndPtr < PageAlignedWrittenPtr_) {
- FlushAsync(prevAlignedEndPtr, PageAlignedWrittenPtr_);
- }
-
- const i64 minPendingCacheOffset = PageAlignedWrittenPtr_ - MaxPendingCacheSize_;
- if (minPendingCacheOffset > GuaranteedWrittenPtr_) {
- FlushSync(GuaranteedWrittenPtr_, minPendingCacheOffset);
- }
+ WriteAndFlush();
}
}
@@ -101,6 +109,36 @@ private:
GuaranteedWrittenPtr_ = to;
}
+ void WriteAndFlush() {
+ const i64 prevAlignedEndPtr = PageAlignedWrittenPtr_;
+ Write();
+
+ if (prevAlignedEndPtr < PageAlignedWrittenPtr_) {
+ FlushAsync(prevAlignedEndPtr, PageAlignedWrittenPtr_);
+ }
+
+ const i64 minPendingCacheOffset = PageAlignedWrittenPtr_ - MaxPendingCacheSize_;
+ if (minPendingCacheOffset > GuaranteedWrittenPtr_) {
+ FlushSync(GuaranteedWrittenPtr_, minPendingCacheOffset);
+ }
+ }
+
+ void RunBufferFlushThread() {
+ Y_ENSURE(BufferFlushPeriod_);
+ TInstant deadline;
+ do {
+ deadline = TInstant::Now() + *BufferFlushPeriod_;
+ try {
+ TGuard guard{Lock_};
+ if (!Buffer_.Empty()) {
+ WriteAndFlush();
+ }
+ } catch (...) {
+ Cerr << "Failed to flush eventlog buffer: " << CurrentExceptionMessage() << Endl;
+ }
+ } while (!BufferFlushThreadExitWaiter_.WaitD(deadline));
+ }
+
private:
TMutex Lock_;
TFile File_;
@@ -112,10 +150,26 @@ private:
i64 WrittenPtr_ = 0;
i64 PageAlignedWrittenPtr_ = 0;
i64 GuaranteedWrittenPtr_ = 0;
+
+ const TMaybe<TDuration> BufferFlushPeriod_;
+ TManualEvent BufferFlushThreadExitWaiter_;
+
+ // thread should be declared last to be destroyed before other props
+ THolder<TThread> BufferFlushThreadPtr_;
};
-TSyncPageCacheFileLogBackend::TSyncPageCacheFileLogBackend(const TString& path, size_t maxBufferSize, size_t maxPengingCacheSize)
- : Impl_(MakeHolder<TImpl>(path, maxBufferSize, maxPengingCacheSize))
+TSyncPageCacheFileLogBackend::TSyncPageCacheFileLogBackend(
+ const TString& path,
+ size_t maxBufferSize,
+ size_t maxPengingCacheSize,
+ TMaybe<TDuration> bufferFlushPeriod
+)
+ : Impl_(MakeHolder<TImpl>(
+ path,
+ maxBufferSize,
+ maxPengingCacheSize,
+ bufferFlushPeriod
+ ))
{}
TSyncPageCacheFileLogBackend::~TSyncPageCacheFileLogBackend() {
diff --git a/library/cpp/logger/sync_page_cache_file.h b/library/cpp/logger/sync_page_cache_file.h
index a36340651c7..e50aade08f9 100644
--- a/library/cpp/logger/sync_page_cache_file.h
+++ b/library/cpp/logger/sync_page_cache_file.h
@@ -2,12 +2,19 @@
#include "backend.h"
+#include <util/datetime/base.h>
#include <util/generic/fwd.h>
+#include <util/generic/maybe.h>
#include <util/generic/ptr.h>
class TSyncPageCacheFileLogBackend final: public TLogBackend {
public:
- TSyncPageCacheFileLogBackend(const TString& path, size_t maxBufferSize, size_t maxPendingCacheSize);
+ TSyncPageCacheFileLogBackend(
+ const TString& path,
+ size_t maxBufferSize,
+ size_t maxPendingCacheSize,
+ TMaybe<TDuration> bufferFlushPeriod = Nothing()
+ );
~TSyncPageCacheFileLogBackend();
void WriteData(const TLogRecord& rec) override;