diff options
author | aozeritsky <aozeritsky@yandex-team.ru> | 2022-02-10 16:46:39 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:39 +0300 |
commit | 4a6816dea1bcaee46ce29a51a5fd7d3495012858 (patch) | |
tree | d98d6003e190b9e761bd77a83cd98428f9657b35 /library/cpp/actors/helpers/pool_stats_collector.h | |
parent | 7aa4cf700385ff96999c5cc301171ff157974773 (diff) | |
download | ydb-4a6816dea1bcaee46ce29a51a5fd7d3495012858.tar.gz |
Restoring authorship annotation for <aozeritsky@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/helpers/pool_stats_collector.h')
-rw-r--r-- | library/cpp/actors/helpers/pool_stats_collector.h | 628 |
1 files changed, 314 insertions, 314 deletions
diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h index 61d0b45780..78c842900e 100644 --- a/library/cpp/actors/helpers/pool_stats_collector.h +++ b/library/cpp/actors/helpers/pool_stats_collector.h @@ -1,314 +1,314 @@ -#pragma once - -#include <library/cpp/actors/core/actor_bootstrapped.h> -#include <library/cpp/actors/core/actorsystem.h> -#include <library/cpp/actors/core/executor_thread.h> -#include <library/cpp/actors/core/hfunc.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> - -#include <util/generic/vector.h> -#include <util/generic/xrange.h> -#include <util/string/printf.h> - -namespace NActors { - -// Periodically collects stats from executor threads and exposes them as mon counters -class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> { -private: - struct THistogramCounters { - void Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit, ui64 maxVal) { - for (size_t i = 0; (1ull<<i) <= maxVal; ++i) { - TString bucketName = ToString(1ull<<i) + " " + unit; - Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", bucketName, true)); - } - Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", "INF", true)); - } - - void Set(const TLogHistogram& data) { - ui32 i = 0; - for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i) - *Buckets[i] = data.Buckets[i]; - ui64 last = 0; - for (;i < Y_ARRAY_SIZE(data.Buckets); ++i) - last += data.Buckets[i]; - *Buckets.back() = last; - } - - void Set(const TLogHistogram& data, double factor) { - ui32 i = 0; - for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i) - *Buckets[i] = data.Buckets[i]*factor; - ui64 last = 0; - for (;i < Y_ARRAY_SIZE(data.Buckets); ++i) - last += data.Buckets[i]; - *Buckets.back() = last*factor; - } - - private: - TVector<NMonitoring::TDynamicCounters::TCounterPtr> Buckets; - }; - - struct TActivityStats { - void Init(NMonitoring::TDynamicCounterPtr group) { - Group = group; - - ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount()); - ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount()); - ActorsAliveByActivityBuckets.resize(GetActivityTypeCount()); - ScheduledEventsByActivityBuckets.resize(GetActivityTypeCount()); - } - - void Set(const TExecutorThreadStats& stats) { - for (ui32 i : xrange(stats.MaxActivityType())) { - Y_VERIFY(i < GetActivityTypeCount()); - ui64 ticks = stats.ElapsedTicksByActivity[i]; - ui64 events = stats.ReceivedEventsByActivity[i]; - ui64 actors = stats.ActorsAliveByActivity[i]; - ui64 scheduled = stats.ScheduledEventsByActivity[i]; - - if (!ActorsAliveByActivityBuckets[i]) { - if (ticks || events || actors || scheduled) { - InitCountersForActivity(i); - } else { - continue; - } - } - - *ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000; - *ReceivedEventsByActivityBuckets[i] = events; - *ActorsAliveByActivityBuckets[i] = actors; - *ScheduledEventsByActivityBuckets[i] = scheduled; - } - } - - private: - void InitCountersForActivity(ui32 activityType) { - Y_VERIFY(activityType < GetActivityTypeCount()); - - auto bucketName = TString(GetActivityTypeName(activityType)); - - ElapsedMicrosecByActivityBuckets[activityType] = - Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true); - ReceivedEventsByActivityBuckets[activityType] = - Group->GetSubgroup("sensor", "ReceivedEventsByActivity")->GetNamedCounter("activity", bucketName, true); - ActorsAliveByActivityBuckets[activityType] = - Group->GetSubgroup("sensor", "ActorsAliveByActivity")->GetNamedCounter("activity", bucketName, false); - ScheduledEventsByActivityBuckets[activityType] = - Group->GetSubgroup("sensor", "ScheduledEventsByActivity")->GetNamedCounter("activity", bucketName, true); - } - - private: - NMonitoring::TDynamicCounterPtr Group; - - TVector<NMonitoring::TDynamicCounters::TCounterPtr> ElapsedMicrosecByActivityBuckets; - TVector<NMonitoring::TDynamicCounters::TCounterPtr> ReceivedEventsByActivityBuckets; - TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets; - TVector<NMonitoring::TDynamicCounters::TCounterPtr> ScheduledEventsByActivityBuckets; - }; - - struct TExecutorPoolCounters { - TIntrusivePtr<NMonitoring::TDynamicCounters> PoolGroup; - - NMonitoring::TDynamicCounters::TCounterPtr SentEvents; - NMonitoring::TDynamicCounters::TCounterPtr ReceivedEvents; - NMonitoring::TDynamicCounters::TCounterPtr PreemptedEvents; - NMonitoring::TDynamicCounters::TCounterPtr NonDeliveredEvents; - NMonitoring::TDynamicCounters::TCounterPtr DestroyedActors; - NMonitoring::TDynamicCounters::TCounterPtr EmptyMailboxActivation; - NMonitoring::TDynamicCounters::TCounterPtr CpuMicrosec; - NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec; - NMonitoring::TDynamicCounters::TCounterPtr ParkedMicrosec; - NMonitoring::TDynamicCounters::TCounterPtr ActorRegistrations; - NMonitoring::TDynamicCounters::TCounterPtr ActorsAlive; - NMonitoring::TDynamicCounters::TCounterPtr AllocatedMailboxes; - NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutBySoftPreemption; - NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByTime; - NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount; - - THistogramCounters LegacyActivationTimeHistogram; - NMonitoring::THistogramPtr ActivationTimeHistogram; - THistogramCounters LegacyEventDeliveryTimeHistogram; - NMonitoring::THistogramPtr EventDeliveryTimeHistogram; - THistogramCounters LegacyEventProcessingCountHistogram; - NMonitoring::THistogramPtr EventProcessingCountHistogram; - THistogramCounters LegacyEventProcessingTimeHistogram; - NMonitoring::THistogramPtr EventProcessingTimeHistogram; - - TActivityStats ActivityStats; - NMonitoring::TDynamicCounters::TCounterPtr MaxUtilizationTime; - - double Usage = 0; - double LastElapsedSeconds = 0; - THPTimer UsageTimer; - TString Name; - ui32 Threads; - - void Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads) { - LastElapsedSeconds = 0; - Usage = 0; - UsageTimer.Reset(); - Name = poolName; - Threads = threads; - - PoolGroup = group->GetSubgroup("execpool", poolName); - - SentEvents = PoolGroup->GetCounter("SentEvents", true); - ReceivedEvents = PoolGroup->GetCounter("ReceivedEvents", true); - PreemptedEvents = PoolGroup->GetCounter("PreemptedEvents", true); - NonDeliveredEvents = PoolGroup->GetCounter("NonDeliveredEvents", true); - DestroyedActors = PoolGroup->GetCounter("DestroyedActors", true); - CpuMicrosec = PoolGroup->GetCounter("CpuMicrosec", true); - ElapsedMicrosec = PoolGroup->GetCounter("ElapsedMicrosec", true); - ParkedMicrosec = PoolGroup->GetCounter("ParkedMicrosec", true); - EmptyMailboxActivation = PoolGroup->GetCounter("EmptyMailboxActivation", true); - ActorRegistrations = PoolGroup->GetCounter("ActorRegistrations", true); - ActorsAlive = PoolGroup->GetCounter("ActorsAlive", false); - AllocatedMailboxes = PoolGroup->GetCounter("AllocatedMailboxes", false); - MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true); - MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true); - MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true); - - LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000); - ActivationTimeHistogram = PoolGroup->GetHistogram( - "ActivationTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1)); - LegacyEventDeliveryTimeHistogram.Init(PoolGroup.Get(), "EventDeliveryTime", "usec", 5*1000*1000); - EventDeliveryTimeHistogram = PoolGroup->GetHistogram( - "EventDeliveryTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1)); - LegacyEventProcessingCountHistogram.Init(PoolGroup.Get(), "EventProcessingCount", "usec", 5*1000*1000); - EventProcessingCountHistogram = PoolGroup->GetHistogram( - "EventProcessingCountUs", NMonitoring::ExponentialHistogram(24, 2, 1)); - LegacyEventProcessingTimeHistogram.Init(PoolGroup.Get(), "EventProcessingTime", "usec", 5*1000*1000); - EventProcessingTimeHistogram = PoolGroup->GetHistogram( - "EventProcessingTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1)); - - ActivityStats.Init(PoolGroup.Get()); - - MaxUtilizationTime = PoolGroup->GetCounter("MaxUtilizationTime", true); - } - - void Set(const TExecutorPoolStats& poolStats, const TExecutorThreadStats& stats, ui32 numThreads) { -#ifdef ACTORSLIB_COLLECT_EXEC_STATS - *SentEvents = stats.SentEvents; - *ReceivedEvents = stats.ReceivedEvents; - *PreemptedEvents = stats.PreemptedEvents; - *NonDeliveredEvents = stats.NonDeliveredEvents; - *DestroyedActors = stats.PoolDestroyedActors; - *EmptyMailboxActivation = stats.EmptyMailboxActivation; - *CpuMicrosec = stats.CpuNs / 1000; - *ElapsedMicrosec = ::NHPTimer::GetSeconds(stats.ElapsedTicks)*1000000; - *ParkedMicrosec = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000; - *ActorRegistrations = stats.PoolActorRegistrations; - *ActorsAlive = stats.PoolActorRegistrations - stats.PoolDestroyedActors; - *AllocatedMailboxes = stats.PoolAllocatedMailboxes; - *MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption; - *MailboxPushedOutByTime = stats.MailboxPushedOutByTime; - *MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount; - - LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram); - ActivationTimeHistogram->Reset(); - ActivationTimeHistogram->Collect(stats.ActivationTimeHistogram); - - LegacyEventDeliveryTimeHistogram.Set(stats.EventDeliveryTimeHistogram); - EventDeliveryTimeHistogram->Reset(); - EventDeliveryTimeHistogram->Collect(stats.EventDeliveryTimeHistogram); - - LegacyEventProcessingCountHistogram.Set(stats.EventProcessingCountHistogram); - EventProcessingCountHistogram->Reset(); - EventProcessingCountHistogram->Collect(stats.EventProcessingCountHistogram); - - double toMicrosec = 1000000 / NHPTimer::GetClockRate(); - LegacyEventProcessingTimeHistogram.Set(stats.EventProcessingTimeHistogram, toMicrosec); - EventProcessingTimeHistogram->Reset(); - for (ui32 i = 0; i < stats.EventProcessingTimeHistogram.Count(); ++i) { - EventProcessingTimeHistogram->Collect( - stats.EventProcessingTimeHistogram.UpperBound(i), - stats.EventProcessingTimeHistogram.Value(i) * toMicrosec); - } - - ActivityStats.Set(stats); - - *MaxUtilizationTime = poolStats.MaxUtilizationTime; - - double seconds = UsageTimer.PassedReset(); - - // TODO[serxa]: It doesn't account for contention. Use 1 - parkedTicksDelta / seconds / numThreads KIKIMR-11916 - const double elapsed = NHPTimer::GetSeconds(stats.ElapsedTicks); - const double currentUsage = numThreads > 0 ? ((elapsed - LastElapsedSeconds) / seconds / numThreads) : 0; - LastElapsedSeconds = elapsed; - - // update usage factor according to smoothness - const double smoothness = 0.5; - Usage = currentUsage * smoothness + Usage * (1.0 - smoothness); -#else - Y_UNUSED(poolStats); - Y_UNUSED(stats); - Y_UNUSED(numThreads); -#endif - } - }; - -public: - static constexpr IActor::EActivityType ActorActivityType() { - return IActor::ACTORLIB_STATS; - } - - TStatsCollectingActor( - ui32 intervalSec, - const TActorSystemSetup& setup, - NMonitoring::TDynamicCounterPtr counters) - : IntervalSec(intervalSec) - , Counters(counters) - { - PoolCounters.resize(setup.GetExecutorsCount()); - for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) { - PoolCounters[poolId].Init(Counters.Get(), setup.GetPoolName(poolId), setup.GetThreads(poolId)); - } - } - - void Bootstrap(const TActorContext& ctx) { - ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup()); - Become(&TThis::StateWork); - } - - STFUNC(StateWork) { - switch (ev->GetTypeRewrite()) { - CFunc(TEvents::TSystem::Wakeup, Wakeup); - } - } - -private: - virtual void OnWakeup(const TActorContext &ctx) { - Y_UNUSED(ctx); - } - - void Wakeup(const TActorContext &ctx) { - for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) { - TVector<TExecutorThreadStats> stats; - TExecutorPoolStats poolStats; - ctx.ExecutorThread.ActorSystem->GetPoolStats(poolId, poolStats, stats); - SetAggregatedCounters(PoolCounters[poolId], poolStats, stats); - } - - OnWakeup(ctx); - - ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup()); - } - - void SetAggregatedCounters(TExecutorPoolCounters& poolCounters, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& stats) { - // Sum all per-thread counters into the 0th element - for (ui32 idx = 1; idx < stats.size(); ++idx) { - stats[0].Aggregate(stats[idx]); - } - if (stats.size()) { - poolCounters.Set(poolStats, stats[0], stats.size() - 1); - } - } - -protected: - const ui32 IntervalSec; - NMonitoring::TDynamicCounterPtr Counters; - - TVector<TExecutorPoolCounters> PoolCounters; -}; - -} // NActors +#pragma once + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/executor_thread.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> + +#include <util/generic/vector.h> +#include <util/generic/xrange.h> +#include <util/string/printf.h> + +namespace NActors { + +// Periodically collects stats from executor threads and exposes them as mon counters +class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> { +private: + struct THistogramCounters { + void Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit, ui64 maxVal) { + for (size_t i = 0; (1ull<<i) <= maxVal; ++i) { + TString bucketName = ToString(1ull<<i) + " " + unit; + Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", bucketName, true)); + } + Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", "INF", true)); + } + + void Set(const TLogHistogram& data) { + ui32 i = 0; + for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i) + *Buckets[i] = data.Buckets[i]; + ui64 last = 0; + for (;i < Y_ARRAY_SIZE(data.Buckets); ++i) + last += data.Buckets[i]; + *Buckets.back() = last; + } + + void Set(const TLogHistogram& data, double factor) { + ui32 i = 0; + for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i) + *Buckets[i] = data.Buckets[i]*factor; + ui64 last = 0; + for (;i < Y_ARRAY_SIZE(data.Buckets); ++i) + last += data.Buckets[i]; + *Buckets.back() = last*factor; + } + + private: + TVector<NMonitoring::TDynamicCounters::TCounterPtr> Buckets; + }; + + struct TActivityStats { + void Init(NMonitoring::TDynamicCounterPtr group) { + Group = group; + + ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount()); + ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount()); + ActorsAliveByActivityBuckets.resize(GetActivityTypeCount()); + ScheduledEventsByActivityBuckets.resize(GetActivityTypeCount()); + } + + void Set(const TExecutorThreadStats& stats) { + for (ui32 i : xrange(stats.MaxActivityType())) { + Y_VERIFY(i < GetActivityTypeCount()); + ui64 ticks = stats.ElapsedTicksByActivity[i]; + ui64 events = stats.ReceivedEventsByActivity[i]; + ui64 actors = stats.ActorsAliveByActivity[i]; + ui64 scheduled = stats.ScheduledEventsByActivity[i]; + + if (!ActorsAliveByActivityBuckets[i]) { + if (ticks || events || actors || scheduled) { + InitCountersForActivity(i); + } else { + continue; + } + } + + *ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000; + *ReceivedEventsByActivityBuckets[i] = events; + *ActorsAliveByActivityBuckets[i] = actors; + *ScheduledEventsByActivityBuckets[i] = scheduled; + } + } + + private: + void InitCountersForActivity(ui32 activityType) { + Y_VERIFY(activityType < GetActivityTypeCount()); + + auto bucketName = TString(GetActivityTypeName(activityType)); + + ElapsedMicrosecByActivityBuckets[activityType] = + Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true); + ReceivedEventsByActivityBuckets[activityType] = + Group->GetSubgroup("sensor", "ReceivedEventsByActivity")->GetNamedCounter("activity", bucketName, true); + ActorsAliveByActivityBuckets[activityType] = + Group->GetSubgroup("sensor", "ActorsAliveByActivity")->GetNamedCounter("activity", bucketName, false); + ScheduledEventsByActivityBuckets[activityType] = + Group->GetSubgroup("sensor", "ScheduledEventsByActivity")->GetNamedCounter("activity", bucketName, true); + } + + private: + NMonitoring::TDynamicCounterPtr Group; + + TVector<NMonitoring::TDynamicCounters::TCounterPtr> ElapsedMicrosecByActivityBuckets; + TVector<NMonitoring::TDynamicCounters::TCounterPtr> ReceivedEventsByActivityBuckets; + TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets; + TVector<NMonitoring::TDynamicCounters::TCounterPtr> ScheduledEventsByActivityBuckets; + }; + + struct TExecutorPoolCounters { + TIntrusivePtr<NMonitoring::TDynamicCounters> PoolGroup; + + NMonitoring::TDynamicCounters::TCounterPtr SentEvents; + NMonitoring::TDynamicCounters::TCounterPtr ReceivedEvents; + NMonitoring::TDynamicCounters::TCounterPtr PreemptedEvents; + NMonitoring::TDynamicCounters::TCounterPtr NonDeliveredEvents; + NMonitoring::TDynamicCounters::TCounterPtr DestroyedActors; + NMonitoring::TDynamicCounters::TCounterPtr EmptyMailboxActivation; + NMonitoring::TDynamicCounters::TCounterPtr CpuMicrosec; + NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec; + NMonitoring::TDynamicCounters::TCounterPtr ParkedMicrosec; + NMonitoring::TDynamicCounters::TCounterPtr ActorRegistrations; + NMonitoring::TDynamicCounters::TCounterPtr ActorsAlive; + NMonitoring::TDynamicCounters::TCounterPtr AllocatedMailboxes; + NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutBySoftPreemption; + NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByTime; + NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount; + + THistogramCounters LegacyActivationTimeHistogram; + NMonitoring::THistogramPtr ActivationTimeHistogram; + THistogramCounters LegacyEventDeliveryTimeHistogram; + NMonitoring::THistogramPtr EventDeliveryTimeHistogram; + THistogramCounters LegacyEventProcessingCountHistogram; + NMonitoring::THistogramPtr EventProcessingCountHistogram; + THistogramCounters LegacyEventProcessingTimeHistogram; + NMonitoring::THistogramPtr EventProcessingTimeHistogram; + + TActivityStats ActivityStats; + NMonitoring::TDynamicCounters::TCounterPtr MaxUtilizationTime; + + double Usage = 0; + double LastElapsedSeconds = 0; + THPTimer UsageTimer; + TString Name; + ui32 Threads; + + void Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads) { + LastElapsedSeconds = 0; + Usage = 0; + UsageTimer.Reset(); + Name = poolName; + Threads = threads; + + PoolGroup = group->GetSubgroup("execpool", poolName); + + SentEvents = PoolGroup->GetCounter("SentEvents", true); + ReceivedEvents = PoolGroup->GetCounter("ReceivedEvents", true); + PreemptedEvents = PoolGroup->GetCounter("PreemptedEvents", true); + NonDeliveredEvents = PoolGroup->GetCounter("NonDeliveredEvents", true); + DestroyedActors = PoolGroup->GetCounter("DestroyedActors", true); + CpuMicrosec = PoolGroup->GetCounter("CpuMicrosec", true); + ElapsedMicrosec = PoolGroup->GetCounter("ElapsedMicrosec", true); + ParkedMicrosec = PoolGroup->GetCounter("ParkedMicrosec", true); + EmptyMailboxActivation = PoolGroup->GetCounter("EmptyMailboxActivation", true); + ActorRegistrations = PoolGroup->GetCounter("ActorRegistrations", true); + ActorsAlive = PoolGroup->GetCounter("ActorsAlive", false); + AllocatedMailboxes = PoolGroup->GetCounter("AllocatedMailboxes", false); + MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true); + MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true); + MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true); + + LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000); + ActivationTimeHistogram = PoolGroup->GetHistogram( + "ActivationTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1)); + LegacyEventDeliveryTimeHistogram.Init(PoolGroup.Get(), "EventDeliveryTime", "usec", 5*1000*1000); + EventDeliveryTimeHistogram = PoolGroup->GetHistogram( + "EventDeliveryTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1)); + LegacyEventProcessingCountHistogram.Init(PoolGroup.Get(), "EventProcessingCount", "usec", 5*1000*1000); + EventProcessingCountHistogram = PoolGroup->GetHistogram( + "EventProcessingCountUs", NMonitoring::ExponentialHistogram(24, 2, 1)); + LegacyEventProcessingTimeHistogram.Init(PoolGroup.Get(), "EventProcessingTime", "usec", 5*1000*1000); + EventProcessingTimeHistogram = PoolGroup->GetHistogram( + "EventProcessingTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1)); + + ActivityStats.Init(PoolGroup.Get()); + + MaxUtilizationTime = PoolGroup->GetCounter("MaxUtilizationTime", true); + } + + void Set(const TExecutorPoolStats& poolStats, const TExecutorThreadStats& stats, ui32 numThreads) { +#ifdef ACTORSLIB_COLLECT_EXEC_STATS + *SentEvents = stats.SentEvents; + *ReceivedEvents = stats.ReceivedEvents; + *PreemptedEvents = stats.PreemptedEvents; + *NonDeliveredEvents = stats.NonDeliveredEvents; + *DestroyedActors = stats.PoolDestroyedActors; + *EmptyMailboxActivation = stats.EmptyMailboxActivation; + *CpuMicrosec = stats.CpuNs / 1000; + *ElapsedMicrosec = ::NHPTimer::GetSeconds(stats.ElapsedTicks)*1000000; + *ParkedMicrosec = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000; + *ActorRegistrations = stats.PoolActorRegistrations; + *ActorsAlive = stats.PoolActorRegistrations - stats.PoolDestroyedActors; + *AllocatedMailboxes = stats.PoolAllocatedMailboxes; + *MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption; + *MailboxPushedOutByTime = stats.MailboxPushedOutByTime; + *MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount; + + LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram); + ActivationTimeHistogram->Reset(); + ActivationTimeHistogram->Collect(stats.ActivationTimeHistogram); + + LegacyEventDeliveryTimeHistogram.Set(stats.EventDeliveryTimeHistogram); + EventDeliveryTimeHistogram->Reset(); + EventDeliveryTimeHistogram->Collect(stats.EventDeliveryTimeHistogram); + + LegacyEventProcessingCountHistogram.Set(stats.EventProcessingCountHistogram); + EventProcessingCountHistogram->Reset(); + EventProcessingCountHistogram->Collect(stats.EventProcessingCountHistogram); + + double toMicrosec = 1000000 / NHPTimer::GetClockRate(); + LegacyEventProcessingTimeHistogram.Set(stats.EventProcessingTimeHistogram, toMicrosec); + EventProcessingTimeHistogram->Reset(); + for (ui32 i = 0; i < stats.EventProcessingTimeHistogram.Count(); ++i) { + EventProcessingTimeHistogram->Collect( + stats.EventProcessingTimeHistogram.UpperBound(i), + stats.EventProcessingTimeHistogram.Value(i) * toMicrosec); + } + + ActivityStats.Set(stats); + + *MaxUtilizationTime = poolStats.MaxUtilizationTime; + + double seconds = UsageTimer.PassedReset(); + + // TODO[serxa]: It doesn't account for contention. Use 1 - parkedTicksDelta / seconds / numThreads KIKIMR-11916 + const double elapsed = NHPTimer::GetSeconds(stats.ElapsedTicks); + const double currentUsage = numThreads > 0 ? ((elapsed - LastElapsedSeconds) / seconds / numThreads) : 0; + LastElapsedSeconds = elapsed; + + // update usage factor according to smoothness + const double smoothness = 0.5; + Usage = currentUsage * smoothness + Usage * (1.0 - smoothness); +#else + Y_UNUSED(poolStats); + Y_UNUSED(stats); + Y_UNUSED(numThreads); +#endif + } + }; + +public: + static constexpr IActor::EActivityType ActorActivityType() { + return IActor::ACTORLIB_STATS; + } + + TStatsCollectingActor( + ui32 intervalSec, + const TActorSystemSetup& setup, + NMonitoring::TDynamicCounterPtr counters) + : IntervalSec(intervalSec) + , Counters(counters) + { + PoolCounters.resize(setup.GetExecutorsCount()); + for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) { + PoolCounters[poolId].Init(Counters.Get(), setup.GetPoolName(poolId), setup.GetThreads(poolId)); + } + } + + void Bootstrap(const TActorContext& ctx) { + ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup()); + Become(&TThis::StateWork); + } + + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + CFunc(TEvents::TSystem::Wakeup, Wakeup); + } + } + +private: + virtual void OnWakeup(const TActorContext &ctx) { + Y_UNUSED(ctx); + } + + void Wakeup(const TActorContext &ctx) { + for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) { + TVector<TExecutorThreadStats> stats; + TExecutorPoolStats poolStats; + ctx.ExecutorThread.ActorSystem->GetPoolStats(poolId, poolStats, stats); + SetAggregatedCounters(PoolCounters[poolId], poolStats, stats); + } + + OnWakeup(ctx); + + ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup()); + } + + void SetAggregatedCounters(TExecutorPoolCounters& poolCounters, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& stats) { + // Sum all per-thread counters into the 0th element + for (ui32 idx = 1; idx < stats.size(); ++idx) { + stats[0].Aggregate(stats[idx]); + } + if (stats.size()) { + poolCounters.Set(poolStats, stats[0], stats.size() - 1); + } + } + +protected: + const ui32 IntervalSec; + NMonitoring::TDynamicCounterPtr Counters; + + TVector<TExecutorPoolCounters> PoolCounters; +}; + +} // NActors |