aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2022-12-15 13:18:59 +0300
committerkruall <kruall@ydb.tech>2022-12-15 13:18:59 +0300
commit64d895f4fc5fadce93faf6e40d90132dd465a669 (patch)
tree301d1156af5eedfd3726cf0b26e5adb145043cb7 /library/cpp
parent079f43fbda3b5c26b969b5a84ce5933697b6a00a (diff)
downloadydb-64d895f4fc5fadce93faf6e40d90132dd465a669.tar.gz
Add sensors,
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp38
-rw-r--r--library/cpp/actors/core/executor_pool_basic.h19
-rw-r--r--library/cpp/actors/core/executor_pool_basic_ut.cpp11
-rw-r--r--library/cpp/actors/core/harmonizer.cpp12
-rw-r--r--library/cpp/actors/core/harmonizer.h8
-rw-r--r--library/cpp/actors/core/mon_stats.h4
-rw-r--r--library/cpp/actors/helpers/pool_stats_collector.h13
7 files changed, 76 insertions, 29 deletions
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index d4209ef205..5d0ecfa079 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -39,6 +39,7 @@ namespace NActors {
, ThreadUtilization(0)
, MaxUtilizationCounter(0)
, MaxUtilizationAccumulator(0)
+ , WrongWakenedThreadCount(0)
, ThreadCount(threads)
, MinThreadCount(minThreadCount)
, MaxThreadCount(maxThreadCount)
@@ -62,6 +63,7 @@ namespace NActors {
}
ThreadCount = MaxThreadCount;
auto semaphore = TSemaphore();
+ semaphore.CurrentThreadCount = ThreadCount;
Semaphore = semaphore.ConverToI64();
}
@@ -273,22 +275,26 @@ namespace NActors {
return 0;
}
- inline void TBasicExecutorPool::WakeUpLoop() {
- for (ui32 i = 0;;) {
- TThreadCtx& threadCtx = Threads[i % PoolThreads];
- switch (AtomicLoad(&threadCtx.WaitingFlag)) {
+ inline void TBasicExecutorPool::WakeUpLoop(i16 currentThreadCount) {
+ for (i16 i = 0;;) {
+ TThreadCtx& threadCtx = Threads[i];
+ TThreadCtx::EWaitState state = static_cast<TThreadCtx::EWaitState>(AtomicLoad(&threadCtx.WaitingFlag));
+ switch (state) {
case TThreadCtx::WS_NONE:
case TThreadCtx::WS_RUNNING:
- ++i;
- break;
- case TThreadCtx::WS_ACTIVE: // in active spin-lock, just set flag
- if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_ACTIVE)) {
- return;
+ if (++i >= MaxThreadCount) {
+ i = 0;
}
break;
+ case TThreadCtx::WS_ACTIVE:
case TThreadCtx::WS_BLOCKED:
- if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, TThreadCtx::WS_BLOCKED)) {
- threadCtx.Pad.Unpark();
+ if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, state)) {
+ if (state == TThreadCtx::WS_BLOCKED) {
+ threadCtx.Pad.Unpark();
+ }
+ if (i >= currentThreadCount) {
+ AtomicIncrement(WrongWakenedThreadCount);
+ }
return;
}
break;
@@ -319,12 +325,19 @@ namespace NActors {
} while (true);
if (needToWakeUp) { // we must find someone to wake-up
- WakeUpLoop();
+ WakeUpLoop(semaphore.CurrentThreadCount);
}
}
void TBasicExecutorPool::GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const {
poolStats.MaxUtilizationTime = RelaxedLoad(&MaxUtilizationAccumulator) / (i64)(NHPTimer::GetCyclesPerSecond() / 1000);
+ poolStats.WrongWakenedThreadCount = RelaxedLoad(&WrongWakenedThreadCount);
+ poolStats.CurrentThreadCount = RelaxedLoad(&ThreadCount);
+ if (Harmonizer) {
+ TPoolStateFlags flags = Harmonizer->GetPoolFlags(PoolId);
+ poolStats.IsNeedy = flags.IsNeedy;
+ poolStats.IsStarved = flags.IsStarved;
+ }
statsCopy.resize(PoolThreads + 1);
// Save counters from the pool object
@@ -436,6 +449,7 @@ namespace NActors {
AtomicSet(ThreadCount, threads);
TSemaphore semaphore = TSemaphore::GetSemaphore(AtomicGet(Semaphore));
i64 oldX = semaphore.ConverToI64();
+ semaphore.CurrentThreadCount = threads;
if (threads > prevCount) {
semaphore.CurrentSleepThreadCount += (i64)threads - prevCount;
semaphore.OldSemaphore -= (i64)threads - prevCount;
diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h
index 7bdf2349ac..cd94a998f1 100644
--- a/library/cpp/actors/core/executor_pool_basic.h
+++ b/library/cpp/actors/core/executor_pool_basic.h
@@ -71,6 +71,7 @@ namespace NActors {
TAtomic ThreadUtilization;
TAtomic MaxUtilizationCounter;
TAtomic MaxUtilizationAccumulator;
+ TAtomic WrongWakenedThreadCount;
TAtomic ThreadCount;
TMutex ChangeThreadsLock;
@@ -86,24 +87,22 @@ namespace NActors {
struct TSemaphore {
i64 OldSemaphore = 0; // 34 bits
// Sign bit
- i8 Reserved1 = 0; // 5 bits
- i16 CurrentSleepThreadCount = 0; // 16 bits
- i8 Reserved2 = 0; // 8 bits
+ i16 CurrentSleepThreadCount = 0; // 14 bits
+ // Sign bit
+ i16 CurrentThreadCount = 0; // 14 bits
inline i64 ConverToI64() {
i64 value = (1ll << 34) + OldSemaphore;
return value
- | ((i64)Reserved1 << 35)
- | ((i64)CurrentSleepThreadCount << 40)
- | ((i64)Reserved2 << 56);
+ | (((i64)CurrentSleepThreadCount + (1 << 14)) << 35)
+ | ((i64)CurrentThreadCount << 50);
}
static inline TSemaphore GetSemaphore(i64 value) {
TSemaphore semaphore;
semaphore.OldSemaphore = (value & 0x7ffffffffll) - (1ll << 34);
- semaphore.Reserved1 = (value >> 35) & 0x1f;
- semaphore.CurrentSleepThreadCount = (value >> 40) & 0xffff;
- semaphore.Reserved2 = (value >> 56) & 0xff;
+ semaphore.CurrentSleepThreadCount = ((value >> 35) & 0x7fff) - (1 << 14);
+ semaphore.CurrentThreadCount = (value >> 50) & 0x3fff;
return semaphore;
}
};
@@ -160,7 +159,7 @@ namespace NActors {
i16 GetPriority() const override;
private:
- void WakeUpLoop();
+ void WakeUpLoop(i16 currentThreadCount);
bool GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock);
void GoToSpin(TThreadCtx& threadCtx);
bool GoToSleep(TThreadCtx& threadCtx, TTimers &timers);
diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp
index 574b0b0759..b87507bdbf 100644
--- a/library/cpp/actors/core/executor_pool_basic_ut.cpp
+++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp
@@ -11,9 +11,8 @@ using namespace NActors;
#define VALUES_EQUAL(a, b, ...) \
UNIT_ASSERT_VALUES_EQUAL_C((a), (b), (i64)semaphore.OldSemaphore \
- << ' ' << (i64)semaphore.Reserved1 \
<< ' ' << (i64)semaphore.CurrentSleepThreadCount \
- << ' ' << (i64)semaphore.Reserved2 __VA_ARGS__);
+ << ' ' << (i64)semaphore.CurrentThreadCount __VA_ARGS__);
////////////////////////////////////////////////////////////////////////////////
@@ -124,10 +123,10 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(value);
UNIT_ASSERT_VALUES_EQUAL_C(expected, semaphore.OldSemaphore, (i64)semaphore.OldSemaphore
<< ' ' << (i64)semaphore.CurrentSleepThreadCount
- << ' ' << (i64)semaphore.Reserved2);
+ << ' ' << (i64)semaphore.CurrentThreadCount);
UNIT_ASSERT_VALUES_EQUAL_C(sleepThreads, semaphore.CurrentSleepThreadCount, (i64)semaphore.OldSemaphore
<< ' ' << (i64)semaphore.CurrentSleepThreadCount
- << ' ' << (i64)semaphore.Reserved2);
+ << ' ' << (i64)semaphore.CurrentThreadCount);
semaphore = TBasicExecutorPool::TSemaphore();
semaphore.OldSemaphore = expected;
semaphore.CurrentSleepThreadCount = sleepThreads;
@@ -139,10 +138,10 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(value);
UNIT_ASSERT_VALUES_EQUAL_C(expected, semaphore.OldSemaphore, (i64)semaphore.OldSemaphore
<< ' ' << (i64)semaphore.CurrentSleepThreadCount
- << ' ' << (i64)semaphore.Reserved2);
+ << ' ' << (i64)semaphore.CurrentThreadCount);
UNIT_ASSERT_VALUES_EQUAL_C(sleepThreads, semaphore.CurrentSleepThreadCount, (i64)semaphore.OldSemaphore
<< ' ' << (i64)semaphore.CurrentSleepThreadCount
- << ' ' << (i64)semaphore.Reserved2);
+ << ' ' << (i64)semaphore.CurrentThreadCount);
value--;
}
}
diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp
index 75f3179a27..2722958486 100644
--- a/library/cpp/actors/core/harmonizer.cpp
+++ b/library/cpp/actors/core/harmonizer.cpp
@@ -117,6 +117,8 @@ struct TPoolInfo {
ui32 MaxAvgPingUs = 0;
ui64 LastUpdateTs = 0;
+ TAtomic LastFlags = 0; // 0 - isNeedy; 1 - isStarved
+
bool IsBeingStopped(i16 threadIdx);
double GetBooked(i16 threadIdx);
double GetlastSecondPoolBooked(i16 threadIdx);
@@ -210,6 +212,7 @@ public:
void DeclareEmergency(ui64 ts) override;
void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override;
void Enable(bool enable) override;
+ TPoolStateFlags GetPoolFlags(i16 poolId) const override;
};
THarmonizer::THarmonizer(ui64 ts) {
@@ -283,6 +286,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
}
booked += poolBooked;
consumed += poolConsumed;
+ AtomicSet(pool.LastFlags, (i64)isNeedy | ((i64)isStarved << 1));
LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish);
}
double budget = total - Max(booked, lastSecondBooked);
@@ -405,4 +409,12 @@ IHarmonizer* MakeHarmonizer(ui64 ts) {
return new THarmonizer(ts);
}
+TPoolStateFlags THarmonizer::GetPoolFlags(i16 poolId) const {
+ ui64 flags = RelaxedLoad(&Pools[poolId].LastFlags);
+ return TPoolStateFlags {
+ .IsNeedy = static_cast<bool>(flags & 1),
+ .IsStarved = static_cast<bool>(flags & 2)
+ };
+}
+
}
diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h
index a171930d9a..5ab44369aa 100644
--- a/library/cpp/actors/core/harmonizer.h
+++ b/library/cpp/actors/core/harmonizer.h
@@ -6,14 +6,20 @@
namespace NActors {
class IExecutorPool;
+ struct TPoolStateFlags {
+ bool IsNeedy = false;
+ bool IsStarved = false;
+ };
+
// Pool cpu harmonizer
class IHarmonizer {
- public:
+ public:
virtual ~IHarmonizer() {}
virtual void Harmonize(ui64 ts) = 0;
virtual void DeclareEmergency(ui64 ts) = 0;
virtual void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo = nullptr) = 0;
virtual void Enable(bool enable) = 0;
+ virtual TPoolStateFlags GetPoolFlags(i16 poolId) const = 0;
};
IHarmonizer* MakeHarmonizer(ui64 ts);
diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h
index fb76410590..037f4bc9d8 100644
--- a/library/cpp/actors/core/mon_stats.h
+++ b/library/cpp/actors/core/mon_stats.h
@@ -60,6 +60,10 @@ namespace NActors {
struct TExecutorPoolStats {
ui64 MaxUtilizationTime = 0;
+ i16 WrongWakenedThreadCount = 0;
+ i16 CurrentThreadCount = 0;
+ bool IsNeedy = false;
+ bool IsStarved = false;
};
struct TExecutorThreadStats {
diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h
index 61d0b45780..2241abc70c 100644
--- a/library/cpp/actors/helpers/pool_stats_collector.h
+++ b/library/cpp/actors/helpers/pool_stats_collector.h
@@ -124,6 +124,11 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutBySoftPreemption;
NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByTime;
NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount;
+ NMonitoring::TDynamicCounters::TCounterPtr WrongWakenedThreadCount;
+ NMonitoring::TDynamicCounters::TCounterPtr CurrentThreadCount;
+ NMonitoring::TDynamicCounters::TCounterPtr IsNeedy;
+ NMonitoring::TDynamicCounters::TCounterPtr IsStarved;
+
THistogramCounters LegacyActivationTimeHistogram;
NMonitoring::THistogramPtr ActivationTimeHistogram;
@@ -167,6 +172,10 @@ private:
MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true);
MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true);
MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true);
+ WrongWakenedThreadCount = PoolGroup->GetCounter("WrongWakenedThreadCount", true);
+ CurrentThreadCount = PoolGroup->GetCounter("CurrentThreadCount", false);
+ IsNeedy = PoolGroup->GetCounter("IsNeedy", false);
+ IsStarved = PoolGroup->GetCounter("IsStarved", false);
LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000);
ActivationTimeHistogram = PoolGroup->GetHistogram(
@@ -203,6 +212,10 @@ private:
*MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption;
*MailboxPushedOutByTime = stats.MailboxPushedOutByTime;
*MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount;
+ *WrongWakenedThreadCount = poolStats.WrongWakenedThreadCount;
+ *CurrentThreadCount = poolStats.CurrentThreadCount;
+ *IsNeedy = poolStats.IsNeedy;
+ *IsStarved = poolStats.IsStarved;
LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram);
ActivationTimeHistogram->Reset();