aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2023-11-15 13:04:33 +0300
committerkruall <kruall@ydb.tech>2023-11-15 14:08:26 +0300
commitf112fa3b9076c88724b55487367f536710f0dd81 (patch)
tree899d94b6a0f462a5648be8325294a4e1aaff36b7
parent2b0bf66043ecfee310e1dbf78b54932a949c7726 (diff)
downloadydb-f112fa3b9076c88724b55487367f536710f0dd81.tar.gz
Add auto spin threshold, KIKIMR-19343
-rw-r--r--library/cpp/actors/core/actor_benchmark_helper.h8
-rw-r--r--library/cpp/actors/core/actor_ut.cpp2
-rw-r--r--library/cpp/actors/core/actorsystem.h1
-rw-r--r--library/cpp/actors/core/config.h7
-rw-r--r--library/cpp/actors/core/executor_pool.h11
-rw-r--r--library/cpp/actors/core/executor_pool_base.cpp6
-rw-r--r--library/cpp/actors/core/executor_pool_base.h1
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp222
-rw-r--r--library/cpp/actors/core/executor_pool_basic.h139
-rw-r--r--library/cpp/actors/core/executor_pool_basic_feature_flags.h13
-rw-r--r--library/cpp/actors/core/executor_pool_basic_ut.cpp128
-rw-r--r--library/cpp/actors/core/executor_thread.cpp8
-rw-r--r--library/cpp/actors/core/executor_thread.h2
-rw-r--r--library/cpp/actors/core/harmonizer.cpp73
-rw-r--r--library/cpp/actors/core/harmonizer.h6
-rw-r--r--library/cpp/actors/core/mon_stats.h2
-rw-r--r--library/cpp/actors/core/probes.h18
-rw-r--r--library/cpp/actors/core/thread_context.h5
-rw-r--r--library/cpp/actors/core/ut_fat/actor_benchmark.cpp2
-rw-r--r--library/cpp/actors/helpers/pool_stats_collector.h21
-rw-r--r--ydb/core/driver_lib/run/auto_config_initializer.cpp8
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp13
-rw-r--r--ydb/core/protos/config.proto7
23 files changed, 640 insertions, 63 deletions
diff --git a/library/cpp/actors/core/actor_benchmark_helper.h b/library/cpp/actors/core/actor_benchmark_helper.h
index c5dd09efe1..7e9651eb35 100644
--- a/library/cpp/actors/core/actor_benchmark_helper.h
+++ b/library/cpp/actors/core/actor_benchmark_helper.h
@@ -41,7 +41,7 @@ struct TTestEndDecorator : TDecorator {
struct TActorBenchmarkSettings {
static constexpr bool DefaultNoRealtime = true;
static constexpr ui32 DefaultSpinThreshold = 1'000'000;
- static constexpr ui32 TotalEventsAmountPerThread = 10'000;
+ static constexpr ui32 TotalEventsAmountPerThread = 1'000;
static constexpr auto MailboxTypes = {
TMailboxType::Simple,
@@ -726,13 +726,13 @@ struct TActorBenchmark {
}
}
- static void RunSendActivateReceiveCSV(const std::vector<ui32> &threadsList, const std::vector<ui32> &actorPairsList, const std::vector<ui32> &inFlights) {
+ static void RunSendActivateReceiveCSV(const std::vector<ui32> &threadsList, const std::vector<ui32> &actorPairsList, const std::vector<ui32> &inFlights, TDuration subtestDuration) {
Cout << "threads,actorPairs,in_flight,msgs_per_sec,elapsed_seconds,min_pair_sent_msgs,max_pair_sent_msgs" << Endl;
for (ui32 threads : threadsList) {
for (ui32 actorPairs : actorPairsList) {
for (ui32 inFlight : inFlights) {
- auto stats = CountStats([threads, actorPairs, inFlight] {
- return BenchContentedThreads(threads, actorPairs, EPoolType::Basic, ESendingType::Common, TDuration::Seconds(1), inFlight);
+ auto stats = CountStats([threads, actorPairs, inFlight, subtestDuration] {
+ return BenchContentedThreads(threads, actorPairs, EPoolType::Basic, ESendingType::Common, subtestDuration, inFlight);
}, 3);
double elapsedSeconds = stats.ElapsedTime.Mean / 1e9;
ui64 eventsPerSecond = stats.SentEvents.Mean / elapsedSeconds;
diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp
index b1e681c1ec..52d3cbad9c 100644
--- a/library/cpp/actors/core/actor_ut.cpp
+++ b/library/cpp/actors/core/actor_ut.cpp
@@ -294,7 +294,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
for (ui32 actorPairs = 1; actorPairs <= 2 * 32; actorPairs *= 2) {
actorPairsList.push_back(actorPairs);
}
- TActorBenchmark::RunSendActivateReceiveCSV(threadsList, actorPairsList, {1});
+ TActorBenchmark::RunSendActivateReceiveCSV(threadsList, actorPairsList, {1}, TDuration::MilliSeconds(100));
}
Y_UNIT_TEST(SendActivateReceiveWithMailboxNeighbours) {
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 1b18abc787..df5de39090 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -8,7 +8,6 @@
#include "executor_pool.h"
#include "log_settings.h"
#include "scheduler_cookie.h"
-#include "mon_stats.h"
#include "cpu_manager.h"
#include "executor_thread.h"
diff --git a/library/cpp/actors/core/config.h b/library/cpp/actors/core/config.h
index 7c843d0847..04bdd6aebe 100644
--- a/library/cpp/actors/core/config.h
+++ b/library/cpp/actors/core/config.h
@@ -28,6 +28,12 @@ namespace NActors {
ui64 PeriodUs = 15000000; // Time between balancer steps
};
+ enum class EASProfile {
+ Default,
+ LowCpuConsumption,
+ LowLatency,
+ };
+
struct TBasicExecutorPoolConfig {
static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TDuration::MilliSeconds(10);
static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = 100;
@@ -46,6 +52,7 @@ namespace NActors {
i16 Priority = 0;
i16 SharedExecutorsCount = 0;
i16 SoftProcessingDurationTs = 0;
+ EASProfile ActorSystemProfile = EASProfile::Default;
};
struct TIOExecutorPoolConfig {
diff --git a/library/cpp/actors/core/executor_pool.h b/library/cpp/actors/core/executor_pool.h
index ae251c14bb..5498a6403e 100644
--- a/library/cpp/actors/core/executor_pool.h
+++ b/library/cpp/actors/core/executor_pool.h
@@ -1,13 +1,14 @@
#pragma once
#include "event.h"
-#include "mon_stats.h"
#include "scheduler_queue.h"
namespace NActors {
class TActorSystem;
struct TMailboxHeader;
struct TWorkerContext;
+ struct TExecutorPoolStats;
+ struct TExecutorThreadStats;
class ISchedulerCookie;
struct TCpuConsumption {
@@ -40,6 +41,9 @@ namespace NActors {
}
// for workers
+ virtual void Initialize(TWorkerContext& wctx) {
+ Y_UNUSED(wctx);
+ }
virtual ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) = 0;
virtual void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) = 0;
virtual TMailboxHeader *ResolveMailbox(ui32 hint) = 0;
@@ -121,6 +125,10 @@ namespace NActors {
Y_UNUSED(threads);
}
+ virtual void SetSpinThresholdCycles(ui32 cycles) {
+ Y_UNUSED(cycles);
+ }
+
virtual i16 GetBlockingThreadCount() const {
return 0;
}
@@ -135,7 +143,6 @@ namespace NActors {
virtual i16 GetMaxThreadCount() const {
return 1;
-
}
virtual TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) {
diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp
index 059cac48d2..1c9a536b9b 100644
--- a/library/cpp/actors/core/executor_pool_base.cpp
+++ b/library/cpp/actors/core/executor_pool_base.cpp
@@ -94,6 +94,9 @@ namespace NActors {
#ifdef ACTORSLIB_COLLECT_EXEC_STATS
RelaxedStore(&ev->SendTime, (::NHPTimer::STime)GetCycleCountFast());
#endif
+ if (TlsThreadContext) {
+ TlsThreadContext->IsCurrentRecipientAService = ev->Recipient.IsService();
+ }
return MailboxTable->SendTo(ev, this);
}
@@ -102,6 +105,9 @@ namespace NActors {
#ifdef ACTORSLIB_COLLECT_EXEC_STATS
RelaxedStore(&ev->SendTime, (::NHPTimer::STime)GetCycleCountFast());
#endif
+ if (TlsThreadContext) {
+ TlsThreadContext->IsCurrentRecipientAService = ev->Recipient.IsService();
+ }
return MailboxTable->SpecificSendTo(ev, this);
}
diff --git a/library/cpp/actors/core/executor_pool_base.h b/library/cpp/actors/core/executor_pool_base.h
index e94ffdbad9..6bfabb527f 100644
--- a/library/cpp/actors/core/executor_pool_base.h
+++ b/library/cpp/actors/core/executor_pool_base.h
@@ -2,6 +2,7 @@
#include "executor_pool.h"
#include "executor_thread.h"
+#include "mon_stats.h"
#include "scheduler_queue.h"
#include <library/cpp/actors/util/affinity.h>
#include <library/cpp/actors/util/unordered_cache.h>
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index ccd813bb2e..4a9019e26d 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -13,6 +13,10 @@
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
+
+ const double TWaitingStatsConstants::HistogramResolutionUs = MaxSpinThersholdUs / BucketCount;
+ const ui64 TWaitingStatsConstants::HistogramResolution = NHPTimer::GetCyclesPerSecond() * 0.000001 * HistogramResolutionUs;
+
constexpr TDuration TBasicExecutorPool::DEFAULT_TIME_PER_MAILBOX;
TBasicExecutorPool::TBasicExecutorPool(
@@ -31,9 +35,11 @@ namespace NActors {
i16 defaultThreadCount,
i16 priority)
: TExecutorPoolBase(poolId, threads, affinity)
- , SpinThreshold(spinThreshold)
- , SpinThresholdCycles(spinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles
- , Threads(new TThreadCtx[threads])
+ , DefaultSpinThresholdCycles(spinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles
+ , SpinThresholdCycles(DefaultSpinThresholdCycles)
+ , SpinThresholdCyclesPerThread(new NThreading::TPadded<std::atomic<ui64>>[threads])
+ , Threads(new NThreading::TPadded<TThreadCtx>[threads])
+ , WaitingStats(new TWaitingStats<ui64>[threads])
, PoolName(poolName)
, TimePerMailbox(timePerMailbox)
, EventsPerMailbox(eventsPerMailbox)
@@ -57,6 +63,14 @@ namespace NActors {
LocalQueueSize = NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE;
}
}
+ if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) {
+ for (ui32 idx = 0; idx < threads; ++idx) {
+ SpinThresholdCyclesPerThread[idx].store(0);
+ }
+ }
+ if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) {
+ MovingWaitingStats.Reset(new TWaitingStats<double>[threads]);
+ }
Y_UNUSED(maxActivityType);
i16 limit = Min(threads, (ui32)Max<i16>());
@@ -99,6 +113,7 @@ namespace NActors {
{
SetSharedExecutorsCount(cfg.SharedExecutorsCount);
SoftProcessingDurationTs = cfg.SoftProcessingDurationTs;
+ ActorSystemProfile = cfg.ActorSystemProfile;
}
TBasicExecutorPool::~TBasicExecutorPool() {
@@ -109,7 +124,7 @@ namespace NActors {
do {
timers.HPNow = GetCycleCountFast();
timers.Elapsed += timers.HPNow - timers.HPStart;
- if (threadCtx.Pad.Park()) // interrupted
+ if (threadCtx.WaitingPad.Park()) // interrupted
return true;
timers.HPStart = GetCycleCountFast();
timers.Parked += timers.HPStart - timers.HPNow;
@@ -117,31 +132,25 @@ namespace NActors {
return false;
}
- void TBasicExecutorPool::GoToSpin(TThreadCtx& threadCtx) {
- ui64 start = GetCycleCountFast();
- bool doSpin = true;
- while (true) {
- for (ui32 j = 0; doSpin && j < 12; ++j) {
- if (GetCycleCountFast() >= (start + SpinThresholdCycles)) {
- doSpin = false;
- break;
- }
- for (ui32 i = 0; i < 12; ++i) {
- if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) {
- SpinLockPause();
- } else {
- doSpin = false;
- break;
- }
- }
- }
- if (!doSpin) {
- break;
- }
- if (RelaxedLoad(&StopFlag)) {
- break;
- }
+ ui32 TBasicExecutorPool::GoToSpin(TThreadCtx& threadCtx, i64 start, i64 &end) {
+ ui32 spinPauseCount = 0;
+ i64 spinThresholdCycles = 0;
+ if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) {
+ spinThresholdCycles = SpinThresholdCyclesPerThread[TlsThreadContext->WorkerId].load();
+ } else {
+ spinThresholdCycles = SpinThresholdCycles.load();
}
+ do {
+ end = GetCycleCountFast();
+ if (end >= (start + spinThresholdCycles) || AtomicLoad(&threadCtx.WaitingFlag) != TThreadCtx::WS_ACTIVE) {
+ return spinPauseCount;
+ }
+
+ SpinLockPause();
+ spinPauseCount++;
+ } while (!RelaxedLoad(&StopFlag));
+
+ return spinPauseCount;
}
bool TBasicExecutorPool::GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock) {
@@ -161,26 +170,55 @@ namespace NActors {
}
#endif
+ i64 startWaiting = GetCycleCountFast();
+ i64 endSpinning = 0;
TAtomic state = AtomicLoad(&threadCtx.WaitingFlag);
+ bool wasSleeping = false;
Y_ABORT_UNLESS(state == TThreadCtx::WS_NONE, "WaitingFlag# %d", int(state));
- if (SpinThreshold > 0 && !needToBlock) {
+ if (SpinThresholdCycles > 0 && !needToBlock) {
// spin configured period
AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_ACTIVE);
- GoToSpin(threadCtx);
+ ui32 spinPauseCount = GoToSpin(threadCtx, startWaiting, endSpinning);
+ SpinningTimeUs += endSpinning - startWaiting;
// then - sleep
if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) {
if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED, TThreadCtx::WS_ACTIVE)) {
+ if (NFeatures::TCommonFeatureFlags::ProbeSpinCycles) {
+ LWPROBE(SpinCycles, PoolId, PoolName, spinPauseCount, true);
+ }
+
+ wasSleeping = true;
if (GoToSleep(threadCtx, timers)) { // interrupted
return true;
}
+ AllThreadsSleep.store(false);
}
}
+ if (NFeatures::TCommonFeatureFlags::ProbeSpinCycles && !wasSleeping) {
+ LWPROBE(SpinCycles, PoolId, PoolName, spinPauseCount, false);
+ }
} else {
AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED);
+ wasSleeping = true;
if (GoToSleep(threadCtx, timers)) { // interrupted
return true;
}
+ AllThreadsSleep.store(false);
+ }
+
+ i64 needTimeTs = threadCtx.StartWakingTs.exchange(0);
+ if (wasSleeping && needTimeTs) {
+ ui64 waitingDuration = std::max<i64>(0, needTimeTs - startWaiting);
+ ui64 awakingDuration = std::max<i64>(0, GetCycleCountFast() - needTimeTs);
+ WaitingStats[TlsThreadContext->WorkerId].AddAwakening(waitingDuration, awakingDuration);
+ } else {
+ ui64 waitingDuration = std::max<i64>(0, endSpinning - startWaiting);
+ if (wasSleeping) {
+ WaitingStats[TlsThreadContext->WorkerId].AddFastAwakening(waitingDuration);
+ } else {
+ WaitingStats[TlsThreadContext->WorkerId].Add(waitingDuration);
+ }
}
Y_DEBUG_ABORT_UNLESS(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE);
@@ -225,6 +263,9 @@ namespace NActors {
if (semaphore.OldSemaphore == 0) {
semaphore.CurrentSleepThreadCount++;
+ if (semaphore.CurrentSleepThreadCount == AtomicLoad(&ThreadCount)) {
+ AllThreadsSleep.store(true);
+ }
x = AtomicGetAndCas(&Semaphore, semaphore.ConverToI64(), x);
if (x == oldX) {
*needToWait = true;
@@ -327,6 +368,25 @@ namespace NActors {
}
inline void TBasicExecutorPool::WakeUpLoop(i16 currentThreadCount) {
+ if (AllThreadsSleep) {
+ TThreadCtx& hotThreadCtx = Threads[0];
+ if (AtomicCas(&hotThreadCtx.WaitingFlag, TThreadCtx::WS_NONE, TThreadCtx::WS_ACTIVE)) {
+ return;
+ }
+
+ TThreadCtx& coldThreadCtx = Threads[AtomicLoad(&ThreadCount) - 1];
+ if (AtomicCas(&coldThreadCtx.WaitingFlag, TThreadCtx::WS_NONE, TThreadCtx::WS_BLOCKED)) {
+ if (TlsThreadContext && TlsThreadContext->WaitingStats) {
+ ui64 beforeUnpark = GetCycleCountFast();
+ coldThreadCtx.StartWakingTs = beforeUnpark;
+ coldThreadCtx.WaitingPad.Unpark();
+ TlsThreadContext->WaitingStats->AddWakingUp(GetCycleCountFast() - beforeUnpark);
+ } else {
+ coldThreadCtx.WaitingPad.Unpark();
+ }
+ return;
+ }
+ }
for (i16 i = 0;;) {
TThreadCtx& threadCtx = Threads[i];
TThreadCtx::EWaitState state = static_cast<TThreadCtx::EWaitState>(AtomicLoad(&threadCtx.WaitingFlag));
@@ -341,7 +401,14 @@ namespace NActors {
case TThreadCtx::WS_BLOCKED:
if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_NONE, state)) {
if (state == TThreadCtx::WS_BLOCKED) {
- threadCtx.Pad.Unpark();
+ ui64 beforeUnpark = GetCycleCountFast();
+ threadCtx.StartWakingTs = beforeUnpark;
+ if (TlsThreadContext && TlsThreadContext->WaitingStats) {
+ threadCtx.WaitingPad.Unpark();
+ TlsThreadContext->WaitingStats->AddWakingUp(GetCycleCountFast() - beforeUnpark);
+ } else {
+ threadCtx.WaitingPad.Unpark();
+ }
}
if (i >= currentThreadCount) {
AtomicIncrement(WrongWakenedThreadCount);
@@ -355,12 +422,12 @@ namespace NActors {
}
}
- void TBasicExecutorPool::ScheduleActivationExCommon(ui32 activation, ui64 revolvingCounter) {
+ void TBasicExecutorPool::ScheduleActivationExCommon(ui32 activation, ui64 revolvingCounter, TAtomic x) {
+ TSemaphore semaphore = TSemaphore::GetSemaphore(x);
+
Activations.Push(activation, revolvingCounter);
bool needToWakeUp = false;
- TAtomic x = AtomicGet(Semaphore);
- TSemaphore semaphore = TSemaphore::GetSemaphore(x);
do {
needToWakeUp = semaphore.CurrentSleepThreadCount > SharedExecutorsCount;
i64 oldX = semaphore.ConverToI64();
@@ -386,15 +453,39 @@ namespace NActors {
LocalQueues[TlsThreadContext->WorkerId].push(activation);
return;
}
+ if (ActorSystemProfile != EASProfile::Default) {
+ TAtomic x = AtomicGet(Semaphore);
+ TSemaphore semaphore = TSemaphore::GetSemaphore(x);
+ if constexpr (NFeatures::TLocalQueuesFeatureFlags::UseIfAllOtherThreadsAreSleeping) {
+ if (semaphore.CurrentSleepThreadCount == semaphore.CurrentThreadCount - 1 && semaphore.OldSemaphore == 0) {
+ if (LocalQueues[TlsThreadContext->WorkerId].empty()) {
+ LocalQueues[TlsThreadContext->WorkerId].push(activation);
+ return;
+ }
+ }
+ }
+
+ if constexpr (NFeatures::TLocalQueuesFeatureFlags::UseOnMicroburst) {
+ if (semaphore.OldSemaphore >= semaphore.CurrentThreadCount) {
+ if (LocalQueues[TlsThreadContext->WorkerId].empty() && TlsThreadContext->WriteTurn < 1) {
+ TlsThreadContext->WriteTurn++;
+ LocalQueues[TlsThreadContext->WorkerId].push(activation);
+ return;
+ }
+ }
+ }
+ ScheduleActivationExCommon(activation, revolvingWriteCounter, x);
+ return;
+ }
}
- ScheduleActivationExCommon(activation, revolvingWriteCounter);
+ ScheduleActivationExCommon(activation, revolvingWriteCounter, AtomicGet(Semaphore));
}
void TBasicExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) {
if constexpr (NFeatures::IsLocalQueues()) {
ScheduleActivationExLocalQueue(activation, revolvingCounter);
} else {
- ScheduleActivationExCommon(activation, revolvingCounter);
+ ScheduleActivationExCommon(activation, revolvingCounter, AtomicGet(Semaphore));
}
}
@@ -404,6 +495,8 @@ namespace NActors {
poolStats.CurrentThreadCount = RelaxedLoad(&ThreadCount);
poolStats.DefaultThreadCount = DefaultThreadCount;
poolStats.MaxThreadCount = MaxThreadCount;
+ poolStats.SpinningTimeUs = Ts2Us(SpinningTimeUs);
+ poolStats.SpinThresholdUs = Ts2Us(SpinThresholdCycles);
if (Harmonizer) {
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
poolStats.IsNeedy = stats.IsNeedy;
@@ -487,7 +580,7 @@ namespace NActors {
AtomicStore(&StopFlag, true);
for (i16 i = 0; i != PoolThreads; ++i) {
Threads[i].Thread->StopFlag = true;
- Threads[i].Pad.Interrupt();
+ Threads[i].WaitingPad.Interrupt();
}
}
@@ -598,4 +691,59 @@ namespace NActors {
LocalQueueSize.store(std::max(size, NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE), std::memory_order_relaxed);
}
}
+
+ void TBasicExecutorPool::Initialize(TWorkerContext& wctx) {
+ if (wctx.WorkerId >= 0) {
+ TlsThreadContext->WaitingStats = &WaitingStats[wctx.WorkerId];
+ }
+ }
+
+ void TBasicExecutorPool::SetSpinThresholdCycles(ui32 cycles) {
+ if (ActorSystemProfile == EASProfile::LowLatency) {
+ if (DefaultSpinThresholdCycles > cycles) {
+ cycles = DefaultSpinThresholdCycles;
+ }
+ }
+ SpinThresholdCycles = cycles;
+ double resolutionUs = TWaitingStatsConstants::HistogramResolutionUs;
+ ui32 bucketIdx = cycles / TWaitingStatsConstants::HistogramResolution;
+ LWPROBE(ChangeSpinThreshold, PoolId, PoolName, cycles, resolutionUs * bucketIdx, bucketIdx);
+ }
+
+ void TBasicExecutorPool::GetWaitingStats(TWaitingStats<ui64> &acc) const {
+ acc.Clear();
+ double resolutionUs = TWaitingStatsConstants::HistogramResolutionUs;
+ for (ui32 idx = 0; idx < ThreadCount; ++idx) {
+ for (ui32 bucketIdx = 0; bucketIdx < TWaitingStatsConstants::BucketCount; ++bucketIdx) {
+ LWPROBE(WaitingHistogramPerThread, PoolId, PoolName, idx, resolutionUs * bucketIdx, resolutionUs * (bucketIdx + 1), WaitingStats[idx].WaitingUntilNeedsTimeHist[bucketIdx].load());
+ }
+ acc.Add(WaitingStats[idx]);
+ }
+ for (ui32 bucketIdx = 0; bucketIdx < TWaitingStatsConstants::BucketCount; ++bucketIdx) {
+ LWPROBE(WaitingHistogram, PoolId, PoolName, resolutionUs * bucketIdx, resolutionUs * (bucketIdx + 1), acc.WaitingUntilNeedsTimeHist[bucketIdx].load());
+ }
+ }
+
+ void TBasicExecutorPool::ClearWaitingStats() const {
+ for (ui32 idx = 0; idx < ThreadCount; ++idx) {
+ WaitingStats[idx].Clear();
+ }
+ }
+
+ void TBasicExecutorPool::CalcSpinPerThread(ui64 wakingUpConsumption) {
+ for (i16 threadIdx = 0; threadIdx < PoolThreads; ++threadIdx) {
+ ui64 newSpinThreshold = 0;
+ if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) {
+ MovingWaitingStats[threadIdx].Add(WaitingStats[threadIdx], 0.8, 0.2);
+ newSpinThreshold = MovingWaitingStats[threadIdx].CalculateGoodSpinThresholdCycles(wakingUpConsumption);
+ } else {
+ newSpinThreshold = WaitingStats[threadIdx].CalculateGoodSpinThresholdCycles(wakingUpConsumption);
+ }
+ SpinThresholdCyclesPerThread[threadIdx].store(newSpinThreshold);
+
+ double resolutionUs = TWaitingStatsConstants::HistogramResolutionUs;
+ ui32 bucketIdx = newSpinThreshold / TWaitingStatsConstants::HistogramResolution;
+ LWPROBE(ChangeSpinThresholdPerThread, PoolId, PoolName, threadIdx, newSpinThreshold, resolutionUs * bucketIdx, bucketIdx);
+ }
+ }
}
diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h
index dcbcaeb3db..d1a4c9fd27 100644
--- a/library/cpp/actors/core/executor_pool_basic.h
+++ b/library/cpp/actors/core/executor_pool_basic.h
@@ -2,6 +2,7 @@
#include "actorsystem.h"
#include "executor_thread.h"
+#include "executor_pool_basic_feature_flags.h"
#include "scheduler_queue.h"
#include "executor_pool_base.h"
#include "harmonizer.h"
@@ -17,17 +18,117 @@
#include <queue>
namespace NActors {
+
+ struct TWaitingStatsConstants {
+ static constexpr ui64 BucketCount = 128;
+ static constexpr double MaxSpinThersholdUs = 12.8;
+
+ static constexpr ui64 KnownAvgWakingUpTime = 4250;
+ static constexpr ui64 KnownAvgAwakeningTime = 7000;
+
+ static const double HistogramResolutionUs;
+ static const ui64 HistogramResolution;
+ };
+
+ template <typename T>
+ struct TWaitingStats : TWaitingStatsConstants {
+ std::array<std::atomic<T>, BucketCount> WaitingUntilNeedsTimeHist;
+
+ std::atomic<T> WakingUpTotalTime;
+ std::atomic<T> WakingUpCount;
+ std::atomic<T> AwakingTotalTime;
+ std::atomic<T> AwakingCount;
+
+ TWaitingStats()
+ {
+ Clear();
+ }
+
+ void Clear() {
+ std::fill(WaitingUntilNeedsTimeHist.begin(), WaitingUntilNeedsTimeHist.end(), 0);
+ WakingUpTotalTime = 0;
+ WakingUpCount = 0;
+ AwakingTotalTime = 0;
+ AwakingCount = 0;
+ }
+
+ void Add(ui64 waitingUntilNeedsTime) {
+ ui64 waitIdx = std::min(waitingUntilNeedsTime / HistogramResolution, BucketCount - 1);
+ WaitingUntilNeedsTimeHist[waitIdx]++;
+ }
+
+ void AddAwakening(ui64 waitingUntilNeedsTime, ui64 awakingTime) {
+ Add(waitingUntilNeedsTime);
+ AwakingTotalTime += awakingTime;
+ AwakingCount++;
+ }
+
+ void AddFastAwakening(ui64 waitingUntilNeedsTime) {
+ Add(waitingUntilNeedsTime - HistogramResolution);
+ }
+
+ void AddWakingUp(ui64 wakingUpTime) {
+ WakingUpTotalTime += wakingUpTime;
+ WakingUpCount++;
+ }
+
+ void Add(const TWaitingStats<T> &stats) {
+ for (ui32 idx = 0; idx < BucketCount; ++idx) {
+ WaitingUntilNeedsTimeHist[idx] += stats.WaitingUntilNeedsTimeHist[idx];
+ }
+ WakingUpTotalTime += stats.WakingUpTotalTime;
+ WakingUpCount += stats.WakingUpCount;
+ AwakingTotalTime += stats.AwakingTotalTime;
+ AwakingCount += stats.AwakingCount;
+ }
+
+ template <typename T2>
+ void Add(const TWaitingStats<T2> &stats, double oldK, double newK) {
+ for (ui32 idx = 0; idx < BucketCount; ++idx) {
+ WaitingUntilNeedsTimeHist[idx] = oldK * WaitingUntilNeedsTimeHist[idx] + newK * stats.WaitingUntilNeedsTimeHist[idx];
+ }
+ WakingUpTotalTime = oldK * WakingUpTotalTime + newK * stats.WakingUpTotalTime;
+ WakingUpCount = oldK * WakingUpCount + newK * stats.WakingUpCount;
+ AwakingTotalTime = oldK * AwakingTotalTime + newK * stats.AwakingTotalTime;
+ AwakingCount = oldK * AwakingCount + newK * stats.AwakingCount;
+ }
+
+ ui32 CalculateGoodSpinThresholdCycles(ui64 avgWakingUpConsumption) {
+ auto &bucketCount = TWaitingStatsConstants::BucketCount;
+ auto &resolution = TWaitingStatsConstants::HistogramResolution;
+
+ T waitingsCount = std::accumulate(WaitingUntilNeedsTimeHist.begin(), WaitingUntilNeedsTimeHist.end(), 0);
+
+ ui32 bestBucketIdx = 0;
+ T bestCpuConsumption = Max<T>();
+
+ T spinTime = 0;
+ T spinCount = 0;
+
+ for (ui32 bucketIdx = 0; bucketIdx < bucketCount; ++bucketIdx) {
+ auto &bucket = WaitingUntilNeedsTimeHist[bucketIdx];
+ ui64 imaginarySpingThreshold = resolution * bucketIdx;
+ T cpuConsumption = spinTime + (waitingsCount - spinCount) * (avgWakingUpConsumption + imaginarySpingThreshold);
+ if (bestCpuConsumption > cpuConsumption) {
+ bestCpuConsumption = cpuConsumption;
+ bestBucketIdx = bucketIdx;
+ }
+ spinTime += (2 * imaginarySpingThreshold + resolution) * bucket / 2;
+ spinCount += bucket;
+ // LWPROBE(WaitingHistogram, Pool->PoolId, Pool->GetName(), resolutionUs * bucketIdx, resolutionUs * (bucketIdx + 1), bucket);
+ }
+ ui64 result = resolution * bestBucketIdx;
+ return result;
+ }
+ };
+
class TBasicExecutorPool: public TExecutorPoolBase {
struct TThreadCtx {
TAutoPtr<TExecutorThread> Thread;
- TThreadParkPad Pad;
+ TThreadParkPad WaitingPad;
TAtomic WaitingFlag;
-
- // different threads must spin/block on different cache-lines.
- // we add some padding bytes to enforce this rule
- static const size_t SizeWithoutPadding = sizeof(TAutoPtr<TExecutorThread>) + sizeof(TThreadParkPad) + sizeof(TAtomic);
- ui8 Padding[64 - SizeWithoutPadding];
- static_assert(64 >= SizeWithoutPadding);
+ std::atomic<i64> StartWakingTs;
+ std::atomic<i64> EndWakingTs;
enum EWaitState {
WS_NONE,
@@ -49,11 +150,16 @@ namespace NActors {
NHPTimer::STime HPNow;
};
- const ui64 SpinThreshold;
- const ui64 SpinThresholdCycles;
+ NThreading::TPadded<std::atomic_bool> AllThreadsSleep = true;
+ const ui64 DefaultSpinThresholdCycles;
+ std::atomic<ui64> SpinThresholdCycles;
+ std::unique_ptr<NThreading::TPadded< std::atomic<ui64>>[]> SpinThresholdCyclesPerThread;
- TArrayHolder<TThreadCtx> Threads;
+ TArrayHolder<NThreading::TPadded<TThreadCtx>> Threads;
+ static_assert(sizeof(std::decay_t<decltype(Threads[0])>) == PLATFORM_CACHE_LINE);
TArrayHolder<NThreading::TPadded<std::queue<ui32>>> LocalQueues;
+ TArrayHolder<TWaitingStats<ui64>> WaitingStats;
+ TArrayHolder<TWaitingStats<double>> MovingWaitingStats;
std::atomic<ui16> LocalQueueSize;
@@ -63,6 +169,7 @@ namespace NActors {
const TString PoolName;
const TDuration TimePerMailbox;
const ui32 EventsPerMailbox;
+ EASProfile ActorSystemProfile;
const int RealtimePriority;
@@ -70,6 +177,7 @@ namespace NActors {
TAtomic MaxUtilizationCounter;
TAtomic MaxUtilizationAccumulator;
TAtomic WrongWakenedThreadCount;
+ std::atomic<ui64> SpinningTimeUs;
TAtomic ThreadCount;
TMutex ChangeThreadsLock;
@@ -129,6 +237,7 @@ namespace NActors {
void SetSharedExecutorsCount(i16 count);
+ void Initialize(TWorkerContext& wctx) override;
ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingReadCounter) override;
ui32 GetReadyActivationCommon(TWorkerContext& wctx, ui64 revolvingReadCounter);
ui32 GetReadyActivationLocalQueue(TWorkerContext& wctx, ui64 revolvingReadCounter);
@@ -138,7 +247,7 @@ namespace NActors {
void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
void ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) override;
- void ScheduleActivationExCommon(ui32 activation, ui64 revolvingWriteCounter);
+ void ScheduleActivationExCommon(ui32 activation, ui64 revolvingWriteCounter, TAtomic semaphoreValue);
void ScheduleActivationExLocalQueue(ui32 activation, ui64 revolvingWriteCounter);
void SetLocalQueueSize(ui16 size);
@@ -164,12 +273,18 @@ namespace NActors {
i16 GetBlockingThreadCount() const override;
i16 GetPriority() const override;
+ void SetSpinThresholdCycles(ui32 cycles) override;
+
+ void GetWaitingStats(TWaitingStats<ui64> &acc) const;
+ void CalcSpinPerThread(ui64 wakingUpConsumption);
+ void ClearWaitingStats() const;
+
private:
void AskToGoToSleep(bool *needToWait, bool *needToBlock);
void WakeUpLoop(i16 currentThreadCount);
bool GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock);
- void GoToSpin(TThreadCtx& threadCtx);
+ ui32 GoToSpin(TThreadCtx& threadCtx, i64 start, i64 &end);
bool GoToSleep(TThreadCtx& threadCtx, TTimers &timers);
bool GoToBeBlocked(TThreadCtx& threadCtx, TTimers &timers);
};
diff --git a/library/cpp/actors/core/executor_pool_basic_feature_flags.h b/library/cpp/actors/core/executor_pool_basic_feature_flags.h
index d87b713a07..17f2a81df4 100644
--- a/library/cpp/actors/core/executor_pool_basic_feature_flags.h
+++ b/library/cpp/actors/core/executor_pool_basic_feature_flags.h
@@ -14,6 +14,8 @@ namespace NActors::NFeatures {
struct TCommonFeatureFlags {
static constexpr EActorSystemOptimizationType OptimizationType = EActorSystemOptimizationType::Common;
+
+ static constexpr bool ProbeSpinCycles = false;
};
struct TLocalQueuesFeatureFlags {
@@ -22,6 +24,17 @@ namespace NActors::NFeatures {
static constexpr ui16 MIN_LOCAL_QUEUE_SIZE = 0;
static constexpr ui16 MAX_LOCAL_QUEUE_SIZE = 16;
static constexpr std::optional<ui16> FIXED_LOCAL_QUEUE_SIZE = std::nullopt;
+
+ static constexpr bool UseIfAllOtherThreadsAreSleeping = false;
+ static constexpr bool UseOnMicroburst = false;
+ };
+
+ struct TSpinFeatureFlags {
+ static constexpr bool DoNotSpinLower = false;
+ static constexpr bool UsePseudoMovingWindow = true;
+
+ static constexpr bool HotColdThreads = false;
+ static constexpr bool CalcPerThread = false;
};
using TFeatureFlags = TCommonFeatureFlags;
diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp
index 516eeda2c6..df8df52635 100644
--- a/library/cpp/actors/core/executor_pool_basic_ut.cpp
+++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp
@@ -91,6 +91,134 @@ THolder<TActorSystemSetup> GetActorSystemSetup(TBasicExecutorPool* pool)
return setup;
}
+Y_UNIT_TEST_SUITE(WaitingBenchs) {
+
+ Y_UNIT_TEST(SpinPause) {
+ const ui32 count = 1'000'000;
+ ui64 startTs = GetCycleCountFast();
+ for (ui32 idx = 0; idx < count; ++idx) {
+ SpinLockPause();
+ }
+ ui64 stopTs = GetCycleCountFast();
+ Cerr << Ts2Us(stopTs - startTs) / count << Endl;
+ Cerr << double(stopTs - startTs) / count << Endl;
+ }
+
+ struct TThread : public ISimpleThread {
+ static const ui64 CyclesInMicroSecond;
+ std::array<ui64, 128> Hist;
+ ui64 WakingTime = 0;
+ ui64 AwakeningTime = 0;
+ ui64 SleepTime = 0;
+ ui64 IterationCount = 0;
+
+ std::atomic<ui64> Awakens = 0;
+ std::atomic<ui64> *OtherAwaken;
+
+ TThreadParkPad OwnPad;
+ TThreadParkPad *OtherPad;
+
+ bool IsWaiting = false;
+
+ void GoToWait() {
+ ui64 start = GetCycleCountFast();
+ OwnPad.Park();
+ ui64 elapsed = GetCycleCountFast() - start;
+ AwakeningTime += elapsed;
+ ui64 idx = std::min(Hist.size() - 1, (elapsed - 20 * CyclesInMicroSecond) / CyclesInMicroSecond);
+ Hist[idx]++;
+ Awakens++;
+ }
+
+ void GoToWakeUp() {
+ ui64 start = GetCycleCountFast();
+ OtherPad->Unpark();
+ ui64 elapsed = GetCycleCountFast() - start;
+ WakingTime += elapsed;
+ ui64 idx = std::min(Hist.size() - 1, elapsed / CyclesInMicroSecond);
+ Hist[idx]++;
+ }
+
+ void GoToSleep() {
+ ui64 start = GetCycleCountFast();
+ ui64 stop = start;
+ while (stop - start < 20 * CyclesInMicroSecond) {
+ SpinLockPause();
+ stop = GetCycleCountFast();
+ }
+ SleepTime += stop - start;
+ }
+
+ void* ThreadProc() {
+ for (ui32 idx = 0; idx < IterationCount; ++idx) {
+ if (IsWaiting) {
+ GoToWait();
+ } else {
+ GoToSleep();
+ GoToWakeUp();
+ while(OtherAwaken->load() == idx) {
+ SpinLockPause();
+ }
+ }
+ }
+ return nullptr;
+ }
+ };
+
+ const ui64 TThread::CyclesInMicroSecond = NHPTimer::GetCyclesPerSecond() * 0.000001;
+
+ Y_UNIT_TEST(WakingUpTest) {
+ TThread a, b;
+ constexpr ui64 iterations = 100'000;
+ std::fill(a.Hist.begin(), a.Hist.end(), 0);
+ std::fill(b.Hist.begin(), b.Hist.end(), 0);
+ a.IterationCount = iterations;
+ b.IterationCount = iterations;
+ a.IsWaiting = true;
+ b.IsWaiting = false;
+ b.OtherAwaken = &a.Awakens;
+ a.OtherPad = &b.OwnPad;
+ b.OtherPad = &a.OwnPad;
+ a.Start();
+ b.Start();
+ a.Join();
+ b.Join();
+
+ ui64 awakeningTime = a.AwakeningTime + b.AwakeningTime - a.SleepTime - b.SleepTime;
+ ui64 wakingUpTime = a.WakingTime + b.WakingTime;
+
+ Cerr << "AvgAwakeningCycles: " << double(awakeningTime) / iterations << Endl;
+ Cerr << "AvgAwakeningUs: " << Ts2Us(awakeningTime) / iterations << Endl;
+ Cerr << "AvgSleep20usCycles:" << double(b.SleepTime) / iterations << Endl;
+ Cerr << "AvgSleep20usUs:" << Ts2Us(b.SleepTime) / iterations << Endl;
+ Cerr << "AvgWakingUpCycles: " << double(wakingUpTime) / iterations << Endl;
+ Cerr << "AvgWakingUpUs: " << Ts2Us(wakingUpTime) / iterations << Endl;
+
+ Cerr << "AwakeningHist:\n";
+ for (ui32 idx = 0; idx < a.Hist.size(); ++idx) {
+ if (a.Hist[idx]) {
+ if (idx + 1 != a.Hist.size()) {
+ Cerr << " [" << idx << "us - " << idx + 1 << "us] " << a.Hist[idx] << Endl;
+ } else {
+ Cerr << " [" << idx << "us - ...] " << a.Hist[idx] << Endl;
+ }
+ }
+ }
+
+ Cerr << "WakingUpHist:\n";
+ for (ui32 idx = 0; idx < b.Hist.size(); ++idx) {
+ if (b.Hist[idx]) {
+ if (idx + 1 != b.Hist.size()) {
+ Cerr << " [" << idx << "us - " << idx + 1 << "us] " << b.Hist[idx] << Endl;
+ } else {
+ Cerr << " [" << idx << "us - ...] " << b.Hist[idx] << Endl;
+ }
+ }
+ }
+ }
+
+}
+
Y_UNIT_TEST_SUITE(BasicExecutorPool) {
Y_UNIT_TEST(Semaphore) {
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index eed1794ab1..4aab7b3e31 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -6,6 +6,7 @@
#include "event.h"
#include "events.h"
#include "executor_pool_base.h"
+#include "probes.h"
#include <library/cpp/actors/prof/tag.h>
#include <library/cpp/actors/util/affinity.h>
@@ -43,6 +44,8 @@ namespace NActors {
, Ctx(workerId, cpuId)
, ThreadName(threadName)
, IsUnitedWorker(true)
+ , TimePerMailbox(timePerMailbox)
+ , EventsPerMailbox(eventsPerMailbox)
{
Ctx.Switch(
ExecutorPool,
@@ -347,12 +350,17 @@ namespace NActors {
return ThreadId;
}
+ TWorkerId TExecutorThread::GetWorkerId() const {
+ return Ctx.WorkerId;
+ }
+
void TExecutorThread::ProcessExecutorPool(IExecutorPool *pool, bool isSharedThread) {
ExecutorPool = pool;
TThreadContext threadCtx;
TlsThreadContext = &threadCtx;
TlsThreadContext->Pool = static_cast<IExecutorPool*>(ExecutorPool);
TlsThreadContext->WorkerId = Ctx.WorkerId;
+ pool->Initialize(Ctx);
ExecutorPool->SetRealTimeMode();
TAffinityGuard affinity(ExecutorPool->Affinity());
diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h
index f66ffcb02d..b466ad2423 100644
--- a/library/cpp/actors/core/executor_thread.h
+++ b/library/cpp/actors/core/executor_thread.h
@@ -69,7 +69,7 @@ namespace NActors {
void GetCurrentStats(TExecutorThreadStats& statsCopy) const;
TThreadId GetThreadId() const; // blocks, must be called after Start()
- TWorkerId GetWorkerId() const { return Ctx.WorkerId; }
+ TWorkerId GetWorkerId() const;
private:
void* ThreadProc();
diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp
index ced855c809..4464603dc8 100644
--- a/library/cpp/actors/core/harmonizer.cpp
+++ b/library/cpp/actors/core/harmonizer.cpp
@@ -194,6 +194,9 @@ struct TPoolInfo {
TAtomic MaxBookedCpu = 0;
TAtomic MinBookedCpu = 0;
+ std::unique_ptr<TWaitingStats<ui64>> WaitingStats;
+ std::unique_ptr<TWaitingStats<double>> MovingWaitingStats;
+
double GetBooked(i16 threadIdx);
double GetlastSecondPoolBooked(i16 threadIdx);
double GetConsumed(i16 threadIdx);
@@ -252,6 +255,13 @@ TCpuConsumption TPoolInfo::PullStats(ui64 ts) {
RelaxedStore(&MinBookedCpu, Booked.GetMinInt());
NewNotEnoughCpuExecutions = acc.NotEnoughCpuExecutions - NotEnoughCpuExecutions;
NotEnoughCpuExecutions = acc.NotEnoughCpuExecutions;
+ if (WaitingStats && BasicPool) {
+ WaitingStats->Clear();
+ BasicPool->GetWaitingStats(*WaitingStats);
+ if constexpr (!NFeatures::TSpinFeatureFlags::CalcPerThread) {
+ MovingWaitingStats->Add(*WaitingStats, 0.8, 0.2);
+ }
+ }
return acc;
}
#undef UNROLL_HISTORY
@@ -291,6 +301,9 @@ private:
TAtomic MaxBookedCpu = 0;
TAtomic MinBookedCpu = 0;
+ std::atomic<double> AvgAwakeningTimeUs = 0;
+ std::atomic<double> AvgWakingUpTimeUs = 0;
+
void PullStats(ui64 ts);
void HarmonizeImpl(ui64 ts);
void CalculatePriorityOrder();
@@ -352,6 +365,58 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
size_t sumOfAdditionalThreads = 0;
+
+ ui64 TotalWakingUpTime = 0;
+ ui64 TotalWakingUps = 0;
+ ui64 TotalAwakeningTime = 0;
+ ui64 TotalAwakenings = 0;
+ for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
+ TPoolInfo& pool = Pools[poolIdx];
+ if (pool.WaitingStats) {
+ TotalWakingUpTime += pool.WaitingStats->WakingUpTotalTime;
+ TotalWakingUps += pool.WaitingStats->WakingUpCount;
+ TotalAwakeningTime += pool.WaitingStats->AwakingTotalTime;
+ TotalAwakenings += pool.WaitingStats->AwakingCount;
+ }
+ }
+
+ constexpr ui64 knownAvgWakingUpTime = TWaitingStatsConstants::KnownAvgWakingUpTime;
+ constexpr ui64 knownAvgAwakeningUpTime = TWaitingStatsConstants::KnownAvgAwakeningTime;
+
+ ui64 realAvgWakingUpTime = (TotalWakingUps ? TotalWakingUpTime / TotalWakingUps : knownAvgWakingUpTime);
+ ui64 avgWakingUpTime = realAvgWakingUpTime;
+ if (avgWakingUpTime > 2 * knownAvgWakingUpTime || !realAvgWakingUpTime) {
+ avgWakingUpTime = knownAvgWakingUpTime;
+ }
+ AvgWakingUpTimeUs = Ts2Us(avgWakingUpTime);
+
+ ui64 realAvgAwakeningTime = (TotalAwakenings ? TotalAwakeningTime / TotalAwakenings : knownAvgAwakeningUpTime);
+ ui64 avgAwakeningTime = realAvgAwakeningTime;
+ if (avgAwakeningTime > 2 * knownAvgAwakeningUpTime || !realAvgAwakeningTime) {
+ avgAwakeningTime = knownAvgAwakeningUpTime;
+ }
+ AvgAwakeningTimeUs = Ts2Us(avgAwakeningTime);
+
+ ui64 avgWakingUpConsumption = avgWakingUpTime + avgAwakeningTime;
+ LWPROBE(WakingUpConsumption, Ts2Us(avgWakingUpTime), Ts2Us(avgWakingUpTime), Ts2Us(avgAwakeningTime), Ts2Us(realAvgAwakeningTime), Ts2Us(avgWakingUpConsumption));
+
+ for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
+ TPoolInfo& pool = Pools[poolIdx];
+ if (!pool.BasicPool) {
+ continue;
+ }
+ if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) {
+ pool.BasicPool->CalcSpinPerThread(avgWakingUpConsumption);
+ } else if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) {
+ ui64 newSpinThreshold = pool.MovingWaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption);
+ pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold);
+ } else {
+ ui64 newSpinThreshold = pool.WaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption);
+ pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold);
+ }
+ pool.BasicPool->ClearWaitingStats();
+ }
+
for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
TPoolInfo& pool = Pools[poolIdx];
total += pool.DefaultThreadCount;
@@ -584,7 +649,11 @@ void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) {
poolInfo.AvgPingCounterWithSmallWindow = pingInfo->AvgPingCounterWithSmallWindow;
poolInfo.MaxAvgPingUs = pingInfo->MaxAvgPingUs;
}
- Pools.push_back(poolInfo);
+ if (poolInfo.BasicPool) {
+ poolInfo.WaitingStats.reset(new TWaitingStats<ui64>());
+ poolInfo.MovingWaitingStats.reset(new TWaitingStats<double>());
+ }
+ Pools.push_back(std::move(poolInfo));
PriorityOrder.clear();
}
@@ -623,6 +692,8 @@ THarmonizerStats THarmonizer::GetStats() const {
.MinConsumedCpu = static_cast<i64>(RelaxedLoad(&MinConsumedCpu)),
.MaxBookedCpu = static_cast<i64>(RelaxedLoad(&MaxBookedCpu)),
.MinBookedCpu = static_cast<i64>(RelaxedLoad(&MinBookedCpu)),
+ .AvgAwakeningTimeUs = AvgAwakeningTimeUs,
+ .AvgWakingUpTimeUs = AvgWakingUpTimeUs,
};
}
diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h
index 04eb616b15..ba98048e49 100644
--- a/library/cpp/actors/core/harmonizer.h
+++ b/library/cpp/actors/core/harmonizer.h
@@ -6,6 +6,9 @@
namespace NActors {
class IExecutorPool;
+ template <typename T>
+ struct TWaitingStats;
+
struct TPoolHarmonizerStats {
ui64 IncreasingThreadsByNeedyState = 0;
ui64 IncreasingThreadsByExchange = 0;
@@ -27,6 +30,9 @@ namespace NActors {
i64 MinConsumedCpu = 0.0;
i64 MaxBookedCpu = 0.0;
i64 MinBookedCpu = 0.0;
+
+ double AvgAwakeningTimeUs = 0;
+ double AvgWakingUpTimeUs = 0;
};
// Pool cpu harmonizer
diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h
index 30db3ac787..4fb49d70c0 100644
--- a/library/cpp/actors/core/mon_stats.h
+++ b/library/cpp/actors/core/mon_stats.h
@@ -70,6 +70,8 @@ namespace NActors {
i64 MinConsumedCpuUs = 0;
i64 MaxBookedCpuUs = 0;
i64 MinBookedCpuUs = 0;
+ double SpinningTimeUs = 0;
+ double SpinThresholdUs = 0;
i16 WrongWakenedThreadCount = 0;
i16 CurrentThreadCount = 0;
i16 PotentialMaxThreadCount = 0;
diff --git a/library/cpp/actors/core/probes.h b/library/cpp/actors/core/probes.h
index aa8dd7bcdb..531923b5ad 100644
--- a/library/cpp/actors/core/probes.h
+++ b/library/cpp/actors/core/probes.h
@@ -175,6 +175,15 @@
PROBE(HarmonizeCheckPool, GROUPS("Harmonizer"), \
TYPES(ui32, TString, double, double, double, double, ui32, ui32, bool, bool, bool), \
NAMES("poolId", "pool", "booked", "consumed", "lastSecondBooked", "lastSecondConsumed", "threadCount", "maxThreadCount", "isStarved", "isNeedy", "isHoggish")) \
+ PROBE(WakingUpConsumption, GROUPS("Harmonizer"), \
+ TYPES(double, double, double, double, double), \
+ NAMES("avgWakingUpUs", "realAvgWakingUpUs", "avgAwakeningUs", "realAvgAwakeningUs", "total")) \
+ PROBE(ChangeSpinThreshold, GROUPS("Harmonizer"), \
+ TYPES(ui32, TString, ui64, double, ui64), \
+ NAMES("poolId", "pool", "spinThreshold", "spinThresholdUs", "bucketIdx")) \
+ PROBE(WaitingHistogram, GROUPS("Harmonizer"), \
+ TYPES(ui32, TString, double, double, ui64), \
+ NAMES("poolId", "pool", "fromUs", "toUs", "count")) \
PROBE(HarmonizeOperation, GROUPS("Harmonizer"), \
TYPES(ui32, TString, TString, ui32, ui32, ui32), \
NAMES("poolId", "pool", "operation", "newCount", "minCount", "maxCount")) \
@@ -193,6 +202,15 @@
PROBE(TryToHarmonizeSuccess, GROUPS("Harmonizer"), \
TYPES(ui64, ui64, ui64), \
NAMES("ts", "nextHarmonizeTs", "previousNextHarmonizeTs")) \
+ PROBE(SpinCycles, GROUPS("Harmonizer"), \
+ TYPES(ui32, TString, ui64, bool), \
+ NAMES("poolId", "pool", "spinPauseCount", "IsInterrupted")) \
+ PROBE(WaitingHistogramPerThread, GROUPS("Harmonizer"), \
+ TYPES(ui32, TString, ui32, double, double, ui64), \
+ NAMES("poolId", "pool", "threadIdx", "fromUs", "toUs", "count")) \
+ PROBE(ChangeSpinThresholdPerThread, GROUPS("Harmonizer"), \
+ TYPES(ui32, TString, ui32, ui64, double, ui64), \
+ NAMES("poolId", "pool", "threadIdx", "spinThreshold", "spinThresholdUs", "bucketIdx")) \
/**/
LWTRACE_DECLARE_PROVIDER(ACTORLIB_PROVIDER)
diff --git a/library/cpp/actors/core/thread_context.h b/library/cpp/actors/core/thread_context.h
index 2c70ccd566..13e493f855 100644
--- a/library/cpp/actors/core/thread_context.h
+++ b/library/cpp/actors/core/thread_context.h
@@ -9,6 +9,9 @@ namespace NActors {
class IExecutorPool;
+ template <typename T>
+ struct TWaitingStats;
+
struct TThreadContext {
IExecutorPool *Pool = nullptr;
ui32 CapturedActivation = 0;
@@ -18,6 +21,8 @@ namespace NActors {
ui32 WriteTurn = 0;
TWorkerId WorkerId;
ui16 LocalQueueSize = 0;
+ TWaitingStats<ui64> *WaitingStats = nullptr;
+ bool IsCurrentRecipientAService = false;
};
extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp
diff --git a/library/cpp/actors/core/ut_fat/actor_benchmark.cpp b/library/cpp/actors/core/ut_fat/actor_benchmark.cpp
index 9512ee1ada..d47cae6ebb 100644
--- a/library/cpp/actors/core/ut_fat/actor_benchmark.cpp
+++ b/library/cpp/actors/core/ut_fat/actor_benchmark.cpp
@@ -30,7 +30,7 @@ Y_UNIT_TEST_SUITE(HeavyActorBenchmark) {
threadsList.push_back(threads);
}
std::vector<ui32> actorPairsList = {512};
- TActorBenchmark::RunSendActivateReceiveCSV(threadsList, actorPairsList, {1,100, 200});
+ TActorBenchmark::RunSendActivateReceiveCSV(threadsList, actorPairsList, {1,100, 200}, TDuration::Seconds(1));
}
Y_UNIT_TEST(StarSendActivateReceiveCSV) {
diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h
index 1f9a6a1ebb..e8decd9300 100644
--- a/library/cpp/actors/helpers/pool_stats_collector.h
+++ b/library/cpp/actors/helpers/pool_stats_collector.h
@@ -158,6 +158,8 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu;
NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu;
NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu;
+ NMonitoring::TDynamicCounters::TCounterPtr SpinningTimeUs;
+ NMonitoring::TDynamicCounters::TCounterPtr SpinThresholdUs;
THistogramCounters LegacyActivationTimeHistogram;
@@ -220,6 +222,9 @@ private:
MinConsumedCpu = PoolGroup->GetCounter("MinConsumedCpuByPool", false);
MaxBookedCpu = PoolGroup->GetCounter("MaxBookedCpuByPool", false);
MinBookedCpu = PoolGroup->GetCounter("MinBookedCpuByPool", false);
+ SpinningTimeUs = PoolGroup->GetCounter("SpinningTimeUs", true);
+ SpinThresholdUs = PoolGroup->GetCounter("SpinThresholdUs", false);
+
LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000);
ActivationTimeHistogram = PoolGroup->GetHistogram(
@@ -271,6 +276,9 @@ private:
*DecreasingThreadsByExchange = poolStats.DecreasingThreadsByExchange;
*NotEnoughCpuExecutions = stats.NotEnoughCpuExecutions;
+ *SpinningTimeUs = poolStats.SpinningTimeUs;
+ *SpinThresholdUs = poolStats.SpinThresholdUs;
+
LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram);
ActivationTimeHistogram->Reset();
ActivationTimeHistogram->Collect(stats.ActivationTimeHistogram);
@@ -322,6 +330,10 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu;
NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu;
+ NMonitoring::TDynamicCounters::TCounterPtr AvgAwakeningTimeUs;
+ NMonitoring::TDynamicCounters::TCounterPtr AvgWakingUpTimeUs;
+
+
void Init(NMonitoring::TDynamicCounters* group) {
Group = group;
@@ -329,6 +341,8 @@ private:
MinConsumedCpu = Group->GetCounter("MinConsumedCpu", false);
MaxBookedCpu = Group->GetCounter("MaxBookedCpu", false);
MinBookedCpu = Group->GetCounter("MinBookedCpu", false);
+ AvgAwakeningTimeUs = Group->GetCounter("AvgAwakeningTimeUs", false);
+ AvgWakingUpTimeUs = Group->GetCounter("AvgWakingUpTimeUs", false);
}
void Set(const THarmonizerStats& harmonizerStats) {
@@ -337,10 +351,11 @@ private:
*MinConsumedCpu = harmonizerStats.MinConsumedCpu;
*MaxBookedCpu = harmonizerStats.MaxBookedCpu;
*MinBookedCpu = harmonizerStats.MinBookedCpu;
+
+ *AvgAwakeningTimeUs = harmonizerStats.AvgAwakeningTimeUs;
+ *AvgWakingUpTimeUs = harmonizerStats.AvgWakingUpTimeUs;
#else
- Y_UNUSED(poolStats);
- Y_UNUSED(stats);
- Y_UNUSED(numThreads);
+ Y_UNUSED(harmonizerStats);
#endif
}
diff --git a/ydb/core/driver_lib/run/auto_config_initializer.cpp b/ydb/core/driver_lib/run/auto_config_initializer.cpp
index 17ea71ae22..a605b3b686 100644
--- a/ydb/core/driver_lib/run/auto_config_initializer.cpp
+++ b/ydb/core/driver_lib/run/auto_config_initializer.cpp
@@ -257,6 +257,14 @@ namespace NKikimr::NAutoConfigInitializer {
config->SetIoExecutor(pools.IOPoolId);
serviceExecutor->SetExecutorId(pools.ICPoolId);
+ if (!config->HasActorSystemProfile()) {
+ if (cpuCount <= 4) {
+ config->SetActorSystemProfile(NKikimrConfig::TActorSystemConfig::LOW_CPU_CONSUMPTION);
+ } else {
+ config->SetActorSystemProfile(NKikimrConfig::TActorSystemConfig::LOW_LATENCY);
+ }
+ }
+
TVector<NKikimrConfig::TActorSystemConfig::TExecutor *> executors;
for (ui32 poolIdx = 0; poolIdx < poolCount; ++poolIdx) {
executors.push_back(config->AddExecutor());
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 14a6365dbd..27448450c4 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -269,6 +269,18 @@ TDuration GetSelfPingInterval(const NKikimrConfig::TActorSystemConfig& systemCon
: TDuration::MilliSeconds(10);
}
+
+NActors::EASProfile ConvertActorSystemProfile(NKikimrConfig::TActorSystemConfig::EActorSystemProfile profile) {
+ switch (profile) {
+ case NKikimrConfig::TActorSystemConfig::DEFAULT:
+ return NActors::EASProfile::Default;
+ case NKikimrConfig::TActorSystemConfig::LOW_CPU_CONSUMPTION:
+ return NActors::EASProfile::LowCpuConsumption;
+ case NKikimrConfig::TActorSystemConfig::LOW_LATENCY:
+ return NActors::EASProfile::LowLatency;
+ }
+}
+
void AddExecutorPool(
TCpuManagerConfig& cpuManager,
const NKikimrConfig::TActorSystemConfig::TExecutor& poolConfig,
@@ -305,6 +317,7 @@ void AddExecutorPool(
} else if (systemConfig.HasEventsPerMailbox()) {
basic.EventsPerMailbox = systemConfig.GetEventsPerMailbox();
}
+ basic.ActorSystemProfile = ConvertActorSystemProfile(systemConfig.GetActorSystemProfile());
Y_ABORT_UNLESS(basic.EventsPerMailbox != 0);
basic.MinThreadCount = poolConfig.GetMinThreads();
basic.MaxThreadCount = poolConfig.GetMaxThreads();
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 5e5e874d98..4b70952175 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -51,6 +51,12 @@ message TActorSystemConfig {
HYBRID = 3;
}
+ enum EActorSystemProfile {
+ DEFAULT = 1;
+ LOW_CPU_CONSUMPTION = 2;
+ LOW_LATENCY = 3;
+ }
+
message TExecutor {
enum EType {
BASIC = 1;
@@ -132,6 +138,7 @@ message TActorSystemConfig {
optional ENodeType NodeType = 14 [default = COMPUTE];
optional bool MonitorStuckActors = 15;
+ optional EActorSystemProfile ActorSystemProfile = 16;
}
message TStaticNameserviceConfig {