diff options
author | andrew-rykov <arykov@ydb.tech> | 2022-09-13 12:49:05 +0300 |
---|---|---|
committer | andrew-rykov <arykov@ydb.tech> | 2022-09-13 12:49:05 +0300 |
commit | cee4a99ba93f21e3d30b5f1d58c84ab6ca41981b (patch) | |
tree | d75cbc3a1ad20a211c550d8f29634eff47f5e48d /library/cpp | |
parent | 8c1af103661148f5377e712cb2ac5623672522a1 (diff) | |
download | ydb-cee4a99ba93f21e3d30b5f1d58c84ab6ca41981b.tar.gz |
add-log-buffer 2
add log buffer
changed names
added move
changed details
fixed released buffer procedure
returned condition IgnoredCount > 0
default bufferSizeLimitBytes = 0
returned passedCount declaration
returned Y_VERIFY
removed passedcount
changed buffer reducing
no new line at the end of file
added srcs in ya.make
add log buffer
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/actors/core/CMakeLists.txt | 1 | ||||
-rw-r--r-- | library/cpp/actors/core/log.cpp | 201 | ||||
-rw-r--r-- | library/cpp/actors/core/log.h | 33 | ||||
-rw-r--r-- | library/cpp/actors/core/log_buffer.cpp | 98 | ||||
-rw-r--r-- | library/cpp/actors/core/log_buffer.h | 37 | ||||
-rw-r--r-- | library/cpp/actors/core/log_iface.h | 1 | ||||
-rw-r--r-- | library/cpp/actors/core/log_metrics.h | 152 | ||||
-rw-r--r-- | library/cpp/actors/core/log_settings.cpp | 6 | ||||
-rw-r--r-- | library/cpp/actors/core/log_settings.h | 5 | ||||
-rw-r--r-- | library/cpp/actors/core/log_ut.cpp | 68 |
10 files changed, 432 insertions, 170 deletions
diff --git a/library/cpp/actors/core/CMakeLists.txt b/library/cpp/actors/core/CMakeLists.txt index 3dc01c29c15..385c419b3a5 100644 --- a/library/cpp/actors/core/CMakeLists.txt +++ b/library/cpp/actors/core/CMakeLists.txt @@ -46,6 +46,7 @@ target_sources(cpp-actors-core PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/io_dispatcher.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/log.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/log_settings.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/log_buffer.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/mailbox.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic_provider.cpp diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp index d005be65355..eb8b19193c4 100644 --- a/library/cpp/actors/core/log.cpp +++ b/library/cpp/actors/core/log.cpp @@ -1,7 +1,4 @@ #include "log.h" -#include "log_settings.h" - -#include <library/cpp/monlib/service/pages/templates.h> static_assert(int(NActors::NLog::PRI_EMERG) == int(::TLOG_EMERG), "expect int(NActors::NLog::PRI_EMERG) == int(::TLOG_EMERG)"); static_assert(int(NActors::NLog::PRI_ALERT) == int(::TLOG_ALERT), "expect int(NActors::NLog::PRI_ALERT) == int(::TLOG_ALERT)"); @@ -33,137 +30,6 @@ namespace { } namespace NActors { - - class TLoggerCounters : public ILoggerMetrics { - public: - TLoggerCounters(TIntrusivePtr<NMonitoring::TDynamicCounters> counters) - : DynamicCounters(counters) - { - ActorMsgs_ = DynamicCounters->GetCounter("ActorMsgs", true); - DirectMsgs_ = DynamicCounters->GetCounter("DirectMsgs", true); - LevelRequests_ = DynamicCounters->GetCounter("LevelRequests", true); - IgnoredMsgs_ = DynamicCounters->GetCounter("IgnoredMsgs", true); - DroppedMsgs_ = DynamicCounters->GetCounter("DroppedMsgs", true); - - AlertMsgs_ = DynamicCounters->GetCounter("AlertMsgs", true); - EmergMsgs_ = DynamicCounters->GetCounter("EmergMsgs", true); - } - - ~TLoggerCounters() = default; - - void IncActorMsgs() override { - ++*ActorMsgs_; - } - void IncDirectMsgs() override { - ++*DirectMsgs_; - } - void IncLevelRequests() override { - ++*LevelRequests_; - } - void IncIgnoredMsgs() override { - ++*IgnoredMsgs_; - } - void IncAlertMsgs() override { - ++*AlertMsgs_; - } - void IncEmergMsgs() override { - ++*EmergMsgs_; - } - void IncDroppedMsgs() override { - DroppedMsgs_->Inc(); - }; - - void GetOutputHtml(IOutputStream& str) override { - HTML(str) { - DIV_CLASS("row") { - DIV_CLASS("col-md-12") { - TAG(TH4) { - str << "Counters" << Endl; - } - DynamicCounters->OutputHtml(str); - } - } - } - } - - private: - NMonitoring::TDynamicCounters::TCounterPtr ActorMsgs_; - NMonitoring::TDynamicCounters::TCounterPtr DirectMsgs_; - NMonitoring::TDynamicCounters::TCounterPtr LevelRequests_; - NMonitoring::TDynamicCounters::TCounterPtr IgnoredMsgs_; - NMonitoring::TDynamicCounters::TCounterPtr AlertMsgs_; - NMonitoring::TDynamicCounters::TCounterPtr EmergMsgs_; - // Dropped while the logger backend was unavailable - NMonitoring::TDynamicCounters::TCounterPtr DroppedMsgs_; - - TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters; - }; - - class TLoggerMetrics : public ILoggerMetrics { - public: - TLoggerMetrics(std::shared_ptr<NMonitoring::TMetricRegistry> metrics) - : Metrics(metrics) - { - ActorMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.actor_msgs"}}); - DirectMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.direct_msgs"}}); - LevelRequests_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.level_requests"}}); - IgnoredMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.ignored_msgs"}}); - DroppedMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.dropped_msgs"}}); - - AlertMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.alert_msgs"}}); - EmergMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.emerg_msgs"}}); - } - - ~TLoggerMetrics() = default; - - void IncActorMsgs() override { - ActorMsgs_->Inc(); - } - void IncDirectMsgs() override { - DirectMsgs_->Inc(); - } - void IncLevelRequests() override { - LevelRequests_->Inc(); - } - void IncIgnoredMsgs() override { - IgnoredMsgs_->Inc(); - } - void IncAlertMsgs() override { - AlertMsgs_->Inc(); - } - void IncEmergMsgs() override { - EmergMsgs_->Inc(); - } - void IncDroppedMsgs() override { - DroppedMsgs_->Inc(); - }; - - void GetOutputHtml(IOutputStream& str) override { - HTML(str) { - DIV_CLASS("row") { - DIV_CLASS("col-md-12") { - TAG(TH4) { - str << "Metrics" << Endl; - } - // TODO: Now, TMetricRegistry does not have the GetOutputHtml function - } - } - } - } - - private: - NMonitoring::TRate* ActorMsgs_; - NMonitoring::TRate* DirectMsgs_; - NMonitoring::TRate* LevelRequests_; - NMonitoring::TRate* IgnoredMsgs_; - NMonitoring::TRate* AlertMsgs_; - NMonitoring::TRate* EmergMsgs_; - // Dropped while the logger backend was unavailable - NMonitoring::TRate* DroppedMsgs_; - - std::shared_ptr<NMonitoring::TMetricRegistry> Metrics; - }; - TAtomic TLoggerActor::IsOverflow = 0; TLoggerActor::TLoggerActor(TIntrusivePtr<NLog::TSettings> settings, @@ -173,6 +39,7 @@ namespace NActors { , Settings(settings) , LogBackend(logBackend.Release()) , Metrics(std::make_unique<TLoggerCounters>(counters)) + , LogBuffer(Metrics.get()) { } @@ -183,6 +50,7 @@ namespace NActors { , Settings(settings) , LogBackend(logBackend) , Metrics(std::make_unique<TLoggerCounters>(counters)) + , LogBuffer(Metrics.get()) { } @@ -193,6 +61,7 @@ namespace NActors { , Settings(settings) , LogBackend(logBackend.Release()) , Metrics(std::make_unique<TLoggerMetrics>(metrics)) + , LogBuffer(Metrics.get()) { } @@ -203,6 +72,7 @@ namespace NActors { , Settings(settings) , LogBackend(logBackend) , Metrics(std::make_unique<TLoggerMetrics>(metrics)) + , LogBuffer(Metrics.get()) { } @@ -242,6 +112,23 @@ namespace NActors { PassedCount = 0; } + void TLoggerActor::ReleaseLogBufferMessage() { + if (LogBuffer.GetLogsNumber() > 0) { + auto message = LogBuffer.GetMessage(); + if (!OutputRecord(message.Time, message.Priority, message.Component, message.Formatted)) { + BecomeDefunct(); + } + } + } + + void TLoggerActor::ReleaseLogBufferMessageEvent(TReleaseLogBuffer::TPtr& ev, const NActors::TActorContext& ctx) { + Y_UNUSED(ev); + ReleaseLogBufferMessage(); + if (LogBuffer.GetLogsNumber() > 0) { + ctx.Send(ctx.SelfID, new TReleaseLogBuffer()); + } + } + void TLoggerActor::HandleIgnoredEventDrop() { // logger backend is unavailable, just ignore } @@ -264,6 +151,30 @@ namespace NActors { } + bool TLoggerActor::TryAddLogBuffer(TLogBufferMessage& lbm) { + ui64 messageSize = sizeof(lbm); + if (LogBuffer.GetSizeBytes() + messageSize < Settings->BufferSizeLimitBytes) + return LogBuffer.TryAddMessage(lbm); + + return false; + } + + bool TLoggerActor::TryKeepLog(TLogBufferMessage& lbm, const NActors::TActorContext& ctx) { + bool wasEmtpyBuffer = LogBuffer.IsEmpty(); + + if (TryAddLogBuffer(lbm)) { + } else if (!LogBuffer.TryReduceLogsNumber()) { + return false; + } else if (!TryAddLogBuffer(lbm)) { + return false; + } + + if (wasEmtpyBuffer) { + ctx.Send(ctx.SelfID, new TReleaseLogBuffer()); + } + return true; + } + void TLoggerActor::HandleLogEvent(NLog::TEvLog::TPtr& ev, const NActors::TActorContext& ctx) { i64 delayMillisec = (ctx.Now() - ev->Get()->Stamp).MilliSeconds(); WriteMessageStat(*ev->Get()); @@ -271,16 +182,17 @@ namespace NActors { // Disable throttling if it was enabled previously if (AtomicGet(IsOverflow)) AtomicSet(IsOverflow, 0); - - // Check if some records have to be dropped - if ((PassedCount > 10 && delayMillisec > (i64)Settings->TimeThresholdMs) || IgnoredCount > 0) { - Metrics->IncIgnoredMsgs(); - if (IgnoredCount == 0) { - ctx.Send(ctx.SelfID, new TLogIgnored()); + if (PassedCount > 10 && delayMillisec > (i64)Settings->TimeThresholdMs || IgnoredCount > 0 || !LogBuffer.IsEmpty()) { + TLogBufferMessage lbm(ev); + if (!TryKeepLog(lbm, ctx)) { + Metrics->IncIgnoredMsgs(); + if (IgnoredCount == 0) { + ctx.Send(ctx.SelfID, new TLogIgnored()); + } + ++IgnoredCount; + PassedCount = 0; } - ++IgnoredCount; - PassedCount = 0; - return; + return; } PassedCount++; } else { @@ -291,8 +203,7 @@ namespace NActors { AtomicSet(IsOverflow, 0); } - const auto prio = ev->Get()->Level.ToPrio(); - if (!OutputRecord(ev->Get()->Stamp, prio, ev->Get()->Component, ev->Get()->Line)) { + if (!OutputRecord(ev->Get()->Stamp, ev->Get()->Level.ToPrio(), ev->Get()->Component, ev->Get()->Line)) { BecomeDefunct(); } } diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h index c11a7cf3c19..7c6921bca7b 100644 --- a/library/cpp/actors/core/log.h +++ b/library/cpp/actors/core/log.h @@ -4,6 +4,8 @@ #include "log_iface.h" #include "log_settings.h" +#include "log_metrics.h" +#include "log_buffer.h" #include "actorsystem.h" #include "events.h" #include "event_local.h" @@ -14,8 +16,6 @@ #include <util/string/printf.h> #include <util/string/builder.h> #include <library/cpp/logger/all.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> -#include <library/cpp/monlib/metrics/metric_registry.h> #include <library/cpp/json/writer/json.h> #include <library/cpp/svnversion/svnversion.h> @@ -172,24 +172,15 @@ namespace NActors { } }; - //////////////////////////////////////////////////////////////////////////////// - // LOGGER ACTOR - //////////////////////////////////////////////////////////////////////////////// - class ILoggerMetrics { + class TReleaseLogBuffer: public TEventLocal<TReleaseLogBuffer, int(NLog::EEv::Buffer)> { public: - virtual ~ILoggerMetrics() = default; - - virtual void IncActorMsgs() = 0; - virtual void IncDirectMsgs() = 0; - virtual void IncLevelRequests() = 0; - virtual void IncIgnoredMsgs() = 0; - virtual void IncAlertMsgs() = 0; - virtual void IncEmergMsgs() = 0; - virtual void IncDroppedMsgs() = 0; - - virtual void GetOutputHtml(IOutputStream&) = 0; + TReleaseLogBuffer() { + } }; + //////////////////////////////////////////////////////////////////////////////// + // LOGGER ACTOR + //////////////////////////////////////////////////////////////////////////////// class TLoggerActor: public TActor<TLoggerActor> { public: static constexpr IActor::EActivityType ActorActivityType() { @@ -210,9 +201,10 @@ namespace NActors { std::shared_ptr<NMonitoring::TMetricRegistry> metrics); ~TLoggerActor(); - void StateFunc(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) { + void StateFunc(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) { switch (ev->GetTypeRewrite()) { HFunc(TLogIgnored, HandleIgnoredEvent); + HFunc(TReleaseLogBuffer, ReleaseLogBufferMessageEvent); HFunc(NLog::TEvLog, HandleLogEvent); HFunc(TLogComponentLevelRequest, HandleLogComponentLevelRequest); HFunc(NMon::TEvHttpInfo, HandleMonInfo); @@ -242,10 +234,14 @@ namespace NActors { static TAtomic IsOverflow; TDuration WakeupInterval{TDuration::Seconds(5)}; std::unique_ptr<ILoggerMetrics> Metrics; + TLogBuffer LogBuffer; void BecomeDefunct(); + void ReleaseLogBufferMessageEvent(TReleaseLogBuffer::TPtr& ev, const NActors::TActorContext& ctx); void HandleIgnoredEvent(TLogIgnored::TPtr& ev, const NActors::TActorContext& ctx); void HandleIgnoredEventDrop(); + bool TryAddLogBuffer(TLogBufferMessage& ev); + bool TryKeepLog(TLogBufferMessage& ev, const TActorContext& ctx); void HandleLogEvent(NLog::TEvLog::TPtr& ev, const TActorContext& ctx); void HandleLogEventDrop(const NLog::TEvLog::TPtr& ev); void HandleLogComponentLevelRequest(TLogComponentLevelRequest::TPtr& ev, const TActorContext& ctx); @@ -254,6 +250,7 @@ namespace NActors { [[nodiscard]] bool OutputRecord(TInstant time, NLog::EPrio priority, NLog::EComponent component, const TString& formatted) noexcept; void RenderComponentPriorities(IOutputStream& str); void LogIgnoredCount(TInstant now); + void ReleaseLogBufferMessage(); void WriteMessageStat(const NLog::TEvLog& ev); static const char* FormatLocalTimestamp(TInstant time, char* buf); }; diff --git a/library/cpp/actors/core/log_buffer.cpp b/library/cpp/actors/core/log_buffer.cpp new file mode 100644 index 00000000000..4a05f9a8c25 --- /dev/null +++ b/library/cpp/actors/core/log_buffer.cpp @@ -0,0 +1,98 @@ +#include "log_buffer.h" + +#include <util/system/yassert.h> +#include <algorithm> + +using namespace NActors::NLog; + +namespace NActors { +TLogBufferMessage::TLogBufferMessage(NLog::TEvLog::TPtr& ev) + : Formatted(ev->Get()->Line) + , Time(ev->Get()->Stamp) + , Component(ev->Get()->Component) + , Priority(ev->Get()->Level.ToPrio()) +{} + +TLogBuffer::TLogBuffer(ILoggerMetrics *metrics) + : Metrics(metrics) +{} + +bool TLogBuffer::TryAddMessage(TLogBufferMessage message) { + Buffer.push_back(std::move(message)); + BufferSize += sizeof(message); + PrioStats[message.Priority]++; + + return true; +} + +TLogBufferMessage TLogBuffer::GetMessage() { + auto message = Buffer.front(); + + ui64 messageSize = sizeof(message); + BufferSize -= messageSize; + Buffer.pop_front(); + PrioStats[message.Priority]--; + + return message; +} + +bool TLogBuffer::IsEmpty() const { + return Buffer.empty(); +} + +size_t TLogBuffer::GetLogsNumber() const { + return Buffer.size(); +} + +ui64 TLogBuffer::GetSizeBytes() const { + return BufferSize; +} + +void TLogBuffer::FilterByLogPriority(NLog::EPrio prio) { + bool isFirstRemoving = true; + auto it = Buffer.begin(); + while (it != Buffer.end()) + { + if (it->Priority >= prio) { + ui64 messageSize = sizeof(*it); + BufferSize -= messageSize; + Metrics->IncIgnoredMsgs(); + PrioStats[it->Priority]--; + + if (isFirstRemoving && prio > NLog::EPrio::Error) { + it->Priority = NLog::EPrio::Error; + it->Formatted = Sprintf("Ignored log records due to log buffer overflow! IgnoredCount# %" PRIu32 " ", PrioStats[prio]); + + PrioStats[NLog::EPrio::Error]++; + BufferSize += sizeof(*it); + it++; + isFirstRemoving = false; + } + else { + it = Buffer.erase(it); + } + } + else { + it++; + } + } +} + +bool inline TLogBuffer::CheckMessagesNumberEnoughForClearing(ui32 number) { + return number * 10 > Buffer.size(); +} + +bool TLogBuffer::TryReduceLogsNumber() { + ui32 removeLogsNumber = 0; + for (ui16 p = ui16(NLog::EPrio::Trace); p > ui16(NLog::EPrio::Alert); p--) + { + NLog::EPrio prio = static_cast<NLog::EPrio>(p); + removeLogsNumber += PrioStats[prio]; + if (CheckMessagesNumberEnoughForClearing(removeLogsNumber)) { + FilterByLogPriority(prio); + return true; + } + } + return false; +} +} diff --git a/library/cpp/actors/core/log_buffer.h b/library/cpp/actors/core/log_buffer.h new file mode 100644 index 00000000000..f40cbae65a8 --- /dev/null +++ b/library/cpp/actors/core/log_buffer.h @@ -0,0 +1,37 @@ +#pragma once + +#include "log_metrics.h" +#include "log_iface.h" + +#include <deque> + +namespace NActors { + +struct TLogBufferMessage { + TString Formatted; + TInstant Time; + NLog::EComponent Component; + NLog::EPrio Priority; + + TLogBufferMessage(NLog::TEvLog::TPtr& ev); +}; + +class TLogBuffer { + ILoggerMetrics *Metrics; + std::deque<TLogBufferMessage> Buffer; + ui64 BufferSize = 0; + std::map<NLog::EPrio, ui32> PrioStats; + + void FilterByLogPriority(NLog::EPrio prio); + bool inline CheckMessagesNumberEnoughForClearing(ui32 number); + + public: + TLogBuffer(ILoggerMetrics *metrics); + bool TryAddMessage(TLogBufferMessage message); + TLogBufferMessage GetMessage(); + bool IsEmpty() const; + size_t GetLogsNumber() const; + ui64 GetSizeBytes() const; + bool TryReduceLogsNumber(); +}; +} diff --git a/library/cpp/actors/core/log_iface.h b/library/cpp/actors/core/log_iface.h index b331db9ca85..3296863538a 100644 --- a/library/cpp/actors/core/log_iface.h +++ b/library/cpp/actors/core/log_iface.h @@ -68,6 +68,7 @@ namespace NActors { LevelReq, LevelResp, Ignored, + Buffer, End }; diff --git a/library/cpp/actors/core/log_metrics.h b/library/cpp/actors/core/log_metrics.h new file mode 100644 index 00000000000..91fed3a4e4c --- /dev/null +++ b/library/cpp/actors/core/log_metrics.h @@ -0,0 +1,152 @@ +#pragma once + +#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/monlib/metrics/metric_registry.h> +#include <library/cpp/monlib/service/pages/templates.h> + +namespace NActors { +class ILoggerMetrics { +public: + virtual ~ILoggerMetrics() = default; + + virtual void IncActorMsgs() = 0; + virtual void IncDirectMsgs() = 0; + virtual void IncLevelRequests() = 0; + virtual void IncIgnoredMsgs() = 0; + virtual void IncAlertMsgs() = 0; + virtual void IncEmergMsgs() = 0; + virtual void IncDroppedMsgs() = 0; + + virtual void GetOutputHtml(IOutputStream&) = 0; +}; + +class TLoggerCounters : public ILoggerMetrics { +public: + TLoggerCounters(TIntrusivePtr<NMonitoring::TDynamicCounters> counters) + : DynamicCounters(counters) + { + ActorMsgs_ = DynamicCounters->GetCounter("ActorMsgs", true); + DirectMsgs_ = DynamicCounters->GetCounter("DirectMsgs", true); + LevelRequests_ = DynamicCounters->GetCounter("LevelRequests", true); + IgnoredMsgs_ = DynamicCounters->GetCounter("IgnoredMsgs", true); + DroppedMsgs_ = DynamicCounters->GetCounter("DroppedMsgs", true); + + AlertMsgs_ = DynamicCounters->GetCounter("AlertMsgs", true); + EmergMsgs_ = DynamicCounters->GetCounter("EmergMsgs", true); + } + + ~TLoggerCounters() = default; + + void IncActorMsgs() override { + ++*ActorMsgs_; + } + void IncDirectMsgs() override { + ++*DirectMsgs_; + } + void IncLevelRequests() override { + ++*LevelRequests_; + } + void IncIgnoredMsgs() override { + ++*IgnoredMsgs_; + } + void IncAlertMsgs() override { + ++*AlertMsgs_; + } + void IncEmergMsgs() override { + ++*EmergMsgs_; + } + void IncDroppedMsgs() override { + DroppedMsgs_->Inc(); + }; + + void GetOutputHtml(IOutputStream& str) override { + HTML(str) { + DIV_CLASS("row") { + DIV_CLASS("col-md-12") { + TAG(TH4) { + str << "Counters" << Endl; + } + DynamicCounters->OutputHtml(str); + } + } + } + } + +private: + NMonitoring::TDynamicCounters::TCounterPtr ActorMsgs_; + NMonitoring::TDynamicCounters::TCounterPtr DirectMsgs_; + NMonitoring::TDynamicCounters::TCounterPtr LevelRequests_; + NMonitoring::TDynamicCounters::TCounterPtr IgnoredMsgs_; + NMonitoring::TDynamicCounters::TCounterPtr AlertMsgs_; + NMonitoring::TDynamicCounters::TCounterPtr EmergMsgs_; + // Dropped while the logger backend was unavailable + NMonitoring::TDynamicCounters::TCounterPtr DroppedMsgs_; + + TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters; +}; + +class TLoggerMetrics : public ILoggerMetrics { +public: + TLoggerMetrics(std::shared_ptr<NMonitoring::TMetricRegistry> metrics) + : Metrics(metrics) + { + ActorMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.actor_msgs"}}); + DirectMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.direct_msgs"}}); + LevelRequests_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.level_requests"}}); + IgnoredMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.ignored_msgs"}}); + DroppedMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.dropped_msgs"}}); + + AlertMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.alert_msgs"}}); + EmergMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.emerg_msgs"}}); + } + + ~TLoggerMetrics() = default; + + void IncActorMsgs() override { + ActorMsgs_->Inc(); + } + void IncDirectMsgs() override { + DirectMsgs_->Inc(); + } + void IncLevelRequests() override { + LevelRequests_->Inc(); + } + void IncIgnoredMsgs() override { + IgnoredMsgs_->Inc(); + } + void IncAlertMsgs() override { + AlertMsgs_->Inc(); + } + void IncEmergMsgs() override { + EmergMsgs_->Inc(); + } + void IncDroppedMsgs() override { + DroppedMsgs_->Inc(); + }; + + void GetOutputHtml(IOutputStream& str) override { + HTML(str) { + DIV_CLASS("row") { + DIV_CLASS("col-md-12") { + TAG(TH4) { + str << "Metrics" << Endl; + } + // TODO: Now, TMetricRegistry does not have the GetOutputHtml function + } + } + } + } + +private: + NMonitoring::TRate* ActorMsgs_; + NMonitoring::TRate* DirectMsgs_; + NMonitoring::TRate* LevelRequests_; + NMonitoring::TRate* IgnoredMsgs_; + NMonitoring::TRate* AlertMsgs_; + NMonitoring::TRate* EmergMsgs_; + // Dropped while the logger backend was unavailable + NMonitoring::TRate* DroppedMsgs_; + + std::shared_ptr<NMonitoring::TMetricRegistry> Metrics; +}; +} diff --git a/library/cpp/actors/core/log_settings.cpp b/library/cpp/actors/core/log_settings.cpp index f52f2fc5d22..321af18d71e 100644 --- a/library/cpp/actors/core/log_settings.cpp +++ b/library/cpp/actors/core/log_settings.cpp @@ -7,10 +7,11 @@ namespace NActors { TSettings::TSettings(const TActorId& loggerActorId, const EComponent loggerComponent, EComponent minVal, EComponent maxVal, EComponentToStringFunc func, EPriority defPriority, EPriority defSamplingPriority, - ui32 defSamplingRate, ui64 timeThresholdMs) + ui32 defSamplingRate, ui64 timeThresholdMs, ui64 bufferSizeLimitBytes) : LoggerActorId(loggerActorId) , LoggerComponent(loggerComponent) , TimeThresholdMs(timeThresholdMs) + , BufferSizeLimitBytes(bufferSizeLimitBytes) , AllowDrop(true) , ThrottleDelay(TDuration::MilliSeconds(100)) , MinVal(0) @@ -29,10 +30,11 @@ namespace NActors { TSettings::TSettings(const TActorId& loggerActorId, const EComponent loggerComponent, EPriority defPriority, EPriority defSamplingPriority, - ui32 defSamplingRate, ui64 timeThresholdMs) + ui32 defSamplingRate, ui64 timeThresholdMs, ui64 bufferSizeLimitBytes) : LoggerActorId(loggerActorId) , LoggerComponent(loggerComponent) , TimeThresholdMs(timeThresholdMs) + , BufferSizeLimitBytes(bufferSizeLimitBytes) , AllowDrop(true) , ThrottleDelay(TDuration::MilliSeconds(100)) , MinVal(0) diff --git a/library/cpp/actors/core/log_settings.h b/library/cpp/actors/core/log_settings.h index 7fe4504edd9..47d30463fb2 100644 --- a/library/cpp/actors/core/log_settings.h +++ b/library/cpp/actors/core/log_settings.h @@ -72,6 +72,7 @@ namespace NActors { TActorId LoggerActorId; EComponent LoggerComponent; ui64 TimeThresholdMs; + ui64 BufferSizeLimitBytes; bool AllowDrop; TDuration ThrottleDelay; TArrayHolder<TAtomic> ComponentInfo; @@ -101,11 +102,11 @@ namespace NActors { TSettings(const TActorId& loggerActorId, const EComponent loggerComponent, EComponent minVal, EComponent maxVal, EComponentToStringFunc func, EPriority defPriority, EPriority defSamplingPriority = PRI_DEBUG, - ui32 defSamplingRate = 0, ui64 timeThresholdMs = 1000); + ui32 defSamplingRate = 0, ui64 timeThresholdMs = 1000, ui64 bufferSizeLimitBytes = 1024 * 1024 * 300); TSettings(const TActorId& loggerActorId, const EComponent loggerComponent, EPriority defPriority, EPriority defSamplingPriority = PRI_DEBUG, - ui32 defSamplingRate = 0, ui64 timeThresholdMs = 1000); + ui32 defSamplingRate = 0, ui64 timeThresholdMs = 1000, ui64 bufferSizeLimitBytes = 1024 * 1024 * 300); void Append(EComponent minVal, EComponent maxVal, EComponentToStringFunc func); diff --git a/library/cpp/actors/core/log_ut.cpp b/library/cpp/actors/core/log_ut.cpp index 09b5f88ea2c..e6508738cca 100644 --- a/library/cpp/actors/core/log_ut.cpp +++ b/library/cpp/actors/core/log_ut.cpp @@ -30,11 +30,31 @@ namespace { EPriority::PRI_TRACE, EPriority::PRI_DEBUG, (ui32)0, - timeThresholdMs); + timeThresholdMs, + (ui64)0); s->Append(0, 1, ServiceToString); return s; } + TIntrusivePtr<TSettings> BufferSettings(ui64 bufferSizeLimitBytes) { + auto loggerId = TActorId{0, "Logger"}; + auto s = MakeIntrusive<TSettings>( + loggerId, + 0, + EPriority::PRI_TRACE, + EPriority::PRI_DEBUG, + (ui32)0, + (ui32)0, + bufferSizeLimitBytes); + s->Append(0, 1, ServiceToString); + s->SetAllowDrop(true); + return s; + } + + TIntrusivePtr<TSettings> NoBufferSettings() { + return BufferSettings(0); + } + class TMockBackend: public TLogBackend { public: using TWriteImpl = std::function<void(const TLogRecord&)>; @@ -88,8 +108,12 @@ namespace { Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(TInstant::Zero(), TLevel{EPrio::Emerg}, 0, "foo")}); } - void WriteLog(TInstant ts) { - Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(ts, TLevel{EPrio::Emerg}, 0, "foo")}); + void WriteLog(TInstant ts, EPrio prio = EPrio::Emerg) { + Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(ts, TLevel{prio}, 0, "foo")}); + } + + void ReleaseLogBuffer() { + Runtime.Send(new IEventHandle{LoggerActor, {}, new TReleaseLogBuffer()}); } void Wakeup() { @@ -182,4 +206,42 @@ Y_UNIT_TEST_SUITE(TLoggerActorTest) { UNIT_ASSERT_VALUES_EQUAL(messages.size(), COUNT + 1); } + + int BufferTest(TFixture &test, const int COUNT) { + TVector<TString> messages; + auto acceptWrites = [&] (const TLogRecord& r) { + messages.emplace_back(r.Data, r.Len); + }; + + test.LogBackend->SetWriteImpl(acceptWrites); + test.Wakeup(); + test.Runtime.AdvanceCurrentTime(TDuration::Days(1)); + auto now = test.Runtime.GetCurrentTime(); + + for (auto i = 0; i < COUNT; ++i) { + test.WriteLog(now - TDuration::Seconds(10), EPrio::Debug); + } + + for (auto i = 0; i < COUNT; ++i) { + test.ReleaseLogBuffer(); + } + + return messages.size(); + } + + Y_UNIT_TEST(ShouldUseLogBufferWhenOverloaded) { + TFixture test{BufferSettings(1024 * 1024 * 300)}; + const auto LOG_COUNT = 500; + auto outputLogSize = BufferTest(test, LOG_COUNT); + + UNIT_ASSERT_VALUES_EQUAL(outputLogSize, LOG_COUNT); + } + + Y_UNIT_TEST(ShouldLoseLogsWithoutBuffer) { + TFixture test{NoBufferSettings()}; + const auto LOG_COUNT = 500; + auto outputLogSize = BufferTest(test, LOG_COUNT); + + UNIT_ASSERT(outputLogSize < LOG_COUNT); + } } |