diff options
author | kruall <kruall@ydb.tech> | 2024-01-09 22:33:42 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-09 22:33:42 +0300 |
commit | 3175dd350105c702ac17667232d4467c32734457 (patch) | |
tree | 845ad85ae7d3842419f6944c46e16934d62563ac | |
parent | 2706eed56c95d65b3b39f828825927b498eb55a1 (diff) | |
download | ydb-3175dd350105c702ac17667232d4467c32734457.tar.gz |
refactor thread ctx (#897)
Co-authored-by: Aleksandr Kriukov <kruall@ydb.ru>
-rw-r--r-- | ydb/library/actors/core/executor_pool_basic.cpp | 52 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_thread.cpp | 2 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_thread_ctx.h | 152 |
3 files changed, 112 insertions, 94 deletions
diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index f316201ef7..8f9892d96c 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -1,6 +1,7 @@ #include "executor_pool_basic.h" #include "executor_pool_basic_feature_flags.h" #include "actor.h" +#include "executor_thread_ctx.h" #include "probes.h" #include "mailbox.h" #include <ydb/library/actors/util/affinity.h> @@ -167,7 +168,7 @@ namespace NActors { } if (workerId >= 0) { - Threads[workerId].ExchangeState(TExecutorThreadCtx::WS_NONE); + Threads[workerId].ExchangeState(EThreadState::None); } TAtomic x = AtomicGet(Semaphore); @@ -191,7 +192,7 @@ namespace NActors { } else { if (const ui32 activation = Activations.Pop(++revolvingCounter)) { if (workerId >= 0) { - Threads[workerId].ExchangeState(TExecutorThreadCtx::WS_RUNNING); + Threads[workerId].ExchangeState(EThreadState::Work); } AtomicDecrement(Semaphore); TlsThreadContext->Timers.HPNow = GetCycleCountFast(); @@ -244,18 +245,18 @@ namespace NActors { inline void TBasicExecutorPool::WakeUpLoop(i16 currentThreadCount) { for (i16 i = 0;;) { TExecutorThreadCtx& threadCtx = Threads[i]; - TExecutorThreadCtx::TWaitState state = threadCtx.GetState(); - switch (state.Flag) { - case TExecutorThreadCtx::WS_NONE: - case TExecutorThreadCtx::WS_RUNNING: + EThreadState state = threadCtx.GetState<EThreadState>(); + switch (state) { + case EThreadState::None: + case EThreadState::Work: if (++i >= MaxThreadCount - SharedExecutorsCount) { i = 0; } break; - case TExecutorThreadCtx::WS_ACTIVE: - case TExecutorThreadCtx::WS_BLOCKED: - if (threadCtx.ReplaceState(state, TExecutorThreadCtx::WS_NONE)) { - if (state.Flag == TExecutorThreadCtx::WS_BLOCKED) { + case EThreadState::Spin: + case EThreadState::Sleep: + if (threadCtx.ReplaceState<EThreadState>(state, EThreadState::None)) { + if (state == EThreadState::Sleep) { ui64 beforeUnpark = GetCycleCountFast(); threadCtx.StartWakingTs = beforeUnpark; if (TlsThreadContext && TlsThreadContext->WaitingStats) { @@ -601,38 +602,13 @@ namespace NActors { } bool TExecutorThreadCtx::Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) { - TWaitState state = ExchangeState(WS_ACTIVE); - Y_ABORT_UNLESS(state.Flag == WS_NONE, "WaitingFlag# %d", int(state.Flag)); - if (OwnerExecutorPool) { - // if (!OwnerExecutorPool->SetSleepOwnSharedThread()) { - // return false; - // } - // if (TBasicExecutorPool *pool = OtherExecutorPool; pool) { - // if (!pool->SetSleepBorrowedSharedThread()) { - // return false; - // } - //} - } + EThreadState state = ExchangeState<EThreadState>(EThreadState::Spin); + Y_ABORT_UNLESS(state == EThreadState::None, "WaitingFlag# %d", int(state)); if (spinThresholdCycles > 0) { // spin configured period Spin(spinThresholdCycles, stopFlag); - // then - sleep - state = GetState(); - if (state.Flag == WS_ACTIVE) { - if (ReplaceState(state, WS_BLOCKED)) { - if (Sleep(stopFlag)) { // interrupted - return true; - } - } else { - NextPool = state.NextPool; - } - } - } else { - Block(stopFlag); } - - Y_DEBUG_ABORT_UNLESS(stopFlag->load() || GetState().Flag == WS_NONE); - return false; + return Sleep(stopFlag); } } diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp index aa4441590d..477a98ba88 100644 --- a/ydb/library/actors/core/executor_thread.cpp +++ b/ydb/library/actors/core/executor_thread.cpp @@ -523,7 +523,7 @@ namespace NActors { std::vector<TExecutorPoolBaseMailboxed*> pools; do { if (NeedToReloadPools.load() == EState::NeedToReloadPools) { - otherPool = dynamic_cast<TExecutorPoolBaseMailboxed*>(ThreadCtx->OtherExecutorPool.load()); + // otherPool = dynamic_cast<TExecutorPoolBaseMailboxed*>(ThreadCtx->OtherExecutorPool.load()); NeedToReloadPools = EState::Running; } bool wasWorking = true; diff --git a/ydb/library/actors/core/executor_thread_ctx.h b/ydb/library/actors/core/executor_thread_ctx.h index 2674fd61ca..67554885b2 100644 --- a/ydb/library/actors/core/executor_thread_ctx.h +++ b/ydb/library/actors/core/executor_thread_ctx.h @@ -11,66 +11,43 @@ namespace NActors { class TExecutorThread; class TBasicExecutorPool; - struct TExecutorThreadCtx { - enum EWaitState : ui64 { - WS_NONE, - WS_ACTIVE, - WS_BLOCKED, - WS_RUNNING - }; - - struct TWaitState { - EWaitState Flag = WS_NONE; - ui32 NextPool = Max<ui32>(); - - TWaitState() = default; - - explicit TWaitState(ui64 state) - : Flag(static_cast<EWaitState>(state & 0x7)) - , NextPool(state >> 3) - {} - - explicit TWaitState(EWaitState flag, ui32 nextPool = Max<ui32>()) - : Flag(flag) - , NextPool(nextPool) - {} - - explicit operator ui64() { - return Flag | ui64(NextPool << 3); - } - }; + enum class EThreadState : ui64 { + None, + Spin, + Sleep, + Work + }; + struct TGenericExecutorThreadCtx { TAutoPtr<TExecutorThread> Thread; TThreadParkPad WaitingPad; private: - std::atomic<ui64> WaitingFlag = WS_NONE; + std::atomic<ui64> WaitingFlag = static_cast<ui64>(EThreadState::None); public: - TBasicExecutorPool *OwnerExecutorPool = nullptr; - std::atomic<TBasicExecutorPool*> OtherExecutorPool = nullptr; ui64 StartWakingTs = 0; - ui32 NextPool = 0; - bool IsShared; - - // different threads must spin/block on different cache-lines. - // we add some padding bytes to enforce this rule; + template <typename TWaitState> TWaitState GetState() { return TWaitState(WaitingFlag.load()); } - TWaitState ExchangeState(EWaitState flag, ui32 nextPool = Max<ui32>()) { - return TWaitState(WaitingFlag.exchange(static_cast<ui64>(TWaitState(flag, nextPool)))); + template <typename TWaitState> + TWaitState ExchangeState(TWaitState state) { + return TWaitState(WaitingFlag.exchange(static_cast<ui64>(state))); } - bool ReplaceState(TWaitState &expected, EWaitState flag, ui32 nextPool = Max<ui32>()) { + template <typename TWaitState> + bool ReplaceState(TWaitState &expected, TWaitState state) { ui64 expectedInt = static_cast<ui64>(expected); - bool result = WaitingFlag.compare_exchange_strong(expectedInt, static_cast<ui64>(TWaitState(flag, nextPool))); + bool result = WaitingFlag.compare_exchange_strong(expectedInt, static_cast<ui64>(state)); expected = TWaitState(expectedInt); return result; } + protected: + template <typename TDerived, typename TWaitState> void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) { ui64 start = GetCycleCountFast(); bool doSpin = true; @@ -81,11 +58,11 @@ namespace NActors { break; } for (ui32 i = 0; i < 12; ++i) { - TWaitState state = GetState(); - if (state.Flag == WS_ACTIVE) { + TWaitState state = GetState<TWaitState>(); + if (static_cast<EThreadState>(state) == EThreadState::Spin) { SpinLockPause(); } else { - NextPool = state.NextPool; + static_cast<TDerived*>(this)->AfterWakeUp(state); doSpin = false; break; } @@ -100,10 +77,16 @@ namespace NActors { } } + template <typename TDerived, typename TWaitState> bool Sleep(std::atomic<bool> *stopFlag) { Y_DEBUG_ABORT_UNLESS(TlsThreadContext); - TWaitState state; + TWaitState state = TWaitState{EThreadState::Spin}; + if (!ReplaceState<TWaitState>(state, TWaitState{EThreadState::Sleep})) { + static_cast<TDerived*>(this)->AfterWakeUp(state); + return false; + } + do { TlsThreadContext->Timers.HPNow = GetCycleCountFast(); TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart; @@ -111,25 +94,84 @@ namespace NActors { return true; TlsThreadContext->Timers.HPStart = GetCycleCountFast(); TlsThreadContext->Timers.Parked += TlsThreadContext->Timers.HPStart - TlsThreadContext->Timers.HPNow; - state = GetState(); - } while (state.Flag == WS_BLOCKED && !stopFlag->load(std::memory_order_relaxed)); - NextPool = state.NextPool; + state = GetState<TWaitState>(); + } while (static_cast<EThreadState>(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed)); + + static_cast<TDerived*>(this)->AfterWakeUp(state); return false; } + }; + + struct TExecutorThreadCtx : public TGenericExecutorThreadCtx { + using TBase = TGenericExecutorThreadCtx; + + TBasicExecutorPool *OwnerExecutorPool = nullptr; + + void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) { + this->TBase::Spin<TExecutorThreadCtx, EThreadState>(spinThresholdCycles, stopFlag); + } + + bool Sleep(std::atomic<bool> *stopFlag) { + return this->TBase::Sleep<TExecutorThreadCtx, EThreadState>(stopFlag); + } bool Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag); // in executor_pool_basic.cpp - bool Block(std::atomic<bool> *stopFlag) { - TWaitState state{WS_ACTIVE}; - if (ReplaceState(state, WS_BLOCKED)) { - Y_ABORT_UNLESS(state.Flag == WS_ACTIVE, "WaitingFlag# %d", int(state.Flag)); - return Sleep(stopFlag); - } else { - return false; - } + void AfterWakeUp(EThreadState /*state*/) { } TExecutorThreadCtx() = default; }; + + constexpr ui32 MaxPoolsForSharedThreads = 4; + + struct TSharedExecutorThreadCtx : public TGenericExecutorThreadCtx { + using TBase = TGenericExecutorThreadCtx; + + struct TWaitState { + EThreadState Flag = EThreadState::None; + ui32 NextPool = Max<ui32>(); + + TWaitState() = default; + + TWaitState(ui64 state) + : Flag(static_cast<EThreadState>(state & 0x7)) + , NextPool(state >> 3) + {} + + TWaitState(EThreadState flag, ui32 nextPool = Max<ui32>()) + : Flag(flag) + , NextPool(nextPool) + {} + + explicit operator ui64() { + return static_cast<ui64>(Flag) | ui64(NextPool << 3); + } + + explicit operator EThreadState() { + return Flag; + } + }; + + std::atomic<TBasicExecutorPool*> ExecutorPools[MaxPoolsForSharedThreads]; + ui32 NextPool = 0; + + void AfterWakeUp(TWaitState state) { + NextPool = state.NextPool; + } + + void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) { + this->TBase::Spin<TSharedExecutorThreadCtx, TWaitState>(spinThresholdCycles, stopFlag); + } + + bool Sleep(std::atomic<bool> *stopFlag) { + return this->TBase::Sleep<TSharedExecutorThreadCtx, TWaitState>(stopFlag); + } + + bool Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag); // in executor_pool_basic.cpp + + TSharedExecutorThreadCtx() = default; + }; + } |