aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorandrew-rykov <arykov@ydb.tech>2022-09-13 12:49:05 +0300
committerandrew-rykov <arykov@ydb.tech>2022-09-13 12:49:05 +0300
commitcee4a99ba93f21e3d30b5f1d58c84ab6ca41981b (patch)
treed75cbc3a1ad20a211c550d8f29634eff47f5e48d /library/cpp
parent8c1af103661148f5377e712cb2ac5623672522a1 (diff)
downloadydb-cee4a99ba93f21e3d30b5f1d58c84ab6ca41981b.tar.gz
add-log-buffer 2
add log buffer changed names added move changed details fixed released buffer procedure returned condition IgnoredCount > 0 default bufferSizeLimitBytes = 0 returned passedCount declaration returned Y_VERIFY removed passedcount changed buffer reducing no new line at the end of file added srcs in ya.make add log buffer
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/core/CMakeLists.txt1
-rw-r--r--library/cpp/actors/core/log.cpp201
-rw-r--r--library/cpp/actors/core/log.h33
-rw-r--r--library/cpp/actors/core/log_buffer.cpp98
-rw-r--r--library/cpp/actors/core/log_buffer.h37
-rw-r--r--library/cpp/actors/core/log_iface.h1
-rw-r--r--library/cpp/actors/core/log_metrics.h152
-rw-r--r--library/cpp/actors/core/log_settings.cpp6
-rw-r--r--library/cpp/actors/core/log_settings.h5
-rw-r--r--library/cpp/actors/core/log_ut.cpp68
10 files changed, 432 insertions, 170 deletions
diff --git a/library/cpp/actors/core/CMakeLists.txt b/library/cpp/actors/core/CMakeLists.txt
index 3dc01c29c15..385c419b3a5 100644
--- a/library/cpp/actors/core/CMakeLists.txt
+++ b/library/cpp/actors/core/CMakeLists.txt
@@ -46,6 +46,7 @@ target_sources(cpp-actors-core PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/io_dispatcher.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/log.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/log_settings.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/log_buffer.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/mailbox.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/monotonic_provider.cpp
diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp
index d005be65355..eb8b19193c4 100644
--- a/library/cpp/actors/core/log.cpp
+++ b/library/cpp/actors/core/log.cpp
@@ -1,7 +1,4 @@
#include "log.h"
-#include "log_settings.h"
-
-#include <library/cpp/monlib/service/pages/templates.h>
static_assert(int(NActors::NLog::PRI_EMERG) == int(::TLOG_EMERG), "expect int(NActors::NLog::PRI_EMERG) == int(::TLOG_EMERG)");
static_assert(int(NActors::NLog::PRI_ALERT) == int(::TLOG_ALERT), "expect int(NActors::NLog::PRI_ALERT) == int(::TLOG_ALERT)");
@@ -33,137 +30,6 @@ namespace {
}
namespace NActors {
-
- class TLoggerCounters : public ILoggerMetrics {
- public:
- TLoggerCounters(TIntrusivePtr<NMonitoring::TDynamicCounters> counters)
- : DynamicCounters(counters)
- {
- ActorMsgs_ = DynamicCounters->GetCounter("ActorMsgs", true);
- DirectMsgs_ = DynamicCounters->GetCounter("DirectMsgs", true);
- LevelRequests_ = DynamicCounters->GetCounter("LevelRequests", true);
- IgnoredMsgs_ = DynamicCounters->GetCounter("IgnoredMsgs", true);
- DroppedMsgs_ = DynamicCounters->GetCounter("DroppedMsgs", true);
-
- AlertMsgs_ = DynamicCounters->GetCounter("AlertMsgs", true);
- EmergMsgs_ = DynamicCounters->GetCounter("EmergMsgs", true);
- }
-
- ~TLoggerCounters() = default;
-
- void IncActorMsgs() override {
- ++*ActorMsgs_;
- }
- void IncDirectMsgs() override {
- ++*DirectMsgs_;
- }
- void IncLevelRequests() override {
- ++*LevelRequests_;
- }
- void IncIgnoredMsgs() override {
- ++*IgnoredMsgs_;
- }
- void IncAlertMsgs() override {
- ++*AlertMsgs_;
- }
- void IncEmergMsgs() override {
- ++*EmergMsgs_;
- }
- void IncDroppedMsgs() override {
- DroppedMsgs_->Inc();
- };
-
- void GetOutputHtml(IOutputStream& str) override {
- HTML(str) {
- DIV_CLASS("row") {
- DIV_CLASS("col-md-12") {
- TAG(TH4) {
- str << "Counters" << Endl;
- }
- DynamicCounters->OutputHtml(str);
- }
- }
- }
- }
-
- private:
- NMonitoring::TDynamicCounters::TCounterPtr ActorMsgs_;
- NMonitoring::TDynamicCounters::TCounterPtr DirectMsgs_;
- NMonitoring::TDynamicCounters::TCounterPtr LevelRequests_;
- NMonitoring::TDynamicCounters::TCounterPtr IgnoredMsgs_;
- NMonitoring::TDynamicCounters::TCounterPtr AlertMsgs_;
- NMonitoring::TDynamicCounters::TCounterPtr EmergMsgs_;
- // Dropped while the logger backend was unavailable
- NMonitoring::TDynamicCounters::TCounterPtr DroppedMsgs_;
-
- TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters;
- };
-
- class TLoggerMetrics : public ILoggerMetrics {
- public:
- TLoggerMetrics(std::shared_ptr<NMonitoring::TMetricRegistry> metrics)
- : Metrics(metrics)
- {
- ActorMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.actor_msgs"}});
- DirectMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.direct_msgs"}});
- LevelRequests_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.level_requests"}});
- IgnoredMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.ignored_msgs"}});
- DroppedMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.dropped_msgs"}});
-
- AlertMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.alert_msgs"}});
- EmergMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.emerg_msgs"}});
- }
-
- ~TLoggerMetrics() = default;
-
- void IncActorMsgs() override {
- ActorMsgs_->Inc();
- }
- void IncDirectMsgs() override {
- DirectMsgs_->Inc();
- }
- void IncLevelRequests() override {
- LevelRequests_->Inc();
- }
- void IncIgnoredMsgs() override {
- IgnoredMsgs_->Inc();
- }
- void IncAlertMsgs() override {
- AlertMsgs_->Inc();
- }
- void IncEmergMsgs() override {
- EmergMsgs_->Inc();
- }
- void IncDroppedMsgs() override {
- DroppedMsgs_->Inc();
- };
-
- void GetOutputHtml(IOutputStream& str) override {
- HTML(str) {
- DIV_CLASS("row") {
- DIV_CLASS("col-md-12") {
- TAG(TH4) {
- str << "Metrics" << Endl;
- }
- // TODO: Now, TMetricRegistry does not have the GetOutputHtml function
- }
- }
- }
- }
-
- private:
- NMonitoring::TRate* ActorMsgs_;
- NMonitoring::TRate* DirectMsgs_;
- NMonitoring::TRate* LevelRequests_;
- NMonitoring::TRate* IgnoredMsgs_;
- NMonitoring::TRate* AlertMsgs_;
- NMonitoring::TRate* EmergMsgs_;
- // Dropped while the logger backend was unavailable
- NMonitoring::TRate* DroppedMsgs_;
-
- std::shared_ptr<NMonitoring::TMetricRegistry> Metrics;
- };
-
TAtomic TLoggerActor::IsOverflow = 0;
TLoggerActor::TLoggerActor(TIntrusivePtr<NLog::TSettings> settings,
@@ -173,6 +39,7 @@ namespace NActors {
, Settings(settings)
, LogBackend(logBackend.Release())
, Metrics(std::make_unique<TLoggerCounters>(counters))
+ , LogBuffer(Metrics.get())
{
}
@@ -183,6 +50,7 @@ namespace NActors {
, Settings(settings)
, LogBackend(logBackend)
, Metrics(std::make_unique<TLoggerCounters>(counters))
+ , LogBuffer(Metrics.get())
{
}
@@ -193,6 +61,7 @@ namespace NActors {
, Settings(settings)
, LogBackend(logBackend.Release())
, Metrics(std::make_unique<TLoggerMetrics>(metrics))
+ , LogBuffer(Metrics.get())
{
}
@@ -203,6 +72,7 @@ namespace NActors {
, Settings(settings)
, LogBackend(logBackend)
, Metrics(std::make_unique<TLoggerMetrics>(metrics))
+ , LogBuffer(Metrics.get())
{
}
@@ -242,6 +112,23 @@ namespace NActors {
PassedCount = 0;
}
+ void TLoggerActor::ReleaseLogBufferMessage() {
+ if (LogBuffer.GetLogsNumber() > 0) {
+ auto message = LogBuffer.GetMessage();
+ if (!OutputRecord(message.Time, message.Priority, message.Component, message.Formatted)) {
+ BecomeDefunct();
+ }
+ }
+ }
+
+ void TLoggerActor::ReleaseLogBufferMessageEvent(TReleaseLogBuffer::TPtr& ev, const NActors::TActorContext& ctx) {
+ Y_UNUSED(ev);
+ ReleaseLogBufferMessage();
+ if (LogBuffer.GetLogsNumber() > 0) {
+ ctx.Send(ctx.SelfID, new TReleaseLogBuffer());
+ }
+ }
+
void TLoggerActor::HandleIgnoredEventDrop() {
// logger backend is unavailable, just ignore
}
@@ -264,6 +151,30 @@ namespace NActors {
}
+ bool TLoggerActor::TryAddLogBuffer(TLogBufferMessage& lbm) {
+ ui64 messageSize = sizeof(lbm);
+ if (LogBuffer.GetSizeBytes() + messageSize < Settings->BufferSizeLimitBytes)
+ return LogBuffer.TryAddMessage(lbm);
+
+ return false;
+ }
+
+ bool TLoggerActor::TryKeepLog(TLogBufferMessage& lbm, const NActors::TActorContext& ctx) {
+ bool wasEmtpyBuffer = LogBuffer.IsEmpty();
+
+ if (TryAddLogBuffer(lbm)) {
+ } else if (!LogBuffer.TryReduceLogsNumber()) {
+ return false;
+ } else if (!TryAddLogBuffer(lbm)) {
+ return false;
+ }
+
+ if (wasEmtpyBuffer) {
+ ctx.Send(ctx.SelfID, new TReleaseLogBuffer());
+ }
+ return true;
+ }
+
void TLoggerActor::HandleLogEvent(NLog::TEvLog::TPtr& ev, const NActors::TActorContext& ctx) {
i64 delayMillisec = (ctx.Now() - ev->Get()->Stamp).MilliSeconds();
WriteMessageStat(*ev->Get());
@@ -271,16 +182,17 @@ namespace NActors {
// Disable throttling if it was enabled previously
if (AtomicGet(IsOverflow))
AtomicSet(IsOverflow, 0);
-
- // Check if some records have to be dropped
- if ((PassedCount > 10 && delayMillisec > (i64)Settings->TimeThresholdMs) || IgnoredCount > 0) {
- Metrics->IncIgnoredMsgs();
- if (IgnoredCount == 0) {
- ctx.Send(ctx.SelfID, new TLogIgnored());
+ if (PassedCount > 10 && delayMillisec > (i64)Settings->TimeThresholdMs || IgnoredCount > 0 || !LogBuffer.IsEmpty()) {
+ TLogBufferMessage lbm(ev);
+ if (!TryKeepLog(lbm, ctx)) {
+ Metrics->IncIgnoredMsgs();
+ if (IgnoredCount == 0) {
+ ctx.Send(ctx.SelfID, new TLogIgnored());
+ }
+ ++IgnoredCount;
+ PassedCount = 0;
}
- ++IgnoredCount;
- PassedCount = 0;
- return;
+ return;
}
PassedCount++;
} else {
@@ -291,8 +203,7 @@ namespace NActors {
AtomicSet(IsOverflow, 0);
}
- const auto prio = ev->Get()->Level.ToPrio();
- if (!OutputRecord(ev->Get()->Stamp, prio, ev->Get()->Component, ev->Get()->Line)) {
+ if (!OutputRecord(ev->Get()->Stamp, ev->Get()->Level.ToPrio(), ev->Get()->Component, ev->Get()->Line)) {
BecomeDefunct();
}
}
diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h
index c11a7cf3c19..7c6921bca7b 100644
--- a/library/cpp/actors/core/log.h
+++ b/library/cpp/actors/core/log.h
@@ -4,6 +4,8 @@
#include "log_iface.h"
#include "log_settings.h"
+#include "log_metrics.h"
+#include "log_buffer.h"
#include "actorsystem.h"
#include "events.h"
#include "event_local.h"
@@ -14,8 +16,6 @@
#include <util/string/printf.h>
#include <util/string/builder.h>
#include <library/cpp/logger/all.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
-#include <library/cpp/monlib/metrics/metric_registry.h>
#include <library/cpp/json/writer/json.h>
#include <library/cpp/svnversion/svnversion.h>
@@ -172,24 +172,15 @@ namespace NActors {
}
};
- ////////////////////////////////////////////////////////////////////////////////
- // LOGGER ACTOR
- ////////////////////////////////////////////////////////////////////////////////
- class ILoggerMetrics {
+ class TReleaseLogBuffer: public TEventLocal<TReleaseLogBuffer, int(NLog::EEv::Buffer)> {
public:
- virtual ~ILoggerMetrics() = default;
-
- virtual void IncActorMsgs() = 0;
- virtual void IncDirectMsgs() = 0;
- virtual void IncLevelRequests() = 0;
- virtual void IncIgnoredMsgs() = 0;
- virtual void IncAlertMsgs() = 0;
- virtual void IncEmergMsgs() = 0;
- virtual void IncDroppedMsgs() = 0;
-
- virtual void GetOutputHtml(IOutputStream&) = 0;
+ TReleaseLogBuffer() {
+ }
};
+ ////////////////////////////////////////////////////////////////////////////////
+ // LOGGER ACTOR
+ ////////////////////////////////////////////////////////////////////////////////
class TLoggerActor: public TActor<TLoggerActor> {
public:
static constexpr IActor::EActivityType ActorActivityType() {
@@ -210,9 +201,10 @@ namespace NActors {
std::shared_ptr<NMonitoring::TMetricRegistry> metrics);
~TLoggerActor();
- void StateFunc(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
+ void StateFunc(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
switch (ev->GetTypeRewrite()) {
HFunc(TLogIgnored, HandleIgnoredEvent);
+ HFunc(TReleaseLogBuffer, ReleaseLogBufferMessageEvent);
HFunc(NLog::TEvLog, HandleLogEvent);
HFunc(TLogComponentLevelRequest, HandleLogComponentLevelRequest);
HFunc(NMon::TEvHttpInfo, HandleMonInfo);
@@ -242,10 +234,14 @@ namespace NActors {
static TAtomic IsOverflow;
TDuration WakeupInterval{TDuration::Seconds(5)};
std::unique_ptr<ILoggerMetrics> Metrics;
+ TLogBuffer LogBuffer;
void BecomeDefunct();
+ void ReleaseLogBufferMessageEvent(TReleaseLogBuffer::TPtr& ev, const NActors::TActorContext& ctx);
void HandleIgnoredEvent(TLogIgnored::TPtr& ev, const NActors::TActorContext& ctx);
void HandleIgnoredEventDrop();
+ bool TryAddLogBuffer(TLogBufferMessage& ev);
+ bool TryKeepLog(TLogBufferMessage& ev, const TActorContext& ctx);
void HandleLogEvent(NLog::TEvLog::TPtr& ev, const TActorContext& ctx);
void HandleLogEventDrop(const NLog::TEvLog::TPtr& ev);
void HandleLogComponentLevelRequest(TLogComponentLevelRequest::TPtr& ev, const TActorContext& ctx);
@@ -254,6 +250,7 @@ namespace NActors {
[[nodiscard]] bool OutputRecord(TInstant time, NLog::EPrio priority, NLog::EComponent component, const TString& formatted) noexcept;
void RenderComponentPriorities(IOutputStream& str);
void LogIgnoredCount(TInstant now);
+ void ReleaseLogBufferMessage();
void WriteMessageStat(const NLog::TEvLog& ev);
static const char* FormatLocalTimestamp(TInstant time, char* buf);
};
diff --git a/library/cpp/actors/core/log_buffer.cpp b/library/cpp/actors/core/log_buffer.cpp
new file mode 100644
index 00000000000..4a05f9a8c25
--- /dev/null
+++ b/library/cpp/actors/core/log_buffer.cpp
@@ -0,0 +1,98 @@
+#include "log_buffer.h"
+
+#include <util/system/yassert.h>
+#include <algorithm>
+
+using namespace NActors::NLog;
+
+namespace NActors {
+TLogBufferMessage::TLogBufferMessage(NLog::TEvLog::TPtr& ev)
+ : Formatted(ev->Get()->Line)
+ , Time(ev->Get()->Stamp)
+ , Component(ev->Get()->Component)
+ , Priority(ev->Get()->Level.ToPrio())
+{}
+
+TLogBuffer::TLogBuffer(ILoggerMetrics *metrics)
+ : Metrics(metrics)
+{}
+
+bool TLogBuffer::TryAddMessage(TLogBufferMessage message) {
+ Buffer.push_back(std::move(message));
+ BufferSize += sizeof(message);
+ PrioStats[message.Priority]++;
+
+ return true;
+}
+
+TLogBufferMessage TLogBuffer::GetMessage() {
+ auto message = Buffer.front();
+
+ ui64 messageSize = sizeof(message);
+ BufferSize -= messageSize;
+ Buffer.pop_front();
+ PrioStats[message.Priority]--;
+
+ return message;
+}
+
+bool TLogBuffer::IsEmpty() const {
+ return Buffer.empty();
+}
+
+size_t TLogBuffer::GetLogsNumber() const {
+ return Buffer.size();
+}
+
+ui64 TLogBuffer::GetSizeBytes() const {
+ return BufferSize;
+}
+
+void TLogBuffer::FilterByLogPriority(NLog::EPrio prio) {
+ bool isFirstRemoving = true;
+ auto it = Buffer.begin();
+ while (it != Buffer.end())
+ {
+ if (it->Priority >= prio) {
+ ui64 messageSize = sizeof(*it);
+ BufferSize -= messageSize;
+ Metrics->IncIgnoredMsgs();
+ PrioStats[it->Priority]--;
+
+ if (isFirstRemoving && prio > NLog::EPrio::Error) {
+ it->Priority = NLog::EPrio::Error;
+ it->Formatted = Sprintf("Ignored log records due to log buffer overflow! IgnoredCount# %" PRIu32 " ", PrioStats[prio]);
+
+ PrioStats[NLog::EPrio::Error]++;
+ BufferSize += sizeof(*it);
+ it++;
+ isFirstRemoving = false;
+ }
+ else {
+ it = Buffer.erase(it);
+ }
+ }
+ else {
+ it++;
+ }
+ }
+}
+
+bool inline TLogBuffer::CheckMessagesNumberEnoughForClearing(ui32 number) {
+ return number * 10 > Buffer.size();
+}
+
+bool TLogBuffer::TryReduceLogsNumber() {
+ ui32 removeLogsNumber = 0;
+ for (ui16 p = ui16(NLog::EPrio::Trace); p > ui16(NLog::EPrio::Alert); p--)
+ {
+ NLog::EPrio prio = static_cast<NLog::EPrio>(p);
+ removeLogsNumber += PrioStats[prio];
+ if (CheckMessagesNumberEnoughForClearing(removeLogsNumber)) {
+ FilterByLogPriority(prio);
+ return true;
+ }
+ }
+ return false;
+}
+}
diff --git a/library/cpp/actors/core/log_buffer.h b/library/cpp/actors/core/log_buffer.h
new file mode 100644
index 00000000000..f40cbae65a8
--- /dev/null
+++ b/library/cpp/actors/core/log_buffer.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include "log_metrics.h"
+#include "log_iface.h"
+
+#include <deque>
+
+namespace NActors {
+
+struct TLogBufferMessage {
+ TString Formatted;
+ TInstant Time;
+ NLog::EComponent Component;
+ NLog::EPrio Priority;
+
+ TLogBufferMessage(NLog::TEvLog::TPtr& ev);
+};
+
+class TLogBuffer {
+ ILoggerMetrics *Metrics;
+ std::deque<TLogBufferMessage> Buffer;
+ ui64 BufferSize = 0;
+ std::map<NLog::EPrio, ui32> PrioStats;
+
+ void FilterByLogPriority(NLog::EPrio prio);
+ bool inline CheckMessagesNumberEnoughForClearing(ui32 number);
+
+ public:
+ TLogBuffer(ILoggerMetrics *metrics);
+ bool TryAddMessage(TLogBufferMessage message);
+ TLogBufferMessage GetMessage();
+ bool IsEmpty() const;
+ size_t GetLogsNumber() const;
+ ui64 GetSizeBytes() const;
+ bool TryReduceLogsNumber();
+};
+}
diff --git a/library/cpp/actors/core/log_iface.h b/library/cpp/actors/core/log_iface.h
index b331db9ca85..3296863538a 100644
--- a/library/cpp/actors/core/log_iface.h
+++ b/library/cpp/actors/core/log_iface.h
@@ -68,6 +68,7 @@ namespace NActors {
LevelReq,
LevelResp,
Ignored,
+ Buffer,
End
};
diff --git a/library/cpp/actors/core/log_metrics.h b/library/cpp/actors/core/log_metrics.h
new file mode 100644
index 00000000000..91fed3a4e4c
--- /dev/null
+++ b/library/cpp/actors/core/log_metrics.h
@@ -0,0 +1,152 @@
+#pragma once
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/monlib/metrics/metric_registry.h>
+#include <library/cpp/monlib/service/pages/templates.h>
+
+namespace NActors {
+class ILoggerMetrics {
+public:
+ virtual ~ILoggerMetrics() = default;
+
+ virtual void IncActorMsgs() = 0;
+ virtual void IncDirectMsgs() = 0;
+ virtual void IncLevelRequests() = 0;
+ virtual void IncIgnoredMsgs() = 0;
+ virtual void IncAlertMsgs() = 0;
+ virtual void IncEmergMsgs() = 0;
+ virtual void IncDroppedMsgs() = 0;
+
+ virtual void GetOutputHtml(IOutputStream&) = 0;
+};
+
+class TLoggerCounters : public ILoggerMetrics {
+public:
+ TLoggerCounters(TIntrusivePtr<NMonitoring::TDynamicCounters> counters)
+ : DynamicCounters(counters)
+ {
+ ActorMsgs_ = DynamicCounters->GetCounter("ActorMsgs", true);
+ DirectMsgs_ = DynamicCounters->GetCounter("DirectMsgs", true);
+ LevelRequests_ = DynamicCounters->GetCounter("LevelRequests", true);
+ IgnoredMsgs_ = DynamicCounters->GetCounter("IgnoredMsgs", true);
+ DroppedMsgs_ = DynamicCounters->GetCounter("DroppedMsgs", true);
+
+ AlertMsgs_ = DynamicCounters->GetCounter("AlertMsgs", true);
+ EmergMsgs_ = DynamicCounters->GetCounter("EmergMsgs", true);
+ }
+
+ ~TLoggerCounters() = default;
+
+ void IncActorMsgs() override {
+ ++*ActorMsgs_;
+ }
+ void IncDirectMsgs() override {
+ ++*DirectMsgs_;
+ }
+ void IncLevelRequests() override {
+ ++*LevelRequests_;
+ }
+ void IncIgnoredMsgs() override {
+ ++*IgnoredMsgs_;
+ }
+ void IncAlertMsgs() override {
+ ++*AlertMsgs_;
+ }
+ void IncEmergMsgs() override {
+ ++*EmergMsgs_;
+ }
+ void IncDroppedMsgs() override {
+ DroppedMsgs_->Inc();
+ };
+
+ void GetOutputHtml(IOutputStream& str) override {
+ HTML(str) {
+ DIV_CLASS("row") {
+ DIV_CLASS("col-md-12") {
+ TAG(TH4) {
+ str << "Counters" << Endl;
+ }
+ DynamicCounters->OutputHtml(str);
+ }
+ }
+ }
+ }
+
+private:
+ NMonitoring::TDynamicCounters::TCounterPtr ActorMsgs_;
+ NMonitoring::TDynamicCounters::TCounterPtr DirectMsgs_;
+ NMonitoring::TDynamicCounters::TCounterPtr LevelRequests_;
+ NMonitoring::TDynamicCounters::TCounterPtr IgnoredMsgs_;
+ NMonitoring::TDynamicCounters::TCounterPtr AlertMsgs_;
+ NMonitoring::TDynamicCounters::TCounterPtr EmergMsgs_;
+ // Dropped while the logger backend was unavailable
+ NMonitoring::TDynamicCounters::TCounterPtr DroppedMsgs_;
+
+ TIntrusivePtr<NMonitoring::TDynamicCounters> DynamicCounters;
+};
+
+class TLoggerMetrics : public ILoggerMetrics {
+public:
+ TLoggerMetrics(std::shared_ptr<NMonitoring::TMetricRegistry> metrics)
+ : Metrics(metrics)
+ {
+ ActorMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.actor_msgs"}});
+ DirectMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.direct_msgs"}});
+ LevelRequests_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.level_requests"}});
+ IgnoredMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.ignored_msgs"}});
+ DroppedMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.dropped_msgs"}});
+
+ AlertMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.alert_msgs"}});
+ EmergMsgs_ = Metrics->Rate(NMonitoring::TLabels{{"sensor", "logger.emerg_msgs"}});
+ }
+
+ ~TLoggerMetrics() = default;
+
+ void IncActorMsgs() override {
+ ActorMsgs_->Inc();
+ }
+ void IncDirectMsgs() override {
+ DirectMsgs_->Inc();
+ }
+ void IncLevelRequests() override {
+ LevelRequests_->Inc();
+ }
+ void IncIgnoredMsgs() override {
+ IgnoredMsgs_->Inc();
+ }
+ void IncAlertMsgs() override {
+ AlertMsgs_->Inc();
+ }
+ void IncEmergMsgs() override {
+ EmergMsgs_->Inc();
+ }
+ void IncDroppedMsgs() override {
+ DroppedMsgs_->Inc();
+ };
+
+ void GetOutputHtml(IOutputStream& str) override {
+ HTML(str) {
+ DIV_CLASS("row") {
+ DIV_CLASS("col-md-12") {
+ TAG(TH4) {
+ str << "Metrics" << Endl;
+ }
+ // TODO: Now, TMetricRegistry does not have the GetOutputHtml function
+ }
+ }
+ }
+ }
+
+private:
+ NMonitoring::TRate* ActorMsgs_;
+ NMonitoring::TRate* DirectMsgs_;
+ NMonitoring::TRate* LevelRequests_;
+ NMonitoring::TRate* IgnoredMsgs_;
+ NMonitoring::TRate* AlertMsgs_;
+ NMonitoring::TRate* EmergMsgs_;
+ // Dropped while the logger backend was unavailable
+ NMonitoring::TRate* DroppedMsgs_;
+
+ std::shared_ptr<NMonitoring::TMetricRegistry> Metrics;
+};
+}
diff --git a/library/cpp/actors/core/log_settings.cpp b/library/cpp/actors/core/log_settings.cpp
index f52f2fc5d22..321af18d71e 100644
--- a/library/cpp/actors/core/log_settings.cpp
+++ b/library/cpp/actors/core/log_settings.cpp
@@ -7,10 +7,11 @@ namespace NActors {
TSettings::TSettings(const TActorId& loggerActorId, const EComponent loggerComponent,
EComponent minVal, EComponent maxVal, EComponentToStringFunc func,
EPriority defPriority, EPriority defSamplingPriority,
- ui32 defSamplingRate, ui64 timeThresholdMs)
+ ui32 defSamplingRate, ui64 timeThresholdMs, ui64 bufferSizeLimitBytes)
: LoggerActorId(loggerActorId)
, LoggerComponent(loggerComponent)
, TimeThresholdMs(timeThresholdMs)
+ , BufferSizeLimitBytes(bufferSizeLimitBytes)
, AllowDrop(true)
, ThrottleDelay(TDuration::MilliSeconds(100))
, MinVal(0)
@@ -29,10 +30,11 @@ namespace NActors {
TSettings::TSettings(const TActorId& loggerActorId, const EComponent loggerComponent,
EPriority defPriority, EPriority defSamplingPriority,
- ui32 defSamplingRate, ui64 timeThresholdMs)
+ ui32 defSamplingRate, ui64 timeThresholdMs, ui64 bufferSizeLimitBytes)
: LoggerActorId(loggerActorId)
, LoggerComponent(loggerComponent)
, TimeThresholdMs(timeThresholdMs)
+ , BufferSizeLimitBytes(bufferSizeLimitBytes)
, AllowDrop(true)
, ThrottleDelay(TDuration::MilliSeconds(100))
, MinVal(0)
diff --git a/library/cpp/actors/core/log_settings.h b/library/cpp/actors/core/log_settings.h
index 7fe4504edd9..47d30463fb2 100644
--- a/library/cpp/actors/core/log_settings.h
+++ b/library/cpp/actors/core/log_settings.h
@@ -72,6 +72,7 @@ namespace NActors {
TActorId LoggerActorId;
EComponent LoggerComponent;
ui64 TimeThresholdMs;
+ ui64 BufferSizeLimitBytes;
bool AllowDrop;
TDuration ThrottleDelay;
TArrayHolder<TAtomic> ComponentInfo;
@@ -101,11 +102,11 @@ namespace NActors {
TSettings(const TActorId& loggerActorId, const EComponent loggerComponent,
EComponent minVal, EComponent maxVal, EComponentToStringFunc func,
EPriority defPriority, EPriority defSamplingPriority = PRI_DEBUG,
- ui32 defSamplingRate = 0, ui64 timeThresholdMs = 1000);
+ ui32 defSamplingRate = 0, ui64 timeThresholdMs = 1000, ui64 bufferSizeLimitBytes = 1024 * 1024 * 300);
TSettings(const TActorId& loggerActorId, const EComponent loggerComponent,
EPriority defPriority, EPriority defSamplingPriority = PRI_DEBUG,
- ui32 defSamplingRate = 0, ui64 timeThresholdMs = 1000);
+ ui32 defSamplingRate = 0, ui64 timeThresholdMs = 1000, ui64 bufferSizeLimitBytes = 1024 * 1024 * 300);
void Append(EComponent minVal, EComponent maxVal, EComponentToStringFunc func);
diff --git a/library/cpp/actors/core/log_ut.cpp b/library/cpp/actors/core/log_ut.cpp
index 09b5f88ea2c..e6508738cca 100644
--- a/library/cpp/actors/core/log_ut.cpp
+++ b/library/cpp/actors/core/log_ut.cpp
@@ -30,11 +30,31 @@ namespace {
EPriority::PRI_TRACE,
EPriority::PRI_DEBUG,
(ui32)0,
- timeThresholdMs);
+ timeThresholdMs,
+ (ui64)0);
s->Append(0, 1, ServiceToString);
return s;
}
+ TIntrusivePtr<TSettings> BufferSettings(ui64 bufferSizeLimitBytes) {
+ auto loggerId = TActorId{0, "Logger"};
+ auto s = MakeIntrusive<TSettings>(
+ loggerId,
+ 0,
+ EPriority::PRI_TRACE,
+ EPriority::PRI_DEBUG,
+ (ui32)0,
+ (ui32)0,
+ bufferSizeLimitBytes);
+ s->Append(0, 1, ServiceToString);
+ s->SetAllowDrop(true);
+ return s;
+ }
+
+ TIntrusivePtr<TSettings> NoBufferSettings() {
+ return BufferSettings(0);
+ }
+
class TMockBackend: public TLogBackend {
public:
using TWriteImpl = std::function<void(const TLogRecord&)>;
@@ -88,8 +108,12 @@ namespace {
Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(TInstant::Zero(), TLevel{EPrio::Emerg}, 0, "foo")});
}
- void WriteLog(TInstant ts) {
- Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(ts, TLevel{EPrio::Emerg}, 0, "foo")});
+ void WriteLog(TInstant ts, EPrio prio = EPrio::Emerg) {
+ Runtime.Send(new IEventHandle{LoggerActor, {}, new TEvLog(ts, TLevel{prio}, 0, "foo")});
+ }
+
+ void ReleaseLogBuffer() {
+ Runtime.Send(new IEventHandle{LoggerActor, {}, new TReleaseLogBuffer()});
}
void Wakeup() {
@@ -182,4 +206,42 @@ Y_UNIT_TEST_SUITE(TLoggerActorTest) {
UNIT_ASSERT_VALUES_EQUAL(messages.size(), COUNT + 1);
}
+
+ int BufferTest(TFixture &test, const int COUNT) {
+ TVector<TString> messages;
+ auto acceptWrites = [&] (const TLogRecord& r) {
+ messages.emplace_back(r.Data, r.Len);
+ };
+
+ test.LogBackend->SetWriteImpl(acceptWrites);
+ test.Wakeup();
+ test.Runtime.AdvanceCurrentTime(TDuration::Days(1));
+ auto now = test.Runtime.GetCurrentTime();
+
+ for (auto i = 0; i < COUNT; ++i) {
+ test.WriteLog(now - TDuration::Seconds(10), EPrio::Debug);
+ }
+
+ for (auto i = 0; i < COUNT; ++i) {
+ test.ReleaseLogBuffer();
+ }
+
+ return messages.size();
+ }
+
+ Y_UNIT_TEST(ShouldUseLogBufferWhenOverloaded) {
+ TFixture test{BufferSettings(1024 * 1024 * 300)};
+ const auto LOG_COUNT = 500;
+ auto outputLogSize = BufferTest(test, LOG_COUNT);
+
+ UNIT_ASSERT_VALUES_EQUAL(outputLogSize, LOG_COUNT);
+ }
+
+ Y_UNIT_TEST(ShouldLoseLogsWithoutBuffer) {
+ TFixture test{NoBufferSettings()};
+ const auto LOG_COUNT = 500;
+ auto outputLogSize = BufferTest(test, LOG_COUNT);
+
+ UNIT_ASSERT(outputLogSize < LOG_COUNT);
+ }
}