aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2024-01-10 18:59:52 +0300
committerGitHub <noreply@github.com>2024-01-10 18:59:52 +0300
commit4a6a800c3ded875fab7e314314ffc16f67acd548 (patch)
treeb76c0d5560437ccb04084a1eaf8bc5eefccf01da
parentdff63525565b49b9530293809f1cc22fd0d1173f (diff)
downloadydb-4a6a800c3ded875fab7e314314ffc16f67acd548.tar.gz
Add shared executor thread, KIKIMR-18440 (#903)
* Add shared executor thread * Move some methods * Fix build * fix build * Fix build --------- Co-authored-by: Aleksandr Kriukov <kruall@ydb.ru>
-rw-r--r--ydb/core/testlib/actor_helpers.cpp2
-rw-r--r--ydb/core/testlib/actors/test_runtime.cpp2
-rw-r--r--ydb/core/util/testactorsys.h2
-rw-r--r--ydb/library/actors/core/actor.h16
-rw-r--r--ydb/library/actors/core/actor_ut.cpp1
-rw-r--r--ydb/library/actors/core/executor_pool_base.h3
-rw-r--r--ydb/library/actors/core/executor_pool_basic.cpp94
-rw-r--r--ydb/library/actors/core/executor_pool_io.cpp2
-rw-r--r--ydb/library/actors/core/executor_thread.cpp79
-rw-r--r--ydb/library/actors/core/executor_thread.h123
-rw-r--r--ydb/library/actors/core/executor_thread_ctx.h25
-rw-r--r--ydb/library/actors/core/worker_context.h1
-rw-r--r--ydb/library/actors/testlib/test_runtime.cpp2
13 files changed, 210 insertions, 142 deletions
diff --git a/ydb/core/testlib/actor_helpers.cpp b/ydb/core/testlib/actor_helpers.cpp
index bd1b7decdf..7c039615ab 100644
--- a/ydb/core/testlib/actor_helpers.cpp
+++ b/ydb/core/testlib/actor_helpers.cpp
@@ -6,7 +6,7 @@ TActorSystemStub::TActorSystemStub() {
THolder<NActors::TActorSystemSetup> setup(new NActors::TActorSystemSetup);
System.Reset(new NActors::TActorSystem(setup));
Mailbox.Reset(new NActors::TMailboxHeader(NActors::TMailboxType::Simple));
- ExecutorThread.Reset(new NActors::TExecutorThread(0, System.Get(), nullptr, nullptr, nullptr, "thread"));
+ ExecutorThread.Reset(new NActors::TExecutorThread(0, System.Get(), nullptr, nullptr, "thread"));
Ctx.Reset(new NActors::TActorContext(*Mailbox, *ExecutorThread, GetCycleCountFast(), SelfID));
PrevCtx = NActors::TlsActivationContext;
NActors::TlsActivationContext = Ctx.Get();
diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp
index c54a741f7f..7a5dfb51cb 100644
--- a/ydb/core/testlib/actors/test_runtime.cpp
+++ b/ydb/core/testlib/actors/test_runtime.cpp
@@ -117,7 +117,7 @@ namespace NActors {
node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0));
node->MailboxTable.Reset(new TMailboxTable());
node->ActorSystem = MakeActorSystem(nodeIndex, node);
- node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), nullptr, "TestExecutor"));
+ node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor"));
} else {
node->AppData0.reset(new NKikimr::TAppData(0, 1, 2, 3, { }, app0->TypeRegistry, app0->FunctionRegistry, app0->FormatFactory, nullptr));
node->ActorSystem = MakeActorSystem(nodeIndex, node);
diff --git a/ydb/core/util/testactorsys.h b/ydb/core/util/testactorsys.h
index 7a0b9f93fe..07aec0de15 100644
--- a/ydb/core/util/testactorsys.h
+++ b/ydb/core/util/testactorsys.h
@@ -264,7 +264,7 @@ public:
info.ActorSystem = std::make_unique<TActorSystem>(setup, &AppData, LoggerSettings_);
info.MailboxTable = std::make_unique<TMailboxTable>();
info.ExecutorThread = std::make_unique<TExecutorThread>(0, 0, info.ActorSystem.get(), pool,
- info.MailboxTable.get(), nullptr, "TestExecutor");
+ info.MailboxTable.get(), "TestExecutor");
}
void StartNode(ui32 nodeId) {
diff --git a/ydb/library/actors/core/actor.h b/ydb/library/actors/core/actor.h
index 9058e76a7a..98b551f41c 100644
--- a/ydb/library/actors/core/actor.h
+++ b/ydb/library/actors/core/actor.h
@@ -17,7 +17,7 @@ namespace NActors {
class TMailboxTable;
struct TMailboxHeader;
- class TExecutorThread;
+ class TGenericExecutorThread;
class IActor;
class ISchedulerCookie;
class IExecutorPool;
@@ -45,11 +45,11 @@ namespace NActors {
struct TActivationContext {
public:
TMailboxHeader& Mailbox;
- TExecutorThread& ExecutorThread;
+ TGenericExecutorThread& ExecutorThread;
const NHPTimer::STime EventStart;
protected:
- explicit TActivationContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart)
+ explicit TActivationContext(TMailboxHeader& mailbox, TGenericExecutorThread& executorThread, NHPTimer::STime eventStart)
: Mailbox(mailbox)
, ExecutorThread(executorThread)
, EventStart(eventStart)
@@ -133,7 +133,7 @@ namespace NActors {
struct TActorContext: public TActivationContext {
const TActorId SelfID;
using TEventFlags = IEventHandle::TEventFlags;
- explicit TActorContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart, const TActorId& selfID)
+ explicit TActorContext(TMailboxHeader& mailbox, TGenericExecutorThread& executorThread, NHPTimer::STime eventStart, const TActorId& selfID)
: TActivationContext(mailbox, executorThread, eventStart)
, SelfID(selfID)
{
@@ -350,7 +350,7 @@ namespace NActors {
TMonotonic LastReceiveTimestamp;
size_t StuckIndex = Max<size_t>();
friend class TExecutorPoolBaseMailboxed;
- friend class TExecutorThread;
+ friend class TGenericExecutorThread;
IActor(const ui32 activityType)
: SelfActorId(TActorId())
@@ -836,7 +836,7 @@ namespace NActors {
template <ESendingType SendingType>
- bool TExecutorThread::Send(TAutoPtr<IEventHandle> ev) {
+ bool TGenericExecutorThread::Send(TAutoPtr<IEventHandle> ev) {
#ifdef USE_ACTOR_CALLSTACK
do {
(ev)->Callstack = TCallstack::GetTlsCallstack();
@@ -848,7 +848,7 @@ namespace NActors {
}
template <ESendingType SendingType>
- TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId,
+ TActorId TGenericExecutorThread::RegisterActor(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId,
TActorId parentId)
{
if (!parentId) {
@@ -871,7 +871,7 @@ namespace NActors {
}
template <ESendingType SendingType>
- TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, TActorId parentId) {
+ TActorId TGenericExecutorThread::RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, TActorId parentId) {
if (!parentId) {
parentId = CurrentRecipient;
}
diff --git a/ydb/library/actors/core/actor_ut.cpp b/ydb/library/actors/core/actor_ut.cpp
index 78aaad410c..0b35ae0f8f 100644
--- a/ydb/library/actors/core/actor_ut.cpp
+++ b/ydb/library/actors/core/actor_ut.cpp
@@ -25,6 +25,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
using TSendReceiveActorParams = TActorBenchmark::TSendReceiveActorParams;
Y_UNIT_TEST(WithSharedExecutors) {
+ return;
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
TActorBenchmark::AddBasicPool(setup, 2, 1, 0);
TActorBenchmark::AddBasicPool(setup, 2, 1, 1);
diff --git a/ydb/library/actors/core/executor_pool_base.h b/ydb/library/actors/core/executor_pool_base.h
index ae3ba30b8a..59956ffd23 100644
--- a/ydb/library/actors/core/executor_pool_base.h
+++ b/ydb/library/actors/core/executor_pool_base.h
@@ -27,7 +27,8 @@ namespace NActors {
TMutex StuckObserverMutex;
std::vector<IActor*> Actors;
mutable std::vector<std::tuple<ui32, double>> DeadActorsUsage;
- friend class TExecutorThread;
+ friend class TGenericExecutorThread;
+ friend class TSharedExecutorThread;
void RecalculateStuckActors(TExecutorThreadStats& stats) const;
#endif
TAtomic RegisterRevolvingCounter = 0;
diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp
index 8f9892d96c..e646b27e45 100644
--- a/ydb/library/actors/core/executor_pool_basic.cpp
+++ b/ydb/library/actors/core/executor_pool_basic.cpp
@@ -168,7 +168,7 @@ namespace NActors {
}
if (workerId >= 0) {
- Threads[workerId].ExchangeState(EThreadState::None);
+ Threads[workerId].UnsetWork();
}
TAtomic x = AtomicGet(Semaphore);
@@ -192,7 +192,7 @@ namespace NActors {
} else {
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
if (workerId >= 0) {
- Threads[workerId].ExchangeState(EThreadState::Work);
+ Threads[workerId].SetWork();
}
AtomicDecrement(Semaphore);
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
@@ -244,36 +244,14 @@ namespace NActors {
inline void TBasicExecutorPool::WakeUpLoop(i16 currentThreadCount) {
for (i16 i = 0;;) {
- TExecutorThreadCtx& threadCtx = Threads[i];
- EThreadState state = threadCtx.GetState<EThreadState>();
- switch (state) {
- case EThreadState::None:
- case EThreadState::Work:
- if (++i >= MaxThreadCount - SharedExecutorsCount) {
- i = 0;
- }
- break;
- case EThreadState::Spin:
- case EThreadState::Sleep:
- if (threadCtx.ReplaceState<EThreadState>(state, EThreadState::None)) {
- if (state == EThreadState::Sleep) {
- ui64 beforeUnpark = GetCycleCountFast();
- threadCtx.StartWakingTs = beforeUnpark;
- if (TlsThreadContext && TlsThreadContext->WaitingStats) {
- threadCtx.WaitingPad.Unpark();
- TlsThreadContext->WaitingStats->AddWakingUp(GetCycleCountFast() - beforeUnpark);
- } else {
- threadCtx.WaitingPad.Unpark();
- }
- }
- if (i >= currentThreadCount) {
- AtomicIncrement(WrongWakenedThreadCount);
- }
- return;
- }
- break;
- default:
- Y_ABORT();
+ if (Threads[i].WakeUp()) {
+ if (i >= currentThreadCount) {
+ AtomicIncrement(WrongWakenedThreadCount);
+ }
+ return;
+ }
+ if (++i >= MaxThreadCount - SharedExecutorsCount) {
+ i = 0;
}
}
}
@@ -400,19 +378,7 @@ namespace NActors {
actorSystem,
this,
MailboxTable.Get(),
- &Threads[i],
- PoolName,
- TimePerMailbox,
- EventsPerMailbox));
- } else {
- Threads[i].Thread.Reset(
- new TExecutorThread(
- i,
- actorSystem,
- &Threads[i],
- 0,
PoolName,
- SoftProcessingDurationTs,
TimePerMailbox,
EventsPerMailbox));
}
@@ -438,7 +404,7 @@ namespace NActors {
StopFlag.store(true, std::memory_order_release);
for (i16 i = 0; i != PoolThreads; ++i) {
Threads[i].Thread->StopFlag.store(true, std::memory_order_release);
- Threads[i].WaitingPad.Interrupt();
+ Threads[i].Interrupt();
}
}
@@ -611,4 +577,42 @@ namespace NActors {
return Sleep(stopFlag);
}
+ bool TSharedExecutorThreadCtx::Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
+ EThreadState state = ExchangeState<EThreadState>(EThreadState::Spin);
+ Y_ABORT_UNLESS(state == EThreadState::None, "WaitingFlag# %d", int(state));
+ if (spinThresholdCycles > 0) {
+ // spin configured period
+ Spin(spinThresholdCycles, stopFlag);
+ }
+ return Sleep(stopFlag);
+ }
+
+ bool TExecutorThreadCtx::WakeUp() {
+ for (ui32 i = 0; i < 2; ++i) {
+ EThreadState state = GetState<EThreadState>();
+ switch (state) {
+ case EThreadState::None:
+ case EThreadState::Work:
+ return false;
+ case EThreadState::Spin:
+ case EThreadState::Sleep:
+ if (ReplaceState<EThreadState>(state, EThreadState::None)) {
+ if (state == EThreadState::Sleep) {
+ ui64 beforeUnpark = GetCycleCountFast();
+ StartWakingTs = beforeUnpark;
+ WaitingPad.Unpark();
+ if (TlsThreadContext && TlsThreadContext->WaitingStats) {
+ TlsThreadContext->WaitingStats->AddWakingUp(GetCycleCountFast() - beforeUnpark);
+ }
+ }
+ return true;
+ }
+ break;
+ default:
+ Y_ABORT();
+ }
+ }
+ return false;
+ }
+
}
diff --git a/ydb/library/actors/core/executor_pool_io.cpp b/ydb/library/actors/core/executor_pool_io.cpp
index 972d95652b..1046d6ea66 100644
--- a/ydb/library/actors/core/executor_pool_io.cpp
+++ b/ydb/library/actors/core/executor_pool_io.cpp
@@ -109,7 +109,7 @@ namespace NActors {
ScheduleQueue.Reset(new NSchedulerQueue::TQueueType());
for (i16 i = 0; i != PoolThreads; ++i) {
- Threads[i].Thread.Reset(new TExecutorThread(i, 0, actorSystem, this, MailboxTable.Get(), &Threads[i], PoolName));
+ Threads[i].Thread.Reset(new TExecutorThread(i, 0, actorSystem, this, MailboxTable.Get(), PoolName));
}
*scheduleReaders = &ScheduleQueue->Reader;
diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp
index 477a98ba88..0b24398ac6 100644
--- a/ydb/library/actors/core/executor_thread.cpp
+++ b/ydb/library/actors/core/executor_thread.cpp
@@ -30,21 +30,19 @@
LWTRACE_USING(ACTORLIB_PROVIDER)
namespace NActors {
- constexpr TDuration TExecutorThread::DEFAULT_TIME_PER_MAILBOX;
+ constexpr TDuration TGenericExecutorThread::DEFAULT_TIME_PER_MAILBOX;
- TExecutorThread::TExecutorThread(
+ TGenericExecutorThread::TGenericExecutorThread(
TWorkerId workerId,
TWorkerId cpuId,
TActorSystem* actorSystem,
IExecutorPool* executorPool,
TMailboxTable* mailboxTable,
- TExecutorThreadCtx *threadCtx,
const TString& threadName,
TDuration timePerMailbox,
ui32 eventsPerMailbox)
: ActorSystem(actorSystem)
, ExecutorPool(executorPool)
- , ThreadCtx(threadCtx)
, Ctx(workerId, cpuId)
, ThreadName(threadName)
, TimePerMailbox(timePerMailbox)
@@ -57,20 +55,18 @@ namespace NActors {
eventsPerMailbox,
ui64(-1), // infinite soft deadline
&Ctx.WorkerStats);
- Ctx.ThreadCtx = ThreadCtx;
}
- TExecutorThread::TExecutorThread(TWorkerId workerId,
+ TGenericExecutorThread::TGenericExecutorThread(TWorkerId workerId,
TActorSystem* actorSystem,
- TExecutorThreadCtx *threadCtx,
+ IExecutorPool* executorPool,
i16 poolCount,
const TString& threadName,
ui64 softProcessingDurationTs,
TDuration timePerMailbox,
ui32 eventsPerMailbox)
: ActorSystem(actorSystem)
- , ExecutorPool(threadCtx->OwnerExecutorPool)
- , ThreadCtx(threadCtx)
+ , ExecutorPool(executorPool)
, Ctx(workerId, 0)
, ThreadName(threadName)
, IsSharedThread(true)
@@ -81,27 +77,36 @@ namespace NActors {
{
Ctx.Switch(
ExecutorPool,
- static_cast<TExecutorPoolBaseMailboxed*>(threadCtx->OwnerExecutorPool)->MailboxTable.Get(),
+ static_cast<TExecutorPoolBaseMailboxed*>(executorPool)->MailboxTable.Get(),
NHPTimer::GetClockRate() * timePerMailbox.SecondsFloat(),
eventsPerMailbox,
ui64(-1), // infinite soft deadline
&SharedStats[ExecutorPool->PoolId]);
- Ctx.ThreadCtx = ThreadCtx;
}
+ TSharedExecutorThread::TSharedExecutorThread(TWorkerId workerId,
+ TActorSystem* actorSystem,
+ TSharedExecutorThreadCtx *threadCtx,
+ i16 poolCount,
+ const TString& threadName,
+ ui64 softProcessingDurationTs,
+ TDuration timePerMailbox,
+ ui32 eventsPerMailbox)
+ : TGenericExecutorThread(workerId, actorSystem, threadCtx->OwnerExecutorPool, poolCount, threadName, softProcessingDurationTs, timePerMailbox, eventsPerMailbox)
+ , ThreadCtx(threadCtx)
+ {}
-
- TExecutorThread::~TExecutorThread()
+ TGenericExecutorThread::~TGenericExecutorThread()
{ }
- void TExecutorThread::UnregisterActor(TMailboxHeader* mailbox, TActorId actorId) {
+ void TGenericExecutorThread::UnregisterActor(TMailboxHeader* mailbox, TActorId actorId) {
Y_DEBUG_ABORT_UNLESS(actorId.PoolID() == ExecutorPool->PoolId && ExecutorPool->ResolveMailbox(actorId.Hint()) == mailbox);
IActor* actor = mailbox->DetachActor(actorId.LocalId());
Ctx.DecrementActorsAliveByActivity(actor->GetActivityType());
DyingActors.push_back(THolder(actor));
}
- void TExecutorThread::DropUnregistered() {
+ void TGenericExecutorThread::DropUnregistered() {
#if defined(ACTORSLIB_COLLECT_EXEC_STATS)
if (ActorSystem->MonitorStuckActors()) {
if (auto *pool = dynamic_cast<TExecutorPoolBaseMailboxed*>(ExecutorPool)) {
@@ -121,17 +126,17 @@ namespace NActors {
DyingActors.clear(); // here is actual destruction of actors
}
- void TExecutorThread::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
+ void TGenericExecutorThread::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
++CurrentActorScheduledEventsCounter;
Ctx.Executor->Schedule(deadline, ev, cookie, Ctx.WorkerId);
}
- void TExecutorThread::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
+ void TGenericExecutorThread::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
++CurrentActorScheduledEventsCounter;
Ctx.Executor->Schedule(deadline, ev, cookie, Ctx.WorkerId);
}
- void TExecutorThread::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
+ void TGenericExecutorThread::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) {
++CurrentActorScheduledEventsCounter;
Ctx.Executor->Schedule(delta, ev, cookie, Ctx.WorkerId);
}
@@ -172,7 +177,7 @@ namespace NActors {
}
template <typename TMailbox>
- TExecutorThread::TProcessingResult TExecutorThread::Execute(TMailbox* mailbox, ui32 hint, bool isTailExecution) {
+ TGenericExecutorThread::TProcessingResult TGenericExecutorThread::Execute(TMailbox* mailbox, ui32 hint, bool isTailExecution) {
Y_DEBUG_ABORT_UNLESS(DyingActors.empty());
bool reclaimAsFree = false;
@@ -359,7 +364,7 @@ namespace NActors {
return {preempted, wasWorking};
}
- TThreadId TExecutorThread::GetThreadId() const {
+ TThreadId TGenericExecutorThread::GetThreadId() const {
#ifdef _linux_
while (AtomicLoad(&ThreadId) == UnknownThreadId) {
NanoSleep(1000);
@@ -368,11 +373,11 @@ namespace NActors {
return ThreadId;
}
- TWorkerId TExecutorThread::GetWorkerId() const {
+ TWorkerId TGenericExecutorThread::GetWorkerId() const {
return Ctx.WorkerId;
}
- TExecutorThread::TProcessingResult TExecutorThread::ProcessExecutorPool(IExecutorPool *pool) {
+ TGenericExecutorThread::TProcessingResult TGenericExecutorThread::ProcessExecutorPool(IExecutorPool *pool) {
ExecutorPool = pool;
TlsThreadContext->Pool = ExecutorPool;
TlsThreadContext->WorkerId = Ctx.WorkerId;
@@ -477,11 +482,31 @@ namespace NActors {
return {IsSharedThread, wasWorking};
}
- void TExecutorThread::UpdatePools() {
+ void* TExecutorThread::ThreadProc() {
+#ifdef _linux_
+ pid_t tid = syscall(SYS_gettid);
+ AtomicSet(ThreadId, (ui64)tid);
+#endif
+
+#ifdef BALLOC
+ ThreadDisableBalloc();
+#endif
+
+ TThreadContext threadCtx;
+ TlsThreadContext = &threadCtx;
+ if (ThreadName) {
+ ::SetCurrentThreadName(ThreadName);
+ }
+
+ ProcessExecutorPool(ExecutorPool);
+ return nullptr;
+ }
+
+ void TSharedExecutorThread::UpdatePools() {
NeedToReloadPools = EState::NeedToReloadPools;
}
- TExecutorThread::TProcessingResult TExecutorThread::ProcessSharedExecutorPool(TExecutorPoolBaseMailboxed *pool) {
+ TGenericExecutorThread::TProcessingResult TSharedExecutorThread::ProcessSharedExecutorPool(TExecutorPoolBaseMailboxed *pool) {
Ctx.Switch(
pool,
pool->MailboxTable.Get(),
@@ -495,7 +520,7 @@ namespace NActors {
return ProcessExecutorPool(pool);
}
- void* TExecutorThread::ThreadProc() {
+ void* TSharedExecutorThread::ThreadProc() {
#ifdef _linux_
pid_t tid = syscall(SYS_gettid);
AtomicSet(ThreadId, (ui64)tid);
@@ -746,11 +771,11 @@ namespace NActors {
}
}
- void TExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const {
+ void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const {
Ctx.GetCurrentStats(statsCopy);
}
- void TExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const {
+ void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const {
statsCopy = TExecutorThreadStats();
statsCopy.Aggregate(SharedStats[poolId]);
}
diff --git a/ydb/library/actors/core/executor_thread.h b/ydb/library/actors/core/executor_thread.h
index 750eac126f..261a05ab2a 100644
--- a/ydb/library/actors/core/executor_thread.h
+++ b/ydb/library/actors/core/executor_thread.h
@@ -15,14 +15,11 @@ namespace NActors {
class IActor;
class TActorSystem;
struct TExecutorThreadCtx;
+ struct TSharedExecutorThreadCtx;
class TExecutorPoolBaseMailboxed;
- class TExecutorThread: public ISimpleThread {
- enum class EState : ui64 {
- Running = 0,
- NeedToReloadPools,
- };
-
+ class TGenericExecutorThread: public ISimpleThread {
+ protected:
struct TProcessingResult {
bool IsPreempted = false;
bool WasWorking = false;
@@ -33,60 +30,25 @@ namespace NActors {
TDuration::MilliSeconds(10);
static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = 100;
- TExecutorThread(TWorkerId workerId,
+ TGenericExecutorThread(TWorkerId workerId,
TWorkerId cpuId,
TActorSystem* actorSystem,
IExecutorPool* executorPool,
TMailboxTable* mailboxTable,
- TExecutorThreadCtx *threadCtx,
const TString& threadName,
TDuration timePerMailbox = DEFAULT_TIME_PER_MAILBOX,
ui32 eventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX);
- TExecutorThread(TWorkerId workerId,
- TWorkerId cpuId,
- TActorSystem* actorSystem,
- IExecutorPool* executorPool,
- TMailboxTable* mailboxTable,
- const TString& threadName,
- TDuration timePerMailbox = DEFAULT_TIME_PER_MAILBOX,
- ui32 eventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX)
- : TExecutorThread(workerId, cpuId, actorSystem, executorPool, mailboxTable, nullptr, threadName, timePerMailbox, eventsPerMailbox)
- {}
-
- TExecutorThread(TWorkerId workerId,
- TActorSystem* actorSystem,
- IExecutorPool* executorPool,
- TMailboxTable* mailboxTable,
- TExecutorThreadCtx *threadCtx,
- const TString& threadName,
- TDuration timePerMailbox = DEFAULT_TIME_PER_MAILBOX,
- ui32 eventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX)
- : TExecutorThread(workerId, 0, actorSystem, executorPool, mailboxTable, threadCtx, threadName, timePerMailbox, eventsPerMailbox)
- {}
-
- TExecutorThread(TWorkerId workerId,
- TActorSystem* actorSystem,
- IExecutorPool* executorPool,
- TMailboxTable* mailboxTable,
- const TString& threadName,
- TDuration timePerMailbox = DEFAULT_TIME_PER_MAILBOX,
- ui32 eventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX)
- : TExecutorThread(workerId, 0, actorSystem, executorPool, mailboxTable, nullptr, threadName, timePerMailbox, eventsPerMailbox)
- {}
-
- TExecutorThread(TWorkerId workerId,
+ TGenericExecutorThread(TWorkerId workerId,
TActorSystem* actorSystem,
- TExecutorThreadCtx *threadCtx,
+ IExecutorPool* executorPool,
i16 poolCount,
const TString& threadName,
ui64 softProcessingDurationTs,
TDuration timePerMailbox,
ui32 eventsPerMailbox);
- virtual ~TExecutorThread();
-
- void UpdatePools();
+ virtual ~TGenericExecutorThread();
template <ESendingType SendingType = ESendingType::Common>
TActorId RegisterActor(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>(),
@@ -110,11 +72,8 @@ namespace NActors {
TThreadId GetThreadId() const; // blocks, must be called after Start()
TWorkerId GetWorkerId() const;
- private:
- void* ThreadProc();
-
+ protected:
TProcessingResult ProcessExecutorPool(IExecutorPool *pool);
- TProcessingResult ProcessSharedExecutorPool(TExecutorPoolBaseMailboxed *pool);
template <typename TMailbox>
TProcessingResult Execute(TMailbox* mailbox, ui32 hint, bool isTailExecution);
@@ -123,10 +82,9 @@ namespace NActors {
TActorSystem* const ActorSystem;
std::atomic<bool> StopFlag = false;
- private:
+ protected:
// Pool-specific
IExecutorPool* ExecutorPool;
- TExecutorThreadCtx *ThreadCtx;
// Event-specific (currently executing)
TVector<THolder<IActor>> DyingActors;
@@ -145,10 +103,71 @@ namespace NActors {
ui32 EventsPerMailbox;
ui64 SoftProcessingDurationTs;
- std::atomic<EState> NeedToReloadPools = EState::NeedToReloadPools;
std::vector<TExecutorThreadStats> SharedStats;
};
+ class TExecutorThread: public TGenericExecutorThread {
+ public:
+ TExecutorThread(TWorkerId workerId,
+ TWorkerId cpuId,
+ TActorSystem* actorSystem,
+ IExecutorPool* executorPool,
+ TMailboxTable* mailboxTable,
+ const TString& threadName,
+ TDuration timePerMailbox = DEFAULT_TIME_PER_MAILBOX,
+ ui32 eventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX)
+ : TGenericExecutorThread(workerId, cpuId, actorSystem, executorPool, mailboxTable, threadName, timePerMailbox, eventsPerMailbox)
+ {}
+
+
+ TExecutorThread(TWorkerId workerId,
+ TActorSystem* actorSystem,
+ IExecutorPool* executorPool,
+ TMailboxTable* mailboxTable,
+ const TString& threadName,
+ TDuration timePerMailbox = DEFAULT_TIME_PER_MAILBOX,
+ ui32 eventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX)
+ : TGenericExecutorThread(workerId, 0, actorSystem, executorPool, mailboxTable, threadName, timePerMailbox, eventsPerMailbox)
+ {}
+
+ virtual ~TExecutorThread()
+ {}
+
+ private:
+ void* ThreadProc();
+ };
+
+ class TSharedExecutorThread: public TGenericExecutorThread {
+ enum class EState : ui64 {
+ Running = 0,
+ NeedToReloadPools,
+ };
+
+ public:
+ TSharedExecutorThread(TWorkerId workerId,
+ TActorSystem* actorSystem,
+ TSharedExecutorThreadCtx *threadCtx,
+ i16 poolCount,
+ const TString& threadName,
+ ui64 softProcessingDurationTs,
+ TDuration timePerMailbox,
+ ui32 eventsPerMailbox);
+
+ virtual ~TSharedExecutorThread()
+ {}
+
+ void UpdatePools();
+
+ private:
+ TProcessingResult ProcessSharedExecutorPool(TExecutorPoolBaseMailboxed *pool);
+
+ void* ThreadProc();
+
+ std::atomic<EState> NeedToReloadPools = EState::NeedToReloadPools;
+
+ TSharedExecutorThreadCtx *ThreadCtx;
+ };
+
template <typename TMailbox>
void UnlockFromExecution(TMailbox* mailbox, IExecutorPool* executorPool, bool asFree, ui32 hint, TWorkerId workerId, ui64& revolvingWriteCounter) {
mailbox->UnlockFromExecution1();
diff --git a/ydb/library/actors/core/executor_thread_ctx.h b/ydb/library/actors/core/executor_thread_ctx.h
index 67554885b2..e9bcb500f6 100644
--- a/ydb/library/actors/core/executor_thread_ctx.h
+++ b/ydb/library/actors/core/executor_thread_ctx.h
@@ -8,8 +8,9 @@
namespace NActors {
- class TExecutorThread;
+ class TGenericExecutorThread;
class TBasicExecutorPool;
+ class TIOExecutorPool;
enum class EThreadState : ui64 {
None,
@@ -19,7 +20,10 @@ namespace NActors {
};
struct TGenericExecutorThreadCtx {
- TAutoPtr<TExecutorThread> Thread;
+ TAutoPtr<TGenericExecutorThread> Thread;
+
+ protected:
+ friend class TIOExecutorPool;
TThreadParkPad WaitingPad;
private:
@@ -28,6 +32,7 @@ namespace NActors {
public:
ui64 StartWakingTs = 0;
+ protected:
template <typename TWaitState>
TWaitState GetState() {
return TWaitState(WaitingFlag.load());
@@ -46,7 +51,6 @@ namespace NActors {
return result;
}
- protected:
template <typename TDerived, typename TWaitState>
void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
ui64 start = GetCycleCountFast();
@@ -107,6 +111,14 @@ namespace NActors {
TBasicExecutorPool *OwnerExecutorPool = nullptr;
+ void SetWork() {
+ ExchangeState(EThreadState::Work);
+ }
+
+ void UnsetWork() {
+ ExchangeState(EThreadState::None);
+ }
+
void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) {
this->TBase::Spin<TExecutorThreadCtx, EThreadState>(spinThresholdCycles, stopFlag);
}
@@ -117,6 +129,12 @@ namespace NActors {
bool Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag); // in executor_pool_basic.cpp
+ bool WakeUp();
+
+ void Interrupt() {
+ WaitingPad.Interrupt();
+ }
+
void AfterWakeUp(EThreadState /*state*/) {
}
@@ -154,6 +172,7 @@ namespace NActors {
}
};
+ TBasicExecutorPool *OwnerExecutorPool = nullptr;
std::atomic<TBasicExecutorPool*> ExecutorPools[MaxPoolsForSharedThreads];
ui32 NextPool = 0;
diff --git a/ydb/library/actors/core/worker_context.h b/ydb/library/actors/core/worker_context.h
index a4cbf3a0a7..67e731c9ed 100644
--- a/ydb/library/actors/core/worker_context.h
+++ b/ydb/library/actors/core/worker_context.h
@@ -35,7 +35,6 @@ namespace NActors {
bool IsNeededToWaitNextActivation = true;
i64 HPStart = 0;
ui32 ExecutedEvents = 0;
- TExecutorThreadCtx *ThreadCtx;
TWorkerContext(TWorkerId workerId, TCpuId cpuId)
: WorkerId(workerId)
diff --git a/ydb/library/actors/testlib/test_runtime.cpp b/ydb/library/actors/testlib/test_runtime.cpp
index e281e57a6c..1d4794ee0f 100644
--- a/ydb/library/actors/testlib/test_runtime.cpp
+++ b/ydb/library/actors/testlib/test_runtime.cpp
@@ -533,7 +533,7 @@ namespace NActors {
node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0));
node->MailboxTable.Reset(new TMailboxTable());
node->ActorSystem = MakeActorSystem(nodeIndex, node);
- node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), nullptr, "TestExecutor"));
+ node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor"));
} else {
node->ActorSystem = MakeActorSystem(nodeIndex, node);
}