aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2023-03-24 19:01:41 +0300
committerkruall <kruall@ydb.tech>2023-03-24 19:01:41 +0300
commit895c30099a7e7a39c5b482c2df957587c764167a (patch)
treef2cf12d654523d418588af2afae868ccd4e1534d /library/cpp/actors
parent951346d51843bcc0698bac6949eeff69f4b19549 (diff)
downloadydb-895c30099a7e7a39c5b482c2df957587c764167a.tar.gz
Add not enough execution metrics,
Diffstat (limited to 'library/cpp/actors')
-rw-r--r--library/cpp/actors/core/actor.h7
-rw-r--r--library/cpp/actors/core/executor_pool.h1
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp2
-rw-r--r--library/cpp/actors/core/executor_thread.cpp8
-rw-r--r--library/cpp/actors/core/harmonizer.cpp8
-rw-r--r--library/cpp/actors/core/mon_stats.h2
-rw-r--r--library/cpp/actors/core/worker_context.h6
-rw-r--r--library/cpp/actors/helpers/pool_stats_collector.h3
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp2
10 files changed, 40 insertions, 3 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index 08a3beb2bf..8330d860c8 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -29,6 +29,7 @@ namespace NActors {
ui32 CapturedActivation = 0;
ESendingType CapturedType = ESendingType::Lazy;
ESendingType SendingType = ESendingType::Common;
+ bool IsEnoughCpu = true;
};
extern Y_POD_THREAD(TThreadContext*) TlsThreadContext;
@@ -522,6 +523,12 @@ namespace NActors {
}
protected:
+ void SetEnoughCpu(bool isEnough) {
+ if (TlsThreadContext) {
+ TlsThreadContext->IsEnoughCpu = isEnough;
+ }
+ }
+
void Describe(IOutputStream&) const noexcept override;
bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final;
bool Send(const TActorId& recipient, IEventHandleLight* ev) const noexcept;
diff --git a/library/cpp/actors/core/executor_pool.h b/library/cpp/actors/core/executor_pool.h
index c7c85e61fd..c387e04d0a 100644
--- a/library/cpp/actors/core/executor_pool.h
+++ b/library/cpp/actors/core/executor_pool.h
@@ -13,6 +13,7 @@ namespace NActors {
struct TCpuConsumption {
double ConsumedUs = 0;
double BookedUs = 0;
+ ui64 NotEnoughCpuExecutions = 0;
};
class IExecutorPool : TNonCopyable {
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index de04105991..a8a9d780e5 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -500,7 +500,7 @@ namespace NActors {
TThreadCtx& threadCtx = Threads[threadIdx];
TExecutorThreadStats stats;
threadCtx.Thread->GetCurrentStats(stats);
- return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs)};
+ return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs), stats.NotEnoughCpuExecutions};
}
i16 TBasicExecutorPool::GetBlockingThreadCount() const {
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index 50ae097a15..c8267fa715 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -362,7 +362,8 @@ namespace NActors {
}
#undef EXECUTE_MAILBOX
hpnow = GetCycleCountFast();
- execCycles += hpnow - hpprev;
+ i64 currentExecCycles = hpnow - hpprev;
+ execCycles += currentExecCycles;
hpprev = hpnow;
execCount++;
if (execCycles + nonExecCycles > 39000000) { // every 15 ms at 2.6GHz, so 1000 items is 15 sec (solomon interval)
@@ -375,6 +376,11 @@ namespace NActors {
nonExecCycles = 0;
Ctx.UpdateThreadTime();
}
+
+ if (!TlsThreadContext->IsEnoughCpu) {
+ Ctx.IncreaseNotEnoughCpuExecutions();
+ TlsThreadContext->IsEnoughCpu = true;
+ }
}
}
LWTRACK(ActivationEnd, Ctx.Orbit, Ctx.CpuId, Ctx.PoolId, Ctx.WorkerId);
diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp
index e2fd0c5f24..8c18d46387 100644
--- a/library/cpp/actors/core/harmonizer.cpp
+++ b/library/cpp/actors/core/harmonizer.cpp
@@ -116,6 +116,8 @@ struct TPoolInfo {
NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
ui32 MaxAvgPingUs = 0;
ui64 LastUpdateTs = 0;
+ ui64 NotEnoughCpuExecutions = 0;
+ ui64 NewNotEnoughCpuExecutions = 0;
TAtomic LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish
TAtomic IncreasingThreadsByNeedyState = 0;
@@ -168,6 +170,7 @@ double TPoolInfo::GetlastSecondPoolConsumed(i16 threadIdx) {
#define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7]
void TPoolInfo::PullStats(ui64 ts) {
+ ui64 notEnoughCpuExecutions = 0;
for (i16 threadIdx = 0; threadIdx < MaxThreadCount; ++threadIdx) {
TThreadInfo &threadInfo = ThreadInfo[threadIdx];
TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx);
@@ -175,7 +178,10 @@ void TPoolInfo::PullStats(ui64 ts) {
LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History));
threadInfo.Booked.Register(ts, cpuConsumption.BookedUs);
LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History));
+ notEnoughCpuExecutions += cpuConsumption.NotEnoughCpuExecutions;
}
+ NewNotEnoughCpuExecutions = notEnoughCpuExecutions - NotEnoughCpuExecutions;
+ NotEnoughCpuExecutions = notEnoughCpuExecutions;
}
#undef UNROLL_HISTORY
@@ -273,7 +279,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
isStarvedPresent = true;
}
ui32 currentThreadCount = pool.GetThreadCount();
- bool isNeedy = pool.IsAvgPingGood() && poolBooked >= currentThreadCount;
+ bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && poolBooked >= currentThreadCount;
if (pool.AvgPingCounter) {
if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) {
isNeedy = false;
diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h
index 4c664a964a..5fba5aa068 100644
--- a/library/cpp/actors/core/mon_stats.h
+++ b/library/cpp/actors/core/mon_stats.h
@@ -100,6 +100,7 @@ namespace NActors {
ui64 MailboxPushedOutBySoftPreemption = 0;
ui64 MailboxPushedOutByTime = 0;
ui64 MailboxPushedOutByEventCount = 0;
+ ui64 NotEnoughCpuExecutions = 0;
TExecutorThreadStats(size_t activityVecSize = 5) // must be not empty as 0 used as default
: ElapsedTicksByActivity(activityVecSize)
@@ -136,6 +137,7 @@ namespace NActors {
MailboxPushedOutBySoftPreemption += RelaxedLoad(&other.MailboxPushedOutBySoftPreemption);
MailboxPushedOutByTime += RelaxedLoad(&other.MailboxPushedOutByTime);
MailboxPushedOutByEventCount += RelaxedLoad(&other.MailboxPushedOutByEventCount);
+ NotEnoughCpuExecutions += RelaxedLoad(&other.NotEnoughCpuExecutions);
ActivationTimeHistogram.Aggregate(other.ActivationTimeHistogram);
EventDeliveryTimeHistogram.Aggregate(other.EventDeliveryTimeHistogram);
diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h
index c3a2947df1..f10409b665 100644
--- a/library/cpp/actors/core/worker_context.h
+++ b/library/cpp/actors/core/worker_context.h
@@ -140,6 +140,11 @@ namespace NActors {
RelaxedStore(&WorkerStats.SafeElapsedTicks, (ui64)RelaxedLoad(&WorkerStats.ElapsedTicks));
RelaxedStore(&WorkerStats.CpuUs, ThreadCPUTime());
}
+
+ void IncreaseNotEnoughCpuExecutions() {
+ RelaxedStore(&WorkerStats.NotEnoughCpuExecutions,
+ (ui64)RelaxedLoad(&WorkerStats.NotEnoughCpuExecutions) + 1);
+ }
#else
void GetCurrentStats(TExecutorThreadStats&) const {}
inline void AddElapsedCycles(ui32, i64) {}
@@ -159,6 +164,7 @@ namespace NActors {
i64 AddEventProcessingStats(i64, i64, ui32, ui64) { return 0; }
void UpdateActorsStats(size_t, IExecutorPool*) {}
void UpdateThreadTime() {}
+ void IncreaseNotEnoughCpuExecutions() {}
#endif
void Switch(IExecutorPool* executor,
diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h
index d80951827d..e82fedfa80 100644
--- a/library/cpp/actors/helpers/pool_stats_collector.h
+++ b/library/cpp/actors/helpers/pool_stats_collector.h
@@ -135,6 +135,7 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr IncreasingThreadsByNeedyState;
NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByStarvedState;
NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState;
+ NMonitoring::TDynamicCounters::TCounterPtr NotEnoughCpuExecutions;
THistogramCounters LegacyActivationTimeHistogram;
@@ -190,6 +191,7 @@ private:
IncreasingThreadsByNeedyState = PoolGroup->GetCounter("IncreasingThreadsByNeedyState", true);
DecreasingThreadsByStarvedState = PoolGroup->GetCounter("DecreasingThreadsByStarvedState", true);
DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true);
+ NotEnoughCpuExecutions = PoolGroup->GetCounter("NotEnoughCpuExecutions", true);
LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000);
ActivationTimeHistogram = PoolGroup->GetHistogram(
@@ -237,6 +239,7 @@ private:
*IncreasingThreadsByNeedyState = poolStats.IncreasingThreadsByNeedyState;
*DecreasingThreadsByStarvedState = poolStats.DecreasingThreadsByStarvedState;
*DecreasingThreadsByHoggishState = poolStats.DecreasingThreadsByHoggishState;
+ *NotEnoughCpuExecutions = stats.NotEnoughCpuExecutions;
LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram);
ActivationTimeHistogram->Reset();
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index 44abbb06bf..122f312fec 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -77,10 +77,12 @@ namespace NActors {
LOG_DEBUG_IC_SESSION("ICIS02", "ReceiveData called");
+ bool enoughCpu = true;
for (int iteration = 0; Socket; ++iteration) {
if (iteration && limit.CheckExceeded()) {
// we have hit processing time limit for this message, send notification to resume processing a bit later
Send(SelfId(), new TEvResumeReceiveData);
+ enoughCpu = false;
break;
}
@@ -110,6 +112,8 @@ namespace NActors {
}
}
+ SetEnoughCpu(enoughCpu);
+
// calculate ping time
auto it = std::min_element(PingQ.begin(), PingQ.end());
const TDuration ping = it != PingQ.end() ? *it : TDuration::Zero();
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 3d6d18d274..5a93bc0cc8 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -433,6 +433,8 @@ namespace NActors {
}
}
+ SetEnoughCpu(generatedBytes < generateLimit);
+
if (Socket) {
WriteData();
}