aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2024-01-09 22:33:42 +0300
committerGitHub <noreply@github.com>2024-01-09 22:33:42 +0300
commit3175dd350105c702ac17667232d4467c32734457 (patch)
tree845ad85ae7d3842419f6944c46e16934d62563ac
parent2706eed56c95d65b3b39f828825927b498eb55a1 (diff)
downloadydb-3175dd350105c702ac17667232d4467c32734457.tar.gz
refactor thread ctx (#897)
Co-authored-by: Aleksandr Kriukov <kruall@ydb.ru>
-rw-r--r--ydb/library/actors/core/executor_pool_basic.cpp52
-rw-r--r--ydb/library/actors/core/executor_thread.cpp2
-rw-r--r--ydb/library/actors/core/executor_thread_ctx.h152
3 files changed, 112 insertions, 94 deletions
diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp
index f316201ef7..8f9892d96c 100644
--- a/ydb/library/actors/core/executor_pool_basic.cpp
+++ b/ydb/library/actors/core/executor_pool_basic.cpp
@@ -1,6 +1,7 @@
#include "executor_pool_basic.h"
#include "executor_pool_basic_feature_flags.h"
#include "actor.h"
+#include "executor_thread_ctx.h"
#include "probes.h"
#include "mailbox.h"
#include <ydb/library/actors/util/affinity.h>
@@ -167,7 +168,7 @@ namespace NActors {
}
if (workerId >= 0) {
- Threads[workerId].ExchangeState(TExecutorThreadCtx::WS_NONE);
+ Threads[workerId].ExchangeState(EThreadState::None);
}
TAtomic x = AtomicGet(Semaphore);
@@ -191,7 +192,7 @@ namespace NActors {
} else {
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
if (workerId >= 0) {
- Threads[workerId].ExchangeState(TExecutorThreadCtx::WS_RUNNING);
+ Threads[workerId].ExchangeState(EThreadState::Work);
}
AtomicDecrement(Semaphore);
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
@@ -244,18 +245,18 @@ namespace NActors {
inline void TBasicExecutorPool::WakeUpLoop(i16 currentThreadCount) {
for (i16 i = 0;;) {
TExecutorThreadCtx& threadCtx = Threads[i];
- TExecutorThreadCtx::TWaitState state = threadCtx.GetState();
- switch (state.Flag) {
- case TExecutorThreadCtx::WS_NONE:
- case TExecutorThreadCtx::WS_RUNNING:
+ EThreadState state = threadCtx.GetState<EThreadState>();
+ switch (state) {
+ case EThreadState::None:
+ case EThreadState::Work:
if (++i >= MaxThreadCount - SharedExecutorsCount) {
i = 0;
}
break;
- case TExecutorThreadCtx::WS_ACTIVE:
- case TExecutorThreadCtx::WS_BLOCKED:
- if (threadCtx.ReplaceState(state, TExecutorThreadCtx::WS_NONE)) {
- if (state.Flag == TExecutorThreadCtx::WS_BLOCKED) {
+ case EThreadState::Spin:
+ case EThreadState::Sleep:
+ if (threadCtx.ReplaceState<EThreadState>(state, EThreadState::None)) {
+ if (state == EThreadState::Sleep) {
ui64 beforeUnpark = GetCycleCountFast();
threadCtx.StartWakingTs = beforeUnpark;
if (TlsThreadContext && TlsThreadContext->WaitingStats) {
@@ -601,38 +602,13 @@ namespace NActors {
}
bool TExecutorThreadCtx::Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
- TWaitState state = ExchangeState(WS_ACTIVE);
- Y_ABORT_UNLESS(state.Flag == WS_NONE, "WaitingFlag# %d", int(state.Flag));
- if (OwnerExecutorPool) {
- // if (!OwnerExecutorPool->SetSleepOwnSharedThread()) {
- // return false;
- // }
- // if (TBasicExecutorPool *pool = OtherExecutorPool; pool) {
- // if (!pool->SetSleepBorrowedSharedThread()) {
- // return false;
- // }
- //}
- }
+ EThreadState state = ExchangeState<EThreadState>(EThreadState::Spin);
+ Y_ABORT_UNLESS(state == EThreadState::None, "WaitingFlag# %d", int(state));
if (spinThresholdCycles > 0) {
// spin configured period
Spin(spinThresholdCycles, stopFlag);
- // then - sleep
- state = GetState();
- if (state.Flag == WS_ACTIVE) {
- if (ReplaceState(state, WS_BLOCKED)) {
- if (Sleep(stopFlag)) { // interrupted
- return true;
- }
- } else {
- NextPool = state.NextPool;
- }
- }
- } else {
- Block(stopFlag);
}
-
- Y_DEBUG_ABORT_UNLESS(stopFlag->load() || GetState().Flag == WS_NONE);
- return false;
+ return Sleep(stopFlag);
}
}
diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp
index aa4441590d..477a98ba88 100644
--- a/ydb/library/actors/core/executor_thread.cpp
+++ b/ydb/library/actors/core/executor_thread.cpp
@@ -523,7 +523,7 @@ namespace NActors {
std::vector<TExecutorPoolBaseMailboxed*> pools;
do {
if (NeedToReloadPools.load() == EState::NeedToReloadPools) {
- otherPool = dynamic_cast<TExecutorPoolBaseMailboxed*>(ThreadCtx->OtherExecutorPool.load());
+ // otherPool = dynamic_cast<TExecutorPoolBaseMailboxed*>(ThreadCtx->OtherExecutorPool.load());
NeedToReloadPools = EState::Running;
}
bool wasWorking = true;
diff --git a/ydb/library/actors/core/executor_thread_ctx.h b/ydb/library/actors/core/executor_thread_ctx.h
index 2674fd61ca..67554885b2 100644
--- a/ydb/library/actors/core/executor_thread_ctx.h
+++ b/ydb/library/actors/core/executor_thread_ctx.h
@@ -11,66 +11,43 @@ namespace NActors {
class TExecutorThread;
class TBasicExecutorPool;
- struct TExecutorThreadCtx {
- enum EWaitState : ui64 {
- WS_NONE,
- WS_ACTIVE,
- WS_BLOCKED,
- WS_RUNNING
- };
-
- struct TWaitState {
- EWaitState Flag = WS_NONE;
- ui32 NextPool = Max<ui32>();
-
- TWaitState() = default;
-
- explicit TWaitState(ui64 state)
- : Flag(static_cast<EWaitState>(state & 0x7))
- , NextPool(state >> 3)
- {}
-
- explicit TWaitState(EWaitState flag, ui32 nextPool = Max<ui32>())
- : Flag(flag)
- , NextPool(nextPool)
- {}
-
- explicit operator ui64() {
- return Flag | ui64(NextPool << 3);
- }
- };
+ enum class EThreadState : ui64 {
+ None,
+ Spin,
+ Sleep,
+ Work
+ };
+ struct TGenericExecutorThreadCtx {
TAutoPtr<TExecutorThread> Thread;
TThreadParkPad WaitingPad;
private:
- std::atomic<ui64> WaitingFlag = WS_NONE;
+ std::atomic<ui64> WaitingFlag = static_cast<ui64>(EThreadState::None);
public:
- TBasicExecutorPool *OwnerExecutorPool = nullptr;
- std::atomic<TBasicExecutorPool*> OtherExecutorPool = nullptr;
ui64 StartWakingTs = 0;
- ui32 NextPool = 0;
- bool IsShared;
-
- // different threads must spin/block on different cache-lines.
- // we add some padding bytes to enforce this rule;
+ template <typename TWaitState>
TWaitState GetState() {
return TWaitState(WaitingFlag.load());
}
- TWaitState ExchangeState(EWaitState flag, ui32 nextPool = Max<ui32>()) {
- return TWaitState(WaitingFlag.exchange(static_cast<ui64>(TWaitState(flag, nextPool))));
+ template <typename TWaitState>
+ TWaitState ExchangeState(TWaitState state) {
+ return TWaitState(WaitingFlag.exchange(static_cast<ui64>(state)));
}
- bool ReplaceState(TWaitState &expected, EWaitState flag, ui32 nextPool = Max<ui32>()) {
+ template <typename TWaitState>
+ bool ReplaceState(TWaitState &expected, TWaitState state) {
ui64 expectedInt = static_cast<ui64>(expected);
- bool result = WaitingFlag.compare_exchange_strong(expectedInt, static_cast<ui64>(TWaitState(flag, nextPool)));
+ bool result = WaitingFlag.compare_exchange_strong(expectedInt, static_cast<ui64>(state));
expected = TWaitState(expectedInt);
return result;
}
+ protected:
+ template <typename TDerived, typename TWaitState>
void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
ui64 start = GetCycleCountFast();
bool doSpin = true;
@@ -81,11 +58,11 @@ namespace NActors {
break;
}
for (ui32 i = 0; i < 12; ++i) {
- TWaitState state = GetState();
- if (state.Flag == WS_ACTIVE) {
+ TWaitState state = GetState<TWaitState>();
+ if (static_cast<EThreadState>(state) == EThreadState::Spin) {
SpinLockPause();
} else {
- NextPool = state.NextPool;
+ static_cast<TDerived*>(this)->AfterWakeUp(state);
doSpin = false;
break;
}
@@ -100,10 +77,16 @@ namespace NActors {
}
}
+ template <typename TDerived, typename TWaitState>
bool Sleep(std::atomic<bool> *stopFlag) {
Y_DEBUG_ABORT_UNLESS(TlsThreadContext);
- TWaitState state;
+ TWaitState state = TWaitState{EThreadState::Spin};
+ if (!ReplaceState<TWaitState>(state, TWaitState{EThreadState::Sleep})) {
+ static_cast<TDerived*>(this)->AfterWakeUp(state);
+ return false;
+ }
+
do {
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart;
@@ -111,25 +94,84 @@ namespace NActors {
return true;
TlsThreadContext->Timers.HPStart = GetCycleCountFast();
TlsThreadContext->Timers.Parked += TlsThreadContext->Timers.HPStart - TlsThreadContext->Timers.HPNow;
- state = GetState();
- } while (state.Flag == WS_BLOCKED && !stopFlag->load(std::memory_order_relaxed));
- NextPool = state.NextPool;
+ state = GetState<TWaitState>();
+ } while (static_cast<EThreadState>(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed));
+
+ static_cast<TDerived*>(this)->AfterWakeUp(state);
return false;
}
+ };
+
+ struct TExecutorThreadCtx : public TGenericExecutorThreadCtx {
+ using TBase = TGenericExecutorThreadCtx;
+
+ TBasicExecutorPool *OwnerExecutorPool = nullptr;
+
+ void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
+ this->TBase::Spin<TExecutorThreadCtx, EThreadState>(spinThresholdCycles, stopFlag);
+ }
+
+ bool Sleep(std::atomic<bool> *stopFlag) {
+ return this->TBase::Sleep<TExecutorThreadCtx, EThreadState>(stopFlag);
+ }
bool Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag); // in executor_pool_basic.cpp
- bool Block(std::atomic<bool> *stopFlag) {
- TWaitState state{WS_ACTIVE};
- if (ReplaceState(state, WS_BLOCKED)) {
- Y_ABORT_UNLESS(state.Flag == WS_ACTIVE, "WaitingFlag# %d", int(state.Flag));
- return Sleep(stopFlag);
- } else {
- return false;
- }
+ void AfterWakeUp(EThreadState /*state*/) {
}
TExecutorThreadCtx() = default;
};
+
+ constexpr ui32 MaxPoolsForSharedThreads = 4;
+
+ struct TSharedExecutorThreadCtx : public TGenericExecutorThreadCtx {
+ using TBase = TGenericExecutorThreadCtx;
+
+ struct TWaitState {
+ EThreadState Flag = EThreadState::None;
+ ui32 NextPool = Max<ui32>();
+
+ TWaitState() = default;
+
+ TWaitState(ui64 state)
+ : Flag(static_cast<EThreadState>(state & 0x7))
+ , NextPool(state >> 3)
+ {}
+
+ TWaitState(EThreadState flag, ui32 nextPool = Max<ui32>())
+ : Flag(flag)
+ , NextPool(nextPool)
+ {}
+
+ explicit operator ui64() {
+ return static_cast<ui64>(Flag) | ui64(NextPool << 3);
+ }
+
+ explicit operator EThreadState() {
+ return Flag;
+ }
+ };
+
+ std::atomic<TBasicExecutorPool*> ExecutorPools[MaxPoolsForSharedThreads];
+ ui32 NextPool = 0;
+
+ void AfterWakeUp(TWaitState state) {
+ NextPool = state.NextPool;
+ }
+
+ void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
+ this->TBase::Spin<TSharedExecutorThreadCtx, TWaitState>(spinThresholdCycles, stopFlag);
+ }
+
+ bool Sleep(std::atomic<bool> *stopFlag) {
+ return this->TBase::Sleep<TSharedExecutorThreadCtx, TWaitState>(stopFlag);
+ }
+
+ bool Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag); // in executor_pool_basic.cpp
+
+ TSharedExecutorThreadCtx() = default;
+ };
+
}