aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2025-04-02 00:51:46 +0000
committerAlexander Smirnov <alex@ydb.tech>2025-04-02 00:51:46 +0000
commitf6416f5c4da60c922a0e1f07ec342fda00837d54 (patch)
treefd9b88400f5fc74b30a0ffacba8d6c0905b2fc6a /library/cpp
parentd30ab64d66bbeced779befb5033f1d81462dde2a (diff)
parent82a20ed13f9ce647673791b772685ee5997f256d (diff)
downloadydb-f6416f5c4da60c922a0e1f07ec342fda00837d54.tar.gz
Merge branch 'rightlib' into merge-libs-250402-0050
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/logger/sync_page_cache_file.cpp84
-rw-r--r--library/cpp/logger/sync_page_cache_file.h9
-rw-r--r--library/cpp/tld/tlds-alpha-by-domain.txt2
-rw-r--r--library/cpp/yt/error/error.cpp1
-rw-r--r--library/cpp/yt/error/unittests/error_ut.cpp16
5 files changed, 95 insertions, 17 deletions
diff --git a/library/cpp/logger/sync_page_cache_file.cpp b/library/cpp/logger/sync_page_cache_file.cpp
index 1424159af7..7f262c7ee7 100644
--- a/library/cpp/logger/sync_page_cache_file.cpp
+++ b/library/cpp/logger/sync_page_cache_file.cpp
@@ -1,26 +1,44 @@
#include "sync_page_cache_file.h"
+
#include "record.h"
#include <util/generic/buffer.h>
#include <util/generic/yexception.h>
+#include <util/system/align.h>
+#include <util/system/event.h>
#include <util/system/file.h>
#include <util/system/info.h>
#include <util/system/mutex.h>
-#include <util/system/align.h>
+#include <util/system/thread.h>
class TSyncPageCacheFileLogBackend::TImpl: public TNonCopyable {
public:
- TImpl(const TString& path, size_t maxBufferSize, size_t maxPendingCacheSize)
+ TImpl(
+ const TString& path,
+ size_t maxBufferSize,
+ size_t maxPendingCacheSize,
+ TMaybe<TDuration> bufferFlushPeriod
+ )
: File_{OpenFile(path)}
, MaxBufferSize_{maxBufferSize}
, MaxPendingCacheSize_{maxPendingCacheSize}
, Buffer_{maxBufferSize}
+ , BufferFlushPeriod_{bufferFlushPeriod}
{
ResetPtrs();
+
+ if (BufferFlushPeriod_) {
+ BufferFlushThreadPtr_ = MakeHolder<TThread>([this] {RunBufferFlushThread();});
+ BufferFlushThreadPtr_->Start();
+ }
}
~TImpl() noexcept {
try {
+ if (BufferFlushThreadPtr_) {
+ BufferFlushThreadExitWaiter_.Signal();
+ }
+
Write();
FlushSync(GuaranteedWrittenPtr_, WrittenPtr_);
} catch (...) {
@@ -32,17 +50,7 @@ public:
Buffer_.Append(rec.Data, rec.Len);
if (Buffer_.size() >= MaxBufferSize_) {
- const i64 prevAlignedEndPtr = PageAlignedWrittenPtr_;
- Write();
-
- if (prevAlignedEndPtr < PageAlignedWrittenPtr_) {
- FlushAsync(prevAlignedEndPtr, PageAlignedWrittenPtr_);
- }
-
- const i64 minPendingCacheOffset = PageAlignedWrittenPtr_ - MaxPendingCacheSize_;
- if (minPendingCacheOffset > GuaranteedWrittenPtr_) {
- FlushSync(GuaranteedWrittenPtr_, minPendingCacheOffset);
- }
+ WriteAndFlush();
}
}
@@ -101,6 +109,36 @@ private:
GuaranteedWrittenPtr_ = to;
}
+ void WriteAndFlush() {
+ const i64 prevAlignedEndPtr = PageAlignedWrittenPtr_;
+ Write();
+
+ if (prevAlignedEndPtr < PageAlignedWrittenPtr_) {
+ FlushAsync(prevAlignedEndPtr, PageAlignedWrittenPtr_);
+ }
+
+ const i64 minPendingCacheOffset = PageAlignedWrittenPtr_ - MaxPendingCacheSize_;
+ if (minPendingCacheOffset > GuaranteedWrittenPtr_) {
+ FlushSync(GuaranteedWrittenPtr_, minPendingCacheOffset);
+ }
+ }
+
+ void RunBufferFlushThread() {
+ Y_ENSURE(BufferFlushPeriod_);
+ TInstant deadline;
+ do {
+ deadline = TInstant::Now() + *BufferFlushPeriod_;
+ try {
+ TGuard guard{Lock_};
+ if (!Buffer_.Empty()) {
+ WriteAndFlush();
+ }
+ } catch (...) {
+ Cerr << "Failed to flush eventlog buffer: " << CurrentExceptionMessage() << Endl;
+ }
+ } while (!BufferFlushThreadExitWaiter_.WaitD(deadline));
+ }
+
private:
TMutex Lock_;
TFile File_;
@@ -112,10 +150,26 @@ private:
i64 WrittenPtr_ = 0;
i64 PageAlignedWrittenPtr_ = 0;
i64 GuaranteedWrittenPtr_ = 0;
+
+ const TMaybe<TDuration> BufferFlushPeriod_;
+ TManualEvent BufferFlushThreadExitWaiter_;
+
+ // thread should be declared last to be destroyed before other props
+ THolder<TThread> BufferFlushThreadPtr_;
};
-TSyncPageCacheFileLogBackend::TSyncPageCacheFileLogBackend(const TString& path, size_t maxBufferSize, size_t maxPengingCacheSize)
- : Impl_(MakeHolder<TImpl>(path, maxBufferSize, maxPengingCacheSize))
+TSyncPageCacheFileLogBackend::TSyncPageCacheFileLogBackend(
+ const TString& path,
+ size_t maxBufferSize,
+ size_t maxPengingCacheSize,
+ TMaybe<TDuration> bufferFlushPeriod
+)
+ : Impl_(MakeHolder<TImpl>(
+ path,
+ maxBufferSize,
+ maxPengingCacheSize,
+ bufferFlushPeriod
+ ))
{}
TSyncPageCacheFileLogBackend::~TSyncPageCacheFileLogBackend() {
diff --git a/library/cpp/logger/sync_page_cache_file.h b/library/cpp/logger/sync_page_cache_file.h
index a36340651c..e50aade08f 100644
--- a/library/cpp/logger/sync_page_cache_file.h
+++ b/library/cpp/logger/sync_page_cache_file.h
@@ -2,12 +2,19 @@
#include "backend.h"
+#include <util/datetime/base.h>
#include <util/generic/fwd.h>
+#include <util/generic/maybe.h>
#include <util/generic/ptr.h>
class TSyncPageCacheFileLogBackend final: public TLogBackend {
public:
- TSyncPageCacheFileLogBackend(const TString& path, size_t maxBufferSize, size_t maxPendingCacheSize);
+ TSyncPageCacheFileLogBackend(
+ const TString& path,
+ size_t maxBufferSize,
+ size_t maxPendingCacheSize,
+ TMaybe<TDuration> bufferFlushPeriod = Nothing()
+ );
~TSyncPageCacheFileLogBackend();
void WriteData(const TLogRecord& rec) override;
diff --git a/library/cpp/tld/tlds-alpha-by-domain.txt b/library/cpp/tld/tlds-alpha-by-domain.txt
index d55b476c70..a118629445 100644
--- a/library/cpp/tld/tlds-alpha-by-domain.txt
+++ b/library/cpp/tld/tlds-alpha-by-domain.txt
@@ -1,4 +1,4 @@
-# Version 2025033100, Last Updated Mon Mar 31 07:07:02 2025 UTC
+# Version 2025033101, Last Updated Tue Apr 1 07:07:01 2025 UTC
AAA
AARP
ABB
diff --git a/library/cpp/yt/error/error.cpp b/library/cpp/yt/error/error.cpp
index f097697cd6..ea71a07fee 100644
--- a/library/cpp/yt/error/error.cpp
+++ b/library/cpp/yt/error/error.cpp
@@ -275,6 +275,7 @@ TError::TErrorOr(const std::exception& ex)
*this = errorEx->Error();
} else {
*this = TError(NYT::EErrorCode::Generic, TRuntimeFormat{ex.what()});
+ *this <<= TErrorAttribute("exception_type", TypeName(ex));
}
YT_VERIFY(!IsOK());
Enrich();
diff --git a/library/cpp/yt/error/unittests/error_ut.cpp b/library/cpp/yt/error/unittests/error_ut.cpp
index 198aa1ecd8..a5576fad58 100644
--- a/library/cpp/yt/error/unittests/error_ut.cpp
+++ b/library/cpp/yt/error/unittests/error_ut.cpp
@@ -397,6 +397,22 @@ TEST(TErrorTest, FormatCtor)
EXPECT_EQ("Some error hello", TError("Some error %v", "hello").GetMessage());
}
+TEST(TErrorTest, ExceptionCtor)
+{
+ {
+ auto error = TError(std::runtime_error("Some error"));
+ EXPECT_EQ(error.GetMessage(), "Some error");
+ EXPECT_EQ(error.Attributes().Get<std::string>("exception_type"), "std::runtime_error");
+ }
+ EXPECT_EQ(TError(std::runtime_error("Some bad char sequences: %v %Qv {}")).GetMessage(),
+ "Some bad char sequences: %v %Qv {}");
+
+ EXPECT_EQ(TError(TSimpleException("Some error")).GetMessage(),
+ "Some error");
+ EXPECT_EQ(TError(TSimpleException("Some bad char sequences: %v %d {}")).GetMessage(),
+ "Some bad char sequences: %v %d {}");
+}
+
TEST(TErrorTest, FindRecursive)
{
auto inner = TError("Inner")