diff options
author | kruall <kruall@ydb.tech> | 2023-03-24 19:01:41 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2023-03-24 19:01:41 +0300 |
commit | 895c30099a7e7a39c5b482c2df957587c764167a (patch) | |
tree | f2cf12d654523d418588af2afae868ccd4e1534d /library/cpp | |
parent | 951346d51843bcc0698bac6949eeff69f4b19549 (diff) | |
download | ydb-895c30099a7e7a39c5b482c2df957587c764167a.tar.gz |
Add not enough execution metrics,
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/actors/core/actor.h | 7 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool.h | 1 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.cpp | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_thread.cpp | 8 | ||||
-rw-r--r-- | library/cpp/actors/core/harmonizer.cpp | 8 | ||||
-rw-r--r-- | library/cpp/actors/core/mon_stats.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/worker_context.h | 6 | ||||
-rw-r--r-- | library/cpp/actors/helpers/pool_stats_collector.h | 3 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp | 4 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_session.cpp | 2 |
10 files changed, 40 insertions, 3 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index 08a3beb2bf..8330d860c8 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -29,6 +29,7 @@ namespace NActors { ui32 CapturedActivation = 0; ESendingType CapturedType = ESendingType::Lazy; ESendingType SendingType = ESendingType::Common; + bool IsEnoughCpu = true; }; extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; @@ -522,6 +523,12 @@ namespace NActors { } protected: + void SetEnoughCpu(bool isEnough) { + if (TlsThreadContext) { + TlsThreadContext->IsEnoughCpu = isEnough; + } + } + void Describe(IOutputStream&) const noexcept override; bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0, ui64 cookie = 0, NWilson::TTraceId traceId = {}) const noexcept final; bool Send(const TActorId& recipient, IEventHandleLight* ev) const noexcept; diff --git a/library/cpp/actors/core/executor_pool.h b/library/cpp/actors/core/executor_pool.h index c7c85e61fd..c387e04d0a 100644 --- a/library/cpp/actors/core/executor_pool.h +++ b/library/cpp/actors/core/executor_pool.h @@ -13,6 +13,7 @@ namespace NActors { struct TCpuConsumption { double ConsumedUs = 0; double BookedUs = 0; + ui64 NotEnoughCpuExecutions = 0; }; class IExecutorPool : TNonCopyable { diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index de04105991..a8a9d780e5 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -500,7 +500,7 @@ namespace NActors { TThreadCtx& threadCtx = Threads[threadIdx]; TExecutorThreadStats stats; threadCtx.Thread->GetCurrentStats(stats); - return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs)}; + return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs), stats.NotEnoughCpuExecutions}; } i16 TBasicExecutorPool::GetBlockingThreadCount() const { diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index 50ae097a15..c8267fa715 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -362,7 +362,8 @@ namespace NActors { } #undef EXECUTE_MAILBOX hpnow = GetCycleCountFast(); - execCycles += hpnow - hpprev; + i64 currentExecCycles = hpnow - hpprev; + execCycles += currentExecCycles; hpprev = hpnow; execCount++; if (execCycles + nonExecCycles > 39000000) { // every 15 ms at 2.6GHz, so 1000 items is 15 sec (solomon interval) @@ -375,6 +376,11 @@ namespace NActors { nonExecCycles = 0; Ctx.UpdateThreadTime(); } + + if (!TlsThreadContext->IsEnoughCpu) { + Ctx.IncreaseNotEnoughCpuExecutions(); + TlsThreadContext->IsEnoughCpu = true; + } } } LWTRACK(ActivationEnd, Ctx.Orbit, Ctx.CpuId, Ctx.PoolId, Ctx.WorkerId); diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp index e2fd0c5f24..8c18d46387 100644 --- a/library/cpp/actors/core/harmonizer.cpp +++ b/library/cpp/actors/core/harmonizer.cpp @@ -116,6 +116,8 @@ struct TPoolInfo { NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow; ui32 MaxAvgPingUs = 0; ui64 LastUpdateTs = 0; + ui64 NotEnoughCpuExecutions = 0; + ui64 NewNotEnoughCpuExecutions = 0; TAtomic LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish TAtomic IncreasingThreadsByNeedyState = 0; @@ -168,6 +170,7 @@ double TPoolInfo::GetlastSecondPoolConsumed(i16 threadIdx) { #define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7] void TPoolInfo::PullStats(ui64 ts) { + ui64 notEnoughCpuExecutions = 0; for (i16 threadIdx = 0; threadIdx < MaxThreadCount; ++threadIdx) { TThreadInfo &threadInfo = ThreadInfo[threadIdx]; TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx); @@ -175,7 +178,10 @@ void TPoolInfo::PullStats(ui64 ts) { LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History)); threadInfo.Booked.Register(ts, cpuConsumption.BookedUs); LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History)); + notEnoughCpuExecutions += cpuConsumption.NotEnoughCpuExecutions; } + NewNotEnoughCpuExecutions = notEnoughCpuExecutions - NotEnoughCpuExecutions; + NotEnoughCpuExecutions = notEnoughCpuExecutions; } #undef UNROLL_HISTORY @@ -273,7 +279,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { isStarvedPresent = true; } ui32 currentThreadCount = pool.GetThreadCount(); - bool isNeedy = pool.IsAvgPingGood() && poolBooked >= currentThreadCount; + bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && poolBooked >= currentThreadCount; if (pool.AvgPingCounter) { if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) { isNeedy = false; diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h index 4c664a964a..5fba5aa068 100644 --- a/library/cpp/actors/core/mon_stats.h +++ b/library/cpp/actors/core/mon_stats.h @@ -100,6 +100,7 @@ namespace NActors { ui64 MailboxPushedOutBySoftPreemption = 0; ui64 MailboxPushedOutByTime = 0; ui64 MailboxPushedOutByEventCount = 0; + ui64 NotEnoughCpuExecutions = 0; TExecutorThreadStats(size_t activityVecSize = 5) // must be not empty as 0 used as default : ElapsedTicksByActivity(activityVecSize) @@ -136,6 +137,7 @@ namespace NActors { MailboxPushedOutBySoftPreemption += RelaxedLoad(&other.MailboxPushedOutBySoftPreemption); MailboxPushedOutByTime += RelaxedLoad(&other.MailboxPushedOutByTime); MailboxPushedOutByEventCount += RelaxedLoad(&other.MailboxPushedOutByEventCount); + NotEnoughCpuExecutions += RelaxedLoad(&other.NotEnoughCpuExecutions); ActivationTimeHistogram.Aggregate(other.ActivationTimeHistogram); EventDeliveryTimeHistogram.Aggregate(other.EventDeliveryTimeHistogram); diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h index c3a2947df1..f10409b665 100644 --- a/library/cpp/actors/core/worker_context.h +++ b/library/cpp/actors/core/worker_context.h @@ -140,6 +140,11 @@ namespace NActors { RelaxedStore(&WorkerStats.SafeElapsedTicks, (ui64)RelaxedLoad(&WorkerStats.ElapsedTicks)); RelaxedStore(&WorkerStats.CpuUs, ThreadCPUTime()); } + + void IncreaseNotEnoughCpuExecutions() { + RelaxedStore(&WorkerStats.NotEnoughCpuExecutions, + (ui64)RelaxedLoad(&WorkerStats.NotEnoughCpuExecutions) + 1); + } #else void GetCurrentStats(TExecutorThreadStats&) const {} inline void AddElapsedCycles(ui32, i64) {} @@ -159,6 +164,7 @@ namespace NActors { i64 AddEventProcessingStats(i64, i64, ui32, ui64) { return 0; } void UpdateActorsStats(size_t, IExecutorPool*) {} void UpdateThreadTime() {} + void IncreaseNotEnoughCpuExecutions() {} #endif void Switch(IExecutorPool* executor, diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h index d80951827d..e82fedfa80 100644 --- a/library/cpp/actors/helpers/pool_stats_collector.h +++ b/library/cpp/actors/helpers/pool_stats_collector.h @@ -135,6 +135,7 @@ private: NMonitoring::TDynamicCounters::TCounterPtr IncreasingThreadsByNeedyState; NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByStarvedState; NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState; + NMonitoring::TDynamicCounters::TCounterPtr NotEnoughCpuExecutions; THistogramCounters LegacyActivationTimeHistogram; @@ -190,6 +191,7 @@ private: IncreasingThreadsByNeedyState = PoolGroup->GetCounter("IncreasingThreadsByNeedyState", true); DecreasingThreadsByStarvedState = PoolGroup->GetCounter("DecreasingThreadsByStarvedState", true); DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true); + NotEnoughCpuExecutions = PoolGroup->GetCounter("NotEnoughCpuExecutions", true); LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000); ActivationTimeHistogram = PoolGroup->GetHistogram( @@ -237,6 +239,7 @@ private: *IncreasingThreadsByNeedyState = poolStats.IncreasingThreadsByNeedyState; *DecreasingThreadsByStarvedState = poolStats.DecreasingThreadsByStarvedState; *DecreasingThreadsByHoggishState = poolStats.DecreasingThreadsByHoggishState; + *NotEnoughCpuExecutions = stats.NotEnoughCpuExecutions; LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram); ActivationTimeHistogram->Reset(); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index 44abbb06bf..122f312fec 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -77,10 +77,12 @@ namespace NActors { LOG_DEBUG_IC_SESSION("ICIS02", "ReceiveData called"); + bool enoughCpu = true; for (int iteration = 0; Socket; ++iteration) { if (iteration && limit.CheckExceeded()) { // we have hit processing time limit for this message, send notification to resume processing a bit later Send(SelfId(), new TEvResumeReceiveData); + enoughCpu = false; break; } @@ -110,6 +112,8 @@ namespace NActors { } } + SetEnoughCpu(enoughCpu); + // calculate ping time auto it = std::min_element(PingQ.begin(), PingQ.end()); const TDuration ping = it != PingQ.end() ? *it : TDuration::Zero(); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 3d6d18d274..5a93bc0cc8 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -433,6 +433,8 @@ namespace NActors { } } + SetEnoughCpu(generatedBytes < generateLimit); + if (Socket) { WriteData(); } |