diff options
author | kruall <kruall@ydb.tech> | 2023-11-15 13:04:33 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2023-11-15 14:08:26 +0300 |
commit | f112fa3b9076c88724b55487367f536710f0dd81 (patch) | |
tree | 899d94b6a0f462a5648be8325294a4e1aaff36b7 | |
parent | 2b0bf66043ecfee310e1dbf78b54932a949c7726 (diff) | |
download | ydb-f112fa3b9076c88724b55487367f536710f0dd81.tar.gz |
Add auto spin threshold, KIKIMR-19343
23 files changed, 640 insertions, 63 deletions
diff --git a/library/cpp/actors/core/actor_benchmark_helper.h b/library/cpp/actors/core/actor_benchmark_helper.h index c5dd09efe1..7e9651eb35 100644 --- a/library/cpp/actors/core/actor_benchmark_helper.h +++ b/library/cpp/actors/core/actor_benchmark_helper.h @@ -41,7 +41,7 @@ struct TTestEndDecorator : TDecorator { struct TActorBenchmarkSettings { static constexpr bool DefaultNoRealtime = true; static constexpr ui32 DefaultSpinThreshold = 1'000'000; - static constexpr ui32 TotalEventsAmountPerThread = 10'000; + static constexpr ui32 TotalEventsAmountPerThread = 1'000; static constexpr auto MailboxTypes = { TMailboxType::Simple, @@ -726,13 +726,13 @@ struct TActorBenchmark { } } - static void RunSendActivateReceiveCSV(const std::vector<ui32> &threadsList, const std::vector<ui32> &actorPairsList, const std::vector<ui32> &inFlights) { + static void RunSendActivateReceiveCSV(const std::vector<ui32> &threadsList, const std::vector<ui32> &actorPairsList, const std::vector<ui32> &inFlights, TDuration subtestDuration) { Cout << "threads,actorPairs,in_flight,msgs_per_sec,elapsed_seconds,min_pair_sent_msgs,max_pair_sent_msgs" << Endl; for (ui32 threads : threadsList) { for (ui32 actorPairs : actorPairsList) { for (ui32 inFlight : inFlights) { - auto stats = CountStats([threads, actorPairs, inFlight] { - return BenchContentedThreads(threads, actorPairs, EPoolType::Basic, ESendingType::Common, TDuration::Seconds(1), inFlight); + auto stats = CountStats([threads, actorPairs, inFlight, subtestDuration] { + return BenchContentedThreads(threads, actorPairs, EPoolType::Basic, ESendingType::Common, subtestDuration, inFlight); }, 3); double elapsedSeconds = stats.ElapsedTime.Mean / 1e9; ui64 eventsPerSecond = stats.SentEvents.Mean / elapsedSeconds; diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp index b1e681c1ec..52d3cbad9c 100644 --- a/library/cpp/actors/core/actor_ut.cpp +++ b/library/cpp/actors/core/actor_ut.cpp @@ -294,7 +294,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { for (ui32 actorPairs = 1; actorPairs <= 2 * 32; actorPairs *= 2) { actorPairsList.push_back(actorPairs); } - TActorBenchmark::RunSendActivateReceiveCSV(threadsList, actorPairsList, {1}); + TActorBenchmark::RunSendActivateReceiveCSV(threadsList, actorPairsList, {1}, TDuration::MilliSeconds(100)); } Y_UNIT_TEST(SendActivateReceiveWithMailboxNeighbours) { diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index 1b18abc787..df5de39090 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -8,7 +8,6 @@ #include "executor_pool.h" #include "log_settings.h" #include "scheduler_cookie.h" -#include "mon_stats.h" #include "cpu_manager.h" #include "executor_thread.h" diff --git a/library/cpp/actors/core/config.h b/library/cpp/actors/core/config.h index 7c843d0847..04bdd6aebe 100644 --- a/library/cpp/actors/core/config.h +++ b/library/cpp/actors/core/config.h @@ -28,6 +28,12 @@ namespace NActors { ui64 PeriodUs = 15000000; // Time between balancer steps }; + enum class EASProfile { + Default, + LowCpuConsumption, + LowLatency, + }; + struct TBasicExecutorPoolConfig { static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TDuration::MilliSeconds(10); static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = 100; @@ -46,6 +52,7 @@ namespace NActors { i16 Priority = 0; i16 SharedExecutorsCount = 0; i16 SoftProcessingDurationTs = 0; + EASProfile ActorSystemProfile = EASProfile::Default; }; struct TIOExecutorPoolConfig { diff --git a/library/cpp/actors/core/executor_pool.h b/library/cpp/actors/core/executor_pool.h index ae251c14bb..5498a6403e 100644 --- a/library/cpp/actors/core/executor_pool.h +++ b/library/cpp/actors/core/executor_pool.h @@ -1,13 +1,14 @@ #pragma once #include "event.h" -#include "mon_stats.h" #include "scheduler_queue.h" namespace NActors { class TActorSystem; struct TMailboxHeader; struct TWorkerContext; + struct TExecutorPoolStats; + struct TExecutorThreadStats; class ISchedulerCookie; struct TCpuConsumption { @@ -40,6 +41,9 @@ namespace NActors { } // for workers + virtual void Initialize(TWorkerContext& wctx) { + Y_UNUSED(wctx); + } virtual ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) = 0; virtual void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) = 0; virtual TMailboxHeader *ResolveMailbox(ui32 hint) = 0; @@ -121,6 +125,10 @@ namespace NActors { Y_UNUSED(threads); } + virtual void SetSpinThresholdCycles(ui32 cycles) { + Y_UNUSED(cycles); + } + virtual i16 GetBlockingThreadCount() const { return 0; } @@ -135,7 +143,6 @@ namespace NActors { virtual i16 GetMaxThreadCount() const { return 1; - } virtual TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) { diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp index 059cac48d2..1c9a536b9b 100644 --- a/library/cpp/actors/core/executor_pool_base.cpp +++ b/library/cpp/actors/core/executor_pool_base.cpp @@ -94,6 +94,9 @@ namespace NActors { #ifdef ACTORSLIB_COLLECT_EXEC_STATS RelaxedStore(&ev->SendTime, (::NHPTimer::STime)GetCycleCountFast()); #endif + if (TlsThreadContext) { + TlsThreadContext->IsCurrentRecipientAService = ev->Recipient.IsService(); + } return MailboxTable->SendTo(ev, this); } @@ -102,6 +105,9 @@ namespace NActors { #ifdef ACTORSLIB_COLLECT_EXEC_STATS RelaxedStore(&ev->SendTime, (::NHPTimer::STime)GetCycleCountFast()); #endif + if (TlsThreadContext) { + TlsThreadContext->IsCurrentRecipientAService = ev->Recipient.IsService(); + } return MailboxTable->SpecificSendTo(ev, this); } diff --git a/library/cpp/actors/core/executor_pool_base.h b/library/cpp/actors/core/executor_pool_base.h index e94ffdbad9..6bfabb527f 100644 --- a/library/cpp/actors/core/executor_pool_base.h +++ b/library/cpp/actors/core/executor_pool_base.h @@ -2,6 +2,7 @@ #include "executor_pool.h" #include "executor_thread.h" +#include "mon_stats.h" #include "scheduler_queue.h" #include <library/cpp/actors/util/affinity.h> #include <library/cpp/actors/util/unordered_cache.h> diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index ccd813bb2e..4a9019e26d 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -13,6 +13,10 @@ namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); + + const double TWaitingStatsConstants::HistogramResolutionUs = MaxSpinThersholdUs / BucketCount; + const ui64 TWaitingStatsConstants::HistogramResolution = NHPTimer::GetCyclesPerSecond() * 0.000001 * HistogramResolutionUs; + constexpr TDuration TBasicExecutorPool::DEFAULT_TIME_PER_MAILBOX; TBasicExecutorPool::TBasicExecutorPool( @@ -31,9 +35,11 @@ namespace NActors { i16 defaultThreadCount, i16 priority) : TExecutorPoolBase(poolId, threads, affinity) - , SpinThreshold(spinThreshold) - , SpinThresholdCycles(spinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles - , Threads(new TThreadCtx[threads]) + , DefaultSpinThresholdCycles(spinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles + , SpinThresholdCycles(DefaultSpinThresholdCycles) + , SpinThresholdCyclesPerThread(new NThreading::TPadded<std::atomic<ui64>>[threads]) + , Threads(new NThreading::TPadded<TThreadCtx>[threads]) + , WaitingStats(new TWaitingStats<ui64>[threads]) , PoolName(poolName) , TimePerMailbox(timePerMailbox) , EventsPerMailbox(eventsPerMailbox) @@ -57,6 +63,14 @@ namespace NActors { LocalQueueSize = NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE; } } + if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) { + for (ui32 idx = 0; idx < threads; ++idx) { + SpinThresholdCyclesPerThread[idx].store(0); + } + } + if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) { + MovingWaitingStats.Reset(new TWaitingStats<double>[threads]); + } Y_UNUSED(maxActivityType); i16 limit = Min(threads, (ui32)Max<i16>()); @@ -99,6 +113,7 @@ namespace NActors { { SetSharedExecutorsCount(cfg.SharedExecutorsCount); SoftProcessingDurationTs = cfg.SoftProcessingDurationTs; + ActorSystemProfile = cfg.ActorSystemProfile; } TBasicExecutorPool::~TBasicExecutorPool() { @@ -109,7 +124,7 @@ namespace NActors { do { timers.HPNow = GetCycleCountFast(); timers.Elapsed += timers.HPNow - timers.HPStart; - if (threadCtx.Pad.Park()) // interrupted + if (threadCtx.WaitingPad.Park()) // interrupted return true; timers.HPStart = GetCycleCountFast(); timers.Parked += timers.HPStart - timers.HPNow; @@ -117,31 +132,25 @@ namespace NActors { return false; } - void TBasicExecutorPool::GoToSpin(TThreadCtx& threadCtx) { - ui64 start = GetCycleCountFast(); - bool doSpin = true; - while (true) { - for (ui32 j = 0; doSpin && j < 12; ++j) { - if (GetCycleCountFast() >= (start + SpinThresholdCycles)) { - doSpin = false; - break; - } - for (ui32 i = 0; i < 12; ++i) { - if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) { - SpinLockPause(); - } else { - doSpin = false; - break; - } - } - } - if (!doSpin) { - break; - } - if (RelaxedLoad(&StopFlag)) { - break; - } + ui32 TBasicExecutorPool::GoToSpin(TThreadCtx& threadCtx, i64 start, i64 &end) { + ui32 spinPauseCount = 0; + i64 spinThresholdCycles = 0; + if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) { + spinThresholdCycles = SpinThresholdCyclesPerThread[TlsThreadContext->WorkerId].load(); + } else { + spinThresholdCycles = SpinThresholdCycles.load(); } + do { + end = GetCycleCountFast(); + if (end >= (start + spinThresholdCycles) || AtomicLoad(&threadCtx.WaitingFlag) != TThreadCtx::WS_ACTIVE) { + return spinPauseCount; + } + + SpinLockPause(); + spinPauseCount++; + } while (!RelaxedLoad(&StopFlag)); + + return spinPauseCount; } bool TBasicExecutorPool::GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock) { @@ -161,26 +170,55 @@ namespace NActors { } #endif + i64 startWaiting = GetCycleCountFast(); + i64 endSpinning = 0; TAtomic state = AtomicLoad(&threadCtx.WaitingFlag); + bool wasSleeping = false; Y_ABORT_UNLESS(state == TThreadCtx::WS_NONE, "WaitingFlag# %d", int(state)); - if (SpinThreshold > 0 && !needToBlock) { + if (SpinThresholdCycles > 0 && !needToBlock) { // spin configured period AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_ACTIVE); - GoToSpin(threadCtx); + ui32 spinPauseCount = GoToSpin(threadCtx, startWaiting, endSpinning); + SpinningTimeUs += endSpinning - startWaiting; // then - sleep if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) { if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED, TThreadCtx::WS_ACTIVE)) { + if (NFeatures::TCommonFeatureFlags::ProbeSpinCycles) { + LWPROBE(SpinCycles, PoolId, PoolName, spinPauseCount, true); + } + + wasSleeping = true; if (GoToSleep(threadCtx, timers)) { // interrupted return true; } + AllThreadsSleep.store(false); } } + if (NFeatures::TCommonFeatureFlags::ProbeSpinCycles && !wasSleeping) { + LWPROBE(SpinCycles, PoolId, PoolName, spinPauseCount, false); + } } else { AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED); + wasSleeping = true; if (GoToSleep(threadCtx, timers)) { // interrupted return true; } + AllThreadsSleep.store(false); + } + + i64 needTimeTs = threadCtx.StartWakingTs.exchange(0); + if (wasSleeping && needTimeTs) { + ui64 waitingDuration = std::max<i64>(0, needTimeTs - startWaiting); + ui64 awakingDuration = std::max<i64>(0, GetCycleCountFast() - needTimeTs); + WaitingStats[TlsThreadContext->WorkerId].AddAwakening(waitingDuration, awakingDuration); + } else { + ui64 waitingDuration = std::max<i64>(0, endSpinning - startWaiting); + if (wasSleeping) { + WaitingStats[TlsThreadContext->WorkerId].AddFastAwakening(waitingDuration); + } else { + WaitingStats[TlsThreadContext->WorkerId].Add(waitingDuration); + } } Y_DEBUG_ABORT_UNLESS(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE); @@ -225,6 +263,9 @@ namespace NActors { if (semaphore.OldSemaphore == 0) { semaphore.CurrentSleepThreadCount++; + if (semaphore.CurrentSleepThreadCount == AtomicLoad(&ThreadCount)) { + AllThreadsSleep.store(true); + } x = AtomicGetAndCas(&Semaphore, semaphore.ConverToI64(), x); if (x == oldX) { *needToWait = true; @@ -327,6 +368,25 @@ namespace NActors { } inline void TBasicExecutorPool::WakeUpLoop(i16 currentThreadCount) { + if (AllThreadsSleep) { + TThreadCtx& hotThreadCtx = Threads[0]; + if (AtomicCas(&hotThreadCtx.WaitingFlag, TThreadCtx::WS_NONE, TThreadCtx::WS_ACTIVE)) { + return; + } + + TThreadCtx& coldThreadCtx = Threads[AtomicLoad(&ThreadCount) - 1]; + if (AtomicCas(&coldThreadCtx.WaitingFlag, TThreadCtx::WS_NONE, TThreadCtx::WS_BLOCKED)) { + if (TlsThreadContext && TlsThreadContext->WaitingStats) { + ui64 beforeUnpark = GetCycleCountFast(); + coldThreadCtx.StartWakingTs = beforeUnpark; + coldThreadCtx.WaitingPad.Unpark(); + TlsThreadContext->WaitingStats->AddWakingUp(GetCycleCountFast() - beforeUnpark); + } else { + coldThreadCtx.WaitingPad.Unpark(); + } + return; + } + } for (i16 i = 0;;) { TThreadCtx& threadCtx = Threads[i]; TThreadCtx::EWaitState state = static_cast<TThreadCtx::EWaitState>(AtomicLoad(&threadCtx.WaitingFlag)); @@ -341,7 +401,14 @@ namespace NActors { case TThreadCtx::WS_BLOCKED: if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_NONE, state)) { if (state == TThreadCtx::WS_BLOCKED) { - threadCtx.Pad.Unpark(); + ui64 beforeUnpark = GetCycleCountFast(); + threadCtx.StartWakingTs = beforeUnpark; + if (TlsThreadContext && TlsThreadContext->WaitingStats) { + threadCtx.WaitingPad.Unpark(); + TlsThreadContext->WaitingStats->AddWakingUp(GetCycleCountFast() - beforeUnpark); + } else { + threadCtx.WaitingPad.Unpark(); + } } if (i >= currentThreadCount) { AtomicIncrement(WrongWakenedThreadCount); @@ -355,12 +422,12 @@ namespace NActors { } } - void TBasicExecutorPool::ScheduleActivationExCommon(ui32 activation, ui64 revolvingCounter) { + void TBasicExecutorPool::ScheduleActivationExCommon(ui32 activation, ui64 revolvingCounter, TAtomic x) { + TSemaphore semaphore = TSemaphore::GetSemaphore(x); + Activations.Push(activation, revolvingCounter); bool needToWakeUp = false; - TAtomic x = AtomicGet(Semaphore); - TSemaphore semaphore = TSemaphore::GetSemaphore(x); do { needToWakeUp = semaphore.CurrentSleepThreadCount > SharedExecutorsCount; i64 oldX = semaphore.ConverToI64(); @@ -386,15 +453,39 @@ namespace NActors { LocalQueues[TlsThreadContext->WorkerId].push(activation); return; } + if (ActorSystemProfile != EASProfile::Default) { + TAtomic x = AtomicGet(Semaphore); + TSemaphore semaphore = TSemaphore::GetSemaphore(x); + if constexpr (NFeatures::TLocalQueuesFeatureFlags::UseIfAllOtherThreadsAreSleeping) { + if (semaphore.CurrentSleepThreadCount == semaphore.CurrentThreadCount - 1 && semaphore.OldSemaphore == 0) { + if (LocalQueues[TlsThreadContext->WorkerId].empty()) { + LocalQueues[TlsThreadContext->WorkerId].push(activation); + return; + } + } + } + + if constexpr (NFeatures::TLocalQueuesFeatureFlags::UseOnMicroburst) { + if (semaphore.OldSemaphore >= semaphore.CurrentThreadCount) { + if (LocalQueues[TlsThreadContext->WorkerId].empty() && TlsThreadContext->WriteTurn < 1) { + TlsThreadContext->WriteTurn++; + LocalQueues[TlsThreadContext->WorkerId].push(activation); + return; + } + } + } + ScheduleActivationExCommon(activation, revolvingWriteCounter, x); + return; + } } - ScheduleActivationExCommon(activation, revolvingWriteCounter); + ScheduleActivationExCommon(activation, revolvingWriteCounter, AtomicGet(Semaphore)); } void TBasicExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) { if constexpr (NFeatures::IsLocalQueues()) { ScheduleActivationExLocalQueue(activation, revolvingCounter); } else { - ScheduleActivationExCommon(activation, revolvingCounter); + ScheduleActivationExCommon(activation, revolvingCounter, AtomicGet(Semaphore)); } } @@ -404,6 +495,8 @@ namespace NActors { poolStats.CurrentThreadCount = RelaxedLoad(&ThreadCount); poolStats.DefaultThreadCount = DefaultThreadCount; poolStats.MaxThreadCount = MaxThreadCount; + poolStats.SpinningTimeUs = Ts2Us(SpinningTimeUs); + poolStats.SpinThresholdUs = Ts2Us(SpinThresholdCycles); if (Harmonizer) { TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId); poolStats.IsNeedy = stats.IsNeedy; @@ -487,7 +580,7 @@ namespace NActors { AtomicStore(&StopFlag, true); for (i16 i = 0; i != PoolThreads; ++i) { Threads[i].Thread->StopFlag = true; - Threads[i].Pad.Interrupt(); + Threads[i].WaitingPad.Interrupt(); } } @@ -598,4 +691,59 @@ namespace NActors { LocalQueueSize.store(std::max(size, NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE), std::memory_order_relaxed); } } + + void TBasicExecutorPool::Initialize(TWorkerContext& wctx) { + if (wctx.WorkerId >= 0) { + TlsThreadContext->WaitingStats = &WaitingStats[wctx.WorkerId]; + } + } + + void TBasicExecutorPool::SetSpinThresholdCycles(ui32 cycles) { + if (ActorSystemProfile == EASProfile::LowLatency) { + if (DefaultSpinThresholdCycles > cycles) { + cycles = DefaultSpinThresholdCycles; + } + } + SpinThresholdCycles = cycles; + double resolutionUs = TWaitingStatsConstants::HistogramResolutionUs; + ui32 bucketIdx = cycles / TWaitingStatsConstants::HistogramResolution; + LWPROBE(ChangeSpinThreshold, PoolId, PoolName, cycles, resolutionUs * bucketIdx, bucketIdx); + } + + void TBasicExecutorPool::GetWaitingStats(TWaitingStats<ui64> &acc) const { + acc.Clear(); + double resolutionUs = TWaitingStatsConstants::HistogramResolutionUs; + for (ui32 idx = 0; idx < ThreadCount; ++idx) { + for (ui32 bucketIdx = 0; bucketIdx < TWaitingStatsConstants::BucketCount; ++bucketIdx) { + LWPROBE(WaitingHistogramPerThread, PoolId, PoolName, idx, resolutionUs * bucketIdx, resolutionUs * (bucketIdx + 1), WaitingStats[idx].WaitingUntilNeedsTimeHist[bucketIdx].load()); + } + acc.Add(WaitingStats[idx]); + } + for (ui32 bucketIdx = 0; bucketIdx < TWaitingStatsConstants::BucketCount; ++bucketIdx) { + LWPROBE(WaitingHistogram, PoolId, PoolName, resolutionUs * bucketIdx, resolutionUs * (bucketIdx + 1), acc.WaitingUntilNeedsTimeHist[bucketIdx].load()); + } + } + + void TBasicExecutorPool::ClearWaitingStats() const { + for (ui32 idx = 0; idx < ThreadCount; ++idx) { + WaitingStats[idx].Clear(); + } + } + + void TBasicExecutorPool::CalcSpinPerThread(ui64 wakingUpConsumption) { + for (i16 threadIdx = 0; threadIdx < PoolThreads; ++threadIdx) { + ui64 newSpinThreshold = 0; + if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) { + MovingWaitingStats[threadIdx].Add(WaitingStats[threadIdx], 0.8, 0.2); + newSpinThreshold = MovingWaitingStats[threadIdx].CalculateGoodSpinThresholdCycles(wakingUpConsumption); + } else { + newSpinThreshold = WaitingStats[threadIdx].CalculateGoodSpinThresholdCycles(wakingUpConsumption); + } + SpinThresholdCyclesPerThread[threadIdx].store(newSpinThreshold); + + double resolutionUs = TWaitingStatsConstants::HistogramResolutionUs; + ui32 bucketIdx = newSpinThreshold / TWaitingStatsConstants::HistogramResolution; + LWPROBE(ChangeSpinThresholdPerThread, PoolId, PoolName, threadIdx, newSpinThreshold, resolutionUs * bucketIdx, bucketIdx); + } + } } diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h index dcbcaeb3db..d1a4c9fd27 100644 --- a/library/cpp/actors/core/executor_pool_basic.h +++ b/library/cpp/actors/core/executor_pool_basic.h @@ -2,6 +2,7 @@ #include "actorsystem.h" #include "executor_thread.h" +#include "executor_pool_basic_feature_flags.h" #include "scheduler_queue.h" #include "executor_pool_base.h" #include "harmonizer.h" @@ -17,17 +18,117 @@ #include <queue> namespace NActors { + + struct TWaitingStatsConstants { + static constexpr ui64 BucketCount = 128; + static constexpr double MaxSpinThersholdUs = 12.8; + + static constexpr ui64 KnownAvgWakingUpTime = 4250; + static constexpr ui64 KnownAvgAwakeningTime = 7000; + + static const double HistogramResolutionUs; + static const ui64 HistogramResolution; + }; + + template <typename T> + struct TWaitingStats : TWaitingStatsConstants { + std::array<std::atomic<T>, BucketCount> WaitingUntilNeedsTimeHist; + + std::atomic<T> WakingUpTotalTime; + std::atomic<T> WakingUpCount; + std::atomic<T> AwakingTotalTime; + std::atomic<T> AwakingCount; + + TWaitingStats() + { + Clear(); + } + + void Clear() { + std::fill(WaitingUntilNeedsTimeHist.begin(), WaitingUntilNeedsTimeHist.end(), 0); + WakingUpTotalTime = 0; + WakingUpCount = 0; + AwakingTotalTime = 0; + AwakingCount = 0; + } + + void Add(ui64 waitingUntilNeedsTime) { + ui64 waitIdx = std::min(waitingUntilNeedsTime / HistogramResolution, BucketCount - 1); + WaitingUntilNeedsTimeHist[waitIdx]++; + } + + void AddAwakening(ui64 waitingUntilNeedsTime, ui64 awakingTime) { + Add(waitingUntilNeedsTime); + AwakingTotalTime += awakingTime; + AwakingCount++; + } + + void AddFastAwakening(ui64 waitingUntilNeedsTime) { + Add(waitingUntilNeedsTime - HistogramResolution); + } + + void AddWakingUp(ui64 wakingUpTime) { + WakingUpTotalTime += wakingUpTime; + WakingUpCount++; + } + + void Add(const TWaitingStats<T> &stats) { + for (ui32 idx = 0; idx < BucketCount; ++idx) { + WaitingUntilNeedsTimeHist[idx] += stats.WaitingUntilNeedsTimeHist[idx]; + } + WakingUpTotalTime += stats.WakingUpTotalTime; + WakingUpCount += stats.WakingUpCount; + AwakingTotalTime += stats.AwakingTotalTime; + AwakingCount += stats.AwakingCount; + } + + template <typename T2> + void Add(const TWaitingStats<T2> &stats, double oldK, double newK) { + for (ui32 idx = 0; idx < BucketCount; ++idx) { + WaitingUntilNeedsTimeHist[idx] = oldK * WaitingUntilNeedsTimeHist[idx] + newK * stats.WaitingUntilNeedsTimeHist[idx]; + } + WakingUpTotalTime = oldK * WakingUpTotalTime + newK * stats.WakingUpTotalTime; + WakingUpCount = oldK * WakingUpCount + newK * stats.WakingUpCount; + AwakingTotalTime = oldK * AwakingTotalTime + newK * stats.AwakingTotalTime; + AwakingCount = oldK * AwakingCount + newK * stats.AwakingCount; + } + + ui32 CalculateGoodSpinThresholdCycles(ui64 avgWakingUpConsumption) { + auto &bucketCount = TWaitingStatsConstants::BucketCount; + auto &resolution = TWaitingStatsConstants::HistogramResolution; + + T waitingsCount = std::accumulate(WaitingUntilNeedsTimeHist.begin(), WaitingUntilNeedsTimeHist.end(), 0); + + ui32 bestBucketIdx = 0; + T bestCpuConsumption = Max<T>(); + + T spinTime = 0; + T spinCount = 0; + + for (ui32 bucketIdx = 0; bucketIdx < bucketCount; ++bucketIdx) { + auto &bucket = WaitingUntilNeedsTimeHist[bucketIdx]; + ui64 imaginarySpingThreshold = resolution * bucketIdx; + T cpuConsumption = spinTime + (waitingsCount - spinCount) * (avgWakingUpConsumption + imaginarySpingThreshold); + if (bestCpuConsumption > cpuConsumption) { + bestCpuConsumption = cpuConsumption; + bestBucketIdx = bucketIdx; + } + spinTime += (2 * imaginarySpingThreshold + resolution) * bucket / 2; + spinCount += bucket; + // LWPROBE(WaitingHistogram, Pool->PoolId, Pool->GetName(), resolutionUs * bucketIdx, resolutionUs * (bucketIdx + 1), bucket); + } + ui64 result = resolution * bestBucketIdx; + return result; + } + }; + class TBasicExecutorPool: public TExecutorPoolBase { struct TThreadCtx { TAutoPtr<TExecutorThread> Thread; - TThreadParkPad Pad; + TThreadParkPad WaitingPad; TAtomic WaitingFlag; - - // different threads must spin/block on different cache-lines. - // we add some padding bytes to enforce this rule - static const size_t SizeWithoutPadding = sizeof(TAutoPtr<TExecutorThread>) + sizeof(TThreadParkPad) + sizeof(TAtomic); - ui8 Padding[64 - SizeWithoutPadding]; - static_assert(64 >= SizeWithoutPadding); + std::atomic<i64> StartWakingTs; + std::atomic<i64> EndWakingTs; enum EWaitState { WS_NONE, @@ -49,11 +150,16 @@ namespace NActors { NHPTimer::STime HPNow; }; - const ui64 SpinThreshold; - const ui64 SpinThresholdCycles; + NThreading::TPadded<std::atomic_bool> AllThreadsSleep = true; + const ui64 DefaultSpinThresholdCycles; + std::atomic<ui64> SpinThresholdCycles; + std::unique_ptr<NThreading::TPadded< std::atomic<ui64>>[]> SpinThresholdCyclesPerThread; - TArrayHolder<TThreadCtx> Threads; + TArrayHolder<NThreading::TPadded<TThreadCtx>> Threads; + static_assert(sizeof(std::decay_t<decltype(Threads[0])>) == PLATFORM_CACHE_LINE); TArrayHolder<NThreading::TPadded<std::queue<ui32>>> LocalQueues; + TArrayHolder<TWaitingStats<ui64>> WaitingStats; + TArrayHolder<TWaitingStats<double>> MovingWaitingStats; std::atomic<ui16> LocalQueueSize; @@ -63,6 +169,7 @@ namespace NActors { const TString PoolName; const TDuration TimePerMailbox; const ui32 EventsPerMailbox; + EASProfile ActorSystemProfile; const int RealtimePriority; @@ -70,6 +177,7 @@ namespace NActors { TAtomic MaxUtilizationCounter; TAtomic MaxUtilizationAccumulator; TAtomic WrongWakenedThreadCount; + std::atomic<ui64> SpinningTimeUs; TAtomic ThreadCount; TMutex ChangeThreadsLock; @@ -129,6 +237,7 @@ namespace NActors { void SetSharedExecutorsCount(i16 count); + void Initialize(TWorkerContext& wctx) override; ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingReadCounter) override; ui32 GetReadyActivationCommon(TWorkerContext& wctx, ui64 revolvingReadCounter); ui32 GetReadyActivationLocalQueue(TWorkerContext& wctx, ui64 revolvingReadCounter); @@ -138,7 +247,7 @@ namespace NActors { void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; void ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) override; - void ScheduleActivationExCommon(ui32 activation, ui64 revolvingWriteCounter); + void ScheduleActivationExCommon(ui32 activation, ui64 revolvingWriteCounter, TAtomic semaphoreValue); void ScheduleActivationExLocalQueue(ui32 activation, ui64 revolvingWriteCounter); void SetLocalQueueSize(ui16 size); @@ -164,12 +273,18 @@ namespace NActors { i16 GetBlockingThreadCount() const override; i16 GetPriority() const override; + void SetSpinThresholdCycles(ui32 cycles) override; + + void GetWaitingStats(TWaitingStats<ui64> &acc) const; + void CalcSpinPerThread(ui64 wakingUpConsumption); + void ClearWaitingStats() const; + private: void AskToGoToSleep(bool *needToWait, bool *needToBlock); void WakeUpLoop(i16 currentThreadCount); bool GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock); - void GoToSpin(TThreadCtx& threadCtx); + ui32 GoToSpin(TThreadCtx& threadCtx, i64 start, i64 &end); bool GoToSleep(TThreadCtx& threadCtx, TTimers &timers); bool GoToBeBlocked(TThreadCtx& threadCtx, TTimers &timers); }; diff --git a/library/cpp/actors/core/executor_pool_basic_feature_flags.h b/library/cpp/actors/core/executor_pool_basic_feature_flags.h index d87b713a07..17f2a81df4 100644 --- a/library/cpp/actors/core/executor_pool_basic_feature_flags.h +++ b/library/cpp/actors/core/executor_pool_basic_feature_flags.h @@ -14,6 +14,8 @@ namespace NActors::NFeatures { struct TCommonFeatureFlags { static constexpr EActorSystemOptimizationType OptimizationType = EActorSystemOptimizationType::Common; + + static constexpr bool ProbeSpinCycles = false; }; struct TLocalQueuesFeatureFlags { @@ -22,6 +24,17 @@ namespace NActors::NFeatures { static constexpr ui16 MIN_LOCAL_QUEUE_SIZE = 0; static constexpr ui16 MAX_LOCAL_QUEUE_SIZE = 16; static constexpr std::optional<ui16> FIXED_LOCAL_QUEUE_SIZE = std::nullopt; + + static constexpr bool UseIfAllOtherThreadsAreSleeping = false; + static constexpr bool UseOnMicroburst = false; + }; + + struct TSpinFeatureFlags { + static constexpr bool DoNotSpinLower = false; + static constexpr bool UsePseudoMovingWindow = true; + + static constexpr bool HotColdThreads = false; + static constexpr bool CalcPerThread = false; }; using TFeatureFlags = TCommonFeatureFlags; diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp index 516eeda2c6..df8df52635 100644 --- a/library/cpp/actors/core/executor_pool_basic_ut.cpp +++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp @@ -91,6 +91,134 @@ THolder<TActorSystemSetup> GetActorSystemSetup(TBasicExecutorPool* pool) return setup; } +Y_UNIT_TEST_SUITE(WaitingBenchs) { + + Y_UNIT_TEST(SpinPause) { + const ui32 count = 1'000'000; + ui64 startTs = GetCycleCountFast(); + for (ui32 idx = 0; idx < count; ++idx) { + SpinLockPause(); + } + ui64 stopTs = GetCycleCountFast(); + Cerr << Ts2Us(stopTs - startTs) / count << Endl; + Cerr << double(stopTs - startTs) / count << Endl; + } + + struct TThread : public ISimpleThread { + static const ui64 CyclesInMicroSecond; + std::array<ui64, 128> Hist; + ui64 WakingTime = 0; + ui64 AwakeningTime = 0; + ui64 SleepTime = 0; + ui64 IterationCount = 0; + + std::atomic<ui64> Awakens = 0; + std::atomic<ui64> *OtherAwaken; + + TThreadParkPad OwnPad; + TThreadParkPad *OtherPad; + + bool IsWaiting = false; + + void GoToWait() { + ui64 start = GetCycleCountFast(); + OwnPad.Park(); + ui64 elapsed = GetCycleCountFast() - start; + AwakeningTime += elapsed; + ui64 idx = std::min(Hist.size() - 1, (elapsed - 20 * CyclesInMicroSecond) / CyclesInMicroSecond); + Hist[idx]++; + Awakens++; + } + + void GoToWakeUp() { + ui64 start = GetCycleCountFast(); + OtherPad->Unpark(); + ui64 elapsed = GetCycleCountFast() - start; + WakingTime += elapsed; + ui64 idx = std::min(Hist.size() - 1, elapsed / CyclesInMicroSecond); + Hist[idx]++; + } + + void GoToSleep() { + ui64 start = GetCycleCountFast(); + ui64 stop = start; + while (stop - start < 20 * CyclesInMicroSecond) { + SpinLockPause(); + stop = GetCycleCountFast(); + } + SleepTime += stop - start; + } + + void* ThreadProc() { + for (ui32 idx = 0; idx < IterationCount; ++idx) { + if (IsWaiting) { + GoToWait(); + } else { + GoToSleep(); + GoToWakeUp(); + while(OtherAwaken->load() == idx) { + SpinLockPause(); + } + } + } + return nullptr; + } + }; + + const ui64 TThread::CyclesInMicroSecond = NHPTimer::GetCyclesPerSecond() * 0.000001; + + Y_UNIT_TEST(WakingUpTest) { + TThread a, b; + constexpr ui64 iterations = 100'000; + std::fill(a.Hist.begin(), a.Hist.end(), 0); + std::fill(b.Hist.begin(), b.Hist.end(), 0); + a.IterationCount = iterations; + b.IterationCount = iterations; + a.IsWaiting = true; + b.IsWaiting = false; + b.OtherAwaken = &a.Awakens; + a.OtherPad = &b.OwnPad; + b.OtherPad = &a.OwnPad; + a.Start(); + b.Start(); + a.Join(); + b.Join(); + + ui64 awakeningTime = a.AwakeningTime + b.AwakeningTime - a.SleepTime - b.SleepTime; + ui64 wakingUpTime = a.WakingTime + b.WakingTime; + + Cerr << "AvgAwakeningCycles: " << double(awakeningTime) / iterations << Endl; + Cerr << "AvgAwakeningUs: " << Ts2Us(awakeningTime) / iterations << Endl; + Cerr << "AvgSleep20usCycles:" << double(b.SleepTime) / iterations << Endl; + Cerr << "AvgSleep20usUs:" << Ts2Us(b.SleepTime) / iterations << Endl; + Cerr << "AvgWakingUpCycles: " << double(wakingUpTime) / iterations << Endl; + Cerr << "AvgWakingUpUs: " << Ts2Us(wakingUpTime) / iterations << Endl; + + Cerr << "AwakeningHist:\n"; + for (ui32 idx = 0; idx < a.Hist.size(); ++idx) { + if (a.Hist[idx]) { + if (idx + 1 != a.Hist.size()) { + Cerr << " [" << idx << "us - " << idx + 1 << "us] " << a.Hist[idx] << Endl; + } else { + Cerr << " [" << idx << "us - ...] " << a.Hist[idx] << Endl; + } + } + } + + Cerr << "WakingUpHist:\n"; + for (ui32 idx = 0; idx < b.Hist.size(); ++idx) { + if (b.Hist[idx]) { + if (idx + 1 != b.Hist.size()) { + Cerr << " [" << idx << "us - " << idx + 1 << "us] " << b.Hist[idx] << Endl; + } else { + Cerr << " [" << idx << "us - ...] " << b.Hist[idx] << Endl; + } + } + } + } + +} + Y_UNIT_TEST_SUITE(BasicExecutorPool) { Y_UNIT_TEST(Semaphore) { diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index eed1794ab1..4aab7b3e31 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -6,6 +6,7 @@ #include "event.h" #include "events.h" #include "executor_pool_base.h" +#include "probes.h" #include <library/cpp/actors/prof/tag.h> #include <library/cpp/actors/util/affinity.h> @@ -43,6 +44,8 @@ namespace NActors { , Ctx(workerId, cpuId) , ThreadName(threadName) , IsUnitedWorker(true) + , TimePerMailbox(timePerMailbox) + , EventsPerMailbox(eventsPerMailbox) { Ctx.Switch( ExecutorPool, @@ -347,12 +350,17 @@ namespace NActors { return ThreadId; } + TWorkerId TExecutorThread::GetWorkerId() const { + return Ctx.WorkerId; + } + void TExecutorThread::ProcessExecutorPool(IExecutorPool *pool, bool isSharedThread) { ExecutorPool = pool; TThreadContext threadCtx; TlsThreadContext = &threadCtx; TlsThreadContext->Pool = static_cast<IExecutorPool*>(ExecutorPool); TlsThreadContext->WorkerId = Ctx.WorkerId; + pool->Initialize(Ctx); ExecutorPool->SetRealTimeMode(); TAffinityGuard affinity(ExecutorPool->Affinity()); diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h index f66ffcb02d..b466ad2423 100644 --- a/library/cpp/actors/core/executor_thread.h +++ b/library/cpp/actors/core/executor_thread.h @@ -69,7 +69,7 @@ namespace NActors { void GetCurrentStats(TExecutorThreadStats& statsCopy) const; TThreadId GetThreadId() const; // blocks, must be called after Start() - TWorkerId GetWorkerId() const { return Ctx.WorkerId; } + TWorkerId GetWorkerId() const; private: void* ThreadProc(); diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp index ced855c809..4464603dc8 100644 --- a/library/cpp/actors/core/harmonizer.cpp +++ b/library/cpp/actors/core/harmonizer.cpp @@ -194,6 +194,9 @@ struct TPoolInfo { TAtomic MaxBookedCpu = 0; TAtomic MinBookedCpu = 0; + std::unique_ptr<TWaitingStats<ui64>> WaitingStats; + std::unique_ptr<TWaitingStats<double>> MovingWaitingStats; + double GetBooked(i16 threadIdx); double GetlastSecondPoolBooked(i16 threadIdx); double GetConsumed(i16 threadIdx); @@ -252,6 +255,13 @@ TCpuConsumption TPoolInfo::PullStats(ui64 ts) { RelaxedStore(&MinBookedCpu, Booked.GetMinInt()); NewNotEnoughCpuExecutions = acc.NotEnoughCpuExecutions - NotEnoughCpuExecutions; NotEnoughCpuExecutions = acc.NotEnoughCpuExecutions; + if (WaitingStats && BasicPool) { + WaitingStats->Clear(); + BasicPool->GetWaitingStats(*WaitingStats); + if constexpr (!NFeatures::TSpinFeatureFlags::CalcPerThread) { + MovingWaitingStats->Add(*WaitingStats, 0.8, 0.2); + } + } return acc; } #undef UNROLL_HISTORY @@ -291,6 +301,9 @@ private: TAtomic MaxBookedCpu = 0; TAtomic MinBookedCpu = 0; + std::atomic<double> AvgAwakeningTimeUs = 0; + std::atomic<double> AvgWakingUpTimeUs = 0; + void PullStats(ui64 ts); void HarmonizeImpl(ui64 ts); void CalculatePriorityOrder(); @@ -352,6 +365,58 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { size_t sumOfAdditionalThreads = 0; + + ui64 TotalWakingUpTime = 0; + ui64 TotalWakingUps = 0; + ui64 TotalAwakeningTime = 0; + ui64 TotalAwakenings = 0; + for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { + TPoolInfo& pool = Pools[poolIdx]; + if (pool.WaitingStats) { + TotalWakingUpTime += pool.WaitingStats->WakingUpTotalTime; + TotalWakingUps += pool.WaitingStats->WakingUpCount; + TotalAwakeningTime += pool.WaitingStats->AwakingTotalTime; + TotalAwakenings += pool.WaitingStats->AwakingCount; + } + } + + constexpr ui64 knownAvgWakingUpTime = TWaitingStatsConstants::KnownAvgWakingUpTime; + constexpr ui64 knownAvgAwakeningUpTime = TWaitingStatsConstants::KnownAvgAwakeningTime; + + ui64 realAvgWakingUpTime = (TotalWakingUps ? TotalWakingUpTime / TotalWakingUps : knownAvgWakingUpTime); + ui64 avgWakingUpTime = realAvgWakingUpTime; + if (avgWakingUpTime > 2 * knownAvgWakingUpTime || !realAvgWakingUpTime) { + avgWakingUpTime = knownAvgWakingUpTime; + } + AvgWakingUpTimeUs = Ts2Us(avgWakingUpTime); + + ui64 realAvgAwakeningTime = (TotalAwakenings ? TotalAwakeningTime / TotalAwakenings : knownAvgAwakeningUpTime); + ui64 avgAwakeningTime = realAvgAwakeningTime; + if (avgAwakeningTime > 2 * knownAvgAwakeningUpTime || !realAvgAwakeningTime) { + avgAwakeningTime = knownAvgAwakeningUpTime; + } + AvgAwakeningTimeUs = Ts2Us(avgAwakeningTime); + + ui64 avgWakingUpConsumption = avgWakingUpTime + avgAwakeningTime; + LWPROBE(WakingUpConsumption, Ts2Us(avgWakingUpTime), Ts2Us(avgWakingUpTime), Ts2Us(avgAwakeningTime), Ts2Us(realAvgAwakeningTime), Ts2Us(avgWakingUpConsumption)); + + for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { + TPoolInfo& pool = Pools[poolIdx]; + if (!pool.BasicPool) { + continue; + } + if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) { + pool.BasicPool->CalcSpinPerThread(avgWakingUpConsumption); + } else if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) { + ui64 newSpinThreshold = pool.MovingWaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption); + pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold); + } else { + ui64 newSpinThreshold = pool.WaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption); + pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold); + } + pool.BasicPool->ClearWaitingStats(); + } + for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { TPoolInfo& pool = Pools[poolIdx]; total += pool.DefaultThreadCount; @@ -584,7 +649,11 @@ void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) { poolInfo.AvgPingCounterWithSmallWindow = pingInfo->AvgPingCounterWithSmallWindow; poolInfo.MaxAvgPingUs = pingInfo->MaxAvgPingUs; } - Pools.push_back(poolInfo); + if (poolInfo.BasicPool) { + poolInfo.WaitingStats.reset(new TWaitingStats<ui64>()); + poolInfo.MovingWaitingStats.reset(new TWaitingStats<double>()); + } + Pools.push_back(std::move(poolInfo)); PriorityOrder.clear(); } @@ -623,6 +692,8 @@ THarmonizerStats THarmonizer::GetStats() const { .MinConsumedCpu = static_cast<i64>(RelaxedLoad(&MinConsumedCpu)), .MaxBookedCpu = static_cast<i64>(RelaxedLoad(&MaxBookedCpu)), .MinBookedCpu = static_cast<i64>(RelaxedLoad(&MinBookedCpu)), + .AvgAwakeningTimeUs = AvgAwakeningTimeUs, + .AvgWakingUpTimeUs = AvgWakingUpTimeUs, }; } diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h index 04eb616b15..ba98048e49 100644 --- a/library/cpp/actors/core/harmonizer.h +++ b/library/cpp/actors/core/harmonizer.h @@ -6,6 +6,9 @@ namespace NActors { class IExecutorPool; + template <typename T> + struct TWaitingStats; + struct TPoolHarmonizerStats { ui64 IncreasingThreadsByNeedyState = 0; ui64 IncreasingThreadsByExchange = 0; @@ -27,6 +30,9 @@ namespace NActors { i64 MinConsumedCpu = 0.0; i64 MaxBookedCpu = 0.0; i64 MinBookedCpu = 0.0; + + double AvgAwakeningTimeUs = 0; + double AvgWakingUpTimeUs = 0; }; // Pool cpu harmonizer diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h index 30db3ac787..4fb49d70c0 100644 --- a/library/cpp/actors/core/mon_stats.h +++ b/library/cpp/actors/core/mon_stats.h @@ -70,6 +70,8 @@ namespace NActors { i64 MinConsumedCpuUs = 0; i64 MaxBookedCpuUs = 0; i64 MinBookedCpuUs = 0; + double SpinningTimeUs = 0; + double SpinThresholdUs = 0; i16 WrongWakenedThreadCount = 0; i16 CurrentThreadCount = 0; i16 PotentialMaxThreadCount = 0; diff --git a/library/cpp/actors/core/probes.h b/library/cpp/actors/core/probes.h index aa8dd7bcdb..531923b5ad 100644 --- a/library/cpp/actors/core/probes.h +++ b/library/cpp/actors/core/probes.h @@ -175,6 +175,15 @@ PROBE(HarmonizeCheckPool, GROUPS("Harmonizer"), \ TYPES(ui32, TString, double, double, double, double, ui32, ui32, bool, bool, bool), \ NAMES("poolId", "pool", "booked", "consumed", "lastSecondBooked", "lastSecondConsumed", "threadCount", "maxThreadCount", "isStarved", "isNeedy", "isHoggish")) \ + PROBE(WakingUpConsumption, GROUPS("Harmonizer"), \ + TYPES(double, double, double, double, double), \ + NAMES("avgWakingUpUs", "realAvgWakingUpUs", "avgAwakeningUs", "realAvgAwakeningUs", "total")) \ + PROBE(ChangeSpinThreshold, GROUPS("Harmonizer"), \ + TYPES(ui32, TString, ui64, double, ui64), \ + NAMES("poolId", "pool", "spinThreshold", "spinThresholdUs", "bucketIdx")) \ + PROBE(WaitingHistogram, GROUPS("Harmonizer"), \ + TYPES(ui32, TString, double, double, ui64), \ + NAMES("poolId", "pool", "fromUs", "toUs", "count")) \ PROBE(HarmonizeOperation, GROUPS("Harmonizer"), \ TYPES(ui32, TString, TString, ui32, ui32, ui32), \ NAMES("poolId", "pool", "operation", "newCount", "minCount", "maxCount")) \ @@ -193,6 +202,15 @@ PROBE(TryToHarmonizeSuccess, GROUPS("Harmonizer"), \ TYPES(ui64, ui64, ui64), \ NAMES("ts", "nextHarmonizeTs", "previousNextHarmonizeTs")) \ + PROBE(SpinCycles, GROUPS("Harmonizer"), \ + TYPES(ui32, TString, ui64, bool), \ + NAMES("poolId", "pool", "spinPauseCount", "IsInterrupted")) \ + PROBE(WaitingHistogramPerThread, GROUPS("Harmonizer"), \ + TYPES(ui32, TString, ui32, double, double, ui64), \ + NAMES("poolId", "pool", "threadIdx", "fromUs", "toUs", "count")) \ + PROBE(ChangeSpinThresholdPerThread, GROUPS("Harmonizer"), \ + TYPES(ui32, TString, ui32, ui64, double, ui64), \ + NAMES("poolId", "pool", "threadIdx", "spinThreshold", "spinThresholdUs", "bucketIdx")) \ /**/ LWTRACE_DECLARE_PROVIDER(ACTORLIB_PROVIDER) diff --git a/library/cpp/actors/core/thread_context.h b/library/cpp/actors/core/thread_context.h index 2c70ccd566..13e493f855 100644 --- a/library/cpp/actors/core/thread_context.h +++ b/library/cpp/actors/core/thread_context.h @@ -9,6 +9,9 @@ namespace NActors { class IExecutorPool; + template <typename T> + struct TWaitingStats; + struct TThreadContext { IExecutorPool *Pool = nullptr; ui32 CapturedActivation = 0; @@ -18,6 +21,8 @@ namespace NActors { ui32 WriteTurn = 0; TWorkerId WorkerId; ui16 LocalQueueSize = 0; + TWaitingStats<ui64> *WaitingStats = nullptr; + bool IsCurrentRecipientAService = false; }; extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp diff --git a/library/cpp/actors/core/ut_fat/actor_benchmark.cpp b/library/cpp/actors/core/ut_fat/actor_benchmark.cpp index 9512ee1ada..d47cae6ebb 100644 --- a/library/cpp/actors/core/ut_fat/actor_benchmark.cpp +++ b/library/cpp/actors/core/ut_fat/actor_benchmark.cpp @@ -30,7 +30,7 @@ Y_UNIT_TEST_SUITE(HeavyActorBenchmark) { threadsList.push_back(threads); } std::vector<ui32> actorPairsList = {512}; - TActorBenchmark::RunSendActivateReceiveCSV(threadsList, actorPairsList, {1,100, 200}); + TActorBenchmark::RunSendActivateReceiveCSV(threadsList, actorPairsList, {1,100, 200}, TDuration::Seconds(1)); } Y_UNIT_TEST(StarSendActivateReceiveCSV) { diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h index 1f9a6a1ebb..e8decd9300 100644 --- a/library/cpp/actors/helpers/pool_stats_collector.h +++ b/library/cpp/actors/helpers/pool_stats_collector.h @@ -158,6 +158,8 @@ private: NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu; NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu; NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu; + NMonitoring::TDynamicCounters::TCounterPtr SpinningTimeUs; + NMonitoring::TDynamicCounters::TCounterPtr SpinThresholdUs; THistogramCounters LegacyActivationTimeHistogram; @@ -220,6 +222,9 @@ private: MinConsumedCpu = PoolGroup->GetCounter("MinConsumedCpuByPool", false); MaxBookedCpu = PoolGroup->GetCounter("MaxBookedCpuByPool", false); MinBookedCpu = PoolGroup->GetCounter("MinBookedCpuByPool", false); + SpinningTimeUs = PoolGroup->GetCounter("SpinningTimeUs", true); + SpinThresholdUs = PoolGroup->GetCounter("SpinThresholdUs", false); + LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000); ActivationTimeHistogram = PoolGroup->GetHistogram( @@ -271,6 +276,9 @@ private: *DecreasingThreadsByExchange = poolStats.DecreasingThreadsByExchange; *NotEnoughCpuExecutions = stats.NotEnoughCpuExecutions; + *SpinningTimeUs = poolStats.SpinningTimeUs; + *SpinThresholdUs = poolStats.SpinThresholdUs; + LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram); ActivationTimeHistogram->Reset(); ActivationTimeHistogram->Collect(stats.ActivationTimeHistogram); @@ -322,6 +330,10 @@ private: NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu; NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu; + NMonitoring::TDynamicCounters::TCounterPtr AvgAwakeningTimeUs; + NMonitoring::TDynamicCounters::TCounterPtr AvgWakingUpTimeUs; + + void Init(NMonitoring::TDynamicCounters* group) { Group = group; @@ -329,6 +341,8 @@ private: MinConsumedCpu = Group->GetCounter("MinConsumedCpu", false); MaxBookedCpu = Group->GetCounter("MaxBookedCpu", false); MinBookedCpu = Group->GetCounter("MinBookedCpu", false); + AvgAwakeningTimeUs = Group->GetCounter("AvgAwakeningTimeUs", false); + AvgWakingUpTimeUs = Group->GetCounter("AvgWakingUpTimeUs", false); } void Set(const THarmonizerStats& harmonizerStats) { @@ -337,10 +351,11 @@ private: *MinConsumedCpu = harmonizerStats.MinConsumedCpu; *MaxBookedCpu = harmonizerStats.MaxBookedCpu; *MinBookedCpu = harmonizerStats.MinBookedCpu; + + *AvgAwakeningTimeUs = harmonizerStats.AvgAwakeningTimeUs; + *AvgWakingUpTimeUs = harmonizerStats.AvgWakingUpTimeUs; #else - Y_UNUSED(poolStats); - Y_UNUSED(stats); - Y_UNUSED(numThreads); + Y_UNUSED(harmonizerStats); #endif } diff --git a/ydb/core/driver_lib/run/auto_config_initializer.cpp b/ydb/core/driver_lib/run/auto_config_initializer.cpp index 17ea71ae22..a605b3b686 100644 --- a/ydb/core/driver_lib/run/auto_config_initializer.cpp +++ b/ydb/core/driver_lib/run/auto_config_initializer.cpp @@ -257,6 +257,14 @@ namespace NKikimr::NAutoConfigInitializer { config->SetIoExecutor(pools.IOPoolId); serviceExecutor->SetExecutorId(pools.ICPoolId); + if (!config->HasActorSystemProfile()) { + if (cpuCount <= 4) { + config->SetActorSystemProfile(NKikimrConfig::TActorSystemConfig::LOW_CPU_CONSUMPTION); + } else { + config->SetActorSystemProfile(NKikimrConfig::TActorSystemConfig::LOW_LATENCY); + } + } + TVector<NKikimrConfig::TActorSystemConfig::TExecutor *> executors; for (ui32 poolIdx = 0; poolIdx < poolCount; ++poolIdx) { executors.push_back(config->AddExecutor()); diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 14a6365dbd..27448450c4 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -269,6 +269,18 @@ TDuration GetSelfPingInterval(const NKikimrConfig::TActorSystemConfig& systemCon : TDuration::MilliSeconds(10); } + +NActors::EASProfile ConvertActorSystemProfile(NKikimrConfig::TActorSystemConfig::EActorSystemProfile profile) { + switch (profile) { + case NKikimrConfig::TActorSystemConfig::DEFAULT: + return NActors::EASProfile::Default; + case NKikimrConfig::TActorSystemConfig::LOW_CPU_CONSUMPTION: + return NActors::EASProfile::LowCpuConsumption; + case NKikimrConfig::TActorSystemConfig::LOW_LATENCY: + return NActors::EASProfile::LowLatency; + } +} + void AddExecutorPool( TCpuManagerConfig& cpuManager, const NKikimrConfig::TActorSystemConfig::TExecutor& poolConfig, @@ -305,6 +317,7 @@ void AddExecutorPool( } else if (systemConfig.HasEventsPerMailbox()) { basic.EventsPerMailbox = systemConfig.GetEventsPerMailbox(); } + basic.ActorSystemProfile = ConvertActorSystemProfile(systemConfig.GetActorSystemProfile()); Y_ABORT_UNLESS(basic.EventsPerMailbox != 0); basic.MinThreadCount = poolConfig.GetMinThreads(); basic.MaxThreadCount = poolConfig.GetMaxThreads(); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 5e5e874d98..4b70952175 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -51,6 +51,12 @@ message TActorSystemConfig { HYBRID = 3; } + enum EActorSystemProfile { + DEFAULT = 1; + LOW_CPU_CONSUMPTION = 2; + LOW_LATENCY = 3; + } + message TExecutor { enum EType { BASIC = 1; @@ -132,6 +138,7 @@ message TActorSystemConfig { optional ENodeType NodeType = 14 [default = COMPUTE]; optional bool MonitorStuckActors = 15; + optional EActorSystemProfile ActorSystemProfile = 16; } message TStaticNameserviceConfig { |