diff options
author | kruall <kruall@ydb.tech> | 2022-12-15 13:18:59 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2022-12-15 13:18:59 +0300 |
commit | 64d895f4fc5fadce93faf6e40d90132dd465a669 (patch) | |
tree | 301d1156af5eedfd3726cf0b26e5adb145043cb7 /library/cpp | |
parent | 079f43fbda3b5c26b969b5a84ce5933697b6a00a (diff) | |
download | ydb-64d895f4fc5fadce93faf6e40d90132dd465a669.tar.gz |
Add sensors,
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.cpp | 38 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.h | 19 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic_ut.cpp | 11 | ||||
-rw-r--r-- | library/cpp/actors/core/harmonizer.cpp | 12 | ||||
-rw-r--r-- | library/cpp/actors/core/harmonizer.h | 8 | ||||
-rw-r--r-- | library/cpp/actors/core/mon_stats.h | 4 | ||||
-rw-r--r-- | library/cpp/actors/helpers/pool_stats_collector.h | 13 |
7 files changed, 76 insertions, 29 deletions
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index d4209ef205..5d0ecfa079 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -39,6 +39,7 @@ namespace NActors { , ThreadUtilization(0) , MaxUtilizationCounter(0) , MaxUtilizationAccumulator(0) + , WrongWakenedThreadCount(0) , ThreadCount(threads) , MinThreadCount(minThreadCount) , MaxThreadCount(maxThreadCount) @@ -62,6 +63,7 @@ namespace NActors { } ThreadCount = MaxThreadCount; auto semaphore = TSemaphore(); + semaphore.CurrentThreadCount = ThreadCount; Semaphore = semaphore.ConverToI64(); } @@ -273,22 +275,26 @@ namespace NActors { return 0; } - inline void TBasicExecutorPool::WakeUpLoop() { - for (ui32 i = 0;;) { - TThreadCtx& threadCtx = Threads[i % PoolThreads]; - switch (AtomicLoad(&threadCtx.WaitingFlag)) { + inline void TBasicExecutorPool::WakeUpLoop(i16 currentThreadCount) { + for (i16 i = 0;;) { + TThreadCtx& threadCtx = Threads[i]; + TThreadCtx::EWaitState state = static_cast<TThreadCtx::EWaitState>(AtomicLoad(&threadCtx.WaitingFlag)); + switch (state) { case TThreadCtx::WS_NONE: case TThreadCtx::WS_RUNNING: - ++i; - break; - case TThreadCtx::WS_ACTIVE: // in active spin-lock, just set flag - if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_ACTIVE)) { - return; + if (++i >= MaxThreadCount) { + i = 0; } break; + case TThreadCtx::WS_ACTIVE: case TThreadCtx::WS_BLOCKED: - if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_BLOCKED)) { - threadCtx.Pad.Unpark(); + if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, state)) { + if (state == TThreadCtx::WS_BLOCKED) { + threadCtx.Pad.Unpark(); + } + if (i >= currentThreadCount) { + AtomicIncrement(WrongWakenedThreadCount); + } return; } break; @@ -319,12 +325,19 @@ namespace NActors { } while (true); if (needToWakeUp) { // we must find someone to wake-up - WakeUpLoop(); + WakeUpLoop(semaphore.CurrentThreadCount); } } void TBasicExecutorPool::GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const { poolStats.MaxUtilizationTime = RelaxedLoad(&MaxUtilizationAccumulator) / (i64)(NHPTimer::GetCyclesPerSecond() / 1000); + poolStats.WrongWakenedThreadCount = RelaxedLoad(&WrongWakenedThreadCount); + poolStats.CurrentThreadCount = RelaxedLoad(&ThreadCount); + if (Harmonizer) { + TPoolStateFlags flags = Harmonizer->GetPoolFlags(PoolId); + poolStats.IsNeedy = flags.IsNeedy; + poolStats.IsStarved = flags.IsStarved; + } statsCopy.resize(PoolThreads + 1); // Save counters from the pool object @@ -436,6 +449,7 @@ namespace NActors { AtomicSet(ThreadCount, threads); TSemaphore semaphore = TSemaphore::GetSemaphore(AtomicGet(Semaphore)); i64 oldX = semaphore.ConverToI64(); + semaphore.CurrentThreadCount = threads; if (threads > prevCount) { semaphore.CurrentSleepThreadCount += (i64)threads - prevCount; semaphore.OldSemaphore -= (i64)threads - prevCount; diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h index 7bdf2349ac..cd94a998f1 100644 --- a/library/cpp/actors/core/executor_pool_basic.h +++ b/library/cpp/actors/core/executor_pool_basic.h @@ -71,6 +71,7 @@ namespace NActors { TAtomic ThreadUtilization; TAtomic MaxUtilizationCounter; TAtomic MaxUtilizationAccumulator; + TAtomic WrongWakenedThreadCount; TAtomic ThreadCount; TMutex ChangeThreadsLock; @@ -86,24 +87,22 @@ namespace NActors { struct TSemaphore { i64 OldSemaphore = 0; // 34 bits // Sign bit - i8 Reserved1 = 0; // 5 bits - i16 CurrentSleepThreadCount = 0; // 16 bits - i8 Reserved2 = 0; // 8 bits + i16 CurrentSleepThreadCount = 0; // 14 bits + // Sign bit + i16 CurrentThreadCount = 0; // 14 bits inline i64 ConverToI64() { i64 value = (1ll << 34) + OldSemaphore; return value - | ((i64)Reserved1 << 35) - | ((i64)CurrentSleepThreadCount << 40) - | ((i64)Reserved2 << 56); + | (((i64)CurrentSleepThreadCount + (1 << 14)) << 35) + | ((i64)CurrentThreadCount << 50); } static inline TSemaphore GetSemaphore(i64 value) { TSemaphore semaphore; semaphore.OldSemaphore = (value & 0x7ffffffffll) - (1ll << 34); - semaphore.Reserved1 = (value >> 35) & 0x1f; - semaphore.CurrentSleepThreadCount = (value >> 40) & 0xffff; - semaphore.Reserved2 = (value >> 56) & 0xff; + semaphore.CurrentSleepThreadCount = ((value >> 35) & 0x7fff) - (1 << 14); + semaphore.CurrentThreadCount = (value >> 50) & 0x3fff; return semaphore; } }; @@ -160,7 +159,7 @@ namespace NActors { i16 GetPriority() const override; private: - void WakeUpLoop(); + void WakeUpLoop(i16 currentThreadCount); bool GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock); void GoToSpin(TThreadCtx& threadCtx); bool GoToSleep(TThreadCtx& threadCtx, TTimers &timers); diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp index 574b0b0759..b87507bdbf 100644 --- a/library/cpp/actors/core/executor_pool_basic_ut.cpp +++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp @@ -11,9 +11,8 @@ using namespace NActors; #define VALUES_EQUAL(a, b, ...) \ UNIT_ASSERT_VALUES_EQUAL_C((a), (b), (i64)semaphore.OldSemaphore \ - << ' ' << (i64)semaphore.Reserved1 \ << ' ' << (i64)semaphore.CurrentSleepThreadCount \ - << ' ' << (i64)semaphore.Reserved2 __VA_ARGS__); + << ' ' << (i64)semaphore.CurrentThreadCount __VA_ARGS__); //////////////////////////////////////////////////////////////////////////////// @@ -124,10 +123,10 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(value); UNIT_ASSERT_VALUES_EQUAL_C(expected, semaphore.OldSemaphore, (i64)semaphore.OldSemaphore << ' ' << (i64)semaphore.CurrentSleepThreadCount - << ' ' << (i64)semaphore.Reserved2); + << ' ' << (i64)semaphore.CurrentThreadCount); UNIT_ASSERT_VALUES_EQUAL_C(sleepThreads, semaphore.CurrentSleepThreadCount, (i64)semaphore.OldSemaphore << ' ' << (i64)semaphore.CurrentSleepThreadCount - << ' ' << (i64)semaphore.Reserved2); + << ' ' << (i64)semaphore.CurrentThreadCount); semaphore = TBasicExecutorPool::TSemaphore(); semaphore.OldSemaphore = expected; semaphore.CurrentSleepThreadCount = sleepThreads; @@ -139,10 +138,10 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(value); UNIT_ASSERT_VALUES_EQUAL_C(expected, semaphore.OldSemaphore, (i64)semaphore.OldSemaphore << ' ' << (i64)semaphore.CurrentSleepThreadCount - << ' ' << (i64)semaphore.Reserved2); + << ' ' << (i64)semaphore.CurrentThreadCount); UNIT_ASSERT_VALUES_EQUAL_C(sleepThreads, semaphore.CurrentSleepThreadCount, (i64)semaphore.OldSemaphore << ' ' << (i64)semaphore.CurrentSleepThreadCount - << ' ' << (i64)semaphore.Reserved2); + << ' ' << (i64)semaphore.CurrentThreadCount); value--; } } diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp index 75f3179a27..2722958486 100644 --- a/library/cpp/actors/core/harmonizer.cpp +++ b/library/cpp/actors/core/harmonizer.cpp @@ -117,6 +117,8 @@ struct TPoolInfo { ui32 MaxAvgPingUs = 0; ui64 LastUpdateTs = 0; + TAtomic LastFlags = 0; // 0 - isNeedy; 1 - isStarved + bool IsBeingStopped(i16 threadIdx); double GetBooked(i16 threadIdx); double GetlastSecondPoolBooked(i16 threadIdx); @@ -210,6 +212,7 @@ public: void DeclareEmergency(ui64 ts) override; void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override; void Enable(bool enable) override; + TPoolStateFlags GetPoolFlags(i16 poolId) const override; }; THarmonizer::THarmonizer(ui64 ts) { @@ -283,6 +286,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { } booked += poolBooked; consumed += poolConsumed; + AtomicSet(pool.LastFlags, (i64)isNeedy | ((i64)isStarved << 1)); LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish); } double budget = total - Max(booked, lastSecondBooked); @@ -405,4 +409,12 @@ IHarmonizer* MakeHarmonizer(ui64 ts) { return new THarmonizer(ts); } +TPoolStateFlags THarmonizer::GetPoolFlags(i16 poolId) const { + ui64 flags = RelaxedLoad(&Pools[poolId].LastFlags); + return TPoolStateFlags { + .IsNeedy = static_cast<bool>(flags & 1), + .IsStarved = static_cast<bool>(flags & 2) + }; +} + } diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h index a171930d9a..5ab44369aa 100644 --- a/library/cpp/actors/core/harmonizer.h +++ b/library/cpp/actors/core/harmonizer.h @@ -6,14 +6,20 @@ namespace NActors { class IExecutorPool; + struct TPoolStateFlags { + bool IsNeedy = false; + bool IsStarved = false; + }; + // Pool cpu harmonizer class IHarmonizer { - public: + public: virtual ~IHarmonizer() {} virtual void Harmonize(ui64 ts) = 0; virtual void DeclareEmergency(ui64 ts) = 0; virtual void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo = nullptr) = 0; virtual void Enable(bool enable) = 0; + virtual TPoolStateFlags GetPoolFlags(i16 poolId) const = 0; }; IHarmonizer* MakeHarmonizer(ui64 ts); diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h index fb76410590..037f4bc9d8 100644 --- a/library/cpp/actors/core/mon_stats.h +++ b/library/cpp/actors/core/mon_stats.h @@ -60,6 +60,10 @@ namespace NActors { struct TExecutorPoolStats { ui64 MaxUtilizationTime = 0; + i16 WrongWakenedThreadCount = 0; + i16 CurrentThreadCount = 0; + bool IsNeedy = false; + bool IsStarved = false; }; struct TExecutorThreadStats { diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h index 61d0b45780..2241abc70c 100644 --- a/library/cpp/actors/helpers/pool_stats_collector.h +++ b/library/cpp/actors/helpers/pool_stats_collector.h @@ -124,6 +124,11 @@ private: NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutBySoftPreemption; NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByTime; NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount; + NMonitoring::TDynamicCounters::TCounterPtr WrongWakenedThreadCount; + NMonitoring::TDynamicCounters::TCounterPtr CurrentThreadCount; + NMonitoring::TDynamicCounters::TCounterPtr IsNeedy; + NMonitoring::TDynamicCounters::TCounterPtr IsStarved; + THistogramCounters LegacyActivationTimeHistogram; NMonitoring::THistogramPtr ActivationTimeHistogram; @@ -167,6 +172,10 @@ private: MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true); MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true); MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true); + WrongWakenedThreadCount = PoolGroup->GetCounter("WrongWakenedThreadCount", true); + CurrentThreadCount = PoolGroup->GetCounter("CurrentThreadCount", false); + IsNeedy = PoolGroup->GetCounter("IsNeedy", false); + IsStarved = PoolGroup->GetCounter("IsStarved", false); LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000); ActivationTimeHistogram = PoolGroup->GetHistogram( @@ -203,6 +212,10 @@ private: *MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption; *MailboxPushedOutByTime = stats.MailboxPushedOutByTime; *MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount; + *WrongWakenedThreadCount = poolStats.WrongWakenedThreadCount; + *CurrentThreadCount = poolStats.CurrentThreadCount; + *IsNeedy = poolStats.IsNeedy; + *IsStarved = poolStats.IsStarved; LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram); ActivationTimeHistogram->Reset(); |