aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/helpers/pool_stats_collector.h
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@yandex-team.ru>2022-02-10 16:46:39 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:39 +0300
commit4a6816dea1bcaee46ce29a51a5fd7d3495012858 (patch)
treed98d6003e190b9e761bd77a83cd98428f9657b35 /library/cpp/actors/helpers/pool_stats_collector.h
parent7aa4cf700385ff96999c5cc301171ff157974773 (diff)
downloadydb-4a6816dea1bcaee46ce29a51a5fd7d3495012858.tar.gz
Restoring authorship annotation for <aozeritsky@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/helpers/pool_stats_collector.h')
-rw-r--r--library/cpp/actors/helpers/pool_stats_collector.h628
1 files changed, 314 insertions, 314 deletions
diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h
index 61d0b45780..78c842900e 100644
--- a/library/cpp/actors/helpers/pool_stats_collector.h
+++ b/library/cpp/actors/helpers/pool_stats_collector.h
@@ -1,314 +1,314 @@
-#pragma once
-
-#include <library/cpp/actors/core/actor_bootstrapped.h>
-#include <library/cpp/actors/core/actorsystem.h>
-#include <library/cpp/actors/core/executor_thread.h>
-#include <library/cpp/actors/core/hfunc.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
-
-#include <util/generic/vector.h>
-#include <util/generic/xrange.h>
-#include <util/string/printf.h>
-
-namespace NActors {
-
-// Periodically collects stats from executor threads and exposes them as mon counters
-class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
-private:
- struct THistogramCounters {
- void Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit, ui64 maxVal) {
- for (size_t i = 0; (1ull<<i) <= maxVal; ++i) {
- TString bucketName = ToString(1ull<<i) + " " + unit;
- Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", bucketName, true));
- }
- Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", "INF", true));
- }
-
- void Set(const TLogHistogram& data) {
- ui32 i = 0;
- for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i)
- *Buckets[i] = data.Buckets[i];
- ui64 last = 0;
- for (;i < Y_ARRAY_SIZE(data.Buckets); ++i)
- last += data.Buckets[i];
- *Buckets.back() = last;
- }
-
- void Set(const TLogHistogram& data, double factor) {
- ui32 i = 0;
- for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i)
- *Buckets[i] = data.Buckets[i]*factor;
- ui64 last = 0;
- for (;i < Y_ARRAY_SIZE(data.Buckets); ++i)
- last += data.Buckets[i];
- *Buckets.back() = last*factor;
- }
-
- private:
- TVector<NMonitoring::TDynamicCounters::TCounterPtr> Buckets;
- };
-
- struct TActivityStats {
- void Init(NMonitoring::TDynamicCounterPtr group) {
- Group = group;
-
- ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount());
- ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount());
- ActorsAliveByActivityBuckets.resize(GetActivityTypeCount());
- ScheduledEventsByActivityBuckets.resize(GetActivityTypeCount());
- }
-
- void Set(const TExecutorThreadStats& stats) {
- for (ui32 i : xrange(stats.MaxActivityType())) {
- Y_VERIFY(i < GetActivityTypeCount());
- ui64 ticks = stats.ElapsedTicksByActivity[i];
- ui64 events = stats.ReceivedEventsByActivity[i];
- ui64 actors = stats.ActorsAliveByActivity[i];
- ui64 scheduled = stats.ScheduledEventsByActivity[i];
-
- if (!ActorsAliveByActivityBuckets[i]) {
- if (ticks || events || actors || scheduled) {
- InitCountersForActivity(i);
- } else {
- continue;
- }
- }
-
- *ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000;
- *ReceivedEventsByActivityBuckets[i] = events;
- *ActorsAliveByActivityBuckets[i] = actors;
- *ScheduledEventsByActivityBuckets[i] = scheduled;
- }
- }
-
- private:
- void InitCountersForActivity(ui32 activityType) {
- Y_VERIFY(activityType < GetActivityTypeCount());
-
- auto bucketName = TString(GetActivityTypeName(activityType));
-
- ElapsedMicrosecByActivityBuckets[activityType] =
- Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true);
- ReceivedEventsByActivityBuckets[activityType] =
- Group->GetSubgroup("sensor", "ReceivedEventsByActivity")->GetNamedCounter("activity", bucketName, true);
- ActorsAliveByActivityBuckets[activityType] =
- Group->GetSubgroup("sensor", "ActorsAliveByActivity")->GetNamedCounter("activity", bucketName, false);
- ScheduledEventsByActivityBuckets[activityType] =
- Group->GetSubgroup("sensor", "ScheduledEventsByActivity")->GetNamedCounter("activity", bucketName, true);
- }
-
- private:
- NMonitoring::TDynamicCounterPtr Group;
-
- TVector<NMonitoring::TDynamicCounters::TCounterPtr> ElapsedMicrosecByActivityBuckets;
- TVector<NMonitoring::TDynamicCounters::TCounterPtr> ReceivedEventsByActivityBuckets;
- TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets;
- TVector<NMonitoring::TDynamicCounters::TCounterPtr> ScheduledEventsByActivityBuckets;
- };
-
- struct TExecutorPoolCounters {
- TIntrusivePtr<NMonitoring::TDynamicCounters> PoolGroup;
-
- NMonitoring::TDynamicCounters::TCounterPtr SentEvents;
- NMonitoring::TDynamicCounters::TCounterPtr ReceivedEvents;
- NMonitoring::TDynamicCounters::TCounterPtr PreemptedEvents;
- NMonitoring::TDynamicCounters::TCounterPtr NonDeliveredEvents;
- NMonitoring::TDynamicCounters::TCounterPtr DestroyedActors;
- NMonitoring::TDynamicCounters::TCounterPtr EmptyMailboxActivation;
- NMonitoring::TDynamicCounters::TCounterPtr CpuMicrosec;
- NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec;
- NMonitoring::TDynamicCounters::TCounterPtr ParkedMicrosec;
- NMonitoring::TDynamicCounters::TCounterPtr ActorRegistrations;
- NMonitoring::TDynamicCounters::TCounterPtr ActorsAlive;
- NMonitoring::TDynamicCounters::TCounterPtr AllocatedMailboxes;
- NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutBySoftPreemption;
- NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByTime;
- NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount;
-
- THistogramCounters LegacyActivationTimeHistogram;
- NMonitoring::THistogramPtr ActivationTimeHistogram;
- THistogramCounters LegacyEventDeliveryTimeHistogram;
- NMonitoring::THistogramPtr EventDeliveryTimeHistogram;
- THistogramCounters LegacyEventProcessingCountHistogram;
- NMonitoring::THistogramPtr EventProcessingCountHistogram;
- THistogramCounters LegacyEventProcessingTimeHistogram;
- NMonitoring::THistogramPtr EventProcessingTimeHistogram;
-
- TActivityStats ActivityStats;
- NMonitoring::TDynamicCounters::TCounterPtr MaxUtilizationTime;
-
- double Usage = 0;
- double LastElapsedSeconds = 0;
- THPTimer UsageTimer;
- TString Name;
- ui32 Threads;
-
- void Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads) {
- LastElapsedSeconds = 0;
- Usage = 0;
- UsageTimer.Reset();
- Name = poolName;
- Threads = threads;
-
- PoolGroup = group->GetSubgroup("execpool", poolName);
-
- SentEvents = PoolGroup->GetCounter("SentEvents", true);
- ReceivedEvents = PoolGroup->GetCounter("ReceivedEvents", true);
- PreemptedEvents = PoolGroup->GetCounter("PreemptedEvents", true);
- NonDeliveredEvents = PoolGroup->GetCounter("NonDeliveredEvents", true);
- DestroyedActors = PoolGroup->GetCounter("DestroyedActors", true);
- CpuMicrosec = PoolGroup->GetCounter("CpuMicrosec", true);
- ElapsedMicrosec = PoolGroup->GetCounter("ElapsedMicrosec", true);
- ParkedMicrosec = PoolGroup->GetCounter("ParkedMicrosec", true);
- EmptyMailboxActivation = PoolGroup->GetCounter("EmptyMailboxActivation", true);
- ActorRegistrations = PoolGroup->GetCounter("ActorRegistrations", true);
- ActorsAlive = PoolGroup->GetCounter("ActorsAlive", false);
- AllocatedMailboxes = PoolGroup->GetCounter("AllocatedMailboxes", false);
- MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true);
- MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true);
- MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true);
-
- LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000);
- ActivationTimeHistogram = PoolGroup->GetHistogram(
- "ActivationTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
- LegacyEventDeliveryTimeHistogram.Init(PoolGroup.Get(), "EventDeliveryTime", "usec", 5*1000*1000);
- EventDeliveryTimeHistogram = PoolGroup->GetHistogram(
- "EventDeliveryTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
- LegacyEventProcessingCountHistogram.Init(PoolGroup.Get(), "EventProcessingCount", "usec", 5*1000*1000);
- EventProcessingCountHistogram = PoolGroup->GetHistogram(
- "EventProcessingCountUs", NMonitoring::ExponentialHistogram(24, 2, 1));
- LegacyEventProcessingTimeHistogram.Init(PoolGroup.Get(), "EventProcessingTime", "usec", 5*1000*1000);
- EventProcessingTimeHistogram = PoolGroup->GetHistogram(
- "EventProcessingTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
-
- ActivityStats.Init(PoolGroup.Get());
-
- MaxUtilizationTime = PoolGroup->GetCounter("MaxUtilizationTime", true);
- }
-
- void Set(const TExecutorPoolStats& poolStats, const TExecutorThreadStats& stats, ui32 numThreads) {
-#ifdef ACTORSLIB_COLLECT_EXEC_STATS
- *SentEvents = stats.SentEvents;
- *ReceivedEvents = stats.ReceivedEvents;
- *PreemptedEvents = stats.PreemptedEvents;
- *NonDeliveredEvents = stats.NonDeliveredEvents;
- *DestroyedActors = stats.PoolDestroyedActors;
- *EmptyMailboxActivation = stats.EmptyMailboxActivation;
- *CpuMicrosec = stats.CpuNs / 1000;
- *ElapsedMicrosec = ::NHPTimer::GetSeconds(stats.ElapsedTicks)*1000000;
- *ParkedMicrosec = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000;
- *ActorRegistrations = stats.PoolActorRegistrations;
- *ActorsAlive = stats.PoolActorRegistrations - stats.PoolDestroyedActors;
- *AllocatedMailboxes = stats.PoolAllocatedMailboxes;
- *MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption;
- *MailboxPushedOutByTime = stats.MailboxPushedOutByTime;
- *MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount;
-
- LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram);
- ActivationTimeHistogram->Reset();
- ActivationTimeHistogram->Collect(stats.ActivationTimeHistogram);
-
- LegacyEventDeliveryTimeHistogram.Set(stats.EventDeliveryTimeHistogram);
- EventDeliveryTimeHistogram->Reset();
- EventDeliveryTimeHistogram->Collect(stats.EventDeliveryTimeHistogram);
-
- LegacyEventProcessingCountHistogram.Set(stats.EventProcessingCountHistogram);
- EventProcessingCountHistogram->Reset();
- EventProcessingCountHistogram->Collect(stats.EventProcessingCountHistogram);
-
- double toMicrosec = 1000000 / NHPTimer::GetClockRate();
- LegacyEventProcessingTimeHistogram.Set(stats.EventProcessingTimeHistogram, toMicrosec);
- EventProcessingTimeHistogram->Reset();
- for (ui32 i = 0; i < stats.EventProcessingTimeHistogram.Count(); ++i) {
- EventProcessingTimeHistogram->Collect(
- stats.EventProcessingTimeHistogram.UpperBound(i),
- stats.EventProcessingTimeHistogram.Value(i) * toMicrosec);
- }
-
- ActivityStats.Set(stats);
-
- *MaxUtilizationTime = poolStats.MaxUtilizationTime;
-
- double seconds = UsageTimer.PassedReset();
-
- // TODO[serxa]: It doesn't account for contention. Use 1 - parkedTicksDelta / seconds / numThreads KIKIMR-11916
- const double elapsed = NHPTimer::GetSeconds(stats.ElapsedTicks);
- const double currentUsage = numThreads > 0 ? ((elapsed - LastElapsedSeconds) / seconds / numThreads) : 0;
- LastElapsedSeconds = elapsed;
-
- // update usage factor according to smoothness
- const double smoothness = 0.5;
- Usage = currentUsage * smoothness + Usage * (1.0 - smoothness);
-#else
- Y_UNUSED(poolStats);
- Y_UNUSED(stats);
- Y_UNUSED(numThreads);
-#endif
- }
- };
-
-public:
- static constexpr IActor::EActivityType ActorActivityType() {
- return IActor::ACTORLIB_STATS;
- }
-
- TStatsCollectingActor(
- ui32 intervalSec,
- const TActorSystemSetup& setup,
- NMonitoring::TDynamicCounterPtr counters)
- : IntervalSec(intervalSec)
- , Counters(counters)
- {
- PoolCounters.resize(setup.GetExecutorsCount());
- for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) {
- PoolCounters[poolId].Init(Counters.Get(), setup.GetPoolName(poolId), setup.GetThreads(poolId));
- }
- }
-
- void Bootstrap(const TActorContext& ctx) {
- ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup());
- Become(&TThis::StateWork);
- }
-
- STFUNC(StateWork) {
- switch (ev->GetTypeRewrite()) {
- CFunc(TEvents::TSystem::Wakeup, Wakeup);
- }
- }
-
-private:
- virtual void OnWakeup(const TActorContext &ctx) {
- Y_UNUSED(ctx);
- }
-
- void Wakeup(const TActorContext &ctx) {
- for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) {
- TVector<TExecutorThreadStats> stats;
- TExecutorPoolStats poolStats;
- ctx.ExecutorThread.ActorSystem->GetPoolStats(poolId, poolStats, stats);
- SetAggregatedCounters(PoolCounters[poolId], poolStats, stats);
- }
-
- OnWakeup(ctx);
-
- ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup());
- }
-
- void SetAggregatedCounters(TExecutorPoolCounters& poolCounters, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& stats) {
- // Sum all per-thread counters into the 0th element
- for (ui32 idx = 1; idx < stats.size(); ++idx) {
- stats[0].Aggregate(stats[idx]);
- }
- if (stats.size()) {
- poolCounters.Set(poolStats, stats[0], stats.size() - 1);
- }
- }
-
-protected:
- const ui32 IntervalSec;
- NMonitoring::TDynamicCounterPtr Counters;
-
- TVector<TExecutorPoolCounters> PoolCounters;
-};
-
-} // NActors
+#pragma once
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/executor_thread.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+#include <util/generic/vector.h>
+#include <util/generic/xrange.h>
+#include <util/string/printf.h>
+
+namespace NActors {
+
+// Periodically collects stats from executor threads and exposes them as mon counters
+class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
+private:
+ struct THistogramCounters {
+ void Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit, ui64 maxVal) {
+ for (size_t i = 0; (1ull<<i) <= maxVal; ++i) {
+ TString bucketName = ToString(1ull<<i) + " " + unit;
+ Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", bucketName, true));
+ }
+ Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", "INF", true));
+ }
+
+ void Set(const TLogHistogram& data) {
+ ui32 i = 0;
+ for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i)
+ *Buckets[i] = data.Buckets[i];
+ ui64 last = 0;
+ for (;i < Y_ARRAY_SIZE(data.Buckets); ++i)
+ last += data.Buckets[i];
+ *Buckets.back() = last;
+ }
+
+ void Set(const TLogHistogram& data, double factor) {
+ ui32 i = 0;
+ for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i)
+ *Buckets[i] = data.Buckets[i]*factor;
+ ui64 last = 0;
+ for (;i < Y_ARRAY_SIZE(data.Buckets); ++i)
+ last += data.Buckets[i];
+ *Buckets.back() = last*factor;
+ }
+
+ private:
+ TVector<NMonitoring::TDynamicCounters::TCounterPtr> Buckets;
+ };
+
+ struct TActivityStats {
+ void Init(NMonitoring::TDynamicCounterPtr group) {
+ Group = group;
+
+ ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount());
+ ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount());
+ ActorsAliveByActivityBuckets.resize(GetActivityTypeCount());
+ ScheduledEventsByActivityBuckets.resize(GetActivityTypeCount());
+ }
+
+ void Set(const TExecutorThreadStats& stats) {
+ for (ui32 i : xrange(stats.MaxActivityType())) {
+ Y_VERIFY(i < GetActivityTypeCount());
+ ui64 ticks = stats.ElapsedTicksByActivity[i];
+ ui64 events = stats.ReceivedEventsByActivity[i];
+ ui64 actors = stats.ActorsAliveByActivity[i];
+ ui64 scheduled = stats.ScheduledEventsByActivity[i];
+
+ if (!ActorsAliveByActivityBuckets[i]) {
+ if (ticks || events || actors || scheduled) {
+ InitCountersForActivity(i);
+ } else {
+ continue;
+ }
+ }
+
+ *ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000;
+ *ReceivedEventsByActivityBuckets[i] = events;
+ *ActorsAliveByActivityBuckets[i] = actors;
+ *ScheduledEventsByActivityBuckets[i] = scheduled;
+ }
+ }
+
+ private:
+ void InitCountersForActivity(ui32 activityType) {
+ Y_VERIFY(activityType < GetActivityTypeCount());
+
+ auto bucketName = TString(GetActivityTypeName(activityType));
+
+ ElapsedMicrosecByActivityBuckets[activityType] =
+ Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true);
+ ReceivedEventsByActivityBuckets[activityType] =
+ Group->GetSubgroup("sensor", "ReceivedEventsByActivity")->GetNamedCounter("activity", bucketName, true);
+ ActorsAliveByActivityBuckets[activityType] =
+ Group->GetSubgroup("sensor", "ActorsAliveByActivity")->GetNamedCounter("activity", bucketName, false);
+ ScheduledEventsByActivityBuckets[activityType] =
+ Group->GetSubgroup("sensor", "ScheduledEventsByActivity")->GetNamedCounter("activity", bucketName, true);
+ }
+
+ private:
+ NMonitoring::TDynamicCounterPtr Group;
+
+ TVector<NMonitoring::TDynamicCounters::TCounterPtr> ElapsedMicrosecByActivityBuckets;
+ TVector<NMonitoring::TDynamicCounters::TCounterPtr> ReceivedEventsByActivityBuckets;
+ TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets;
+ TVector<NMonitoring::TDynamicCounters::TCounterPtr> ScheduledEventsByActivityBuckets;
+ };
+
+ struct TExecutorPoolCounters {
+ TIntrusivePtr<NMonitoring::TDynamicCounters> PoolGroup;
+
+ NMonitoring::TDynamicCounters::TCounterPtr SentEvents;
+ NMonitoring::TDynamicCounters::TCounterPtr ReceivedEvents;
+ NMonitoring::TDynamicCounters::TCounterPtr PreemptedEvents;
+ NMonitoring::TDynamicCounters::TCounterPtr NonDeliveredEvents;
+ NMonitoring::TDynamicCounters::TCounterPtr DestroyedActors;
+ NMonitoring::TDynamicCounters::TCounterPtr EmptyMailboxActivation;
+ NMonitoring::TDynamicCounters::TCounterPtr CpuMicrosec;
+ NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec;
+ NMonitoring::TDynamicCounters::TCounterPtr ParkedMicrosec;
+ NMonitoring::TDynamicCounters::TCounterPtr ActorRegistrations;
+ NMonitoring::TDynamicCounters::TCounterPtr ActorsAlive;
+ NMonitoring::TDynamicCounters::TCounterPtr AllocatedMailboxes;
+ NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutBySoftPreemption;
+ NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByTime;
+ NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount;
+
+ THistogramCounters LegacyActivationTimeHistogram;
+ NMonitoring::THistogramPtr ActivationTimeHistogram;
+ THistogramCounters LegacyEventDeliveryTimeHistogram;
+ NMonitoring::THistogramPtr EventDeliveryTimeHistogram;
+ THistogramCounters LegacyEventProcessingCountHistogram;
+ NMonitoring::THistogramPtr EventProcessingCountHistogram;
+ THistogramCounters LegacyEventProcessingTimeHistogram;
+ NMonitoring::THistogramPtr EventProcessingTimeHistogram;
+
+ TActivityStats ActivityStats;
+ NMonitoring::TDynamicCounters::TCounterPtr MaxUtilizationTime;
+
+ double Usage = 0;
+ double LastElapsedSeconds = 0;
+ THPTimer UsageTimer;
+ TString Name;
+ ui32 Threads;
+
+ void Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads) {
+ LastElapsedSeconds = 0;
+ Usage = 0;
+ UsageTimer.Reset();
+ Name = poolName;
+ Threads = threads;
+
+ PoolGroup = group->GetSubgroup("execpool", poolName);
+
+ SentEvents = PoolGroup->GetCounter("SentEvents", true);
+ ReceivedEvents = PoolGroup->GetCounter("ReceivedEvents", true);
+ PreemptedEvents = PoolGroup->GetCounter("PreemptedEvents", true);
+ NonDeliveredEvents = PoolGroup->GetCounter("NonDeliveredEvents", true);
+ DestroyedActors = PoolGroup->GetCounter("DestroyedActors", true);
+ CpuMicrosec = PoolGroup->GetCounter("CpuMicrosec", true);
+ ElapsedMicrosec = PoolGroup->GetCounter("ElapsedMicrosec", true);
+ ParkedMicrosec = PoolGroup->GetCounter("ParkedMicrosec", true);
+ EmptyMailboxActivation = PoolGroup->GetCounter("EmptyMailboxActivation", true);
+ ActorRegistrations = PoolGroup->GetCounter("ActorRegistrations", true);
+ ActorsAlive = PoolGroup->GetCounter("ActorsAlive", false);
+ AllocatedMailboxes = PoolGroup->GetCounter("AllocatedMailboxes", false);
+ MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true);
+ MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true);
+ MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true);
+
+ LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000);
+ ActivationTimeHistogram = PoolGroup->GetHistogram(
+ "ActivationTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
+ LegacyEventDeliveryTimeHistogram.Init(PoolGroup.Get(), "EventDeliveryTime", "usec", 5*1000*1000);
+ EventDeliveryTimeHistogram = PoolGroup->GetHistogram(
+ "EventDeliveryTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
+ LegacyEventProcessingCountHistogram.Init(PoolGroup.Get(), "EventProcessingCount", "usec", 5*1000*1000);
+ EventProcessingCountHistogram = PoolGroup->GetHistogram(
+ "EventProcessingCountUs", NMonitoring::ExponentialHistogram(24, 2, 1));
+ LegacyEventProcessingTimeHistogram.Init(PoolGroup.Get(), "EventProcessingTime", "usec", 5*1000*1000);
+ EventProcessingTimeHistogram = PoolGroup->GetHistogram(
+ "EventProcessingTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
+
+ ActivityStats.Init(PoolGroup.Get());
+
+ MaxUtilizationTime = PoolGroup->GetCounter("MaxUtilizationTime", true);
+ }
+
+ void Set(const TExecutorPoolStats& poolStats, const TExecutorThreadStats& stats, ui32 numThreads) {
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+ *SentEvents = stats.SentEvents;
+ *ReceivedEvents = stats.ReceivedEvents;
+ *PreemptedEvents = stats.PreemptedEvents;
+ *NonDeliveredEvents = stats.NonDeliveredEvents;
+ *DestroyedActors = stats.PoolDestroyedActors;
+ *EmptyMailboxActivation = stats.EmptyMailboxActivation;
+ *CpuMicrosec = stats.CpuNs / 1000;
+ *ElapsedMicrosec = ::NHPTimer::GetSeconds(stats.ElapsedTicks)*1000000;
+ *ParkedMicrosec = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000;
+ *ActorRegistrations = stats.PoolActorRegistrations;
+ *ActorsAlive = stats.PoolActorRegistrations - stats.PoolDestroyedActors;
+ *AllocatedMailboxes = stats.PoolAllocatedMailboxes;
+ *MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption;
+ *MailboxPushedOutByTime = stats.MailboxPushedOutByTime;
+ *MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount;
+
+ LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram);
+ ActivationTimeHistogram->Reset();
+ ActivationTimeHistogram->Collect(stats.ActivationTimeHistogram);
+
+ LegacyEventDeliveryTimeHistogram.Set(stats.EventDeliveryTimeHistogram);
+ EventDeliveryTimeHistogram->Reset();
+ EventDeliveryTimeHistogram->Collect(stats.EventDeliveryTimeHistogram);
+
+ LegacyEventProcessingCountHistogram.Set(stats.EventProcessingCountHistogram);
+ EventProcessingCountHistogram->Reset();
+ EventProcessingCountHistogram->Collect(stats.EventProcessingCountHistogram);
+
+ double toMicrosec = 1000000 / NHPTimer::GetClockRate();
+ LegacyEventProcessingTimeHistogram.Set(stats.EventProcessingTimeHistogram, toMicrosec);
+ EventProcessingTimeHistogram->Reset();
+ for (ui32 i = 0; i < stats.EventProcessingTimeHistogram.Count(); ++i) {
+ EventProcessingTimeHistogram->Collect(
+ stats.EventProcessingTimeHistogram.UpperBound(i),
+ stats.EventProcessingTimeHistogram.Value(i) * toMicrosec);
+ }
+
+ ActivityStats.Set(stats);
+
+ *MaxUtilizationTime = poolStats.MaxUtilizationTime;
+
+ double seconds = UsageTimer.PassedReset();
+
+ // TODO[serxa]: It doesn't account for contention. Use 1 - parkedTicksDelta / seconds / numThreads KIKIMR-11916
+ const double elapsed = NHPTimer::GetSeconds(stats.ElapsedTicks);
+ const double currentUsage = numThreads > 0 ? ((elapsed - LastElapsedSeconds) / seconds / numThreads) : 0;
+ LastElapsedSeconds = elapsed;
+
+ // update usage factor according to smoothness
+ const double smoothness = 0.5;
+ Usage = currentUsage * smoothness + Usage * (1.0 - smoothness);
+#else
+ Y_UNUSED(poolStats);
+ Y_UNUSED(stats);
+ Y_UNUSED(numThreads);
+#endif
+ }
+ };
+
+public:
+ static constexpr IActor::EActivityType ActorActivityType() {
+ return IActor::ACTORLIB_STATS;
+ }
+
+ TStatsCollectingActor(
+ ui32 intervalSec,
+ const TActorSystemSetup& setup,
+ NMonitoring::TDynamicCounterPtr counters)
+ : IntervalSec(intervalSec)
+ , Counters(counters)
+ {
+ PoolCounters.resize(setup.GetExecutorsCount());
+ for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) {
+ PoolCounters[poolId].Init(Counters.Get(), setup.GetPoolName(poolId), setup.GetThreads(poolId));
+ }
+ }
+
+ void Bootstrap(const TActorContext& ctx) {
+ ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup());
+ Become(&TThis::StateWork);
+ }
+
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ CFunc(TEvents::TSystem::Wakeup, Wakeup);
+ }
+ }
+
+private:
+ virtual void OnWakeup(const TActorContext &ctx) {
+ Y_UNUSED(ctx);
+ }
+
+ void Wakeup(const TActorContext &ctx) {
+ for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) {
+ TVector<TExecutorThreadStats> stats;
+ TExecutorPoolStats poolStats;
+ ctx.ExecutorThread.ActorSystem->GetPoolStats(poolId, poolStats, stats);
+ SetAggregatedCounters(PoolCounters[poolId], poolStats, stats);
+ }
+
+ OnWakeup(ctx);
+
+ ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup());
+ }
+
+ void SetAggregatedCounters(TExecutorPoolCounters& poolCounters, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& stats) {
+ // Sum all per-thread counters into the 0th element
+ for (ui32 idx = 1; idx < stats.size(); ++idx) {
+ stats[0].Aggregate(stats[idx]);
+ }
+ if (stats.size()) {
+ poolCounters.Set(poolStats, stats[0], stats.size() - 1);
+ }
+ }
+
+protected:
+ const ui32 IntervalSec;
+ NMonitoring::TDynamicCounterPtr Counters;
+
+ TVector<TExecutorPoolCounters> PoolCounters;
+};
+
+} // NActors