diff options
author | kruall <kruall@ydb.tech> | 2024-01-10 18:59:52 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-10 18:59:52 +0300 |
commit | 4a6a800c3ded875fab7e314314ffc16f67acd548 (patch) | |
tree | b76c0d5560437ccb04084a1eaf8bc5eefccf01da | |
parent | dff63525565b49b9530293809f1cc22fd0d1173f (diff) | |
download | ydb-4a6a800c3ded875fab7e314314ffc16f67acd548.tar.gz |
Add shared executor thread, KIKIMR-18440 (#903)
* Add shared executor thread
* Move some methods
* Fix build
* fix build
* Fix build
---------
Co-authored-by: Aleksandr Kriukov <kruall@ydb.ru>
-rw-r--r-- | ydb/core/testlib/actor_helpers.cpp | 2 | ||||
-rw-r--r-- | ydb/core/testlib/actors/test_runtime.cpp | 2 | ||||
-rw-r--r-- | ydb/core/util/testactorsys.h | 2 | ||||
-rw-r--r-- | ydb/library/actors/core/actor.h | 16 | ||||
-rw-r--r-- | ydb/library/actors/core/actor_ut.cpp | 1 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_pool_base.h | 3 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_pool_basic.cpp | 94 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_pool_io.cpp | 2 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_thread.cpp | 79 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_thread.h | 123 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_thread_ctx.h | 25 | ||||
-rw-r--r-- | ydb/library/actors/core/worker_context.h | 1 | ||||
-rw-r--r-- | ydb/library/actors/testlib/test_runtime.cpp | 2 |
13 files changed, 210 insertions, 142 deletions
diff --git a/ydb/core/testlib/actor_helpers.cpp b/ydb/core/testlib/actor_helpers.cpp index bd1b7decdf..7c039615ab 100644 --- a/ydb/core/testlib/actor_helpers.cpp +++ b/ydb/core/testlib/actor_helpers.cpp @@ -6,7 +6,7 @@ TActorSystemStub::TActorSystemStub() { THolder<NActors::TActorSystemSetup> setup(new NActors::TActorSystemSetup); System.Reset(new NActors::TActorSystem(setup)); Mailbox.Reset(new NActors::TMailboxHeader(NActors::TMailboxType::Simple)); - ExecutorThread.Reset(new NActors::TExecutorThread(0, System.Get(), nullptr, nullptr, nullptr, "thread")); + ExecutorThread.Reset(new NActors::TExecutorThread(0, System.Get(), nullptr, nullptr, "thread")); Ctx.Reset(new NActors::TActorContext(*Mailbox, *ExecutorThread, GetCycleCountFast(), SelfID)); PrevCtx = NActors::TlsActivationContext; NActors::TlsActivationContext = Ctx.Get(); diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp index c54a741f7f..7a5dfb51cb 100644 --- a/ydb/core/testlib/actors/test_runtime.cpp +++ b/ydb/core/testlib/actors/test_runtime.cpp @@ -117,7 +117,7 @@ namespace NActors { node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0)); node->MailboxTable.Reset(new TMailboxTable()); node->ActorSystem = MakeActorSystem(nodeIndex, node); - node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), nullptr, "TestExecutor")); + node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor")); } else { node->AppData0.reset(new NKikimr::TAppData(0, 1, 2, 3, { }, app0->TypeRegistry, app0->FunctionRegistry, app0->FormatFactory, nullptr)); node->ActorSystem = MakeActorSystem(nodeIndex, node); diff --git a/ydb/core/util/testactorsys.h b/ydb/core/util/testactorsys.h index 7a0b9f93fe..07aec0de15 100644 --- a/ydb/core/util/testactorsys.h +++ b/ydb/core/util/testactorsys.h @@ -264,7 +264,7 @@ public: info.ActorSystem = std::make_unique<TActorSystem>(setup, &AppData, LoggerSettings_); info.MailboxTable = std::make_unique<TMailboxTable>(); info.ExecutorThread = std::make_unique<TExecutorThread>(0, 0, info.ActorSystem.get(), pool, - info.MailboxTable.get(), nullptr, "TestExecutor"); + info.MailboxTable.get(), "TestExecutor"); } void StartNode(ui32 nodeId) { diff --git a/ydb/library/actors/core/actor.h b/ydb/library/actors/core/actor.h index 9058e76a7a..98b551f41c 100644 --- a/ydb/library/actors/core/actor.h +++ b/ydb/library/actors/core/actor.h @@ -17,7 +17,7 @@ namespace NActors { class TMailboxTable; struct TMailboxHeader; - class TExecutorThread; + class TGenericExecutorThread; class IActor; class ISchedulerCookie; class IExecutorPool; @@ -45,11 +45,11 @@ namespace NActors { struct TActivationContext { public: TMailboxHeader& Mailbox; - TExecutorThread& ExecutorThread; + TGenericExecutorThread& ExecutorThread; const NHPTimer::STime EventStart; protected: - explicit TActivationContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart) + explicit TActivationContext(TMailboxHeader& mailbox, TGenericExecutorThread& executorThread, NHPTimer::STime eventStart) : Mailbox(mailbox) , ExecutorThread(executorThread) , EventStart(eventStart) @@ -133,7 +133,7 @@ namespace NActors { struct TActorContext: public TActivationContext { const TActorId SelfID; using TEventFlags = IEventHandle::TEventFlags; - explicit TActorContext(TMailboxHeader& mailbox, TExecutorThread& executorThread, NHPTimer::STime eventStart, const TActorId& selfID) + explicit TActorContext(TMailboxHeader& mailbox, TGenericExecutorThread& executorThread, NHPTimer::STime eventStart, const TActorId& selfID) : TActivationContext(mailbox, executorThread, eventStart) , SelfID(selfID) { @@ -350,7 +350,7 @@ namespace NActors { TMonotonic LastReceiveTimestamp; size_t StuckIndex = Max<size_t>(); friend class TExecutorPoolBaseMailboxed; - friend class TExecutorThread; + friend class TGenericExecutorThread; IActor(const ui32 activityType) : SelfActorId(TActorId()) @@ -836,7 +836,7 @@ namespace NActors { template <ESendingType SendingType> - bool TExecutorThread::Send(TAutoPtr<IEventHandle> ev) { + bool TGenericExecutorThread::Send(TAutoPtr<IEventHandle> ev) { #ifdef USE_ACTOR_CALLSTACK do { (ev)->Callstack = TCallstack::GetTlsCallstack(); @@ -848,7 +848,7 @@ namespace NActors { } template <ESendingType SendingType> - TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId, + TActorId TGenericExecutorThread::RegisterActor(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId, TActorId parentId) { if (!parentId) { @@ -871,7 +871,7 @@ namespace NActors { } template <ESendingType SendingType> - TActorId TExecutorThread::RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, TActorId parentId) { + TActorId TGenericExecutorThread::RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, TActorId parentId) { if (!parentId) { parentId = CurrentRecipient; } diff --git a/ydb/library/actors/core/actor_ut.cpp b/ydb/library/actors/core/actor_ut.cpp index 78aaad410c..0b35ae0f8f 100644 --- a/ydb/library/actors/core/actor_ut.cpp +++ b/ydb/library/actors/core/actor_ut.cpp @@ -25,6 +25,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { using TSendReceiveActorParams = TActorBenchmark::TSendReceiveActorParams; Y_UNIT_TEST(WithSharedExecutors) { + return; THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup(); TActorBenchmark::AddBasicPool(setup, 2, 1, 0); TActorBenchmark::AddBasicPool(setup, 2, 1, 1); diff --git a/ydb/library/actors/core/executor_pool_base.h b/ydb/library/actors/core/executor_pool_base.h index ae3ba30b8a..59956ffd23 100644 --- a/ydb/library/actors/core/executor_pool_base.h +++ b/ydb/library/actors/core/executor_pool_base.h @@ -27,7 +27,8 @@ namespace NActors { TMutex StuckObserverMutex; std::vector<IActor*> Actors; mutable std::vector<std::tuple<ui32, double>> DeadActorsUsage; - friend class TExecutorThread; + friend class TGenericExecutorThread; + friend class TSharedExecutorThread; void RecalculateStuckActors(TExecutorThreadStats& stats) const; #endif TAtomic RegisterRevolvingCounter = 0; diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index 8f9892d96c..e646b27e45 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -168,7 +168,7 @@ namespace NActors { } if (workerId >= 0) { - Threads[workerId].ExchangeState(EThreadState::None); + Threads[workerId].UnsetWork(); } TAtomic x = AtomicGet(Semaphore); @@ -192,7 +192,7 @@ namespace NActors { } else { if (const ui32 activation = Activations.Pop(++revolvingCounter)) { if (workerId >= 0) { - Threads[workerId].ExchangeState(EThreadState::Work); + Threads[workerId].SetWork(); } AtomicDecrement(Semaphore); TlsThreadContext->Timers.HPNow = GetCycleCountFast(); @@ -244,36 +244,14 @@ namespace NActors { inline void TBasicExecutorPool::WakeUpLoop(i16 currentThreadCount) { for (i16 i = 0;;) { - TExecutorThreadCtx& threadCtx = Threads[i]; - EThreadState state = threadCtx.GetState<EThreadState>(); - switch (state) { - case EThreadState::None: - case EThreadState::Work: - if (++i >= MaxThreadCount - SharedExecutorsCount) { - i = 0; - } - break; - case EThreadState::Spin: - case EThreadState::Sleep: - if (threadCtx.ReplaceState<EThreadState>(state, EThreadState::None)) { - if (state == EThreadState::Sleep) { - ui64 beforeUnpark = GetCycleCountFast(); - threadCtx.StartWakingTs = beforeUnpark; - if (TlsThreadContext && TlsThreadContext->WaitingStats) { - threadCtx.WaitingPad.Unpark(); - TlsThreadContext->WaitingStats->AddWakingUp(GetCycleCountFast() - beforeUnpark); - } else { - threadCtx.WaitingPad.Unpark(); - } - } - if (i >= currentThreadCount) { - AtomicIncrement(WrongWakenedThreadCount); - } - return; - } - break; - default: - Y_ABORT(); + if (Threads[i].WakeUp()) { + if (i >= currentThreadCount) { + AtomicIncrement(WrongWakenedThreadCount); + } + return; + } + if (++i >= MaxThreadCount - SharedExecutorsCount) { + i = 0; } } } @@ -400,19 +378,7 @@ namespace NActors { actorSystem, this, MailboxTable.Get(), - &Threads[i], - PoolName, - TimePerMailbox, - EventsPerMailbox)); - } else { - Threads[i].Thread.Reset( - new TExecutorThread( - i, - actorSystem, - &Threads[i], - 0, PoolName, - SoftProcessingDurationTs, TimePerMailbox, EventsPerMailbox)); } @@ -438,7 +404,7 @@ namespace NActors { StopFlag.store(true, std::memory_order_release); for (i16 i = 0; i != PoolThreads; ++i) { Threads[i].Thread->StopFlag.store(true, std::memory_order_release); - Threads[i].WaitingPad.Interrupt(); + Threads[i].Interrupt(); } } @@ -611,4 +577,42 @@ namespace NActors { return Sleep(stopFlag); } + bool TSharedExecutorThreadCtx::Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) { + EThreadState state = ExchangeState<EThreadState>(EThreadState::Spin); + Y_ABORT_UNLESS(state == EThreadState::None, "WaitingFlag# %d", int(state)); + if (spinThresholdCycles > 0) { + // spin configured period + Spin(spinThresholdCycles, stopFlag); + } + return Sleep(stopFlag); + } + + bool TExecutorThreadCtx::WakeUp() { + for (ui32 i = 0; i < 2; ++i) { + EThreadState state = GetState<EThreadState>(); + switch (state) { + case EThreadState::None: + case EThreadState::Work: + return false; + case EThreadState::Spin: + case EThreadState::Sleep: + if (ReplaceState<EThreadState>(state, EThreadState::None)) { + if (state == EThreadState::Sleep) { + ui64 beforeUnpark = GetCycleCountFast(); + StartWakingTs = beforeUnpark; + WaitingPad.Unpark(); + if (TlsThreadContext && TlsThreadContext->WaitingStats) { + TlsThreadContext->WaitingStats->AddWakingUp(GetCycleCountFast() - beforeUnpark); + } + } + return true; + } + break; + default: + Y_ABORT(); + } + } + return false; + } + } diff --git a/ydb/library/actors/core/executor_pool_io.cpp b/ydb/library/actors/core/executor_pool_io.cpp index 972d95652b..1046d6ea66 100644 --- a/ydb/library/actors/core/executor_pool_io.cpp +++ b/ydb/library/actors/core/executor_pool_io.cpp @@ -109,7 +109,7 @@ namespace NActors { ScheduleQueue.Reset(new NSchedulerQueue::TQueueType()); for (i16 i = 0; i != PoolThreads; ++i) { - Threads[i].Thread.Reset(new TExecutorThread(i, 0, actorSystem, this, MailboxTable.Get(), &Threads[i], PoolName)); + Threads[i].Thread.Reset(new TExecutorThread(i, 0, actorSystem, this, MailboxTable.Get(), PoolName)); } *scheduleReaders = &ScheduleQueue->Reader; diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp index 477a98ba88..0b24398ac6 100644 --- a/ydb/library/actors/core/executor_thread.cpp +++ b/ydb/library/actors/core/executor_thread.cpp @@ -30,21 +30,19 @@ LWTRACE_USING(ACTORLIB_PROVIDER) namespace NActors { - constexpr TDuration TExecutorThread::DEFAULT_TIME_PER_MAILBOX; + constexpr TDuration TGenericExecutorThread::DEFAULT_TIME_PER_MAILBOX; - TExecutorThread::TExecutorThread( + TGenericExecutorThread::TGenericExecutorThread( TWorkerId workerId, TWorkerId cpuId, TActorSystem* actorSystem, IExecutorPool* executorPool, TMailboxTable* mailboxTable, - TExecutorThreadCtx *threadCtx, const TString& threadName, TDuration timePerMailbox, ui32 eventsPerMailbox) : ActorSystem(actorSystem) , ExecutorPool(executorPool) - , ThreadCtx(threadCtx) , Ctx(workerId, cpuId) , ThreadName(threadName) , TimePerMailbox(timePerMailbox) @@ -57,20 +55,18 @@ namespace NActors { eventsPerMailbox, ui64(-1), // infinite soft deadline &Ctx.WorkerStats); - Ctx.ThreadCtx = ThreadCtx; } - TExecutorThread::TExecutorThread(TWorkerId workerId, + TGenericExecutorThread::TGenericExecutorThread(TWorkerId workerId, TActorSystem* actorSystem, - TExecutorThreadCtx *threadCtx, + IExecutorPool* executorPool, i16 poolCount, const TString& threadName, ui64 softProcessingDurationTs, TDuration timePerMailbox, ui32 eventsPerMailbox) : ActorSystem(actorSystem) - , ExecutorPool(threadCtx->OwnerExecutorPool) - , ThreadCtx(threadCtx) + , ExecutorPool(executorPool) , Ctx(workerId, 0) , ThreadName(threadName) , IsSharedThread(true) @@ -81,27 +77,36 @@ namespace NActors { { Ctx.Switch( ExecutorPool, - static_cast<TExecutorPoolBaseMailboxed*>(threadCtx->OwnerExecutorPool)->MailboxTable.Get(), + static_cast<TExecutorPoolBaseMailboxed*>(executorPool)->MailboxTable.Get(), NHPTimer::GetClockRate() * timePerMailbox.SecondsFloat(), eventsPerMailbox, ui64(-1), // infinite soft deadline &SharedStats[ExecutorPool->PoolId]); - Ctx.ThreadCtx = ThreadCtx; } + TSharedExecutorThread::TSharedExecutorThread(TWorkerId workerId, + TActorSystem* actorSystem, + TSharedExecutorThreadCtx *threadCtx, + i16 poolCount, + const TString& threadName, + ui64 softProcessingDurationTs, + TDuration timePerMailbox, + ui32 eventsPerMailbox) + : TGenericExecutorThread(workerId, actorSystem, threadCtx->OwnerExecutorPool, poolCount, threadName, softProcessingDurationTs, timePerMailbox, eventsPerMailbox) + , ThreadCtx(threadCtx) + {} - - TExecutorThread::~TExecutorThread() + TGenericExecutorThread::~TGenericExecutorThread() { } - void TExecutorThread::UnregisterActor(TMailboxHeader* mailbox, TActorId actorId) { + void TGenericExecutorThread::UnregisterActor(TMailboxHeader* mailbox, TActorId actorId) { Y_DEBUG_ABORT_UNLESS(actorId.PoolID() == ExecutorPool->PoolId && ExecutorPool->ResolveMailbox(actorId.Hint()) == mailbox); IActor* actor = mailbox->DetachActor(actorId.LocalId()); Ctx.DecrementActorsAliveByActivity(actor->GetActivityType()); DyingActors.push_back(THolder(actor)); } - void TExecutorThread::DropUnregistered() { + void TGenericExecutorThread::DropUnregistered() { #if defined(ACTORSLIB_COLLECT_EXEC_STATS) if (ActorSystem->MonitorStuckActors()) { if (auto *pool = dynamic_cast<TExecutorPoolBaseMailboxed*>(ExecutorPool)) { @@ -121,17 +126,17 @@ namespace NActors { DyingActors.clear(); // here is actual destruction of actors } - void TExecutorThread::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { + void TGenericExecutorThread::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { ++CurrentActorScheduledEventsCounter; Ctx.Executor->Schedule(deadline, ev, cookie, Ctx.WorkerId); } - void TExecutorThread::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { + void TGenericExecutorThread::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { ++CurrentActorScheduledEventsCounter; Ctx.Executor->Schedule(deadline, ev, cookie, Ctx.WorkerId); } - void TExecutorThread::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { + void TGenericExecutorThread::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) { ++CurrentActorScheduledEventsCounter; Ctx.Executor->Schedule(delta, ev, cookie, Ctx.WorkerId); } @@ -172,7 +177,7 @@ namespace NActors { } template <typename TMailbox> - TExecutorThread::TProcessingResult TExecutorThread::Execute(TMailbox* mailbox, ui32 hint, bool isTailExecution) { + TGenericExecutorThread::TProcessingResult TGenericExecutorThread::Execute(TMailbox* mailbox, ui32 hint, bool isTailExecution) { Y_DEBUG_ABORT_UNLESS(DyingActors.empty()); bool reclaimAsFree = false; @@ -359,7 +364,7 @@ namespace NActors { return {preempted, wasWorking}; } - TThreadId TExecutorThread::GetThreadId() const { + TThreadId TGenericExecutorThread::GetThreadId() const { #ifdef _linux_ while (AtomicLoad(&ThreadId) == UnknownThreadId) { NanoSleep(1000); @@ -368,11 +373,11 @@ namespace NActors { return ThreadId; } - TWorkerId TExecutorThread::GetWorkerId() const { + TWorkerId TGenericExecutorThread::GetWorkerId() const { return Ctx.WorkerId; } - TExecutorThread::TProcessingResult TExecutorThread::ProcessExecutorPool(IExecutorPool *pool) { + TGenericExecutorThread::TProcessingResult TGenericExecutorThread::ProcessExecutorPool(IExecutorPool *pool) { ExecutorPool = pool; TlsThreadContext->Pool = ExecutorPool; TlsThreadContext->WorkerId = Ctx.WorkerId; @@ -477,11 +482,31 @@ namespace NActors { return {IsSharedThread, wasWorking}; } - void TExecutorThread::UpdatePools() { + void* TExecutorThread::ThreadProc() { +#ifdef _linux_ + pid_t tid = syscall(SYS_gettid); + AtomicSet(ThreadId, (ui64)tid); +#endif + +#ifdef BALLOC + ThreadDisableBalloc(); +#endif + + TThreadContext threadCtx; + TlsThreadContext = &threadCtx; + if (ThreadName) { + ::SetCurrentThreadName(ThreadName); + } + + ProcessExecutorPool(ExecutorPool); + return nullptr; + } + + void TSharedExecutorThread::UpdatePools() { NeedToReloadPools = EState::NeedToReloadPools; } - TExecutorThread::TProcessingResult TExecutorThread::ProcessSharedExecutorPool(TExecutorPoolBaseMailboxed *pool) { + TGenericExecutorThread::TProcessingResult TSharedExecutorThread::ProcessSharedExecutorPool(TExecutorPoolBaseMailboxed *pool) { Ctx.Switch( pool, pool->MailboxTable.Get(), @@ -495,7 +520,7 @@ namespace NActors { return ProcessExecutorPool(pool); } - void* TExecutorThread::ThreadProc() { + void* TSharedExecutorThread::ThreadProc() { #ifdef _linux_ pid_t tid = syscall(SYS_gettid); AtomicSet(ThreadId, (ui64)tid); @@ -746,11 +771,11 @@ namespace NActors { } } - void TExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const { + void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const { Ctx.GetCurrentStats(statsCopy); } - void TExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const { + void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const { statsCopy = TExecutorThreadStats(); statsCopy.Aggregate(SharedStats[poolId]); } diff --git a/ydb/library/actors/core/executor_thread.h b/ydb/library/actors/core/executor_thread.h index 750eac126f..261a05ab2a 100644 --- a/ydb/library/actors/core/executor_thread.h +++ b/ydb/library/actors/core/executor_thread.h @@ -15,14 +15,11 @@ namespace NActors { class IActor; class TActorSystem; struct TExecutorThreadCtx; + struct TSharedExecutorThreadCtx; class TExecutorPoolBaseMailboxed; - class TExecutorThread: public ISimpleThread { - enum class EState : ui64 { - Running = 0, - NeedToReloadPools, - }; - + class TGenericExecutorThread: public ISimpleThread { + protected: struct TProcessingResult { bool IsPreempted = false; bool WasWorking = false; @@ -33,60 +30,25 @@ namespace NActors { TDuration::MilliSeconds(10); static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = 100; - TExecutorThread(TWorkerId workerId, + TGenericExecutorThread(TWorkerId workerId, TWorkerId cpuId, TActorSystem* actorSystem, IExecutorPool* executorPool, TMailboxTable* mailboxTable, - TExecutorThreadCtx *threadCtx, const TString& threadName, TDuration timePerMailbox = DEFAULT_TIME_PER_MAILBOX, ui32 eventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX); - TExecutorThread(TWorkerId workerId, - TWorkerId cpuId, - TActorSystem* actorSystem, - IExecutorPool* executorPool, - TMailboxTable* mailboxTable, - const TString& threadName, - TDuration timePerMailbox = DEFAULT_TIME_PER_MAILBOX, - ui32 eventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX) - : TExecutorThread(workerId, cpuId, actorSystem, executorPool, mailboxTable, nullptr, threadName, timePerMailbox, eventsPerMailbox) - {} - - TExecutorThread(TWorkerId workerId, - TActorSystem* actorSystem, - IExecutorPool* executorPool, - TMailboxTable* mailboxTable, - TExecutorThreadCtx *threadCtx, - const TString& threadName, - TDuration timePerMailbox = DEFAULT_TIME_PER_MAILBOX, - ui32 eventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX) - : TExecutorThread(workerId, 0, actorSystem, executorPool, mailboxTable, threadCtx, threadName, timePerMailbox, eventsPerMailbox) - {} - - TExecutorThread(TWorkerId workerId, - TActorSystem* actorSystem, - IExecutorPool* executorPool, - TMailboxTable* mailboxTable, - const TString& threadName, - TDuration timePerMailbox = DEFAULT_TIME_PER_MAILBOX, - ui32 eventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX) - : TExecutorThread(workerId, 0, actorSystem, executorPool, mailboxTable, nullptr, threadName, timePerMailbox, eventsPerMailbox) - {} - - TExecutorThread(TWorkerId workerId, + TGenericExecutorThread(TWorkerId workerId, TActorSystem* actorSystem, - TExecutorThreadCtx *threadCtx, + IExecutorPool* executorPool, i16 poolCount, const TString& threadName, ui64 softProcessingDurationTs, TDuration timePerMailbox, ui32 eventsPerMailbox); - virtual ~TExecutorThread(); - - void UpdatePools(); + virtual ~TGenericExecutorThread(); template <ESendingType SendingType = ESendingType::Common> TActorId RegisterActor(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 poolId = Max<ui32>(), @@ -110,11 +72,8 @@ namespace NActors { TThreadId GetThreadId() const; // blocks, must be called after Start() TWorkerId GetWorkerId() const; - private: - void* ThreadProc(); - + protected: TProcessingResult ProcessExecutorPool(IExecutorPool *pool); - TProcessingResult ProcessSharedExecutorPool(TExecutorPoolBaseMailboxed *pool); template <typename TMailbox> TProcessingResult Execute(TMailbox* mailbox, ui32 hint, bool isTailExecution); @@ -123,10 +82,9 @@ namespace NActors { TActorSystem* const ActorSystem; std::atomic<bool> StopFlag = false; - private: + protected: // Pool-specific IExecutorPool* ExecutorPool; - TExecutorThreadCtx *ThreadCtx; // Event-specific (currently executing) TVector<THolder<IActor>> DyingActors; @@ -145,10 +103,71 @@ namespace NActors { ui32 EventsPerMailbox; ui64 SoftProcessingDurationTs; - std::atomic<EState> NeedToReloadPools = EState::NeedToReloadPools; std::vector<TExecutorThreadStats> SharedStats; }; + class TExecutorThread: public TGenericExecutorThread { + public: + TExecutorThread(TWorkerId workerId, + TWorkerId cpuId, + TActorSystem* actorSystem, + IExecutorPool* executorPool, + TMailboxTable* mailboxTable, + const TString& threadName, + TDuration timePerMailbox = DEFAULT_TIME_PER_MAILBOX, + ui32 eventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX) + : TGenericExecutorThread(workerId, cpuId, actorSystem, executorPool, mailboxTable, threadName, timePerMailbox, eventsPerMailbox) + {} + + + TExecutorThread(TWorkerId workerId, + TActorSystem* actorSystem, + IExecutorPool* executorPool, + TMailboxTable* mailboxTable, + const TString& threadName, + TDuration timePerMailbox = DEFAULT_TIME_PER_MAILBOX, + ui32 eventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX) + : TGenericExecutorThread(workerId, 0, actorSystem, executorPool, mailboxTable, threadName, timePerMailbox, eventsPerMailbox) + {} + + virtual ~TExecutorThread() + {} + + private: + void* ThreadProc(); + }; + + class TSharedExecutorThread: public TGenericExecutorThread { + enum class EState : ui64 { + Running = 0, + NeedToReloadPools, + }; + + public: + TSharedExecutorThread(TWorkerId workerId, + TActorSystem* actorSystem, + TSharedExecutorThreadCtx *threadCtx, + i16 poolCount, + const TString& threadName, + ui64 softProcessingDurationTs, + TDuration timePerMailbox, + ui32 eventsPerMailbox); + + virtual ~TSharedExecutorThread() + {} + + void UpdatePools(); + + private: + TProcessingResult ProcessSharedExecutorPool(TExecutorPoolBaseMailboxed *pool); + + void* ThreadProc(); + + std::atomic<EState> NeedToReloadPools = EState::NeedToReloadPools; + + TSharedExecutorThreadCtx *ThreadCtx; + }; + template <typename TMailbox> void UnlockFromExecution(TMailbox* mailbox, IExecutorPool* executorPool, bool asFree, ui32 hint, TWorkerId workerId, ui64& revolvingWriteCounter) { mailbox->UnlockFromExecution1(); diff --git a/ydb/library/actors/core/executor_thread_ctx.h b/ydb/library/actors/core/executor_thread_ctx.h index 67554885b2..e9bcb500f6 100644 --- a/ydb/library/actors/core/executor_thread_ctx.h +++ b/ydb/library/actors/core/executor_thread_ctx.h @@ -8,8 +8,9 @@ namespace NActors { - class TExecutorThread; + class TGenericExecutorThread; class TBasicExecutorPool; + class TIOExecutorPool; enum class EThreadState : ui64 { None, @@ -19,7 +20,10 @@ namespace NActors { }; struct TGenericExecutorThreadCtx { - TAutoPtr<TExecutorThread> Thread; + TAutoPtr<TGenericExecutorThread> Thread; + + protected: + friend class TIOExecutorPool; TThreadParkPad WaitingPad; private: @@ -28,6 +32,7 @@ namespace NActors { public: ui64 StartWakingTs = 0; + protected: template <typename TWaitState> TWaitState GetState() { return TWaitState(WaitingFlag.load()); @@ -46,7 +51,6 @@ namespace NActors { return result; } - protected: template <typename TDerived, typename TWaitState> void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) { ui64 start = GetCycleCountFast(); @@ -107,6 +111,14 @@ namespace NActors { TBasicExecutorPool *OwnerExecutorPool = nullptr; + void SetWork() { + ExchangeState(EThreadState::Work); + } + + void UnsetWork() { + ExchangeState(EThreadState::None); + } + void Spin(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag) { this->TBase::Spin<TExecutorThreadCtx, EThreadState>(spinThresholdCycles, stopFlag); } @@ -117,6 +129,12 @@ namespace NActors { bool Wait(ui64 spinThresholdCycles, std::atomic<bool> *stopFlag); // in executor_pool_basic.cpp + bool WakeUp(); + + void Interrupt() { + WaitingPad.Interrupt(); + } + void AfterWakeUp(EThreadState /*state*/) { } @@ -154,6 +172,7 @@ namespace NActors { } }; + TBasicExecutorPool *OwnerExecutorPool = nullptr; std::atomic<TBasicExecutorPool*> ExecutorPools[MaxPoolsForSharedThreads]; ui32 NextPool = 0; diff --git a/ydb/library/actors/core/worker_context.h b/ydb/library/actors/core/worker_context.h index a4cbf3a0a7..67e731c9ed 100644 --- a/ydb/library/actors/core/worker_context.h +++ b/ydb/library/actors/core/worker_context.h @@ -35,7 +35,6 @@ namespace NActors { bool IsNeededToWaitNextActivation = true; i64 HPStart = 0; ui32 ExecutedEvents = 0; - TExecutorThreadCtx *ThreadCtx; TWorkerContext(TWorkerId workerId, TCpuId cpuId) : WorkerId(workerId) diff --git a/ydb/library/actors/testlib/test_runtime.cpp b/ydb/library/actors/testlib/test_runtime.cpp index e281e57a6c..1d4794ee0f 100644 --- a/ydb/library/actors/testlib/test_runtime.cpp +++ b/ydb/library/actors/testlib/test_runtime.cpp @@ -533,7 +533,7 @@ namespace NActors { node->SchedulerPool.Reset(CreateExecutorPoolStub(this, nodeIndex, node, 0)); node->MailboxTable.Reset(new TMailboxTable()); node->ActorSystem = MakeActorSystem(nodeIndex, node); - node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), nullptr, "TestExecutor")); + node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor")); } else { node->ActorSystem = MakeActorSystem(nodeIndex, node); } |