aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2023-12-07 13:37:20 +0300
committerkruall <kruall@ydb.tech>2023-12-07 14:16:36 +0300
commitd014e0f31449a0b13f21906969251d9624c3a62f (patch)
tree8cbd978397437f27df333539a6bac6852ba7fe70
parentb02752271b118467bfccb968d4ef887635c3d357 (diff)
downloadydb-d014e0f31449a0b13f21906969251d9624c3a62f.tar.gz
Add executor thread ctx, KIKIMR-18440
-rw-r--r--ydb/library/actors/core/executor_pool_base.h2
-rw-r--r--ydb/library/actors/core/executor_pool_basic.cpp214
-rw-r--r--ydb/library/actors/core/executor_pool_basic.h34
-rw-r--r--ydb/library/actors/core/executor_pool_io.cpp14
-rw-r--r--ydb/library/actors/core/executor_pool_io.h8
-rw-r--r--ydb/library/actors/core/executor_thread_ctx.h136
-rw-r--r--ydb/library/actors/core/thread_context.h19
-rw-r--r--ydb/library/actors/core/ut_fat/actor_benchmark.cpp2
8 files changed, 221 insertions, 208 deletions
diff --git a/ydb/library/actors/core/executor_pool_base.h b/ydb/library/actors/core/executor_pool_base.h
index acae4f0aea..56e74a025f 100644
--- a/ydb/library/actors/core/executor_pool_base.h
+++ b/ydb/library/actors/core/executor_pool_base.h
@@ -48,7 +48,7 @@ namespace NActors {
TAtomic Semaphore = 0;
TUnorderedCache<ui32, 512, 4> Activations;
TAtomic ActivationsRevolvingCounter = 0;
- volatile bool StopFlag = false;
+ std::atomic_bool StopFlag = false;
public:
TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity);
~TExecutorPoolBase();
diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp
index 149e704c94..624af1c1c2 100644
--- a/ydb/library/actors/core/executor_pool_basic.cpp
+++ b/ydb/library/actors/core/executor_pool_basic.cpp
@@ -38,7 +38,7 @@ namespace NActors {
, DefaultSpinThresholdCycles(spinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles
, SpinThresholdCycles(DefaultSpinThresholdCycles)
, SpinThresholdCyclesPerThread(new NThreading::TPadded<std::atomic<ui64>>[threads])
- , Threads(new NThreading::TPadded<TThreadCtx>[threads])
+ , Threads(new NThreading::TPadded<TExecutorThreadCtx>[threads])
, WaitingStats(new TWaitingStats<ui64>[threads])
, PoolName(poolName)
, TimePerMailbox(timePerMailbox)
@@ -120,131 +120,6 @@ namespace NActors {
Threads.Destroy();
}
- bool TBasicExecutorPool::GoToSleep(TThreadCtx& threadCtx, TTimers &timers) {
- do {
- timers.HPNow = GetCycleCountFast();
- timers.Elapsed += timers.HPNow - timers.HPStart;
- if (threadCtx.WaitingPad.Park()) // interrupted
- return true;
- timers.HPStart = GetCycleCountFast();
- timers.Parked += timers.HPStart - timers.HPNow;
- } while (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_BLOCKED && !RelaxedLoad(&StopFlag));
- return false;
- }
-
- ui32 TBasicExecutorPool::GoToSpin(TThreadCtx& threadCtx, i64 start, i64 &end) {
- ui32 spinPauseCount = 0;
- i64 spinThresholdCycles = 0;
- if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) {
- spinThresholdCycles = SpinThresholdCyclesPerThread[TlsThreadContext->WorkerId].load();
- } else {
- spinThresholdCycles = SpinThresholdCycles.load();
- }
- do {
- end = GetCycleCountFast();
- if (end >= (start + spinThresholdCycles) || AtomicLoad(&threadCtx.WaitingFlag) != TThreadCtx::WS_ACTIVE) {
- return spinPauseCount;
- }
-
- SpinLockPause();
- spinPauseCount++;
- } while (!RelaxedLoad(&StopFlag));
-
- return spinPauseCount;
- }
-
- bool TBasicExecutorPool::GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock) {
-#if defined ACTORSLIB_COLLECT_EXEC_STATS
- if (AtomicGetAndIncrement(ThreadUtilization) == 0) {
- // Initially counter contains -t0, the pool start timestamp
- // When the first thread goes to sleep we add t1, so the counter
- // becomes t1-t0 >= 0, or the duration of max utilization so far.
- // If the counter was negative and becomes positive, that means
- // counter just turned into a duration and we should store that
- // duration. Otherwise another thread raced with us and
- // subtracted some other timestamp t2.
- const i64 t = GetCycleCountFast();
- const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, t);
- if (x < 0 && x + t > 0)
- AtomicStore(&MaxUtilizationAccumulator, x + t);
- }
-#endif
-
- i64 startWaiting = GetCycleCountFast();
- i64 endSpinning = 0;
- TAtomic state = AtomicLoad(&threadCtx.WaitingFlag);
- bool wasSleeping = false;
- Y_ABORT_UNLESS(state == TThreadCtx::WS_NONE, "WaitingFlag# %d", int(state));
-
- if (SpinThresholdCycles > 0 && !needToBlock) {
- // spin configured period
- AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_ACTIVE);
- ui32 spinPauseCount = GoToSpin(threadCtx, startWaiting, endSpinning);
- SpinningTimeUs += endSpinning - startWaiting;
- // then - sleep
- if (AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_ACTIVE) {
- if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED, TThreadCtx::WS_ACTIVE)) {
- if (NFeatures::TCommonFeatureFlags::ProbeSpinCycles) {
- LWPROBE(SpinCycles, PoolId, PoolName, spinPauseCount, true);
- }
-
- wasSleeping = true;
- if (GoToSleep(threadCtx, timers)) { // interrupted
- return true;
- }
- AllThreadsSleep.store(false);
- }
- }
- if (NFeatures::TCommonFeatureFlags::ProbeSpinCycles && !wasSleeping) {
- LWPROBE(SpinCycles, PoolId, PoolName, spinPauseCount, false);
- }
- } else {
- AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_BLOCKED);
- wasSleeping = true;
- if (GoToSleep(threadCtx, timers)) { // interrupted
- return true;
- }
- AllThreadsSleep.store(false);
- }
-
- i64 needTimeTs = threadCtx.StartWakingTs.exchange(0);
- if (wasSleeping && needTimeTs) {
- ui64 waitingDuration = std::max<i64>(0, needTimeTs - startWaiting);
- ui64 awakingDuration = std::max<i64>(0, GetCycleCountFast() - needTimeTs);
- WaitingStats[TlsThreadContext->WorkerId].AddAwakening(waitingDuration, awakingDuration);
- } else {
- ui64 waitingDuration = std::max<i64>(0, endSpinning - startWaiting);
- if (wasSleeping) {
- WaitingStats[TlsThreadContext->WorkerId].AddFastAwakening(waitingDuration);
- } else {
- WaitingStats[TlsThreadContext->WorkerId].Add(waitingDuration);
- }
- }
-
- Y_DEBUG_ABORT_UNLESS(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE);
-
-#if defined ACTORSLIB_COLLECT_EXEC_STATS
- if (AtomicDecrement(ThreadUtilization) == 0) {
- // When we started sleeping counter contained t1-t0, or the
- // last duration of max utilization. Now we subtract t2 >= t1,
- // which turns counter negative again, and the next sleep cycle
- // at timestamp t3 would be adding some new duration t3-t2.
- // If the counter was positive and becomes negative that means
- // there are no current races with other threads and we should
- // store the last positive duration we observed. Multiple
- // threads may be adding and subtracting values in potentially
- // arbitrary order, which would cause counter to oscillate
- // around zero. When it crosses zero is a good indication of a
- // correct value.
- const i64 t = GetCycleCountFast();
- const i64 x = AtomicGetAndAdd(MaxUtilizationCounter, -t);
- if (x > 0 && x - t < 0)
- AtomicStore(&MaxUtilizationAccumulator, x);
- }
-#endif
- return false;
- }
-
void TBasicExecutorPool::AskToGoToSleep(bool *needToWait, bool *needToBlock) {
TAtomic x = AtomicGet(Semaphore);
do {
@@ -293,12 +168,12 @@ namespace NActors {
}
if (workerId >= 0) {
- AtomicSet(Threads[workerId].WaitingFlag, TThreadCtx::WS_NONE);
+ Threads[workerId].ExchangeState(TExecutorThreadCtx::WS_NONE);
}
TAtomic x = AtomicGet(Semaphore);
TSemaphore semaphore = TSemaphore::GetSemaphore(x);
- while (!RelaxedLoad(&StopFlag)) {
+ while (!StopFlag.load(std::memory_order_acquire)) {
if (!semaphore.OldSemaphore || semaphore.CurrentSleepThreadCount < 0) {
if (workerId < 0 || !wctx.IsNeededToWaitNextActivation) {
timers.HPNow = GetCycleCountFast();
@@ -310,14 +185,14 @@ namespace NActors {
bool needToBlock = false;
AskToGoToSleep(&needToWait, &needToBlock);
if (needToWait) {
- if (GoToWaiting(Threads[workerId], timers, needToBlock)) { // interrupted
+ if (Threads[workerId].Wait(SpinThresholdCycles, &StopFlag)) {
return 0;
}
}
} else {
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
if (workerId >= 0) {
- AtomicSet(Threads[workerId].WaitingFlag, TThreadCtx::WS_RUNNING);
+ Threads[workerId].ExchangeState(TExecutorThreadCtx::WS_RUNNING);
}
AtomicDecrement(Semaphore);
timers.HPNow = GetCycleCountFast();
@@ -368,39 +243,20 @@ namespace NActors {
}
inline void TBasicExecutorPool::WakeUpLoop(i16 currentThreadCount) {
- if (AllThreadsSleep) {
- TThreadCtx& hotThreadCtx = Threads[0];
- if (AtomicCas(&hotThreadCtx.WaitingFlag, TThreadCtx::WS_NONE, TThreadCtx::WS_ACTIVE)) {
- return;
- }
-
- TThreadCtx& coldThreadCtx = Threads[AtomicLoad(&ThreadCount) - 1];
- if (AtomicCas(&coldThreadCtx.WaitingFlag, TThreadCtx::WS_NONE, TThreadCtx::WS_BLOCKED)) {
- if (TlsThreadContext && TlsThreadContext->WaitingStats) {
- ui64 beforeUnpark = GetCycleCountFast();
- coldThreadCtx.StartWakingTs = beforeUnpark;
- coldThreadCtx.WaitingPad.Unpark();
- TlsThreadContext->WaitingStats->AddWakingUp(GetCycleCountFast() - beforeUnpark);
- } else {
- coldThreadCtx.WaitingPad.Unpark();
- }
- return;
- }
- }
for (i16 i = 0;;) {
- TThreadCtx& threadCtx = Threads[i];
- TThreadCtx::EWaitState state = static_cast<TThreadCtx::EWaitState>(AtomicLoad(&threadCtx.WaitingFlag));
- switch (state) {
- case TThreadCtx::WS_NONE:
- case TThreadCtx::WS_RUNNING:
+ TExecutorThreadCtx& threadCtx = Threads[i];
+ TExecutorThreadCtx::TWaitState state = threadCtx.GetState();
+ switch (state.Flag) {
+ case TExecutorThreadCtx::WS_NONE:
+ case TExecutorThreadCtx::WS_RUNNING:
if (++i >= MaxThreadCount - SharedExecutorsCount) {
i = 0;
}
break;
- case TThreadCtx::WS_ACTIVE:
- case TThreadCtx::WS_BLOCKED:
- if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_NONE, state)) {
- if (state == TThreadCtx::WS_BLOCKED) {
+ case TExecutorThreadCtx::WS_ACTIVE:
+ case TExecutorThreadCtx::WS_BLOCKED:
+ if (threadCtx.ReplaceState(state, TExecutorThreadCtx::WS_NONE)) {
+ if (state.Flag == TExecutorThreadCtx::WS_BLOCKED) {
ui64 beforeUnpark = GetCycleCountFast();
threadCtx.StartWakingTs = beforeUnpark;
if (TlsThreadContext && TlsThreadContext->WaitingStats) {
@@ -577,9 +433,9 @@ namespace NActors {
}
void TBasicExecutorPool::PrepareStop() {
- AtomicStore(&StopFlag, true);
+ StopFlag.store(true, std::memory_order_release);
for (i16 i = 0; i != PoolThreads; ++i) {
- Threads[i].Thread->StopFlag = true;
+ Threads[i].Thread->StopFlag.store(true, std::memory_order_release);
Threads[i].WaitingPad.Interrupt();
}
}
@@ -666,7 +522,7 @@ namespace NActors {
if (threadIdx >= PoolThreads) {
return {0.0, 0.0};
}
- TThreadCtx& threadCtx = Threads[threadIdx];
+ TExecutorThreadCtx& threadCtx = Threads[threadIdx];
TExecutorThreadStats stats;
threadCtx.Thread->GetCurrentStats(stats);
return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs), stats.NotEnoughCpuExecutions};
@@ -746,4 +602,40 @@ namespace NActors {
LWPROBE(ChangeSpinThresholdPerThread, PoolId, PoolName, threadIdx, newSpinThreshold, resolutionUs * bucketIdx, bucketIdx);
}
}
+
+ bool TExecutorThreadCtx::Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
+ TWaitState state = ExchangeState(WS_ACTIVE);
+ Y_ABORT_UNLESS(state.Flag == WS_NONE, "WaitingFlag# %d", int(state.Flag));
+ if (OwnerExecutorPool) {
+ // if (!OwnerExecutorPool->SetSleepOwnSharedThread()) {
+ // return false;
+ // }
+ // if (TBasicExecutorPool *pool = OtherExecutorPool; pool) {
+ // if (!pool->SetSleepBorrowedSharedThread()) {
+ // return false;
+ // }
+ //}
+ }
+ if (spinThresholdCycles > 0) {
+ // spin configured period
+ Spin(spinThresholdCycles, stopFlag);
+ // then - sleep
+ state = GetState();
+ if (state.Flag == WS_ACTIVE) {
+ if (ReplaceState(state, WS_BLOCKED)) {
+ if (Sleep(stopFlag)) { // interrupted
+ return true;
+ }
+ } else {
+ NextPool = state.NextPool;
+ }
+ }
+ } else {
+ Block(stopFlag);
+ }
+
+ Y_DEBUG_ABORT_UNLESS(stopFlag->load() || GetState().Flag == WS_NONE);
+ return false;
+ }
+
}
diff --git a/ydb/library/actors/core/executor_pool_basic.h b/ydb/library/actors/core/executor_pool_basic.h
index 171b303519..99a41801af 100644
--- a/ydb/library/actors/core/executor_pool_basic.h
+++ b/ydb/library/actors/core/executor_pool_basic.h
@@ -2,6 +2,7 @@
#include "actorsystem.h"
#include "executor_thread.h"
+#include "executor_thread_ctx.h"
#include "executor_pool_basic_feature_flags.h"
#include "scheduler_queue.h"
#include "executor_pool_base.h"
@@ -123,39 +124,12 @@ namespace NActors {
};
class TBasicExecutorPool: public TExecutorPoolBase {
- struct TThreadCtx {
- TAutoPtr<TExecutorThread> Thread;
- TThreadParkPad WaitingPad;
- TAtomic WaitingFlag;
- std::atomic<i64> StartWakingTs;
- std::atomic<i64> EndWakingTs;
-
- enum EWaitState {
- WS_NONE,
- WS_ACTIVE,
- WS_BLOCKED,
- WS_RUNNING
- };
-
- TThreadCtx()
- : WaitingFlag(WS_NONE)
- {
- }
- };
-
- struct TTimers {
- NHPTimer::STime Elapsed = 0;
- NHPTimer::STime Parked = 0;
- NHPTimer::STime HPStart = GetCycleCountFast();
- NHPTimer::STime HPNow;
- };
-
NThreading::TPadded<std::atomic_bool> AllThreadsSleep = true;
const ui64 DefaultSpinThresholdCycles;
std::atomic<ui64> SpinThresholdCycles;
std::unique_ptr<NThreading::TPadded< std::atomic<ui64>>[]> SpinThresholdCyclesPerThread;
- TArrayHolder<NThreading::TPadded<TThreadCtx>> Threads;
+ TArrayHolder<NThreading::TPadded<TExecutorThreadCtx>> Threads;
static_assert(sizeof(std::decay_t<decltype(Threads[0])>) == PLATFORM_CACHE_LINE);
TArrayHolder<NThreading::TPadded<std::queue<ui32>>> LocalQueues;
TArrayHolder<TWaitingStats<ui64>> WaitingStats;
@@ -283,9 +257,5 @@ namespace NActors {
void AskToGoToSleep(bool *needToWait, bool *needToBlock);
void WakeUpLoop(i16 currentThreadCount);
- bool GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock);
- ui32 GoToSpin(TThreadCtx& threadCtx, i64 start, i64 &end);
- bool GoToSleep(TThreadCtx& threadCtx, TTimers &timers);
- bool GoToBeBlocked(TThreadCtx& threadCtx, TTimers &timers);
};
}
diff --git a/ydb/library/actors/core/executor_pool_io.cpp b/ydb/library/actors/core/executor_pool_io.cpp
index 974049d508..1046d6ea66 100644
--- a/ydb/library/actors/core/executor_pool_io.cpp
+++ b/ydb/library/actors/core/executor_pool_io.cpp
@@ -7,7 +7,7 @@
namespace NActors {
TIOExecutorPool::TIOExecutorPool(ui32 poolId, ui32 threads, const TString& poolName, TAffinity* affinity)
: TExecutorPoolBase(poolId, threads, affinity)
- , Threads(new TThreadCtx[threads])
+ , Threads(new TExecutorThreadCtx[threads])
, PoolName(poolName)
{}
@@ -37,17 +37,17 @@ namespace NActors {
const TAtomic x = AtomicDecrement(Semaphore);
if (x < 0) {
- TThreadCtx& threadCtx = Threads[workerId];
+ TExecutorThreadCtx& threadCtx = Threads[workerId];
ThreadQueue.Push(workerId + 1, revolvingCounter);
hpnow = GetCycleCountFast();
elapsed += hpnow - hpstart;
- if (threadCtx.Pad.Park())
+ if (threadCtx.WaitingPad.Park())
return 0;
hpstart = GetCycleCountFast();
parked += hpstart - hpnow;
}
- while (!RelaxedLoad(&StopFlag)) {
+ while (!StopFlag.load(std::memory_order_acquire)) {
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
hpnow = GetCycleCountFast();
elapsed += hpnow - hpstart;
@@ -93,7 +93,7 @@ namespace NActors {
for (;; ++revolvingWriteCounter) {
if (const ui32 x = ThreadQueue.Pop(revolvingWriteCounter)) {
const ui32 threadIdx = x - 1;
- Threads[threadIdx].Pad.Unpark();
+ Threads[threadIdx].WaitingPad.Unpark();
return;
}
SpinLockPause();
@@ -124,10 +124,10 @@ namespace NActors {
}
void TIOExecutorPool::PrepareStop() {
- AtomicStore(&StopFlag, true);
+ StopFlag.store(true, std::memory_order_release);
for (i16 i = 0; i != PoolThreads; ++i) {
Threads[i].Thread->StopFlag = true;
- Threads[i].Pad.Interrupt();
+ Threads[i].WaitingPad.Interrupt();
}
}
diff --git a/ydb/library/actors/core/executor_pool_io.h b/ydb/library/actors/core/executor_pool_io.h
index fe4d45774a..8385f2b28f 100644
--- a/ydb/library/actors/core/executor_pool_io.h
+++ b/ydb/library/actors/core/executor_pool_io.h
@@ -2,6 +2,7 @@
#include "actorsystem.h"
#include "executor_thread.h"
+#include "executor_thread_ctx.h"
#include "scheduler_queue.h"
#include "executor_pool_base.h"
#include <ydb/library/actors/actor_type/indexes.h>
@@ -12,12 +13,7 @@
namespace NActors {
class TIOExecutorPool: public TExecutorPoolBase {
- struct TThreadCtx {
- TAutoPtr<TExecutorThread> Thread;
- TThreadParkPad Pad;
- };
-
- TArrayHolder<TThreadCtx> Threads;
+ TArrayHolder<TExecutorThreadCtx> Threads;
TUnorderedCache<ui32, 512, 4> ThreadQueue;
THolder<NSchedulerQueue::TQueueType> ScheduleQueue;
diff --git a/ydb/library/actors/core/executor_thread_ctx.h b/ydb/library/actors/core/executor_thread_ctx.h
new file mode 100644
index 0000000000..dab10b65bd
--- /dev/null
+++ b/ydb/library/actors/core/executor_thread_ctx.h
@@ -0,0 +1,136 @@
+#pragma once
+
+#include "defs.h"
+#include "thread_context.h"
+
+#include <ydb/library/actors/util/datetime.h>
+#include <ydb/library/actors/util/threadparkpad.h>
+
+
+namespace NActors {
+ class TExecutorThread;
+ class TBasicExecutorPool;
+
+ struct TExecutorThreadCtx {
+ enum EWaitState : ui64 {
+ WS_NONE,
+ WS_ACTIVE,
+ WS_BLOCKED,
+ WS_RUNNING
+ };
+
+ struct TWaitState {
+ EWaitState Flag = WS_NONE;
+ ui32 NextPool = Max<ui32>();
+
+ TWaitState() = default;
+
+ explicit TWaitState(ui64 state)
+ : Flag(static_cast<EWaitState>(state & 0x7))
+ , NextPool(state >> 3)
+ {}
+
+ explicit TWaitState(EWaitState flag, ui32 nextPool = Max<ui32>())
+ : Flag(flag)
+ , NextPool(nextPool)
+ {}
+
+ explicit operator ui64() {
+ return Flag | ui64(NextPool << 3);
+ }
+ };
+
+ TAutoPtr<TExecutorThread> Thread;
+ TThreadParkPad WaitingPad;
+
+ private:
+ std::atomic<ui64> WaitingFlag = WS_NONE;
+
+ public:
+ TBasicExecutorPool *OwnerExecutorPool = nullptr;
+ std::atomic<TBasicExecutorPool*> OtherExecutorPool = nullptr;
+ // std::atomic<ui64> FullCycleCheckCount = 0;
+ ui64 StartWakingTs = 0;
+ ui32 NextPool = 0;
+ bool IsShared;
+
+ // different threads must spin/block on different cache-lines.
+ // we add some padding bytes to enforce this rule;
+
+ TWaitState GetState() {
+ return TWaitState(WaitingFlag.load());
+ }
+
+ TWaitState ExchangeState(EWaitState flag, ui32 nextPool = Max<ui32>()) {
+ return TWaitState(WaitingFlag.exchange(static_cast<ui64>(TWaitState(flag, nextPool))));
+ }
+
+ bool ReplaceState(TWaitState &expected, EWaitState flag, ui32 nextPool = Max<ui32>()) {
+ ui64 expectedInt = static_cast<ui64>(expected);
+ bool result = WaitingFlag.compare_exchange_strong(expectedInt, static_cast<ui64>(TWaitState(flag, nextPool)));
+ expected = TWaitState(expectedInt);
+ return result;
+ }
+
+ void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
+ ui64 start = GetCycleCountFast();
+ bool doSpin = true;
+ while (true) {
+ for (ui32 j = 0; doSpin && j < 12; ++j) {
+ if (GetCycleCountFast() >= (start + spinThresholdCycles)) {
+ doSpin = false;
+ break;
+ }
+ for (ui32 i = 0; i < 12; ++i) {
+ TWaitState state = GetState();
+ if (state.Flag == WS_ACTIVE) {
+ SpinLockPause();
+ } else {
+ NextPool = state.NextPool;
+ doSpin = false;
+ break;
+ }
+ }
+ }
+ if (!doSpin) {
+ break;
+ }
+ if (stopFlag->load(std::memory_order_relaxed)) {
+ break;
+ }
+ }
+ }
+
+ bool Sleep(std::atomic<bool> *stopFlag) {
+ Y_DEBUG_ABORT_UNLESS(TlsThreadContext);
+
+ TWaitState state;
+ do {
+ TlsThreadContext->Timers.HPNow = GetCycleCountFast();
+ TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart;
+ if (WaitingPad.Park()) // interrupted
+ return true;
+ TlsThreadContext->Timers.HPStart = GetCycleCountFast();
+ TlsThreadContext->Timers.Parked += TlsThreadContext->Timers.HPStart - TlsThreadContext->Timers.HPNow;
+ state = GetState();
+ } while (state.Flag == WS_BLOCKED && !stopFlag->load(std::memory_order_relaxed));
+ NextPool = state.NextPool;
+ return false;
+ }
+
+ bool Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag); // in executor_pool_basic.cpp
+
+ bool Block(std::atomic<bool> *stopFlag) {
+ TWaitState state{WS_ACTIVE};
+ if (ReplaceState(state, WS_BLOCKED)) {
+ Y_ABORT_UNLESS(state.Flag == WS_ACTIVE, "WaitingFlag# %d", int(state.Flag));
+ return Sleep(stopFlag);
+ } else {
+ return false;
+ }
+ }
+
+ TExecutorThreadCtx() = default;
+ };
+
+}
diff --git a/ydb/library/actors/core/thread_context.h b/ydb/library/actors/core/thread_context.h
index 13e493f855..5345d31d9b 100644
--- a/ydb/library/actors/core/thread_context.h
+++ b/ydb/library/actors/core/thread_context.h
@@ -2,6 +2,8 @@
#include "defs.h"
+#include <ydb/library/actors/util/datetime.h>
+
#include <util/system/tls.h>
@@ -12,6 +14,22 @@ namespace NActors {
template <typename T>
struct TWaitingStats;
+ struct TTimers {
+ NHPTimer::STime Elapsed = 0;
+ NHPTimer::STime Parked = 0;
+ NHPTimer::STime Blocked = 0;
+ NHPTimer::STime HPStart = GetCycleCountFast();
+ NHPTimer::STime HPNow;
+
+ void Reset() {
+ Elapsed = 0;
+ Parked = 0;
+ Blocked = 0;
+ HPStart = GetCycleCountFast();
+ HPNow = HPStart;
+ }
+ };
+
struct TThreadContext {
IExecutorPool *Pool = nullptr;
ui32 CapturedActivation = 0;
@@ -23,6 +41,7 @@ namespace NActors {
ui16 LocalQueueSize = 0;
TWaitingStats<ui64> *WaitingStats = nullptr;
bool IsCurrentRecipientAService = false;
+ TTimers Timers;
};
extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp
diff --git a/ydb/library/actors/core/ut_fat/actor_benchmark.cpp b/ydb/library/actors/core/ut_fat/actor_benchmark.cpp
index 2a0d51efed..ed092603ed 100644
--- a/ydb/library/actors/core/ut_fat/actor_benchmark.cpp
+++ b/ydb/library/actors/core/ut_fat/actor_benchmark.cpp
@@ -10,7 +10,7 @@ using namespace NActors::NTests;
struct THeavyActorBenchmarkSettings : TActorBenchmarkSettings {
- static constexpr ui32 TotalEventsAmountPerThread = 1'000'000;
+ static constexpr ui32 TotalEventsAmountPerThread = 1'000;
static constexpr auto MailboxTypes = {
TMailboxType::HTSwap,