diff options
author | andrew-rykov <arykov@ydb.tech> | 2022-09-15 16:44:36 +0300 |
---|---|---|
committer | andrew-rykov <arykov@ydb.tech> | 2022-09-15 16:44:36 +0300 |
commit | 67ddc7744f2c537a293745383c33e324fbf08928 (patch) | |
tree | 596c2c382226b2cdaf9913e32a4fe41e7f0f1dda /library/cpp | |
parent | 343f348b324943ca4364be11cad7c9e69fec348c (diff) | |
download | ydb-67ddc7744f2c537a293745383c33e324fbf08928.tar.gz |
add log buffer 3
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/actors/core/log.cpp | 99 | ||||
-rw-r--r-- | library/cpp/actors/core/log.h | 25 | ||||
-rw-r--r-- | library/cpp/actors/core/log_buffer.cpp | 140 | ||||
-rw-r--r-- | library/cpp/actors/core/log_buffer.h | 47 | ||||
-rw-r--r-- | library/cpp/actors/core/log_iface.h | 9 | ||||
-rw-r--r-- | library/cpp/actors/core/log_ut.cpp | 22 |
6 files changed, 164 insertions, 178 deletions
diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp index 24fdcf1d92a..f3b3c5d19ae 100644 --- a/library/cpp/actors/core/log.cpp +++ b/library/cpp/actors/core/log.cpp @@ -37,7 +37,7 @@ namespace NActors { , Settings(settings) , LogBackend(logBackend.Release()) , Metrics(std::make_unique<TLoggerCounters>(counters)) - , LogBuffer(Metrics.get()) + , LogBuffer(*Metrics, *Settings) { } @@ -48,7 +48,7 @@ namespace NActors { , Settings(settings) , LogBackend(logBackend) , Metrics(std::make_unique<TLoggerCounters>(counters)) - , LogBuffer(Metrics.get()) + , LogBuffer(*Metrics, *Settings) { } @@ -59,7 +59,7 @@ namespace NActors { , Settings(settings) , LogBackend(logBackend.Release()) , Metrics(std::make_unique<TLoggerMetrics>(metrics)) - , LogBuffer(Metrics.get()) + , LogBuffer(*Metrics, *Settings) { } @@ -70,7 +70,7 @@ namespace NActors { , Settings(settings) , LogBackend(logBackend) , Metrics(std::make_unique<TLoggerMetrics>(metrics)) - , LogBuffer(Metrics.get()) + , LogBuffer(*Metrics, *Settings) { } @@ -97,41 +97,35 @@ namespace NActors { Y_UNUSED(settings); } - void TLoggerActor::LogIgnoredCount(TInstant now) { - TString message = Sprintf("Ignored IgnoredCount# %" PRIu64 " log records due to logger overflow!", IgnoredCount); - if (!OutputRecord(now, NActors::NLog::EPrio::Error, Settings->LoggerComponent, message)) { - BecomeDefunct(); + void TLoggerActor::FlushLogBufferMessage() { + if (!LogBuffer.IsEmpty()) { + NLog::TEvLog *log = LogBuffer.Pop(); + if (!OutputRecord(log)) { + BecomeDefunct(); + } + delete log; } } - void TLoggerActor::HandleIgnoredEvent(TLogIgnored::TPtr& ev, const NActors::TActorContext& ctx) { + void TLoggerActor::FlushLogBufferMessageEvent(TFlushLogBuffer::TPtr& ev, const NActors::TActorContext& ctx) { Y_UNUSED(ev); - LogIgnoredCount(ctx.Now()); - IgnoredCount = 0; - PassedCount = 0; - } - - void TLoggerActor::ReleaseLogBufferMessage() { - if (LogBuffer.GetLogsNumber() > 0) { - auto message = LogBuffer.GetMessage(); - if (!OutputRecord(message.Time, message.Priority, message.Component, message.Formatted)) { + FlushLogBufferMessage(); + + ui64 ignoredCount = LogBuffer.GetIgnoredCount(); + if (ignoredCount > 0) { + NLog::EPrio prio = LogBuffer.GetIgnoredHighestPrio(); + TString message = Sprintf("Logger overflow! Ignored %" PRIu64 " log records with priority [%s] or lower!", ignoredCount, PriorityToString(prio)); + if (!OutputRecord(ctx.Now(), NActors::NLog::EPrio::Error, Settings->LoggerComponent, message)) { BecomeDefunct(); } + LogBuffer.ClearIgnoredCount(); } - } - void TLoggerActor::ReleaseLogBufferMessageEvent(TReleaseLogBuffer::TPtr& ev, const NActors::TActorContext& ctx) { - Y_UNUSED(ev); - ReleaseLogBufferMessage(); - if (LogBuffer.GetLogsNumber() > 0) { - ctx.Send(ctx.SelfID, new TReleaseLogBuffer()); + if (!LogBuffer.IsEmpty()) { + ctx.Send(ctx.SelfID, ev->Release().Release()); } } - void TLoggerActor::HandleIgnoredEventDrop() { - // logger backend is unavailable, just ignore - } - void TLoggerActor::WriteMessageStat(const NLog::TEvLog& ev) { Metrics->IncActorMsgs(); @@ -150,50 +144,27 @@ namespace NActors { } - bool TLoggerActor::TryAddLogBuffer(TLogBufferMessage& lbm) { - ui64 messageSize = sizeof(lbm); - if (LogBuffer.GetSizeBytes() + messageSize < Settings->BufferSizeLimitBytes) - return LogBuffer.TryAddMessage(lbm); - - return false; - } - - bool TLoggerActor::TryKeepLog(TLogBufferMessage& lbm, const NActors::TActorContext& ctx) { - bool wasEmtpyBuffer = LogBuffer.IsEmpty(); - - if (TryAddLogBuffer(lbm)) { - } else if (!LogBuffer.TryReduceLogsNumber()) { - return false; - } else if (!TryAddLogBuffer(lbm)) { - return false; - } - - if (wasEmtpyBuffer) { - ctx.Send(ctx.SelfID, new TReleaseLogBuffer()); - } - return true; - } - void TLoggerActor::HandleLogEvent(NLog::TEvLog::TPtr& ev, const NActors::TActorContext& ctx) { i64 delayMillisec = (ctx.Now() - ev->Get()->Stamp).MilliSeconds(); WriteMessageStat(*ev->Get()); if (Settings->AllowDrop) { - if (PassedCount > 10 && delayMillisec > (i64)Settings->TimeThresholdMs || IgnoredCount > 0 || !LogBuffer.IsEmpty()) { - TLogBufferMessage lbm(ev); - if (!TryKeepLog(lbm, ctx)) { - Metrics->IncIgnoredMsgs(); - if (IgnoredCount == 0) { - ctx.Send(ctx.SelfID, new TLogIgnored()); - } - ++IgnoredCount; - PassedCount = 0; + if (PassedCount > 10 && delayMillisec > (i64)Settings->TimeThresholdMs || !LogBuffer.IsEmpty() || LogBuffer.CheckLogIgnoring()) { + if (LogBuffer.IsEmpty() && !LogBuffer.CheckLogIgnoring()) { + ctx.Send(ctx.SelfID, new TFlushLogBuffer()); + } + LogBuffer.AddLog(ev->Release().Release()); + PassedCount = 0; + + if (delayMillisec < (i64)Settings->TimeThresholdMs && !LogBuffer.CheckLogIgnoring()) { + FlushLogBufferMessage(); } return; } + PassedCount++; } - if (!OutputRecord(ev->Get()->Stamp, ev->Get()->Level.ToPrio(), ev->Get()->Component, ev->Get()->Line)) { + if (!OutputRecord(ev->Get())) { BecomeDefunct(); } } @@ -475,6 +446,10 @@ namespace NActors { constexpr size_t TimeBufSize = 512; + bool TLoggerActor::OutputRecord(NLog::TEvLog *evLog) noexcept { + return OutputRecord(evLog->Stamp, evLog->Level.ToPrio(), evLog->Component, evLog->Line); + } + bool TLoggerActor::OutputRecord(TInstant time, NLog::EPrio priority, NLog::EComponent component, const TString& formatted) noexcept try { const auto logPrio = ::ELogPriority(ui16(priority)); diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h index 947f8a35b5a..60469f5193a 100644 --- a/library/cpp/actors/core/log.h +++ b/library/cpp/actors/core/log.h @@ -166,15 +166,9 @@ namespace NActors { TString Explanation; }; - class TLogIgnored: public TEventLocal<TLogIgnored, int(NLog::EEv::Ignored)> { + class TFlushLogBuffer: public TEventLocal<TFlushLogBuffer, int(NLog::EEv::Buffer)> { public: - TLogIgnored() { - } - }; - - class TReleaseLogBuffer: public TEventLocal<TReleaseLogBuffer, int(NLog::EEv::Buffer)> { - public: - TReleaseLogBuffer() { + TFlushLogBuffer() { } }; @@ -203,8 +197,7 @@ namespace NActors { void StateFunc(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) { switch (ev->GetTypeRewrite()) { - HFunc(TLogIgnored, HandleIgnoredEvent); - HFunc(TReleaseLogBuffer, ReleaseLogBufferMessageEvent); + HFunc(TFlushLogBuffer, FlushLogBufferMessageEvent); HFunc(NLog::TEvLog, HandleLogEvent); HFunc(TLogComponentLevelRequest, HandleLogComponentLevelRequest); HFunc(NMon::TEvHttpInfo, HandleMonInfo); @@ -213,7 +206,6 @@ namespace NActors { STFUNC(StateDefunct) { switch (ev->GetTypeRewrite()) { - cFunc(TLogIgnored::EventType, HandleIgnoredEventDrop); hFunc(NLog::TEvLog, HandleLogEventDrop); HFunc(TLogComponentLevelRequest, HandleLogComponentLevelRequest); HFunc(NMon::TEvHttpInfo, HandleMonInfo); @@ -229,27 +221,22 @@ namespace NActors { private: TIntrusivePtr<NLog::TSettings> Settings; std::shared_ptr<TLogBackend> LogBackend; - ui64 IgnoredCount = 0; ui64 PassedCount = 0; TDuration WakeupInterval{TDuration::Seconds(5)}; std::unique_ptr<ILoggerMetrics> Metrics; TLogBuffer LogBuffer; void BecomeDefunct(); - void ReleaseLogBufferMessageEvent(TReleaseLogBuffer::TPtr& ev, const NActors::TActorContext& ctx); - void HandleIgnoredEvent(TLogIgnored::TPtr& ev, const NActors::TActorContext& ctx); - void HandleIgnoredEventDrop(); - bool TryAddLogBuffer(TLogBufferMessage& ev); - bool TryKeepLog(TLogBufferMessage& ev, const TActorContext& ctx); + void FlushLogBufferMessageEvent(TFlushLogBuffer::TPtr& ev, const NActors::TActorContext& ctx); void HandleLogEvent(NLog::TEvLog::TPtr& ev, const TActorContext& ctx); void HandleLogEventDrop(const NLog::TEvLog::TPtr& ev); void HandleLogComponentLevelRequest(TLogComponentLevelRequest::TPtr& ev, const TActorContext& ctx); void HandleMonInfo(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx); void HandleWakeup(); + [[nodiscard]] bool OutputRecord(NLog::TEvLog *evLog) noexcept; [[nodiscard]] bool OutputRecord(TInstant time, NLog::EPrio priority, NLog::EComponent component, const TString& formatted) noexcept; void RenderComponentPriorities(IOutputStream& str); - void LogIgnoredCount(TInstant now); - void ReleaseLogBufferMessage(); + void FlushLogBufferMessage(); void WriteMessageStat(const NLog::TEvLog& ev); static const char* FormatLocalTimestamp(TInstant time, char* buf); }; diff --git a/library/cpp/actors/core/log_buffer.cpp b/library/cpp/actors/core/log_buffer.cpp index 4a05f9a8c25..8c80f1d054b 100644 --- a/library/cpp/actors/core/log_buffer.cpp +++ b/library/cpp/actors/core/log_buffer.cpp @@ -6,93 +6,103 @@ using namespace NActors::NLog; namespace NActors { -TLogBufferMessage::TLogBufferMessage(NLog::TEvLog::TPtr& ev) - : Formatted(ev->Get()->Line) - , Time(ev->Get()->Stamp) - , Component(ev->Get()->Component) - , Priority(ev->Get()->Level.ToPrio()) -{} - -TLogBuffer::TLogBuffer(ILoggerMetrics *metrics) +TLogBuffer::TLogBuffer(ILoggerMetrics &metrics, const NLog::TSettings &settings) : Metrics(metrics) + , Settings(settings) {} -bool TLogBuffer::TryAddMessage(TLogBufferMessage message) { - Buffer.push_back(std::move(message)); - BufferSize += sizeof(message); - PrioStats[message.Priority]++; +size_t TLogBuffer::GetLogCostInBytes(NLog::TEvLog *log) const { + return LOG_STRUCTURE_BYTES + log->Line.length(); +} - return true; +ui16 TLogBuffer::GetPrioIndex(NLog::EPrio prio) { + return Min(ui16(prio), ui16(LOG_PRIORITIES_NUMBER - 1)); } -TLogBufferMessage TLogBuffer::GetMessage() { - auto message = Buffer.front(); +TIntrusiveList<NLog::TEvLog, NLog::TEvLogBufferLevelListTag> &TLogBuffer::GetPrioLogs(NLog::EPrio prio) { + return PrioLogsList[GetPrioIndex(prio)]; +} - ui64 messageSize = sizeof(message); - BufferSize -= messageSize; - Buffer.pop_front(); - PrioStats[message.Priority]--; +void TLogBuffer::AddLog(NLog::TEvLog *log) { + NLog::EPrio prio = log->Level.ToPrio(); + if (!CheckSize(log) && prio > NLog::EPrio::Emerg) { // always keep logs with prio Emerg = 0 + HandleIgnoredLog(log); + return; + } - return message; + SizeBytes += GetLogCostInBytes(log); + Logs.PushBack(log); + GetPrioLogs(prio).PushBack(log); } -bool TLogBuffer::IsEmpty() const { - return Buffer.empty(); +NLog::TEvLog* TLogBuffer::Pop() { + NLog::TEvLog* log = Logs.PopFront(); + static_cast<TIntrusiveListItem<TEvLog, TEvLogBufferLevelListTag>&>(*log).Unlink(); + + SizeBytes -= GetLogCostInBytes(log); + + return log; } -size_t TLogBuffer::GetLogsNumber() const { - return Buffer.size(); +bool TLogBuffer::IsEmpty() const { + return Logs.Empty(); } -ui64 TLogBuffer::GetSizeBytes() const { - return BufferSize; +bool TLogBuffer::CheckLogIgnoring() const { + return IgnoredCount > 0; } -void TLogBuffer::FilterByLogPriority(NLog::EPrio prio) { - bool isFirstRemoving = true; - auto it = Buffer.begin(); - while (it != Buffer.end()) - { - if (it->Priority >= prio) { - ui64 messageSize = sizeof(*it); - BufferSize -= messageSize; - Metrics->IncIgnoredMsgs(); - PrioStats[it->Priority]--; - - if (isFirstRemoving && prio > NLog::EPrio::Error) { - it->Priority = NLog::EPrio::Error; - it->Formatted = Sprintf("Ignored log records due to log buffer overflow! IgnoredCount# %" PRIu32 " ", PrioStats[prio]); - - PrioStats[NLog::EPrio::Error]++; - BufferSize += sizeof(*it); - it++; - isFirstRemoving = false; - } - else { - it = Buffer.erase(it); +bool TLogBuffer::CheckSize(NLog::TEvLog *log) { + size_t startSizeBytes = SizeBytes; + + size_t logSize = GetLogCostInBytes(log); + if (SizeBytes + logSize <= Settings.BufferSizeLimitBytes) { + return true; + } + + ui16 scanHighestPrio = Max((ui16)1, GetPrioIndex(log->Level.ToPrio())); // always keep logs with prio Emerg = 0 + for (ui16 scanPrio = LOG_PRIORITIES_NUMBER - 1; scanPrio >= scanHighestPrio; scanPrio--) { + TIntrusiveList<NLog::TEvLog, NLog::TEvLogBufferLevelListTag> &scanLogs = PrioLogsList[scanPrio]; + while (!scanLogs.Empty()) { + NLog::TEvLog* log = scanLogs.PopFront(); + SizeBytes -= GetLogCostInBytes(log); + HandleIgnoredLog(log); + + if (SizeBytes + logSize <= Settings.BufferSizeLimitBytes) { + return true; } } - else { - it++; - } } -} -bool inline TLogBuffer::CheckMessagesNumberEnoughForClearing(ui32 number) { - return number * 10 > Buffer.size(); + if (startSizeBytes > SizeBytes) { + return true; + } + + return false; } -bool TLogBuffer::TryReduceLogsNumber() { - ui32 removeLogsNumber = 0; - for (ui16 p = ui16(NLog::EPrio::Trace); p > ui16(NLog::EPrio::Alert); p--) - { - NLog::EPrio prio = static_cast<NLog::EPrio>(p); - removeLogsNumber += PrioStats[prio]; - if (CheckMessagesNumberEnoughForClearing(removeLogsNumber)) { - FilterByLogPriority(prio); - return true; - } +void TLogBuffer::HandleIgnoredLog(NLog::TEvLog *log) { + ui16 logPrio = GetPrioIndex(log->Level.ToPrio()); + Metrics.IncIgnoredMsgs(); + if (IgnoredHighestPrio > logPrio) { + IgnoredHighestPrio = logPrio; } - return false; + IgnoredCount++; + delete log; } + +ui64 TLogBuffer::GetIgnoredCount() { + return IgnoredCount; +} + +NLog::EPrio TLogBuffer::GetIgnoredHighestPrio() { + NLog::EPrio prio = static_cast<NLog::EPrio>(IgnoredHighestPrio); + return prio; +} + +void TLogBuffer::ClearIgnoredCount() { + IgnoredHighestPrio = LOG_PRIORITIES_NUMBER - 1; + IgnoredCount = 0; +} + } diff --git a/library/cpp/actors/core/log_buffer.h b/library/cpp/actors/core/log_buffer.h index f40cbae65a8..60bc09cc855 100644 --- a/library/cpp/actors/core/log_buffer.h +++ b/library/cpp/actors/core/log_buffer.h @@ -2,36 +2,39 @@ #include "log_metrics.h" #include "log_iface.h" +#include "log_settings.h" -#include <deque> +#include <util/generic/intrlist.h> namespace NActors { +class TLogBuffer { + static const size_t LOG_STRUCTURE_BYTES = sizeof(NLog::TEvLog); + static const ui16 LOG_PRIORITIES_NUMBER = 9; + + ILoggerMetrics &Metrics; + const NLog::TSettings &Settings; -struct TLogBufferMessage { - TString Formatted; - TInstant Time; - NLog::EComponent Component; - NLog::EPrio Priority; - - TLogBufferMessage(NLog::TEvLog::TPtr& ev); -}; + TIntrusiveListWithAutoDelete<NLog::TEvLog, TDelete, NLog::TEvLogBufferMainListTag> Logs; + TIntrusiveList<NLog::TEvLog, NLog::TEvLogBufferLevelListTag> PrioLogsList[LOG_PRIORITIES_NUMBER]; -class TLogBuffer { - ILoggerMetrics *Metrics; - std::deque<TLogBufferMessage> Buffer; - ui64 BufferSize = 0; - std::map<NLog::EPrio, ui32> PrioStats; + ui64 SizeBytes = 0; + ui64 IgnoredCount = 0; + ui16 IgnoredHighestPrio = LOG_PRIORITIES_NUMBER - 1; - void FilterByLogPriority(NLog::EPrio prio); - bool inline CheckMessagesNumberEnoughForClearing(ui32 number); + size_t GetLogCostInBytes(NLog::TEvLog *log) const; + void HandleIgnoredLog(NLog::TEvLog *log); + bool CheckSize(NLog::TEvLog *log); + static inline ui16 GetPrioIndex(NLog::EPrio); + inline TIntrusiveList<NLog::TEvLog, NLog::TEvLogBufferLevelListTag> &GetPrioLogs(NLog::EPrio); public: - TLogBuffer(ILoggerMetrics *metrics); - bool TryAddMessage(TLogBufferMessage message); - TLogBufferMessage GetMessage(); + TLogBuffer(ILoggerMetrics &metrics, const NLog::TSettings &Settings); + void AddLog(NLog::TEvLog *log); + NLog::TEvLog *Pop(); bool IsEmpty() const; - size_t GetLogsNumber() const; - ui64 GetSizeBytes() const; - bool TryReduceLogsNumber(); + bool CheckLogIgnoring() const; + ui64 GetIgnoredCount(); + NLog::EPrio GetIgnoredHighestPrio(); + void ClearIgnoredCount(); }; } diff --git a/library/cpp/actors/core/log_iface.h b/library/cpp/actors/core/log_iface.h index 3296863538a..d929794f4a4 100644 --- a/library/cpp/actors/core/log_iface.h +++ b/library/cpp/actors/core/log_iface.h @@ -74,7 +74,14 @@ namespace NActors { static_assert(int(EEv::End) < EventSpaceEnd(TEvents::ES_LOGGER), ""); - class TEvLog: public TEventLocal<TEvLog, int(EEv::Log)> { + struct TEvLogBufferMainListTag {}; + struct TEvLogBufferLevelListTag {}; + + class TEvLog + : public TEventLocal<TEvLog, int(EEv::Log)> + , public TIntrusiveListItem<TEvLog, TEvLogBufferMainListTag> + , public TIntrusiveListItem<TEvLog, TEvLogBufferLevelListTag> + { public: TEvLog(TInstant stamp, TLevel level, EComponent comp, const TString &line) : Stamp(stamp) diff --git a/library/cpp/actors/core/log_ut.cpp b/library/cpp/actors/core/log_ut.cpp index e6508738cca..995e3c4121c 100644 --- a/library/cpp/actors/core/log_ut.cpp +++ b/library/cpp/actors/core/log_ut.cpp @@ -108,12 +108,12 @@ namespace { Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(TInstant::Zero(), TLevel{EPrio::Emerg}, 0, "foo")}); } - void WriteLog(TInstant ts, EPrio prio = EPrio::Emerg) { - Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(ts, TLevel{prio}, 0, "foo")}); + void WriteLog(TInstant ts, EPrio prio = EPrio::Emerg, TString msg = "foo") { + Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(ts, TLevel{prio}, 0, msg)}); } - void ReleaseLogBuffer() { - Runtime.Send(new IEventHandle{LoggerActor, {}, new TReleaseLogBuffer()}); + void FlushLogBuffer() { + Runtime.Send(new IEventHandle{LoggerActor, {}, new TFlushLogBuffer()}); } void Wakeup() { @@ -219,11 +219,15 @@ Y_UNIT_TEST_SUITE(TLoggerActorTest) { auto now = test.Runtime.GetCurrentTime(); for (auto i = 0; i < COUNT; ++i) { - test.WriteLog(now - TDuration::Seconds(10), EPrio::Debug); + test.WriteLog(now - TDuration::Seconds(10), EPrio::Debug, std::to_string(i)); } for (auto i = 0; i < COUNT; ++i) { - test.ReleaseLogBuffer(); + test.FlushLogBuffer(); + } + + for (ui64 i = 0; i < messages.size(); ++i) { + Cerr << messages[i] << Endl; } return messages.size(); @@ -231,15 +235,15 @@ Y_UNIT_TEST_SUITE(TLoggerActorTest) { Y_UNIT_TEST(ShouldUseLogBufferWhenOverloaded) { TFixture test{BufferSettings(1024 * 1024 * 300)}; - const auto LOG_COUNT = 500; + const auto LOG_COUNT = 100; auto outputLogSize = BufferTest(test, LOG_COUNT); UNIT_ASSERT_VALUES_EQUAL(outputLogSize, LOG_COUNT); } - Y_UNIT_TEST(ShouldLoseLogsWithoutBuffer) { + Y_UNIT_TEST(ShouldLoseLogsIfBufferZeroSize) { TFixture test{NoBufferSettings()}; - const auto LOG_COUNT = 500; + const auto LOG_COUNT = 100; auto outputLogSize = BufferTest(test, LOG_COUNT); UNIT_ASSERT(outputLogSize < LOG_COUNT); |