diff options
author | Alexander Smirnov <alex@ydb.tech> | 2025-04-02 00:51:46 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2025-04-02 00:51:46 +0000 |
commit | f6416f5c4da60c922a0e1f07ec342fda00837d54 (patch) | |
tree | fd9b88400f5fc74b30a0ffacba8d6c0905b2fc6a /library/cpp | |
parent | d30ab64d66bbeced779befb5033f1d81462dde2a (diff) | |
parent | 82a20ed13f9ce647673791b772685ee5997f256d (diff) | |
download | ydb-f6416f5c4da60c922a0e1f07ec342fda00837d54.tar.gz |
Merge branch 'rightlib' into merge-libs-250402-0050
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/logger/sync_page_cache_file.cpp | 84 | ||||
-rw-r--r-- | library/cpp/logger/sync_page_cache_file.h | 9 | ||||
-rw-r--r-- | library/cpp/tld/tlds-alpha-by-domain.txt | 2 | ||||
-rw-r--r-- | library/cpp/yt/error/error.cpp | 1 | ||||
-rw-r--r-- | library/cpp/yt/error/unittests/error_ut.cpp | 16 |
5 files changed, 95 insertions, 17 deletions
diff --git a/library/cpp/logger/sync_page_cache_file.cpp b/library/cpp/logger/sync_page_cache_file.cpp index 1424159af7..7f262c7ee7 100644 --- a/library/cpp/logger/sync_page_cache_file.cpp +++ b/library/cpp/logger/sync_page_cache_file.cpp @@ -1,26 +1,44 @@ #include "sync_page_cache_file.h" + #include "record.h" #include <util/generic/buffer.h> #include <util/generic/yexception.h> +#include <util/system/align.h> +#include <util/system/event.h> #include <util/system/file.h> #include <util/system/info.h> #include <util/system/mutex.h> -#include <util/system/align.h> +#include <util/system/thread.h> class TSyncPageCacheFileLogBackend::TImpl: public TNonCopyable { public: - TImpl(const TString& path, size_t maxBufferSize, size_t maxPendingCacheSize) + TImpl( + const TString& path, + size_t maxBufferSize, + size_t maxPendingCacheSize, + TMaybe<TDuration> bufferFlushPeriod + ) : File_{OpenFile(path)} , MaxBufferSize_{maxBufferSize} , MaxPendingCacheSize_{maxPendingCacheSize} , Buffer_{maxBufferSize} + , BufferFlushPeriod_{bufferFlushPeriod} { ResetPtrs(); + + if (BufferFlushPeriod_) { + BufferFlushThreadPtr_ = MakeHolder<TThread>([this] {RunBufferFlushThread();}); + BufferFlushThreadPtr_->Start(); + } } ~TImpl() noexcept { try { + if (BufferFlushThreadPtr_) { + BufferFlushThreadExitWaiter_.Signal(); + } + Write(); FlushSync(GuaranteedWrittenPtr_, WrittenPtr_); } catch (...) { @@ -32,17 +50,7 @@ public: Buffer_.Append(rec.Data, rec.Len); if (Buffer_.size() >= MaxBufferSize_) { - const i64 prevAlignedEndPtr = PageAlignedWrittenPtr_; - Write(); - - if (prevAlignedEndPtr < PageAlignedWrittenPtr_) { - FlushAsync(prevAlignedEndPtr, PageAlignedWrittenPtr_); - } - - const i64 minPendingCacheOffset = PageAlignedWrittenPtr_ - MaxPendingCacheSize_; - if (minPendingCacheOffset > GuaranteedWrittenPtr_) { - FlushSync(GuaranteedWrittenPtr_, minPendingCacheOffset); - } + WriteAndFlush(); } } @@ -101,6 +109,36 @@ private: GuaranteedWrittenPtr_ = to; } + void WriteAndFlush() { + const i64 prevAlignedEndPtr = PageAlignedWrittenPtr_; + Write(); + + if (prevAlignedEndPtr < PageAlignedWrittenPtr_) { + FlushAsync(prevAlignedEndPtr, PageAlignedWrittenPtr_); + } + + const i64 minPendingCacheOffset = PageAlignedWrittenPtr_ - MaxPendingCacheSize_; + if (minPendingCacheOffset > GuaranteedWrittenPtr_) { + FlushSync(GuaranteedWrittenPtr_, minPendingCacheOffset); + } + } + + void RunBufferFlushThread() { + Y_ENSURE(BufferFlushPeriod_); + TInstant deadline; + do { + deadline = TInstant::Now() + *BufferFlushPeriod_; + try { + TGuard guard{Lock_}; + if (!Buffer_.Empty()) { + WriteAndFlush(); + } + } catch (...) { + Cerr << "Failed to flush eventlog buffer: " << CurrentExceptionMessage() << Endl; + } + } while (!BufferFlushThreadExitWaiter_.WaitD(deadline)); + } + private: TMutex Lock_; TFile File_; @@ -112,10 +150,26 @@ private: i64 WrittenPtr_ = 0; i64 PageAlignedWrittenPtr_ = 0; i64 GuaranteedWrittenPtr_ = 0; + + const TMaybe<TDuration> BufferFlushPeriod_; + TManualEvent BufferFlushThreadExitWaiter_; + + // thread should be declared last to be destroyed before other props + THolder<TThread> BufferFlushThreadPtr_; }; -TSyncPageCacheFileLogBackend::TSyncPageCacheFileLogBackend(const TString& path, size_t maxBufferSize, size_t maxPengingCacheSize) - : Impl_(MakeHolder<TImpl>(path, maxBufferSize, maxPengingCacheSize)) +TSyncPageCacheFileLogBackend::TSyncPageCacheFileLogBackend( + const TString& path, + size_t maxBufferSize, + size_t maxPengingCacheSize, + TMaybe<TDuration> bufferFlushPeriod +) + : Impl_(MakeHolder<TImpl>( + path, + maxBufferSize, + maxPengingCacheSize, + bufferFlushPeriod + )) {} TSyncPageCacheFileLogBackend::~TSyncPageCacheFileLogBackend() { diff --git a/library/cpp/logger/sync_page_cache_file.h b/library/cpp/logger/sync_page_cache_file.h index a36340651c..e50aade08f 100644 --- a/library/cpp/logger/sync_page_cache_file.h +++ b/library/cpp/logger/sync_page_cache_file.h @@ -2,12 +2,19 @@ #include "backend.h" +#include <util/datetime/base.h> #include <util/generic/fwd.h> +#include <util/generic/maybe.h> #include <util/generic/ptr.h> class TSyncPageCacheFileLogBackend final: public TLogBackend { public: - TSyncPageCacheFileLogBackend(const TString& path, size_t maxBufferSize, size_t maxPendingCacheSize); + TSyncPageCacheFileLogBackend( + const TString& path, + size_t maxBufferSize, + size_t maxPendingCacheSize, + TMaybe<TDuration> bufferFlushPeriod = Nothing() + ); ~TSyncPageCacheFileLogBackend(); void WriteData(const TLogRecord& rec) override; diff --git a/library/cpp/tld/tlds-alpha-by-domain.txt b/library/cpp/tld/tlds-alpha-by-domain.txt index d55b476c70..a118629445 100644 --- a/library/cpp/tld/tlds-alpha-by-domain.txt +++ b/library/cpp/tld/tlds-alpha-by-domain.txt @@ -1,4 +1,4 @@ -# Version 2025033100, Last Updated Mon Mar 31 07:07:02 2025 UTC +# Version 2025033101, Last Updated Tue Apr 1 07:07:01 2025 UTC AAA AARP ABB diff --git a/library/cpp/yt/error/error.cpp b/library/cpp/yt/error/error.cpp index f097697cd6..ea71a07fee 100644 --- a/library/cpp/yt/error/error.cpp +++ b/library/cpp/yt/error/error.cpp @@ -275,6 +275,7 @@ TError::TErrorOr(const std::exception& ex) *this = errorEx->Error(); } else { *this = TError(NYT::EErrorCode::Generic, TRuntimeFormat{ex.what()}); + *this <<= TErrorAttribute("exception_type", TypeName(ex)); } YT_VERIFY(!IsOK()); Enrich(); diff --git a/library/cpp/yt/error/unittests/error_ut.cpp b/library/cpp/yt/error/unittests/error_ut.cpp index 198aa1ecd8..a5576fad58 100644 --- a/library/cpp/yt/error/unittests/error_ut.cpp +++ b/library/cpp/yt/error/unittests/error_ut.cpp @@ -397,6 +397,22 @@ TEST(TErrorTest, FormatCtor) EXPECT_EQ("Some error hello", TError("Some error %v", "hello").GetMessage()); } +TEST(TErrorTest, ExceptionCtor) +{ + { + auto error = TError(std::runtime_error("Some error")); + EXPECT_EQ(error.GetMessage(), "Some error"); + EXPECT_EQ(error.Attributes().Get<std::string>("exception_type"), "std::runtime_error"); + } + EXPECT_EQ(TError(std::runtime_error("Some bad char sequences: %v %Qv {}")).GetMessage(), + "Some bad char sequences: %v %Qv {}"); + + EXPECT_EQ(TError(TSimpleException("Some error")).GetMessage(), + "Some error"); + EXPECT_EQ(TError(TSimpleException("Some bad char sequences: %v %d {}")).GetMessage(), + "Some bad char sequences: %v %d {}"); +} + TEST(TErrorTest, FindRecursive) { auto inner = TError("Inner") |