diff options
author | kruall <kruall@ydb.tech> | 2023-12-07 13:37:20 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2023-12-07 14:16:36 +0300 |
commit | d014e0f31449a0b13f21906969251d9624c3a62f (patch) | |
tree | 8cbd978397437f27df333539a6bac6852ba7fe70 | |
parent | b02752271b118467bfccb968d4ef887635c3d357 (diff) | |
download | ydb-d014e0f31449a0b13f21906969251d9624c3a62f.tar.gz |
Add executor thread ctx, KIKIMR-18440
-rw-r--r-- | ydb/library/actors/core/executor_pool_base.h | 2 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_pool_basic.cpp | 214 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_pool_basic.h | 34 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_pool_io.cpp | 14 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_pool_io.h | 8 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_thread_ctx.h | 136 | ||||
-rw-r--r-- | ydb/library/actors/core/thread_context.h | 19 | ||||
-rw-r--r-- | ydb/library/actors/core/ut_fat/actor_benchmark.cpp | 2 |
8 files changed, 221 insertions, 208 deletions
diff --git a/ydb/library/actors/core/executor_pool_base.h b/ydb/library/actors/core/executor_pool_base.h index acae4f0aea..56e74a025f 100644 --- a/ydb/library/actors/core/executor_pool_base.h +++ b/ydb/library/actors/core/executor_pool_base.h @@ -48,7 +48,7 @@ namespace NActors { TAtomic Semaphore = 0; TUnorderedCache<ui32, 512, 4> Activations; TAtomic ActivationsRevolvingCounter = 0; - volatile bool StopFlag = false; + std::atomic_bool StopFlag = false; public: TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity); ~TExecutorPoolBase(); diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index 149e704c94..624af1c1c2 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -38,7 +38,7 @@ namespace NActors { , DefaultSpinThresholdCycles(spinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles , SpinThresholdCycles(DefaultSpinThresholdCycles) , SpinThresholdCyclesPerThread(new NThreading::TPadded<std::atomic<ui64>>[threads]) - , Threads(new NThreading::TPadded<TThreadCtx>[threads]) + , Threads(new NThreading::TPadded<TExecutorThreadCtx>[threads]) , WaitingStats(new TWaitingStats<ui64>[threads]) , PoolName(poolName) , TimePerMailbox(timePerMailbox) @@ -120,131 +120,6 @@ namespace NActors { Threads.Destroy(); } - bool TBasicExecutorPool::GoToSleep(TThreadCtx& threadCtx, TTimers &timers) { - do { - timers.HPNow = GetCycleCountFast(); - timers.Elapsed += timers.HPNow - timers.HPStart; - if (threadCtx.WaitingPad.Park()) // interrupted - return true; - timers.HPStart = GetCycleCountFast(); - timers.Parked += timers.HPStart - timers.HPNow; - } while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED && !RelaxedLoad(&StopFlag)); - return false; - } - - ui32 TBasicExecutorPool::GoToSpin(TThreadCtx& threadCtx, i64 start, i64 &end) { - ui32 spinPauseCount = 0; - i64 spinThresholdCycles = 0; - if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) { - spinThresholdCycles = SpinThresholdCyclesPerThread[TlsThreadContext->WorkerId].load(); - } else { - spinThresholdCycles = SpinThresholdCycles.load(); - } - do { - end = GetCycleCountFast(); - if (end >= (start + spinThresholdCycles) || AtomicLoad(&threadCtx.WaitingFlag) != TThreadCtx::WS_ACTIVE) { - return spinPauseCount; - } - - SpinLockPause(); - spinPauseCount++; - } while (!RelaxedLoad(&StopFlag)); - - return spinPauseCount; - } - - bool TBasicExecutorPool::GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock) { -#if defined ACTORSLIB_COLLECT_EXEC_STATS - if (AtomicGetAndIncrement(ThreadUtilization) == 0) { - // Initially counter contains -t0, the pool start timestamp - // When the first thread goes to sleep we add t1, so the counter - // becomes t1-t0 >= 0, or the duration of max utilization so far. - // If the counter was negative and becomes positive, that means - // counter just turned into a duration and we should store that - // duration. Otherwise another thread raced with us and - // subtracted some other timestamp t2. - const i64 t = GetCycleCountFast(); - const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, t); - if (x < 0 && x + t > 0) - AtomicStore(&MaxUtilizationAccumulator, x + t); - } -#endif - - i64 startWaiting = GetCycleCountFast(); - i64 endSpinning = 0; - TAtomic state = AtomicLoad(&threadCtx.WaitingFlag); - bool wasSleeping = false; - Y_ABORT_UNLESS(state == TThreadCtx::WS_NONE, "WaitingFlag# %d", int(state)); - - if (SpinThresholdCycles > 0 && !needToBlock) { - // spin configured period - AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_ACTIVE); - ui32 spinPauseCount = GoToSpin(threadCtx, startWaiting, endSpinning); - SpinningTimeUs += endSpinning - startWaiting; - // then - sleep - if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) { - if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED, TThreadCtx::WS_ACTIVE)) { - if (NFeatures::TCommonFeatureFlags::ProbeSpinCycles) { - LWPROBE(SpinCycles, PoolId, PoolName, spinPauseCount, true); - } - - wasSleeping = true; - if (GoToSleep(threadCtx, timers)) { // interrupted - return true; - } - AllThreadsSleep.store(false); - } - } - if (NFeatures::TCommonFeatureFlags::ProbeSpinCycles && !wasSleeping) { - LWPROBE(SpinCycles, PoolId, PoolName, spinPauseCount, false); - } - } else { - AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED); - wasSleeping = true; - if (GoToSleep(threadCtx, timers)) { // interrupted - return true; - } - AllThreadsSleep.store(false); - } - - i64 needTimeTs = threadCtx.StartWakingTs.exchange(0); - if (wasSleeping && needTimeTs) { - ui64 waitingDuration = std::max<i64>(0, needTimeTs - startWaiting); - ui64 awakingDuration = std::max<i64>(0, GetCycleCountFast() - needTimeTs); - WaitingStats[TlsThreadContext->WorkerId].AddAwakening(waitingDuration, awakingDuration); - } else { - ui64 waitingDuration = std::max<i64>(0, endSpinning - startWaiting); - if (wasSleeping) { - WaitingStats[TlsThreadContext->WorkerId].AddFastAwakening(waitingDuration); - } else { - WaitingStats[TlsThreadContext->WorkerId].Add(waitingDuration); - } - } - - Y_DEBUG_ABORT_UNLESS(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE); - -#if defined ACTORSLIB_COLLECT_EXEC_STATS - if (AtomicDecrement(ThreadUtilization) == 0) { - // When we started sleeping counter contained t1-t0, or the - // last duration of max utilization. Now we subtract t2 >= t1, - // which turns counter negative again, and the next sleep cycle - // at timestamp t3 would be adding some new duration t3-t2. - // If the counter was positive and becomes negative that means - // there are no current races with other threads and we should - // store the last positive duration we observed. Multiple - // threads may be adding and subtracting values in potentially - // arbitrary order, which would cause counter to oscillate - // around zero. When it crosses zero is a good indication of a - // correct value. - const i64 t = GetCycleCountFast(); - const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, -t); - if (x > 0 && x - t < 0) - AtomicStore(&MaxUtilizationAccumulator, x); - } -#endif - return false; - } - void TBasicExecutorPool::AskToGoToSleep(bool *needToWait, bool *needToBlock) { TAtomic x = AtomicGet(Semaphore); do { @@ -293,12 +168,12 @@ namespace NActors { } if (workerId >= 0) { - AtomicSet(Threads[workerId].WaitingFlag, TThreadCtx::WS_NONE); + Threads[workerId].ExchangeState(TExecutorThreadCtx::WS_NONE); } TAtomic x = AtomicGet(Semaphore); TSemaphore semaphore = TSemaphore::GetSemaphore(x); - while (!RelaxedLoad(&StopFlag)) { + while (!StopFlag.load(std::memory_order_acquire)) { if (!semaphore.OldSemaphore || semaphore.CurrentSleepThreadCount < 0) { if (workerId < 0 || !wctx.IsNeededToWaitNextActivation) { timers.HPNow = GetCycleCountFast(); @@ -310,14 +185,14 @@ namespace NActors { bool needToBlock = false; AskToGoToSleep(&needToWait, &needToBlock); if (needToWait) { - if (GoToWaiting(Threads[workerId], timers, needToBlock)) { // interrupted + if (Threads[workerId].Wait(SpinThresholdCycles, &StopFlag)) { return 0; } } } else { if (const ui32 activation = Activations.Pop(++revolvingCounter)) { if (workerId >= 0) { - AtomicSet(Threads[workerId].WaitingFlag, TThreadCtx::WS_RUNNING); + Threads[workerId].ExchangeState(TExecutorThreadCtx::WS_RUNNING); } AtomicDecrement(Semaphore); timers.HPNow = GetCycleCountFast(); @@ -368,39 +243,20 @@ namespace NActors { } inline void TBasicExecutorPool::WakeUpLoop(i16 currentThreadCount) { - if (AllThreadsSleep) { - TThreadCtx& hotThreadCtx = Threads[0]; - if (AtomicCas(&hotThreadCtx.WaitingFlag, TThreadCtx::WS_NONE, TThreadCtx::WS_ACTIVE)) { - return; - } - - TThreadCtx& coldThreadCtx = Threads[AtomicLoad(&ThreadCount) - 1]; - if (AtomicCas(&coldThreadCtx.WaitingFlag, TThreadCtx::WS_NONE, TThreadCtx::WS_BLOCKED)) { - if (TlsThreadContext && TlsThreadContext->WaitingStats) { - ui64 beforeUnpark = GetCycleCountFast(); - coldThreadCtx.StartWakingTs = beforeUnpark; - coldThreadCtx.WaitingPad.Unpark(); - TlsThreadContext->WaitingStats->AddWakingUp(GetCycleCountFast() - beforeUnpark); - } else { - coldThreadCtx.WaitingPad.Unpark(); - } - return; - } - } for (i16 i = 0;;) { - TThreadCtx& threadCtx = Threads[i]; - TThreadCtx::EWaitState state = static_cast<TThreadCtx::EWaitState>(AtomicLoad(&threadCtx.WaitingFlag)); - switch (state) { - case TThreadCtx::WS_NONE: - case TThreadCtx::WS_RUNNING: + TExecutorThreadCtx& threadCtx = Threads[i]; + TExecutorThreadCtx::TWaitState state = threadCtx.GetState(); + switch (state.Flag) { + case TExecutorThreadCtx::WS_NONE: + case TExecutorThreadCtx::WS_RUNNING: if (++i >= MaxThreadCount - SharedExecutorsCount) { i = 0; } break; - case TThreadCtx::WS_ACTIVE: - case TThreadCtx::WS_BLOCKED: - if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_NONE, state)) { - if (state == TThreadCtx::WS_BLOCKED) { + case TExecutorThreadCtx::WS_ACTIVE: + case TExecutorThreadCtx::WS_BLOCKED: + if (threadCtx.ReplaceState(state, TExecutorThreadCtx::WS_NONE)) { + if (state.Flag == TExecutorThreadCtx::WS_BLOCKED) { ui64 beforeUnpark = GetCycleCountFast(); threadCtx.StartWakingTs = beforeUnpark; if (TlsThreadContext && TlsThreadContext->WaitingStats) { @@ -577,9 +433,9 @@ namespace NActors { } void TBasicExecutorPool::PrepareStop() { - AtomicStore(&StopFlag, true); + StopFlag.store(true, std::memory_order_release); for (i16 i = 0; i != PoolThreads; ++i) { - Threads[i].Thread->StopFlag = true; + Threads[i].Thread->StopFlag.store(true, std::memory_order_release); Threads[i].WaitingPad.Interrupt(); } } @@ -666,7 +522,7 @@ namespace NActors { if (threadIdx >= PoolThreads) { return {0.0, 0.0}; } - TThreadCtx& threadCtx = Threads[threadIdx]; + TExecutorThreadCtx& threadCtx = Threads[threadIdx]; TExecutorThreadStats stats; threadCtx.Thread->GetCurrentStats(stats); return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs), stats.NotEnoughCpuExecutions}; @@ -746,4 +602,40 @@ namespace NActors { LWPROBE(ChangeSpinThresholdPerThread, PoolId, PoolName, threadIdx, newSpinThreshold, resolutionUs * bucketIdx, bucketIdx); } } + + bool TExecutorThreadCtx::Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) { + TWaitState state = ExchangeState(WS_ACTIVE); + Y_ABORT_UNLESS(state.Flag == WS_NONE, "WaitingFlag# %d", int(state.Flag)); + if (OwnerExecutorPool) { + // if (!OwnerExecutorPool->SetSleepOwnSharedThread()) { + // return false; + // } + // if (TBasicExecutorPool *pool = OtherExecutorPool; pool) { + // if (!pool->SetSleepBorrowedSharedThread()) { + // return false; + // } + //} + } + if (spinThresholdCycles > 0) { + // spin configured period + Spin(spinThresholdCycles, stopFlag); + // then - sleep + state = GetState(); + if (state.Flag == WS_ACTIVE) { + if (ReplaceState(state, WS_BLOCKED)) { + if (Sleep(stopFlag)) { // interrupted + return true; + } + } else { + NextPool = state.NextPool; + } + } + } else { + Block(stopFlag); + } + + Y_DEBUG_ABORT_UNLESS(stopFlag->load() || GetState().Flag == WS_NONE); + return false; + } + } diff --git a/ydb/library/actors/core/executor_pool_basic.h b/ydb/library/actors/core/executor_pool_basic.h index 171b303519..99a41801af 100644 --- a/ydb/library/actors/core/executor_pool_basic.h +++ b/ydb/library/actors/core/executor_pool_basic.h @@ -2,6 +2,7 @@ #include "actorsystem.h" #include "executor_thread.h" +#include "executor_thread_ctx.h" #include "executor_pool_basic_feature_flags.h" #include "scheduler_queue.h" #include "executor_pool_base.h" @@ -123,39 +124,12 @@ namespace NActors { }; class TBasicExecutorPool: public TExecutorPoolBase { - struct TThreadCtx { - TAutoPtr<TExecutorThread> Thread; - TThreadParkPad WaitingPad; - TAtomic WaitingFlag; - std::atomic<i64> StartWakingTs; - std::atomic<i64> EndWakingTs; - - enum EWaitState { - WS_NONE, - WS_ACTIVE, - WS_BLOCKED, - WS_RUNNING - }; - - TThreadCtx() - : WaitingFlag(WS_NONE) - { - } - }; - - struct TTimers { - NHPTimer::STime Elapsed = 0; - NHPTimer::STime Parked = 0; - NHPTimer::STime HPStart = GetCycleCountFast(); - NHPTimer::STime HPNow; - }; - NThreading::TPadded<std::atomic_bool> AllThreadsSleep = true; const ui64 DefaultSpinThresholdCycles; std::atomic<ui64> SpinThresholdCycles; std::unique_ptr<NThreading::TPadded< std::atomic<ui64>>[]> SpinThresholdCyclesPerThread; - TArrayHolder<NThreading::TPadded<TThreadCtx>> Threads; + TArrayHolder<NThreading::TPadded<TExecutorThreadCtx>> Threads; static_assert(sizeof(std::decay_t<decltype(Threads[0])>) == PLATFORM_CACHE_LINE); TArrayHolder<NThreading::TPadded<std::queue<ui32>>> LocalQueues; TArrayHolder<TWaitingStats<ui64>> WaitingStats; @@ -283,9 +257,5 @@ namespace NActors { void AskToGoToSleep(bool *needToWait, bool *needToBlock); void WakeUpLoop(i16 currentThreadCount); - bool GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock); - ui32 GoToSpin(TThreadCtx& threadCtx, i64 start, i64 &end); - bool GoToSleep(TThreadCtx& threadCtx, TTimers &timers); - bool GoToBeBlocked(TThreadCtx& threadCtx, TTimers &timers); }; } diff --git a/ydb/library/actors/core/executor_pool_io.cpp b/ydb/library/actors/core/executor_pool_io.cpp index 974049d508..1046d6ea66 100644 --- a/ydb/library/actors/core/executor_pool_io.cpp +++ b/ydb/library/actors/core/executor_pool_io.cpp @@ -7,7 +7,7 @@ namespace NActors { TIOExecutorPool::TIOExecutorPool(ui32 poolId, ui32 threads, const TString& poolName, TAffinity* affinity) : TExecutorPoolBase(poolId, threads, affinity) - , Threads(new TThreadCtx[threads]) + , Threads(new TExecutorThreadCtx[threads]) , PoolName(poolName) {} @@ -37,17 +37,17 @@ namespace NActors { const TAtomic x = AtomicDecrement(Semaphore); if (x < 0) { - TThreadCtx& threadCtx = Threads[workerId]; + TExecutorThreadCtx& threadCtx = Threads[workerId]; ThreadQueue.Push(workerId + 1, revolvingCounter); hpnow = GetCycleCountFast(); elapsed += hpnow - hpstart; - if (threadCtx.Pad.Park()) + if (threadCtx.WaitingPad.Park()) return 0; hpstart = GetCycleCountFast(); parked += hpstart - hpnow; } - while (!RelaxedLoad(&StopFlag)) { + while (!StopFlag.load(std::memory_order_acquire)) { if (const ui32 activation = Activations.Pop(++revolvingCounter)) { hpnow = GetCycleCountFast(); elapsed += hpnow - hpstart; @@ -93,7 +93,7 @@ namespace NActors { for (;; ++revolvingWriteCounter) { if (const ui32 x = ThreadQueue.Pop(revolvingWriteCounter)) { const ui32 threadIdx = x - 1; - Threads[threadIdx].Pad.Unpark(); + Threads[threadIdx].WaitingPad.Unpark(); return; } SpinLockPause(); @@ -124,10 +124,10 @@ namespace NActors { } void TIOExecutorPool::PrepareStop() { - AtomicStore(&StopFlag, true); + StopFlag.store(true, std::memory_order_release); for (i16 i = 0; i != PoolThreads; ++i) { Threads[i].Thread->StopFlag = true; - Threads[i].Pad.Interrupt(); + Threads[i].WaitingPad.Interrupt(); } } diff --git a/ydb/library/actors/core/executor_pool_io.h b/ydb/library/actors/core/executor_pool_io.h index fe4d45774a..8385f2b28f 100644 --- a/ydb/library/actors/core/executor_pool_io.h +++ b/ydb/library/actors/core/executor_pool_io.h @@ -2,6 +2,7 @@ #include "actorsystem.h" #include "executor_thread.h" +#include "executor_thread_ctx.h" #include "scheduler_queue.h" #include "executor_pool_base.h" #include <ydb/library/actors/actor_type/indexes.h> @@ -12,12 +13,7 @@ namespace NActors { class TIOExecutorPool: public TExecutorPoolBase { - struct TThreadCtx { - TAutoPtr<TExecutorThread> Thread; - TThreadParkPad Pad; - }; - - TArrayHolder<TThreadCtx> Threads; + TArrayHolder<TExecutorThreadCtx> Threads; TUnorderedCache<ui32, 512, 4> ThreadQueue; THolder<NSchedulerQueue::TQueueType> ScheduleQueue; diff --git a/ydb/library/actors/core/executor_thread_ctx.h b/ydb/library/actors/core/executor_thread_ctx.h new file mode 100644 index 0000000000..dab10b65bd --- /dev/null +++ b/ydb/library/actors/core/executor_thread_ctx.h @@ -0,0 +1,136 @@ +#pragma once + +#include "defs.h" +#include "thread_context.h" + +#include <ydb/library/actors/util/datetime.h> +#include <ydb/library/actors/util/threadparkpad.h> + + +namespace NActors { + class TExecutorThread; + class TBasicExecutorPool; + + struct TExecutorThreadCtx { + enum EWaitState : ui64 { + WS_NONE, + WS_ACTIVE, + WS_BLOCKED, + WS_RUNNING + }; + + struct TWaitState { + EWaitState Flag = WS_NONE; + ui32 NextPool = Max<ui32>(); + + TWaitState() = default; + + explicit TWaitState(ui64 state) + : Flag(static_cast<EWaitState>(state & 0x7)) + , NextPool(state >> 3) + {} + + explicit TWaitState(EWaitState flag, ui32 nextPool = Max<ui32>()) + : Flag(flag) + , NextPool(nextPool) + {} + + explicit operator ui64() { + return Flag | ui64(NextPool << 3); + } + }; + + TAutoPtr<TExecutorThread> Thread; + TThreadParkPad WaitingPad; + + private: + std::atomic<ui64> WaitingFlag = WS_NONE; + + public: + TBasicExecutorPool *OwnerExecutorPool = nullptr; + std::atomic<TBasicExecutorPool*> OtherExecutorPool = nullptr; + // std::atomic<ui64> FullCycleCheckCount = 0; + ui64 StartWakingTs = 0; + ui32 NextPool = 0; + bool IsShared; + + // different threads must spin/block on different cache-lines. + // we add some padding bytes to enforce this rule; + + TWaitState GetState() { + return TWaitState(WaitingFlag.load()); + } + + TWaitState ExchangeState(EWaitState flag, ui32 nextPool = Max<ui32>()) { + return TWaitState(WaitingFlag.exchange(static_cast<ui64>(TWaitState(flag, nextPool)))); + } + + bool ReplaceState(TWaitState &expected, EWaitState flag, ui32 nextPool = Max<ui32>()) { + ui64 expectedInt = static_cast<ui64>(expected); + bool result = WaitingFlag.compare_exchange_strong(expectedInt, static_cast<ui64>(TWaitState(flag, nextPool))); + expected = TWaitState(expectedInt); + return result; + } + + void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) { + ui64 start = GetCycleCountFast(); + bool doSpin = true; + while (true) { + for (ui32 j = 0; doSpin && j < 12; ++j) { + if (GetCycleCountFast() >= (start + spinThresholdCycles)) { + doSpin = false; + break; + } + for (ui32 i = 0; i < 12; ++i) { + TWaitState state = GetState(); + if (state.Flag == WS_ACTIVE) { + SpinLockPause(); + } else { + NextPool = state.NextPool; + doSpin = false; + break; + } + } + } + if (!doSpin) { + break; + } + if (stopFlag->load(std::memory_order_relaxed)) { + break; + } + } + } + + bool Sleep(std::atomic<bool> *stopFlag) { + Y_DEBUG_ABORT_UNLESS(TlsThreadContext); + + TWaitState state; + do { + TlsThreadContext->Timers.HPNow = GetCycleCountFast(); + TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart; + if (WaitingPad.Park()) // interrupted + return true; + TlsThreadContext->Timers.HPStart = GetCycleCountFast(); + TlsThreadContext->Timers.Parked += TlsThreadContext->Timers.HPStart - TlsThreadContext->Timers.HPNow; + state = GetState(); + } while (state.Flag == WS_BLOCKED && !stopFlag->load(std::memory_order_relaxed)); + NextPool = state.NextPool; + return false; + } + + bool Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag); // in executor_pool_basic.cpp + + bool Block(std::atomic<bool> *stopFlag) { + TWaitState state{WS_ACTIVE}; + if (ReplaceState(state, WS_BLOCKED)) { + Y_ABORT_UNLESS(state.Flag == WS_ACTIVE, "WaitingFlag# %d", int(state.Flag)); + return Sleep(stopFlag); + } else { + return false; + } + } + + TExecutorThreadCtx() = default; + }; + +} diff --git a/ydb/library/actors/core/thread_context.h b/ydb/library/actors/core/thread_context.h index 13e493f855..5345d31d9b 100644 --- a/ydb/library/actors/core/thread_context.h +++ b/ydb/library/actors/core/thread_context.h @@ -2,6 +2,8 @@ #include "defs.h" +#include <ydb/library/actors/util/datetime.h> + #include <util/system/tls.h> @@ -12,6 +14,22 @@ namespace NActors { template <typename T> struct TWaitingStats; + struct TTimers { + NHPTimer::STime Elapsed = 0; + NHPTimer::STime Parked = 0; + NHPTimer::STime Blocked = 0; + NHPTimer::STime HPStart = GetCycleCountFast(); + NHPTimer::STime HPNow; + + void Reset() { + Elapsed = 0; + Parked = 0; + Blocked = 0; + HPStart = GetCycleCountFast(); + HPNow = HPStart; + } + }; + struct TThreadContext { IExecutorPool *Pool = nullptr; ui32 CapturedActivation = 0; @@ -23,6 +41,7 @@ namespace NActors { ui16 LocalQueueSize = 0; TWaitingStats<ui64> *WaitingStats = nullptr; bool IsCurrentRecipientAService = false; + TTimers Timers; }; extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp diff --git a/ydb/library/actors/core/ut_fat/actor_benchmark.cpp b/ydb/library/actors/core/ut_fat/actor_benchmark.cpp index 2a0d51efed..ed092603ed 100644 --- a/ydb/library/actors/core/ut_fat/actor_benchmark.cpp +++ b/ydb/library/actors/core/ut_fat/actor_benchmark.cpp @@ -10,7 +10,7 @@ using namespace NActors::NTests; struct THeavyActorBenchmarkSettings : TActorBenchmarkSettings { - static constexpr ui32 TotalEventsAmountPerThread = 1'000'000; + static constexpr ui32 TotalEventsAmountPerThread = 1'000; static constexpr auto MailboxTypes = { TMailboxType::HTSwap, |