diff options
author | kruall <kruall@ydb.tech> | 2023-07-26 14:09:12 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2023-07-26 14:09:12 +0300 |
commit | ba9a310064ff9f159b0b96a5f8b1a606c3e0e595 (patch) | |
tree | 31c7a74f253f918edd3e99aae0e094656cef90ca | |
parent | b10d9c13aca0559db5483430ebb41823e79f9d6d (diff) | |
download | ydb-ba9a310064ff9f159b0b96a5f8b1a606c3e0e595.tar.gz |
Add several pools for one thread, KIKIMR-18440
-rw-r--r-- | library/cpp/actors/core/actor_ut.cpp | 88 | ||||
-rw-r--r-- | library/cpp/actors/core/actorsystem.h | 4 | ||||
-rw-r--r-- | library/cpp/actors/core/config.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/cpu_manager.cpp | 11 | ||||
-rw-r--r-- | library/cpp/actors/core/cpu_manager.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/defs.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool.h | 9 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_base.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.cpp | 220 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.h | 20 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_io.cpp | 12 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_united.cpp | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_united_workers.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_thread.cpp | 101 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_thread.h | 18 | ||||
-rw-r--r-- | library/cpp/actors/core/harmonizer.cpp | 5 | ||||
-rw-r--r-- | library/cpp/actors/core/worker_context.h | 4 |
17 files changed, 350 insertions, 154 deletions
diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp index 6901cc298d..0d0721fae3 100644 --- a/library/cpp/actors/core/actor_ut.cpp +++ b/library/cpp/actors/core/actor_ut.cpp @@ -135,13 +135,15 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { ui32 MailboxNeighboursCount; }; - void AddBasicPool(THolder<TActorSystemSetup>& setup, ui32 threads, bool activateEveryEvent) { + void AddBasicPool(THolder<TActorSystemSetup>& setup, ui32 threads, bool activateEveryEvent, i16 sharedExecutorsCount) { TBasicExecutorPoolConfig basic; basic.PoolId = setup->GetExecutorsCount(); basic.PoolName = TStringBuilder() << "b" << basic.PoolId; basic.Threads = threads; basic.SpinThreshold = DefaultSpinThreshold; basic.TimePerMailbox = TDuration::Hours(1); + basic.SharedExecutorsCount = sharedExecutorsCount; + basic.SoftProcessingDurationTs = Us2Ts(100); if (activateEveryEvent) { basic.EventsPerMailbox = 1; } else { @@ -192,7 +194,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { if (poolType == EPoolType::Basic) { THolder<TActorSystemSetup> setup = GetActorSystemSetup(0, false); for (ui32 i = 0; i < poolsCount; ++i) { - AddBasicPool(setup, threads, activateEveryEvent); + AddBasicPool(setup, threads, activateEveryEvent, 0); } return setup; } else if (poolType == EPoolType::United) { @@ -339,6 +341,88 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) { TMailboxType::TinyReadAsFilled }; + Y_UNIT_TEST(WithSharedExecutors) { + THolder<TActorSystemSetup> setup = GetActorSystemSetup(0, false); + AddBasicPool(setup, 2, 1, 0); + AddBasicPool(setup, 2, 1, 1); + + TActorSystem actorSystem(setup); + actorSystem.Start(); + + TThreadParkPad pad; + TAtomic actorsAlive = 0; + THPTimer Timer; + + Timer.Reset(); + for (ui32 i = 0; i < 50; ++i) { + ui32 followerPoolId = 0; + ui32 leaderPoolId = 0; + TActorId followerId = actorSystem.Register( + new TSendReceiveActor(nullptr, {}, true, ERole::Follower, ESendingType::Common), TMailboxType::HTSwap, followerPoolId); + THolder<IActor> leader{ + new TTestEndDecorator(THolder( + new TSendReceiveActor(nullptr, followerId, true, ERole::Leader, ESendingType::Common)), &pad, &actorsAlive)}; + actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId); + } + for (ui32 i = 0; i < 10; ++i) { + ui32 followerPoolId = 1; + ui32 leaderPoolId = 1; + TActorId followerId = actorSystem.Register( + new TSendReceiveActor(nullptr, {}, true, ERole::Follower, ESendingType::Common), TMailboxType::HTSwap, followerPoolId); + THolder<IActor> leader{ + new TTestEndDecorator(THolder( + new TSendReceiveActor(nullptr, followerId, true, ERole::Leader, ESendingType::Common)), &pad, &actorsAlive)}; + actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId); + } + + pad.Park(); + auto elapsedTime = Timer.Passed() / TotalEventsAmount; + actorSystem.Stop(); + + Cerr << "Completed " << 1e9 * elapsedTime << Endl; + } + + Y_UNIT_TEST(WithoutSharedExecutors) { + THolder<TActorSystemSetup> setup = GetActorSystemSetup(0, false); + AddBasicPool(setup, 2, 1, 0); + AddBasicPool(setup, 2, 1, 0); + + TActorSystem actorSystem(setup); + actorSystem.Start(); + + TThreadParkPad pad; + TAtomic actorsAlive = 0; + THPTimer Timer; + + Timer.Reset(); + for (ui32 i = 0; i < 50; ++i) { + ui32 followerPoolId = 0; + ui32 leaderPoolId = 0; + TActorId followerId = actorSystem.Register( + new TSendReceiveActor(nullptr, {}, true, ERole::Follower, ESendingType::Common), TMailboxType::HTSwap, followerPoolId); + THolder<IActor> leader{ + new TTestEndDecorator(THolder( + new TSendReceiveActor(nullptr, followerId, true, ERole::Leader, ESendingType::Common)), &pad, &actorsAlive)}; + actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId); + } + for (ui32 i = 0; i < 10; ++i) { + ui32 followerPoolId = 1; + ui32 leaderPoolId = 1; + TActorId followerId = actorSystem.Register( + new TSendReceiveActor(nullptr, {}, true, ERole::Follower, ESendingType::Common), TMailboxType::HTSwap, followerPoolId); + THolder<IActor> leader{ + new TTestEndDecorator(THolder( + new TSendReceiveActor(nullptr, followerId, true, ERole::Leader, ESendingType::Common)), &pad, &actorsAlive)}; + actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId); + } + + pad.Park(); + auto elapsedTime = Timer.Passed() / TotalEventsAmount; + actorSystem.Stop(); + + Cerr << "Completed " << 1e9 * elapsedTime << Endl; + } + Y_UNIT_TEST(SendReceive1Pool1ThreadAlloc) { for (const auto& mType : MailboxTypes) { auto stats = CountStats([mType] { diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index 821b94ada9..64a8e827d2 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -304,5 +304,9 @@ namespace NActors { DeferredPreStop.push_back(std::move(fn)); } + TVector<IExecutorPool*> GetBasicExecutorPools() const { + return CpuManager->GetBasicExecutorPools(); + } + }; } diff --git a/library/cpp/actors/core/config.h b/library/cpp/actors/core/config.h index 220cb3d9d1..ab35998241 100644 --- a/library/cpp/actors/core/config.h +++ b/library/cpp/actors/core/config.h @@ -44,6 +44,8 @@ namespace NActors { i16 MaxThreadCount = 0; i16 DefaultThreadCount = 0; i16 Priority = 0; + i16 SharedExecutorsCount = 0; + i16 SoftProcessingDurationTs = 0; }; struct TIOExecutorPoolConfig { diff --git a/library/cpp/actors/core/cpu_manager.cpp b/library/cpp/actors/core/cpu_manager.cpp index 4aabb014dd..3ec6cea5a3 100644 --- a/library/cpp/actors/core/cpu_manager.cpp +++ b/library/cpp/actors/core/cpu_manager.cpp @@ -134,4 +134,15 @@ namespace NActors { } Y_FAIL("missing PoolId: %d", int(poolId)); } + + TVector<IExecutorPool*> TCpuManager::GetBasicExecutorPools() const { + TVector<IExecutorPool*> pools; + for (ui32 idx = 0; idx < ExecutorPoolCount; ++idx) { + if (auto basicPool = dynamic_cast<TBasicExecutorPool*>(Executors[idx].Get()); basicPool != nullptr) { + pools.push_back(basicPool); + } + } + return pools; + } + } diff --git a/library/cpp/actors/core/cpu_manager.h b/library/cpp/actors/core/cpu_manager.h index c3a7588639..26ba97aa39 100644 --- a/library/cpp/actors/core/cpu_manager.h +++ b/library/cpp/actors/core/cpu_manager.h @@ -26,6 +26,8 @@ namespace NActors { void Shutdown(); void Cleanup(); + TVector<IExecutorPool*> GetBasicExecutorPools() const; + ui32 GetExecutorsCount() const { return ExecutorPoolCount; } diff --git a/library/cpp/actors/core/defs.h b/library/cpp/actors/core/defs.h index 779473efc9..276ebef219 100644 --- a/library/cpp/actors/core/defs.h +++ b/library/cpp/actors/core/defs.h @@ -30,7 +30,7 @@ namespace NActors { static constexpr TPoolWeight DefPoolWeight = 32; static constexpr TPoolWeight MaxPoolWeight = 1024; - using TWorkerId = ui16; + using TWorkerId = i16; static constexpr TWorkerId WorkerBits = 11; static constexpr TWorkerId MaxWorkers = 1 << WorkerBits; diff --git a/library/cpp/actors/core/executor_pool.h b/library/cpp/actors/core/executor_pool.h index 5adcf4d0d5..ae251c14bb 100644 --- a/library/cpp/actors/core/executor_pool.h +++ b/library/cpp/actors/core/executor_pool.h @@ -113,11 +113,11 @@ namespace NActors { virtual void SetRealTimeMode() const {} - virtual ui32 GetThreadCount() const { + virtual i16 GetThreadCount() const { return 1; } - virtual void SetThreadCount(ui32 threads) { + virtual void SetThreadCount(i16 threads) { Y_UNUSED(threads); } @@ -138,11 +138,6 @@ namespace NActors { } - virtual bool IsThreadBeingStopped(i16 threadIdx) const { - Y_UNUSED(threadIdx); - return false; - } - virtual TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) { Y_UNUSED(threadIdx); return TCpuConsumption{0.0, 0.0}; diff --git a/library/cpp/actors/core/executor_pool_base.h b/library/cpp/actors/core/executor_pool_base.h index 8a5214c92c..60714f8441 100644 --- a/library/cpp/actors/core/executor_pool_base.h +++ b/library/cpp/actors/core/executor_pool_base.h @@ -41,7 +41,7 @@ namespace NActors { class TExecutorPoolBase: public TExecutorPoolBaseMailboxed { protected: - const ui32 PoolThreads; + const i16 PoolThreads; TIntrusivePtr<TAffinity> ThreadsAffinity; TAtomic Semaphore = 0; TUnorderedCache<ui32, 512, 4> Activations; diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index 6f0d785ff5..6bf21252f5 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -86,26 +86,15 @@ namespace NActors { cfg.DefaultThreadCount, cfg.Priority ) - {} + { + SetSharedExecutorsCount(cfg.SharedExecutorsCount); + SoftProcessingDurationTs = cfg.SoftProcessingDurationTs; + } TBasicExecutorPool::~TBasicExecutorPool() { Threads.Destroy(); } - bool TBasicExecutorPool::GoToBeBlocked(TThreadCtx& threadCtx, TTimers &timers) { - do { - if (AtomicCas(&threadCtx.BlockedFlag, TThreadCtx::BS_BLOCKED, TThreadCtx::BS_BLOCKING)) { - timers.HPNow = GetCycleCountFast(); - timers.Elapsed += timers.HPNow - timers.HPStart; - if (threadCtx.BlockedPad.Park()) // interrupted - return true; - timers.HPStart = GetCycleCountFast(); - timers.Blocked += timers.HPStart - timers.HPNow; - } - } while (AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE && !RelaxedLoad(&StopFlag)); - return false; - } - bool TBasicExecutorPool::GoToSleep(TThreadCtx& threadCtx, TTimers &timers) { do { timers.HPNow = GetCycleCountFast(); @@ -162,7 +151,8 @@ namespace NActors { } #endif - Y_VERIFY(AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE); + TAtomic state = AtomicLoad(&threadCtx.WaitingFlag); + Y_VERIFY(state == TThreadCtx::WS_NONE, "WaitingFlag# %d", int(state)); if (SpinThreshold > 0 && !needToBlock) { // spin configured period @@ -183,7 +173,7 @@ namespace NActors { } } - Y_VERIFY_DEBUG(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_RUNNING); + Y_VERIFY_DEBUG(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE); #if defined ACTORSLIB_COLLECT_EXEC_STATS if (AtomicDecrement(ThreadUtilization) == 0) { @@ -207,8 +197,41 @@ namespace NActors { return false; } + void TBasicExecutorPool::AskToGoToSleep(bool *needToWait, bool *needToBlock) { + TAtomic x = AtomicGet(Semaphore); + do { + i64 oldX = x; + TSemaphore semaphore = TSemaphore::GetSemaphore(x);; + if (semaphore.CurrentSleepThreadCount < 0) { + semaphore.CurrentSleepThreadCount++; + x = AtomicGetAndCas(&Semaphore, semaphore.ConverToI64(), x); + if (x == oldX) { + *needToWait = true; + *needToBlock = true; + return; + } + continue; + } + + if (semaphore.OldSemaphore == 0) { + semaphore.CurrentSleepThreadCount++; + x = AtomicGetAndCas(&Semaphore, semaphore.ConverToI64(), x); + if (x == oldX) { + *needToWait = true; + *needToBlock = false; + return; + } + continue; + } + + *needToWait = false; + *needToBlock = false; + return; + } while (true); + } + ui32 TBasicExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) { - ui32 workerId = wctx.WorkerId; + TWorkerId workerId = wctx.WorkerId; Y_VERIFY_DEBUG(workerId < PoolThreads); TTimers timers; @@ -218,62 +241,53 @@ namespace NActors { Harmonizer->Harmonize(timers.HPStart); } - TThreadCtx& threadCtx = Threads[workerId]; - AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_NONE); - - bool needToWait = false; - bool needToBlock = false; + if (workerId >= 0) { + AtomicSet(Threads[workerId].WaitingFlag, TThreadCtx::WS_NONE); + } TAtomic x = AtomicGet(Semaphore); + TSemaphore semaphore = TSemaphore::GetSemaphore(x); do { - i64 oldX = x; - TSemaphore semaphore = TSemaphore::GetSemaphore(x); - needToBlock = semaphore.CurrentSleepThreadCount < 0; - needToWait = needToBlock || semaphore.OldSemaphore <= -semaphore.CurrentSleepThreadCount; - - if (needToWait && wctx.HasCapturedMessageBox) { - timers.HPNow = GetCycleCountFast(); - wctx.AddElapsedCycles(ActorSystemIndex, timers.HPNow - timers.HPStart); - return 0; - } - - semaphore.OldSemaphore--; - if (needToWait) { - semaphore.CurrentSleepThreadCount++; - } - - x = AtomicGetAndCas(&Semaphore, semaphore.ConverToI64(), x); - if (x == oldX) { - break; - } - } while (!RelaxedLoad(&StopFlag)); - - if (needToWait) { - if (GoToWaiting(threadCtx, timers, needToBlock)) { // interrupted - return 0; - } - } else { - AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING); - } + if (!semaphore.OldSemaphore || semaphore.CurrentSleepThreadCount < 0) { + if (workerId < 0 || !wctx.IsNeededToWaitNextActivation) { + timers.HPNow = GetCycleCountFast(); + wctx.AddElapsedCycles(ActorSystemIndex, timers.HPNow - timers.HPStart); + return 0; + } - // ok, has work suggested, must dequeue - while (!RelaxedLoad(&StopFlag)) { - if (const ui32 activation = Activations.Pop(++revolvingCounter)) { - timers.HPNow = GetCycleCountFast(); - timers.Elapsed += timers.HPNow - timers.HPStart; - wctx.AddElapsedCycles(ActorSystemIndex, timers.Elapsed); - if (timers.Parked > 0) { - wctx.AddParkedCycles(timers.Parked); + bool needToWait = false; + bool needToBlock = false; + AskToGoToSleep(&needToWait, &needToBlock); + if (needToWait) { + if (GoToWaiting(Threads[workerId], timers, needToBlock)) { // interrupted + return 0; + } } - if (timers.Blocked > 0) { - wctx.AddBlockedCycles(timers.Blocked); + } else { + if (const ui32 activation = Activations.Pop(++revolvingCounter)) { + if (workerId >= 0) { + AtomicSet(Threads[workerId].WaitingFlag, TThreadCtx::WS_RUNNING); + } + AtomicDecrement(Semaphore); + timers.HPNow = GetCycleCountFast(); + timers.Elapsed += timers.HPNow - timers.HPStart; + wctx.AddElapsedCycles(ActorSystemIndex, timers.Elapsed); + if (timers.Parked > 0) { + wctx.AddParkedCycles(timers.Parked); + } + if (timers.Blocked > 0) { + wctx.AddBlockedCycles(timers.Blocked); + } + return activation; } - return activation; + semaphore.CurrentSleepThreadCount++; } + SpinLockPause(); - } + x = AtomicGet(Semaphore); + semaphore = TSemaphore::GetSemaphore(x); + } while (!RelaxedLoad(&StopFlag)); - // stopping, die! return 0; } @@ -284,13 +298,13 @@ namespace NActors { switch (state) { case TThreadCtx::WS_NONE: case TThreadCtx::WS_RUNNING: - if (++i >= MaxThreadCount) { + if (++i >= MaxThreadCount - SharedExecutorsCount) { i = 0; } break; case TThreadCtx::WS_ACTIVE: case TThreadCtx::WS_BLOCKED: - if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, state)) { + if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_NONE, state)) { if (state == TThreadCtx::WS_BLOCKED) { threadCtx.Pad.Unpark(); } @@ -313,7 +327,7 @@ namespace NActors { TAtomic x = AtomicGet(Semaphore); TSemaphore semaphore = TSemaphore::GetSemaphore(x); do { - needToWakeUp = semaphore.CurrentSleepThreadCount > 0; + needToWakeUp = semaphore.CurrentSleepThreadCount > SharedExecutorsCount; i64 oldX = semaphore.ConverToI64(); semaphore.OldSemaphore++; if (needToWakeUp) { @@ -362,7 +376,7 @@ namespace NActors { RecalculateStuckActors(statsCopy[0]); #endif // Per-thread stats - for (size_t i = 0; i < PoolThreads; ++i) { + for (i16 i = 0; i < PoolThreads; ++i) { Threads[i].Thread->GetCurrentStats(statsCopy[i + 1]); } } @@ -375,17 +389,29 @@ namespace NActors { ScheduleReaders.Reset(new NSchedulerQueue::TReader[PoolThreads]); ScheduleWriters.Reset(new NSchedulerQueue::TWriter[PoolThreads]); - for (ui32 i = 0; i != PoolThreads; ++i) { - Threads[i].Thread.Reset( - new TExecutorThread( - i, - 0, // CpuId is not used in BASIC pool - actorSystem, - this, - MailboxTable.Get(), - PoolName, - TimePerMailbox, - EventsPerMailbox)); + for (i16 i = 0; i != PoolThreads; ++i) { + if (i < MaxThreadCount - SharedExecutorsCount) { + Threads[i].Thread.Reset( + new TExecutorThread( + i, + 0, // CpuId is not used in BASIC pool + actorSystem, + this, + MailboxTable.Get(), + PoolName, + TimePerMailbox, + EventsPerMailbox)); + } else { + Threads[i].Thread.Reset( + new TExecutorThread( + i, + actorSystem, + actorSystem->GetBasicExecutorPools(), + PoolName, + SoftProcessingDurationTs, + TimePerMailbox, + EventsPerMailbox)); + } ScheduleWriters[i].Init(ScheduleReaders[i]); } @@ -399,21 +425,22 @@ namespace NActors { ThreadUtilization = 0; AtomicAdd(MaxUtilizationCounter, -(i64)GetCycleCountFast()); - for (ui32 i = 0; i != PoolThreads; ++i) { + for (i16 i = 0; i != PoolThreads; ++i) { Threads[i].Thread->Start(); } } void TBasicExecutorPool::PrepareStop() { AtomicStore(&StopFlag, true); - for (ui32 i = 0; i != PoolThreads; ++i) { + for (i16 i = 0; i != PoolThreads; ++i) { Threads[i].Pad.Interrupt(); Threads[i].BlockedPad.Interrupt(); + AtomicStore(&Threads[i].Thread->StopFlag, TAtomic(true)); } } void TBasicExecutorPool::Shutdown() { - for (ui32 i = 0; i != PoolThreads; ++i) + for (i16 i = 0; i != PoolThreads; ++i) Threads[i].Thread->Join(); } @@ -456,24 +483,22 @@ namespace NActors { #endif } - ui32 TBasicExecutorPool::GetThreadCount() const { + i16 TBasicExecutorPool::GetThreadCount() const { return AtomicGet(ThreadCount); } - void TBasicExecutorPool::SetThreadCount(ui32 threads) { - threads = Max(1u, Min(PoolThreads, threads)); + void TBasicExecutorPool::SetThreadCount(i16 threads) { + threads = Max(i16(1), Min(PoolThreads, threads)); with_lock (ChangeThreadsLock) { - size_t prevCount = GetThreadCount(); + i16 prevCount = GetThreadCount(); AtomicSet(ThreadCount, threads); TSemaphore semaphore = TSemaphore::GetSemaphore(AtomicGet(Semaphore)); i64 oldX = semaphore.ConverToI64(); semaphore.CurrentThreadCount = threads; if (threads > prevCount) { semaphore.CurrentSleepThreadCount += (i64)threads - prevCount; - semaphore.OldSemaphore -= (i64)threads - prevCount; } else { semaphore.CurrentSleepThreadCount -= (i64)prevCount - threads; - semaphore.OldSemaphore += prevCount - threads; } AtomicAdd(Semaphore, semaphore.ConverToI64() - oldX); LWPROBE(ThreadCount, PoolId, PoolName, threads, MinThreadCount, MaxThreadCount, DefaultThreadCount); @@ -492,19 +517,8 @@ namespace NActors { return MaxThreadCount; } - bool TBasicExecutorPool::IsThreadBeingStopped(i16 threadIdx) const { - if ((ui32)threadIdx >= PoolThreads) { - return false; - } - auto blockedFlag = AtomicGet(Threads[threadIdx].BlockedFlag); - if (blockedFlag == TThreadCtx::BS_BLOCKING) { - return true; - } - return false; - } - TCpuConsumption TBasicExecutorPool::GetThreadCpuConsumption(i16 threadIdx) { - if ((ui32)threadIdx >= PoolThreads) { + if (threadIdx >= PoolThreads) { return {0.0, 0.0}; } TThreadCtx& threadCtx = Threads[threadIdx]; @@ -522,4 +536,8 @@ namespace NActors { i16 TBasicExecutorPool::GetPriority() const { return Priority; } + + void TBasicExecutorPool::SetSharedExecutorsCount(i16 count) { + SharedExecutorsCount = count; + } } diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h index 2a727fafd5..8adb5773e2 100644 --- a/library/cpp/actors/core/executor_pool_basic.h +++ b/library/cpp/actors/core/executor_pool_basic.h @@ -23,7 +23,7 @@ namespace NActors { // different threads must spin/block on different cache-lines. // we add some padding bytes to enforce this rule - static const size_t SizeWithoutPadding = sizeof(TAutoPtr<TExecutorThread>) + 2 * sizeof(TThreadParkPad) + 2 * sizeof(TAtomic); + static const size_t SizeWithoutPadding = sizeof(TAutoPtr<TExecutorThread>) + 2 * sizeof(TThreadParkPad) + sizeof(TAtomic); ui8 Padding[64 - SizeWithoutPadding]; static_assert(64 >= SizeWithoutPadding); @@ -34,15 +34,8 @@ namespace NActors { WS_RUNNING }; - enum EBlockedState { - BS_NONE, - BS_BLOCKING, - BS_BLOCKED - }; - TThreadCtx() : WaitingFlag(WS_NONE) - , BlockedFlag(BS_NONE) { } }; @@ -81,6 +74,8 @@ namespace NActors { i16 MaxThreadCount; i16 DefaultThreadCount; IHarmonizer *Harmonizer; + i16 SharedExecutorsCount = 0; + ui64 SoftProcessingDurationTs = 0; const i16 Priority = 0; const ui32 ActorSystemIndex = NActors::TActorTypeOperator::GetActorSystemIndex(); @@ -128,6 +123,8 @@ namespace NActors { explicit TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer); ~TBasicExecutorPool(); + void SetSharedExecutorsCount(i16 count); + ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingReadCounter) override; void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; @@ -148,17 +145,18 @@ namespace NActors { void SetRealTimeMode() const override; - ui32 GetThreadCount() const override; - void SetThreadCount(ui32 threads) override; + i16 GetThreadCount() const override; + void SetThreadCount(i16 threads) override; i16 GetDefaultThreadCount() const override; i16 GetMinThreadCount() const override; i16 GetMaxThreadCount() const override; - bool IsThreadBeingStopped(i16 threadIdx) const override; TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) override; i16 GetBlockingThreadCount() const override; i16 GetPriority() const override; private: + void AskToGoToSleep(bool *needToWait, bool *needToBlock); + void WakeUpLoop(i16 currentThreadCount); bool GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock); void GoToSpin(TThreadCtx& threadCtx); diff --git a/library/cpp/actors/core/executor_pool_io.cpp b/library/cpp/actors/core/executor_pool_io.cpp index 80b927c486..539b6348b2 100644 --- a/library/cpp/actors/core/executor_pool_io.cpp +++ b/library/cpp/actors/core/executor_pool_io.cpp @@ -27,7 +27,7 @@ namespace NActors { } ui32 TIOExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) { - ui32 workerId = wctx.WorkerId; + i16 workerId = wctx.WorkerId; Y_VERIFY_DEBUG(workerId < PoolThreads); NHPTimer::STime elapsed = 0; @@ -108,7 +108,7 @@ namespace NActors { ScheduleQueue.Reset(new NSchedulerQueue::TQueueType()); - for (ui32 i = 0; i != PoolThreads; ++i) { + for (i16 i = 0; i != PoolThreads; ++i) { Threads[i].Thread.Reset(new TExecutorThread(i, 0, actorSystem, this, MailboxTable.Get(), PoolName)); } @@ -119,18 +119,18 @@ namespace NActors { void TIOExecutorPool::Start() { TAffinityGuard affinityGuard(Affinity()); - for (ui32 i = 0; i != PoolThreads; ++i) + for (i16 i = 0; i != PoolThreads; ++i) Threads[i].Thread->Start(); } void TIOExecutorPool::PrepareStop() { AtomicStore(&StopFlag, true); - for (ui32 i = 0; i != PoolThreads; ++i) + for (i16 i = 0; i != PoolThreads; ++i) Threads[i].Pad.Interrupt(); } void TIOExecutorPool::Shutdown() { - for (ui32 i = 0; i != PoolThreads; ++i) + for (i16 i = 0; i != PoolThreads; ++i) Threads[i].Thread->Join(); } @@ -140,7 +140,7 @@ namespace NActors { statsCopy[0] = TExecutorThreadStats(); statsCopy[0].Aggregate(Stats); // Per-thread stats - for (size_t i = 0; i < PoolThreads; ++i) { + for (i16 i = 0; i < PoolThreads; ++i) { Threads[i].Thread->GetCurrentStats(statsCopy[i + 1]); } } diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp index eb4c8348d7..2e689dec59 100644 --- a/library/cpp/actors/core/executor_pool_united.cpp +++ b/library/cpp/actors/core/executor_pool_united.cpp @@ -1168,7 +1168,7 @@ namespace NActors { // Setup cpu-local workers if (cpu.LocalManager) { - for (size_t i = 0; i < cpu.LocalManager->WorkerCount(); i++) { + for (i16 i = 0; i < cpu.LocalManager->WorkerCount(); i++) { TWorkerId workerId = workers++; cpu.LocalManager->AddWorker(workerId); diff --git a/library/cpp/actors/core/executor_pool_united_workers.h b/library/cpp/actors/core/executor_pool_united_workers.h index b088b582f6..c683ae7d9f 100644 --- a/library/cpp/actors/core/executor_pool_united_workers.h +++ b/library/cpp/actors/core/executor_pool_united_workers.h @@ -18,7 +18,7 @@ namespace NActors { struct TPool; struct TCpu; - size_t WorkerCount; + i16 WorkerCount; TArrayHolder<TWorker> Workers; // indexed by WorkerId size_t PoolCount; TArrayHolder<TPool> Pools; // indexed by PoolId, so may include not used (not united) pools diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index 0b6045693b..e8fff6857c 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -53,6 +53,24 @@ namespace NActors { &Ctx.WorkerStats); } + TExecutorThread::TExecutorThread(TWorkerId workerId, + TActorSystem* actorSystem, + TVector<IExecutorPool*> executorPools, + const TString& threadName, + ui64 softProcessingDurationTs, + TDuration timePerMailbox, + ui32 eventsPerMailbox) + : ActorSystem(actorSystem) + , AvailableExecutorPools(executorPools) + , Ctx(workerId, 0) + , ThreadName(threadName) + , IsUnitedWorker(false) + , TimePerMailbox(timePerMailbox) + , EventsPerMailbox(eventsPerMailbox) + , SoftProcessingDurationTs(softProcessingDurationTs) + {} + + TExecutorThread::~TExecutorThread() { } @@ -325,19 +343,8 @@ namespace NActors { return ThreadId; } - void* TExecutorThread::ThreadProc() { -#ifdef _linux_ - pid_t tid = syscall(SYS_gettid); - AtomicSet(ThreadId, (ui64)tid); -#endif - -#ifdef BALLOC - ThreadDisableBalloc(); -#endif - - if (ThreadName) { - ::SetCurrentThreadName(ThreadName); - } + void TExecutorThread::ProcessExecutorPool(IExecutorPool *pool, bool isSharedThread) { + ExecutorPool = pool; TThreadContext threadCtx; TlsThreadContext = &threadCtx; TlsThreadContext->Pool = static_cast<IExecutorPool*>(ExecutorPool); @@ -352,6 +359,8 @@ namespace NActors { i64 execCycles = 0; i64 nonExecCycles = 0; + bool needToStop = false; + auto executeActivation = [&]<bool IsTailExecution>(ui32 activation) { LWTRACK(ActivationBegin, Ctx.Orbit, Ctx.CpuId, Ctx.PoolId, Ctx.WorkerId, NHPTimer::GetSeconds(Ctx.Lease.GetPreciseExpireTs()) * 1e3); readyActivationCount++; @@ -394,6 +403,10 @@ namespace NActors { Ctx.UpdateThreadTime(); } + if (isSharedThread && (ui64)hpnow > Ctx.SoftDeadlineTs) { + needToStop = true; + } + if (!TlsThreadContext->IsEnoughCpu) { Ctx.IncreaseNotEnoughCpuExecutions(); TlsThreadContext->IsEnoughCpu = true; @@ -404,14 +417,14 @@ namespace NActors { Ctx.Orbit.Reset(); }; - for (;;) { + while (!needToStop) { if (TlsThreadContext->CapturedType == ESendingType::Tail) { TlsThreadContext->CapturedType = ESendingType::Lazy; ui32 activation = std::exchange(TlsThreadContext->CapturedActivation, 0); executeActivation.operator()<true>(activation); continue; } - Ctx.HasCapturedMessageBox = TlsThreadContext->CapturedActivation; + Ctx.IsNeededToWaitNextActivation = !TlsThreadContext->CapturedActivation && !isSharedThread; ui32 activation = ExecutorPool->GetReadyActivation(Ctx, ++RevolvingReadCounter); if (!activation) { activation = std::exchange(TlsThreadContext->CapturedActivation, 0); @@ -424,6 +437,64 @@ namespace NActors { } executeActivation.operator()<false>(activation); } + } + + void* TExecutorThread::ThreadProc() { +#ifdef _linux_ + pid_t tid = syscall(SYS_gettid); + AtomicSet(ThreadId, (ui64)tid); +#endif + +#ifdef BALLOC + ThreadDisableBalloc(); +#endif + + if (ThreadName) { + ::SetCurrentThreadName(ThreadName); + } + + + std::vector<TExecutorPoolBaseMailboxed*> pools; + pools.reserve(AvailableExecutorPools.size()); + for (auto pool : AvailableExecutorPools) { + TExecutorPoolBaseMailboxed* mailboxedPool = dynamic_cast<TExecutorPoolBaseMailboxed*>(pool); + if (mailboxedPool) { + pools.push_back(mailboxedPool); + } + } + + if (pools.size() == 1) { + ExecutorPool = pools[0]; + Ctx.Switch( + pools[0], + pools[0]->MailboxTable.Get(), + NHPTimer::GetClockRate() * TimePerMailbox.SecondsFloat(), + EventsPerMailbox, + GetCycleCountFast() + SoftProcessingDurationTs, + &Ctx.WorkerStats); + } + + if (pools.size() <= 1) { + ProcessExecutorPool(ExecutorPool, false); + } else { + while (true) { + for (auto pool : pools) { + Ctx.Switch( + pool, + pool->MailboxTable.Get(), + NHPTimer::GetClockRate() * TimePerMailbox.SecondsFloat(), + EventsPerMailbox, + GetCycleCountFast() + SoftProcessingDurationTs, + &Ctx.WorkerStats); + Ctx.WorkerId = -1; + ProcessExecutorPool(pool, true); + if (RelaxedLoad(&StopFlag)) { + return nullptr; + } + } + } + } + return nullptr; } diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h index 1e78a34c2d..a5aa3ed8a7 100644 --- a/library/cpp/actors/core/executor_thread.h +++ b/library/cpp/actors/core/executor_thread.h @@ -40,6 +40,14 @@ namespace NActors { : TExecutorThread(workerId, 0, actorSystem, executorPool, mailboxTable, threadName, timePerMailbox, eventsPerMailbox) {} + TExecutorThread(TWorkerId workerId, + TActorSystem* actorSystem, + TVector<IExecutorPool*> executorPools, + const TString& threadName, + ui64 softProcessingDurationTs, + TDuration timePerMailbox, + ui32 eventsPerMailbox); + virtual ~TExecutorThread(); template <ESendingType SendingType = ESendingType::Common> @@ -66,15 +74,19 @@ namespace NActors { private: void* ThreadProc(); + void ProcessExecutorPool(IExecutorPool *pool, bool isSharedThread); + template <typename TMailbox, bool IsTailExecution = false> bool Execute(TMailbox* mailbox, ui32 hint); public: TActorSystem* const ActorSystem; + TAtomic StopFlag = false; private: // Pool-specific - IExecutorPool* const ExecutorPool; + IExecutorPool* ExecutorPool; + TVector<IExecutorPool*> AvailableExecutorPools; // Event-specific (currently executing) TVector<THolder<IActor>> DyingActors; @@ -88,6 +100,10 @@ namespace NActors { const TString ThreadName; volatile TThreadId ThreadId = UnknownThreadId; bool IsUnitedWorker = false; + + TDuration TimePerMailbox; + ui32 EventsPerMailbox; + ui64 SoftProcessingDurationTs; }; template <typename TMailbox> diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp index 7e350ac2cb..1a67cf1f24 100644 --- a/library/cpp/actors/core/harmonizer.cpp +++ b/library/cpp/actors/core/harmonizer.cpp @@ -190,7 +190,6 @@ struct TPoolInfo { TAtomic MaxBookedCpu = 0; TAtomic MinBookedCpu = 0; - bool IsBeingStopped(i16 threadIdx); double GetBooked(i16 threadIdx); double GetlastSecondPoolBooked(i16 threadIdx); double GetConsumed(i16 threadIdx); @@ -201,10 +200,6 @@ struct TPoolInfo { bool IsAvgPingGood(); }; -bool TPoolInfo::IsBeingStopped(i16 threadIdx) { - return Pool->IsThreadBeingStopped(threadIdx); -} - double TPoolInfo::GetBooked(i16 threadIdx) { if ((size_t)threadIdx < ThreadInfo.size()) { return ThreadInfo[threadIdx].Booked.GetAvgPart(); diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h index 1ad78e2207..7ee9394cd9 100644 --- a/library/cpp/actors/core/worker_context.h +++ b/library/cpp/actors/core/worker_context.h @@ -18,7 +18,7 @@ namespace NActors { struct TWorkerContext { - const TWorkerId WorkerId; + TWorkerId WorkerId; const TCpuId CpuId; TLease Lease; IExecutorPool* Executor = nullptr; @@ -30,7 +30,7 @@ namespace NActors { TExecutorThreadStats WorkerStats; TPoolId PoolId = MaxPools; mutable NLWTrace::TOrbit Orbit; - bool HasCapturedMessageBox = false; + bool IsNeededToWaitNextActivation = true; i64 HPStart = 0; ui32 ExecutedEvents = 0; |