diff options
author | kruall <kruall@ydb.tech> | 2023-03-29 11:20:11 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2023-03-29 11:20:11 +0300 |
commit | d7902a2e7ea2f740aa0ad87f9d060de555e5b9d5 (patch) | |
tree | 89f9b5bfcbd8832b91f708d19505d664a4debd81 /library | |
parent | 35185f5ff52ac229628762af6bf520cabe9d6a18 (diff) | |
download | ydb-d7902a2e7ea2f740aa0ad87f9d060de555e5b9d5.tar.gz |
Add burst metrics,
Diffstat (limited to 'library')
-rw-r--r-- | library/cpp/actors/core/actorsystem.cpp | 5 | ||||
-rw-r--r-- | library/cpp/actors/core/actorsystem.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/cpu_manager.h | 7 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool.h | 6 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.cpp | 6 | ||||
-rw-r--r-- | library/cpp/actors/core/harmonizer.cpp | 142 | ||||
-rw-r--r-- | library/cpp/actors/core/harmonizer.h | 16 | ||||
-rw-r--r-- | library/cpp/actors/core/mon_stats.h | 4 | ||||
-rw-r--r-- | library/cpp/actors/core/worker_context.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/helpers/pool_stats_collector.h | 44 |
10 files changed, 209 insertions, 25 deletions
diff --git a/library/cpp/actors/core/actorsystem.cpp b/library/cpp/actors/core/actorsystem.cpp index 9a8a0abd012..e6df9c2ee0a 100644 --- a/library/cpp/actors/core/actorsystem.cpp +++ b/library/cpp/actors/core/actorsystem.cpp @@ -218,6 +218,11 @@ namespace NActors { CpuManager->GetPoolStats(poolId, poolStats, statsCopy); } + THarmonizerStats TActorSystem::GetHarmonizerStats() const { + return CpuManager->GetHarmonizerStats(); + + } + void TActorSystem::Start() { Y_VERIFY(StartExecuted == false); StartExecuted = true; diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index b9ecaa2b9ea..73fb3177075 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -299,6 +299,8 @@ namespace NActors { void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const; + THarmonizerStats GetHarmonizerStats() const; + std::optional<ui32> GetPoolThreadsCount(const ui32 poolId) const { if (!SystemSetup) { return {}; diff --git a/library/cpp/actors/core/cpu_manager.h b/library/cpp/actors/core/cpu_manager.h index e2e3861a3b0..c3a7588639c 100644 --- a/library/cpp/actors/core/cpu_manager.h +++ b/library/cpp/actors/core/cpu_manager.h @@ -40,6 +40,13 @@ namespace NActors { } } + THarmonizerStats GetHarmonizerStats() const { + if (Harmonizer) { + return Harmonizer->GetStats(); + } + return {}; + } + private: IExecutorPool* CreateExecutorPool(ui32 poolId); }; diff --git a/library/cpp/actors/core/executor_pool.h b/library/cpp/actors/core/executor_pool.h index c387e04d0ab..f4def74077c 100644 --- a/library/cpp/actors/core/executor_pool.h +++ b/library/cpp/actors/core/executor_pool.h @@ -14,6 +14,12 @@ namespace NActors { double ConsumedUs = 0; double BookedUs = 0; ui64 NotEnoughCpuExecutions = 0; + + void Add(const TCpuConsumption& other) { + ConsumedUs += other.ConsumedUs; + BookedUs += other.BookedUs; + NotEnoughCpuExecutions += other.NotEnoughCpuExecutions; + } }; class IExecutorPool : TNonCopyable { diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index a8a9d780e59..fc87daed779 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -337,7 +337,7 @@ namespace NActors { poolStats.DefaultThreadCount = DefaultThreadCount; poolStats.MaxThreadCount = MaxThreadCount; if (Harmonizer) { - TPoolHarmonizedStats stats = Harmonizer->GetPoolStats(PoolId); + TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId); poolStats.IsNeedy = stats.IsNeedy; poolStats.IsStarved = stats.IsStarved; poolStats.IsHoggish = stats.IsHoggish; @@ -345,6 +345,10 @@ namespace NActors { poolStats.DecreasingThreadsByStarvedState = stats.DecreasingThreadsByStarvedState; poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState; poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount; + poolStats.MaxConsumedCpuUs = stats.MaxConsumedCpu; + poolStats.MinConsumedCpuUs = stats.MinConsumedCpu; + poolStats.MaxBookedCpuUs = stats.MaxBookedCpu; + poolStats.MinBookedCpuUs = stats.MinBookedCpu; } statsCopy.resize(PoolThreads + 1); diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp index 8c18d463874..92a2b533b68 100644 --- a/library/cpp/actors/core/harmonizer.cpp +++ b/library/cpp/actors/core/harmonizer.cpp @@ -19,8 +19,8 @@ constexpr bool CheckBinaryPower(ui64 value) { return !(value & (value - 1)); } +template <ui8 HistoryBufferSize = 8> struct TValueHistory { - static constexpr ui64 HistoryBufferSize = 8; static_assert(CheckBinaryPower(HistoryBufferSize)); double History[HistoryBufferSize] = {0.0}; @@ -31,32 +31,87 @@ struct TValueHistory { ui64 AccumulatedTs = 0; template <bool WithTail=false> - double GetAvgPartForLastSeconds(ui8 seconds) { - double sum = AccumulatedUs; + double Accumulate(auto op, auto comb, ui8 seconds) { + double acc = AccumulatedUs; size_t idx = HistoryIdx; ui8 leftSeconds = seconds; + if constexpr (!WithTail) { + idx--; + leftSeconds--; + if (idx >= HistoryBufferSize) { + idx = HistoryBufferSize - 1; + } + acc = History[idx]; + } do { idx--; leftSeconds--; if (idx >= HistoryBufferSize) { idx = HistoryBufferSize - 1; } - if (WithTail || leftSeconds) { - sum += History[idx]; + if constexpr (WithTail) { + acc = op(acc, History[idx]); + } else if (leftSeconds) { + acc = op(acc, History[idx]); } else { ui64 tsInSecond = Us2Ts(1'000'000.0); - sum += History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond; + acc = op(acc, History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond); } } while (leftSeconds); - double duration = 1'000'000.0 * seconds + (WithTail ? Ts2Us(AccumulatedTs): 0.0); - double avg = sum / duration; - return avg; + double duration = 1'000'000.0 * seconds; + if constexpr (WithTail) { + duration += Ts2Us(AccumulatedTs); + } + return comb(acc, duration); + } + + template <bool WithTail=false> + double GetAvgPartForLastSeconds(ui8 seconds) { + auto sum = [](double acc, double value) { + return acc + value; + }; + auto avg = [](double sum, double duration) { + return sum / duration; + }; + return Accumulate<WithTail>(sum, avg, seconds); } double GetAvgPart() { return GetAvgPartForLastSeconds<true>(HistoryBufferSize); } + double GetMaxForLastSeconds(ui8 seconds) { + auto max = [](const double& acc, const double& value) { + return Max(acc, value); + }; + auto fst = [](const double& value, const double&) { return value; }; + return Accumulate<false>(max, fst, seconds); + } + + double GetMax() { + return GetMaxForLastSeconds(HistoryBufferSize); + } + + i64 GetMaxInt() { + return static_cast<i64>(GetMax()); + } + + double GetMinForLastSeconds(ui8 seconds) { + auto min = [](const double& acc, const double& value) { + return Min(acc, value); + }; + auto fst = [](const double& value, const double&) { return value; }; + return Accumulate<false>(min, fst, seconds); + } + + double GetMin() { + return GetMinForLastSeconds(HistoryBufferSize); + } + + i64 GetMinInt() { + return static_cast<i64>(GetMin()); + } + void Register(ui64 ts, double valueUs) { if (ts < LastTs) { LastTs = ts; @@ -101,8 +156,8 @@ struct TValueHistory { }; struct TThreadInfo { - TValueHistory Consumed; - TValueHistory Booked; + TValueHistory<8> Consumed; + TValueHistory<8> Booked; }; struct TPoolInfo { @@ -125,12 +180,20 @@ struct TPoolInfo { TAtomic DecreasingThreadsByHoggishState = 0; TAtomic PotentialMaxThreadCount = 0; + TValueHistory<16> Consumed; + TValueHistory<16> Booked; + + TAtomic MaxConsumedCpu = 0; + TAtomic MinConsumedCpu = 0; + TAtomic MaxBookedCpu = 0; + TAtomic MinBookedCpu = 0; + bool IsBeingStopped(i16 threadIdx); double GetBooked(i16 threadIdx); double GetlastSecondPoolBooked(i16 threadIdx); double GetConsumed(i16 threadIdx); double GetlastSecondPoolConsumed(i16 threadIdx); - void PullStats(ui64 ts); + TCpuConsumption PullStats(ui64 ts); i16 GetThreadCount(); void SetThreadCount(i16 threadCount); bool IsAvgPingGood(); @@ -169,19 +232,26 @@ double TPoolInfo::GetlastSecondPoolConsumed(i16 threadIdx) { } #define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7] -void TPoolInfo::PullStats(ui64 ts) { - ui64 notEnoughCpuExecutions = 0; +TCpuConsumption TPoolInfo::PullStats(ui64 ts) { + TCpuConsumption acc; for (i16 threadIdx = 0; threadIdx < MaxThreadCount; ++threadIdx) { TThreadInfo &threadInfo = ThreadInfo[threadIdx]; TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx); + acc.Add(cpuConsumption); threadInfo.Consumed.Register(ts, cpuConsumption.ConsumedUs); LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History)); threadInfo.Booked.Register(ts, cpuConsumption.BookedUs); LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History)); - notEnoughCpuExecutions += cpuConsumption.NotEnoughCpuExecutions; } - NewNotEnoughCpuExecutions = notEnoughCpuExecutions - NotEnoughCpuExecutions; - NotEnoughCpuExecutions = notEnoughCpuExecutions; + Consumed.Register(ts, acc.ConsumedUs); + RelaxedStore(&MaxConsumedCpu, Consumed.GetMaxInt()); + RelaxedStore(&MinConsumedCpu, Consumed.GetMinInt()); + Booked.Register(ts, acc.BookedUs); + RelaxedStore(&MaxBookedCpu, Booked.GetMaxInt()); + RelaxedStore(&MinBookedCpu, Booked.GetMinInt()); + NewNotEnoughCpuExecutions = acc.NotEnoughCpuExecutions - NotEnoughCpuExecutions; + NotEnoughCpuExecutions = acc.NotEnoughCpuExecutions; + return acc; } #undef UNROLL_HISTORY @@ -212,6 +282,14 @@ private: std::vector<TPoolInfo> Pools; std::vector<ui16> PriorityOrder; + TValueHistory<16> Consumed; + TValueHistory<16> Booked; + + TAtomic MaxConsumedCpu = 0; + TAtomic MinConsumedCpu = 0; + TAtomic MaxBookedCpu = 0; + TAtomic MinBookedCpu = 0; + void PullStats(ui64 ts); void HarmonizeImpl(ui64 ts); void CalculatePriorityOrder(); @@ -223,7 +301,8 @@ public: void DeclareEmergency(ui64 ts) override; void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override; void Enable(bool enable) override; - TPoolHarmonizedStats GetPoolStats(i16 poolId) const override; + TPoolHarmonizerStats GetPoolStats(i16 poolId) const override; + THarmonizerStats GetStats() const override; }; THarmonizer::THarmonizer(ui64 ts) { @@ -238,9 +317,17 @@ double THarmonizer::Rescale(double value) const { } void THarmonizer::PullStats(ui64 ts) { + TCpuConsumption acc; for (TPoolInfo &pool : Pools) { - pool.PullStats(ts); + TCpuConsumption consumption = pool.PullStats(ts); + acc.Add(consumption); } + Consumed.Register(ts, acc.ConsumedUs); + RelaxedStore(&MaxConsumedCpu, Consumed.GetMaxInt()); + RelaxedStore(&MinConsumedCpu, Consumed.GetMinInt()); + Booked.Register(ts, acc.BookedUs); + RelaxedStore(&MaxBookedCpu, Booked.GetMaxInt()); + RelaxedStore(&MinBookedCpu, Booked.GetMinInt()); } Y_FORCE_INLINE bool IsStarved(double consumed, double booked) { @@ -431,13 +518,17 @@ IHarmonizer* MakeHarmonizer(ui64 ts) { return new THarmonizer(ts); } -TPoolHarmonizedStats THarmonizer::GetPoolStats(i16 poolId) const { +TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const { const TPoolInfo &pool = Pools[poolId]; ui64 flags = RelaxedLoad(&pool.LastFlags); - return TPoolHarmonizedStats { + return TPoolHarmonizerStats{ .IncreasingThreadsByNeedyState = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)), .DecreasingThreadsByStarvedState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)), .DecreasingThreadsByHoggishState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)), + .MaxConsumedCpu = static_cast<i64>(RelaxedLoad(&pool.MaxConsumedCpu)), + .MinConsumedCpu = static_cast<i64>(RelaxedLoad(&pool.MinConsumedCpu)), + .MaxBookedCpu = static_cast<i64>(RelaxedLoad(&pool.MaxBookedCpu)), + .MinBookedCpu = static_cast<i64>(RelaxedLoad(&pool.MinBookedCpu)), .PotentialMaxThreadCount = static_cast<i16>(RelaxedLoad(&pool.PotentialMaxThreadCount)), .IsNeedy = static_cast<bool>(flags & 1), .IsStarved = static_cast<bool>(flags & 2), @@ -445,4 +536,13 @@ TPoolHarmonizedStats THarmonizer::GetPoolStats(i16 poolId) const { }; } +THarmonizerStats THarmonizer::GetStats() const { + return THarmonizerStats{ + .MaxConsumedCpu = static_cast<i64>(RelaxedLoad(&MaxConsumedCpu)), + .MinConsumedCpu = static_cast<i64>(RelaxedLoad(&MinConsumedCpu)), + .MaxBookedCpu = static_cast<i64>(RelaxedLoad(&MaxBookedCpu)), + .MinBookedCpu = static_cast<i64>(RelaxedLoad(&MinBookedCpu)), + }; +} + } diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h index bc6b938fe88..7c66ff54c67 100644 --- a/library/cpp/actors/core/harmonizer.h +++ b/library/cpp/actors/core/harmonizer.h @@ -6,16 +6,27 @@ namespace NActors { class IExecutorPool; - struct TPoolHarmonizedStats { + struct TPoolHarmonizerStats { ui64 IncreasingThreadsByNeedyState = 0; ui64 DecreasingThreadsByStarvedState = 0; ui64 DecreasingThreadsByHoggishState = 0; + i64 MaxConsumedCpu = 0.0; + i64 MinConsumedCpu = 0.0; + i64 MaxBookedCpu = 0.0; + i64 MinBookedCpu = 0.0; i16 PotentialMaxThreadCount = 0; bool IsNeedy = false; bool IsStarved = false; bool IsHoggish = false; }; + struct THarmonizerStats { + i64 MaxConsumedCpu = 0.0; + i64 MinConsumedCpu = 0.0; + i64 MaxBookedCpu = 0.0; + i64 MinBookedCpu = 0.0; + }; + // Pool cpu harmonizer class IHarmonizer { public: @@ -24,7 +35,8 @@ namespace NActors { virtual void DeclareEmergency(ui64 ts) = 0; virtual void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo = nullptr) = 0; virtual void Enable(bool enable) = 0; - virtual TPoolHarmonizedStats GetPoolStats(i16 poolId) const = 0; + virtual TPoolHarmonizerStats GetPoolStats(i16 poolId) const = 0; + virtual THarmonizerStats GetStats() const = 0; }; IHarmonizer* MakeHarmonizer(ui64 ts); diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h index 5fba5aa0682..5d61c9f87c3 100644 --- a/library/cpp/actors/core/mon_stats.h +++ b/library/cpp/actors/core/mon_stats.h @@ -63,6 +63,10 @@ namespace NActors { ui64 IncreasingThreadsByNeedyState = 0; ui64 DecreasingThreadsByStarvedState = 0; ui64 DecreasingThreadsByHoggishState = 0; + i64 MaxConsumedCpuUs = 0; + i64 MinConsumedCpuUs = 0; + i64 MaxBookedCpuUs = 0; + i64 MinBookedCpuUs = 0; i16 WrongWakenedThreadCount = 0; i16 CurrentThreadCount = 0; i16 PotentialMaxThreadCount = 0; diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h index f10409b6654..cc8da2ff777 100644 --- a/library/cpp/actors/core/worker_context.h +++ b/library/cpp/actors/core/worker_context.h @@ -104,7 +104,7 @@ namespace NActors { i64 ts = deliveredTs > scheduleTs ? deliveredTs - scheduleTs : 0; double usec = NHPTimer::GetSeconds(ts) * 1000000.0; Stats->ActivationTimeHistogram.Add(usec); - Stats->WorstActivationTimeUs = Max(Stats->WorstActivationTimeUs, (ui64)usec); + RelaxedStore(&Stats->WorstActivationTimeUs, Max(Stats->WorstActivationTimeUs, (ui64)usec)); return usec; } diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h index e82fedfa80e..51ace0e3ccf 100644 --- a/library/cpp/actors/helpers/pool_stats_collector.h +++ b/library/cpp/actors/helpers/pool_stats_collector.h @@ -136,6 +136,10 @@ private: NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByStarvedState; NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState; NMonitoring::TDynamicCounters::TCounterPtr NotEnoughCpuExecutions; + NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu; THistogramCounters LegacyActivationTimeHistogram; @@ -192,6 +196,10 @@ private: DecreasingThreadsByStarvedState = PoolGroup->GetCounter("DecreasingThreadsByStarvedState", true); DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true); NotEnoughCpuExecutions = PoolGroup->GetCounter("NotEnoughCpuExecutions", true); + MaxConsumedCpu = PoolGroup->GetCounter("MaxConsumedCpuByPool", false); + MinConsumedCpu = PoolGroup->GetCounter("MinConsumedCpuByPool", false); + MaxBookedCpu = PoolGroup->GetCounter("MaxBookedCpuByPool", false); + MinBookedCpu = PoolGroup->GetCounter("MinBookedCpuByPool", false); LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000); ActivationTimeHistogram = PoolGroup->GetHistogram( @@ -284,6 +292,38 @@ private: } }; + struct TActorSystemCounters { + TIntrusivePtr<NMonitoring::TDynamicCounters> Group; + + NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu; + + void Init(NMonitoring::TDynamicCounters* group) { + Group = group; + + MaxConsumedCpu = Group->GetCounter("MaxConsumedCpu", false); + MinConsumedCpu = Group->GetCounter("MinConsumedCpu", false); + MaxBookedCpu = Group->GetCounter("MaxBookedCpu", false); + MinBookedCpu = Group->GetCounter("MinBookedCpu", false); + } + + void Set(const THarmonizerStats& harmonizerStats) { +#ifdef ACTORSLIB_COLLECT_EXEC_STATS + *MaxConsumedCpu = harmonizerStats.MaxConsumedCpu; + *MinConsumedCpu = harmonizerStats.MinConsumedCpu; + *MaxBookedCpu = harmonizerStats.MaxBookedCpu; + *MinBookedCpu = harmonizerStats.MinBookedCpu; +#else + Y_UNUSED(poolStats); + Y_UNUSED(stats); + Y_UNUSED(numThreads); +#endif + } + + }; + public: static constexpr IActor::EActivityType ActorActivityType() { return IActor::ACTORLIB_STATS; @@ -300,6 +340,7 @@ public: for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) { PoolCounters[poolId].Init(Counters.Get(), setup.GetPoolName(poolId), setup.GetThreads(poolId)); } + ActorSystemCounters.Init(Counters.Get()); } void Bootstrap(const TActorContext& ctx) { @@ -325,6 +366,8 @@ private: ctx.ExecutorThread.ActorSystem->GetPoolStats(poolId, poolStats, stats); SetAggregatedCounters(PoolCounters[poolId], poolStats, stats); } + THarmonizerStats harmonizerStats = ctx.ExecutorThread.ActorSystem->GetHarmonizerStats(); + ActorSystemCounters.Set(harmonizerStats); OnWakeup(ctx); @@ -346,6 +389,7 @@ protected: NMonitoring::TDynamicCounterPtr Counters; TVector<TExecutorPoolCounters> PoolCounters; + TActorSystemCounters ActorSystemCounters; }; } // NActors |