aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2023-07-26 14:09:12 +0300
committerkruall <kruall@ydb.tech>2023-07-26 14:09:12 +0300
commitba9a310064ff9f159b0b96a5f8b1a606c3e0e595 (patch)
tree31c7a74f253f918edd3e99aae0e094656cef90ca
parentb10d9c13aca0559db5483430ebb41823e79f9d6d (diff)
downloadydb-ba9a310064ff9f159b0b96a5f8b1a606c3e0e595.tar.gz
Add several pools for one thread, KIKIMR-18440
-rw-r--r--library/cpp/actors/core/actor_ut.cpp88
-rw-r--r--library/cpp/actors/core/actorsystem.h4
-rw-r--r--library/cpp/actors/core/config.h2
-rw-r--r--library/cpp/actors/core/cpu_manager.cpp11
-rw-r--r--library/cpp/actors/core/cpu_manager.h2
-rw-r--r--library/cpp/actors/core/defs.h2
-rw-r--r--library/cpp/actors/core/executor_pool.h9
-rw-r--r--library/cpp/actors/core/executor_pool_base.h2
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp220
-rw-r--r--library/cpp/actors/core/executor_pool_basic.h20
-rw-r--r--library/cpp/actors/core/executor_pool_io.cpp12
-rw-r--r--library/cpp/actors/core/executor_pool_united.cpp2
-rw-r--r--library/cpp/actors/core/executor_pool_united_workers.h2
-rw-r--r--library/cpp/actors/core/executor_thread.cpp101
-rw-r--r--library/cpp/actors/core/executor_thread.h18
-rw-r--r--library/cpp/actors/core/harmonizer.cpp5
-rw-r--r--library/cpp/actors/core/worker_context.h4
17 files changed, 350 insertions, 154 deletions
diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp
index 6901cc298d..0d0721fae3 100644
--- a/library/cpp/actors/core/actor_ut.cpp
+++ b/library/cpp/actors/core/actor_ut.cpp
@@ -135,13 +135,15 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
ui32 MailboxNeighboursCount;
};
- void AddBasicPool(THolder<TActorSystemSetup>& setup, ui32 threads, bool activateEveryEvent) {
+ void AddBasicPool(THolder<TActorSystemSetup>& setup, ui32 threads, bool activateEveryEvent, i16 sharedExecutorsCount) {
TBasicExecutorPoolConfig basic;
basic.PoolId = setup->GetExecutorsCount();
basic.PoolName = TStringBuilder() << "b" << basic.PoolId;
basic.Threads = threads;
basic.SpinThreshold = DefaultSpinThreshold;
basic.TimePerMailbox = TDuration::Hours(1);
+ basic.SharedExecutorsCount = sharedExecutorsCount;
+ basic.SoftProcessingDurationTs = Us2Ts(100);
if (activateEveryEvent) {
basic.EventsPerMailbox = 1;
} else {
@@ -192,7 +194,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
if (poolType == EPoolType::Basic) {
THolder<TActorSystemSetup> setup = GetActorSystemSetup(0, false);
for (ui32 i = 0; i < poolsCount; ++i) {
- AddBasicPool(setup, threads, activateEveryEvent);
+ AddBasicPool(setup, threads, activateEveryEvent, 0);
}
return setup;
} else if (poolType == EPoolType::United) {
@@ -339,6 +341,88 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
TMailboxType::TinyReadAsFilled
};
+ Y_UNIT_TEST(WithSharedExecutors) {
+ THolder<TActorSystemSetup> setup = GetActorSystemSetup(0, false);
+ AddBasicPool(setup, 2, 1, 0);
+ AddBasicPool(setup, 2, 1, 1);
+
+ TActorSystem actorSystem(setup);
+ actorSystem.Start();
+
+ TThreadParkPad pad;
+ TAtomic actorsAlive = 0;
+ THPTimer Timer;
+
+ Timer.Reset();
+ for (ui32 i = 0; i < 50; ++i) {
+ ui32 followerPoolId = 0;
+ ui32 leaderPoolId = 0;
+ TActorId followerId = actorSystem.Register(
+ new TSendReceiveActor(nullptr, {}, true, ERole::Follower, ESendingType::Common), TMailboxType::HTSwap, followerPoolId);
+ THolder<IActor> leader{
+ new TTestEndDecorator(THolder(
+ new TSendReceiveActor(nullptr, followerId, true, ERole::Leader, ESendingType::Common)), &pad, &actorsAlive)};
+ actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId);
+ }
+ for (ui32 i = 0; i < 10; ++i) {
+ ui32 followerPoolId = 1;
+ ui32 leaderPoolId = 1;
+ TActorId followerId = actorSystem.Register(
+ new TSendReceiveActor(nullptr, {}, true, ERole::Follower, ESendingType::Common), TMailboxType::HTSwap, followerPoolId);
+ THolder<IActor> leader{
+ new TTestEndDecorator(THolder(
+ new TSendReceiveActor(nullptr, followerId, true, ERole::Leader, ESendingType::Common)), &pad, &actorsAlive)};
+ actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId);
+ }
+
+ pad.Park();
+ auto elapsedTime = Timer.Passed() / TotalEventsAmount;
+ actorSystem.Stop();
+
+ Cerr << "Completed " << 1e9 * elapsedTime << Endl;
+ }
+
+ Y_UNIT_TEST(WithoutSharedExecutors) {
+ THolder<TActorSystemSetup> setup = GetActorSystemSetup(0, false);
+ AddBasicPool(setup, 2, 1, 0);
+ AddBasicPool(setup, 2, 1, 0);
+
+ TActorSystem actorSystem(setup);
+ actorSystem.Start();
+
+ TThreadParkPad pad;
+ TAtomic actorsAlive = 0;
+ THPTimer Timer;
+
+ Timer.Reset();
+ for (ui32 i = 0; i < 50; ++i) {
+ ui32 followerPoolId = 0;
+ ui32 leaderPoolId = 0;
+ TActorId followerId = actorSystem.Register(
+ new TSendReceiveActor(nullptr, {}, true, ERole::Follower, ESendingType::Common), TMailboxType::HTSwap, followerPoolId);
+ THolder<IActor> leader{
+ new TTestEndDecorator(THolder(
+ new TSendReceiveActor(nullptr, followerId, true, ERole::Leader, ESendingType::Common)), &pad, &actorsAlive)};
+ actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId);
+ }
+ for (ui32 i = 0; i < 10; ++i) {
+ ui32 followerPoolId = 1;
+ ui32 leaderPoolId = 1;
+ TActorId followerId = actorSystem.Register(
+ new TSendReceiveActor(nullptr, {}, true, ERole::Follower, ESendingType::Common), TMailboxType::HTSwap, followerPoolId);
+ THolder<IActor> leader{
+ new TTestEndDecorator(THolder(
+ new TSendReceiveActor(nullptr, followerId, true, ERole::Leader, ESendingType::Common)), &pad, &actorsAlive)};
+ actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId);
+ }
+
+ pad.Park();
+ auto elapsedTime = Timer.Passed() / TotalEventsAmount;
+ actorSystem.Stop();
+
+ Cerr << "Completed " << 1e9 * elapsedTime << Endl;
+ }
+
Y_UNIT_TEST(SendReceive1Pool1ThreadAlloc) {
for (const auto& mType : MailboxTypes) {
auto stats = CountStats([mType] {
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 821b94ada9..64a8e827d2 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -304,5 +304,9 @@ namespace NActors {
DeferredPreStop.push_back(std::move(fn));
}
+ TVector<IExecutorPool*> GetBasicExecutorPools() const {
+ return CpuManager->GetBasicExecutorPools();
+ }
+
};
}
diff --git a/library/cpp/actors/core/config.h b/library/cpp/actors/core/config.h
index 220cb3d9d1..ab35998241 100644
--- a/library/cpp/actors/core/config.h
+++ b/library/cpp/actors/core/config.h
@@ -44,6 +44,8 @@ namespace NActors {
i16 MaxThreadCount = 0;
i16 DefaultThreadCount = 0;
i16 Priority = 0;
+ i16 SharedExecutorsCount = 0;
+ i16 SoftProcessingDurationTs = 0;
};
struct TIOExecutorPoolConfig {
diff --git a/library/cpp/actors/core/cpu_manager.cpp b/library/cpp/actors/core/cpu_manager.cpp
index 4aabb014dd..3ec6cea5a3 100644
--- a/library/cpp/actors/core/cpu_manager.cpp
+++ b/library/cpp/actors/core/cpu_manager.cpp
@@ -134,4 +134,15 @@ namespace NActors {
}
Y_FAIL("missing PoolId: %d", int(poolId));
}
+
+ TVector<IExecutorPool*> TCpuManager::GetBasicExecutorPools() const {
+ TVector<IExecutorPool*> pools;
+ for (ui32 idx = 0; idx < ExecutorPoolCount; ++idx) {
+ if (auto basicPool = dynamic_cast<TBasicExecutorPool*>(Executors[idx].Get()); basicPool != nullptr) {
+ pools.push_back(basicPool);
+ }
+ }
+ return pools;
+ }
+
}
diff --git a/library/cpp/actors/core/cpu_manager.h b/library/cpp/actors/core/cpu_manager.h
index c3a7588639..26ba97aa39 100644
--- a/library/cpp/actors/core/cpu_manager.h
+++ b/library/cpp/actors/core/cpu_manager.h
@@ -26,6 +26,8 @@ namespace NActors {
void Shutdown();
void Cleanup();
+ TVector<IExecutorPool*> GetBasicExecutorPools() const;
+
ui32 GetExecutorsCount() const {
return ExecutorPoolCount;
}
diff --git a/library/cpp/actors/core/defs.h b/library/cpp/actors/core/defs.h
index 779473efc9..276ebef219 100644
--- a/library/cpp/actors/core/defs.h
+++ b/library/cpp/actors/core/defs.h
@@ -30,7 +30,7 @@ namespace NActors {
static constexpr TPoolWeight DefPoolWeight = 32;
static constexpr TPoolWeight MaxPoolWeight = 1024;
- using TWorkerId = ui16;
+ using TWorkerId = i16;
static constexpr TWorkerId WorkerBits = 11;
static constexpr TWorkerId MaxWorkers = 1 << WorkerBits;
diff --git a/library/cpp/actors/core/executor_pool.h b/library/cpp/actors/core/executor_pool.h
index 5adcf4d0d5..ae251c14bb 100644
--- a/library/cpp/actors/core/executor_pool.h
+++ b/library/cpp/actors/core/executor_pool.h
@@ -113,11 +113,11 @@ namespace NActors {
virtual void SetRealTimeMode() const {}
- virtual ui32 GetThreadCount() const {
+ virtual i16 GetThreadCount() const {
return 1;
}
- virtual void SetThreadCount(ui32 threads) {
+ virtual void SetThreadCount(i16 threads) {
Y_UNUSED(threads);
}
@@ -138,11 +138,6 @@ namespace NActors {
}
- virtual bool IsThreadBeingStopped(i16 threadIdx) const {
- Y_UNUSED(threadIdx);
- return false;
- }
-
virtual TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) {
Y_UNUSED(threadIdx);
return TCpuConsumption{0.0, 0.0};
diff --git a/library/cpp/actors/core/executor_pool_base.h b/library/cpp/actors/core/executor_pool_base.h
index 8a5214c92c..60714f8441 100644
--- a/library/cpp/actors/core/executor_pool_base.h
+++ b/library/cpp/actors/core/executor_pool_base.h
@@ -41,7 +41,7 @@ namespace NActors {
class TExecutorPoolBase: public TExecutorPoolBaseMailboxed {
protected:
- const ui32 PoolThreads;
+ const i16 PoolThreads;
TIntrusivePtr<TAffinity> ThreadsAffinity;
TAtomic Semaphore = 0;
TUnorderedCache<ui32, 512, 4> Activations;
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index 6f0d785ff5..6bf21252f5 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -86,26 +86,15 @@ namespace NActors {
cfg.DefaultThreadCount,
cfg.Priority
)
- {}
+ {
+ SetSharedExecutorsCount(cfg.SharedExecutorsCount);
+ SoftProcessingDurationTs = cfg.SoftProcessingDurationTs;
+ }
TBasicExecutorPool::~TBasicExecutorPool() {
Threads.Destroy();
}
- bool TBasicExecutorPool::GoToBeBlocked(TThreadCtx& threadCtx, TTimers &timers) {
- do {
- if (AtomicCas(&threadCtx.BlockedFlag, TThreadCtx::BS_BLOCKED, TThreadCtx::BS_BLOCKING)) {
- timers.HPNow = GetCycleCountFast();
- timers.Elapsed += timers.HPNow - timers.HPStart;
- if (threadCtx.BlockedPad.Park()) // interrupted
- return true;
- timers.HPStart = GetCycleCountFast();
- timers.Blocked += timers.HPStart - timers.HPNow;
- }
- } while (AtomicGet(threadCtx.BlockedFlag) != TThreadCtx::BS_NONE && !RelaxedLoad(&StopFlag));
- return false;
- }
-
bool TBasicExecutorPool::GoToSleep(TThreadCtx& threadCtx, TTimers &timers) {
do {
timers.HPNow = GetCycleCountFast();
@@ -162,7 +151,8 @@ namespace NActors {
}
#endif
- Y_VERIFY(AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE);
+ TAtomic state = AtomicLoad(&threadCtx.WaitingFlag);
+ Y_VERIFY(state == TThreadCtx::WS_NONE, "WaitingFlag# %d", int(state));
if (SpinThreshold > 0 && !needToBlock) {
// spin configured period
@@ -183,7 +173,7 @@ namespace NActors {
}
}
- Y_VERIFY_DEBUG(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_RUNNING);
+ Y_VERIFY_DEBUG(AtomicLoad(&StopFlag) || AtomicLoad(&threadCtx.WaitingFlag) == TThreadCtx::WS_NONE);
#if defined ACTORSLIB_COLLECT_EXEC_STATS
if (AtomicDecrement(ThreadUtilization) == 0) {
@@ -207,8 +197,41 @@ namespace NActors {
return false;
}
+ void TBasicExecutorPool::AskToGoToSleep(bool *needToWait, bool *needToBlock) {
+ TAtomic x = AtomicGet(Semaphore);
+ do {
+ i64 oldX = x;
+ TSemaphore semaphore = TSemaphore::GetSemaphore(x);;
+ if (semaphore.CurrentSleepThreadCount < 0) {
+ semaphore.CurrentSleepThreadCount++;
+ x = AtomicGetAndCas(&Semaphore, semaphore.ConverToI64(), x);
+ if (x == oldX) {
+ *needToWait = true;
+ *needToBlock = true;
+ return;
+ }
+ continue;
+ }
+
+ if (semaphore.OldSemaphore == 0) {
+ semaphore.CurrentSleepThreadCount++;
+ x = AtomicGetAndCas(&Semaphore, semaphore.ConverToI64(), x);
+ if (x == oldX) {
+ *needToWait = true;
+ *needToBlock = false;
+ return;
+ }
+ continue;
+ }
+
+ *needToWait = false;
+ *needToBlock = false;
+ return;
+ } while (true);
+ }
+
ui32 TBasicExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) {
- ui32 workerId = wctx.WorkerId;
+ TWorkerId workerId = wctx.WorkerId;
Y_VERIFY_DEBUG(workerId < PoolThreads);
TTimers timers;
@@ -218,62 +241,53 @@ namespace NActors {
Harmonizer->Harmonize(timers.HPStart);
}
- TThreadCtx& threadCtx = Threads[workerId];
- AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_NONE);
-
- bool needToWait = false;
- bool needToBlock = false;
+ if (workerId >= 0) {
+ AtomicSet(Threads[workerId].WaitingFlag, TThreadCtx::WS_NONE);
+ }
TAtomic x = AtomicGet(Semaphore);
+ TSemaphore semaphore = TSemaphore::GetSemaphore(x);
do {
- i64 oldX = x;
- TSemaphore semaphore = TSemaphore::GetSemaphore(x);
- needToBlock = semaphore.CurrentSleepThreadCount < 0;
- needToWait = needToBlock || semaphore.OldSemaphore <= -semaphore.CurrentSleepThreadCount;
-
- if (needToWait && wctx.HasCapturedMessageBox) {
- timers.HPNow = GetCycleCountFast();
- wctx.AddElapsedCycles(ActorSystemIndex, timers.HPNow - timers.HPStart);
- return 0;
- }
-
- semaphore.OldSemaphore--;
- if (needToWait) {
- semaphore.CurrentSleepThreadCount++;
- }
-
- x = AtomicGetAndCas(&Semaphore, semaphore.ConverToI64(), x);
- if (x == oldX) {
- break;
- }
- } while (!RelaxedLoad(&StopFlag));
-
- if (needToWait) {
- if (GoToWaiting(threadCtx, timers, needToBlock)) { // interrupted
- return 0;
- }
- } else {
- AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING);
- }
+ if (!semaphore.OldSemaphore || semaphore.CurrentSleepThreadCount < 0) {
+ if (workerId < 0 || !wctx.IsNeededToWaitNextActivation) {
+ timers.HPNow = GetCycleCountFast();
+ wctx.AddElapsedCycles(ActorSystemIndex, timers.HPNow - timers.HPStart);
+ return 0;
+ }
- // ok, has work suggested, must dequeue
- while (!RelaxedLoad(&StopFlag)) {
- if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
- timers.HPNow = GetCycleCountFast();
- timers.Elapsed += timers.HPNow - timers.HPStart;
- wctx.AddElapsedCycles(ActorSystemIndex, timers.Elapsed);
- if (timers.Parked > 0) {
- wctx.AddParkedCycles(timers.Parked);
+ bool needToWait = false;
+ bool needToBlock = false;
+ AskToGoToSleep(&needToWait, &needToBlock);
+ if (needToWait) {
+ if (GoToWaiting(Threads[workerId], timers, needToBlock)) { // interrupted
+ return 0;
+ }
}
- if (timers.Blocked > 0) {
- wctx.AddBlockedCycles(timers.Blocked);
+ } else {
+ if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
+ if (workerId >= 0) {
+ AtomicSet(Threads[workerId].WaitingFlag, TThreadCtx::WS_RUNNING);
+ }
+ AtomicDecrement(Semaphore);
+ timers.HPNow = GetCycleCountFast();
+ timers.Elapsed += timers.HPNow - timers.HPStart;
+ wctx.AddElapsedCycles(ActorSystemIndex, timers.Elapsed);
+ if (timers.Parked > 0) {
+ wctx.AddParkedCycles(timers.Parked);
+ }
+ if (timers.Blocked > 0) {
+ wctx.AddBlockedCycles(timers.Blocked);
+ }
+ return activation;
}
- return activation;
+ semaphore.CurrentSleepThreadCount++;
}
+
SpinLockPause();
- }
+ x = AtomicGet(Semaphore);
+ semaphore = TSemaphore::GetSemaphore(x);
+ } while (!RelaxedLoad(&StopFlag));
- // stopping, die!
return 0;
}
@@ -284,13 +298,13 @@ namespace NActors {
switch (state) {
case TThreadCtx::WS_NONE:
case TThreadCtx::WS_RUNNING:
- if (++i >= MaxThreadCount) {
+ if (++i >= MaxThreadCount - SharedExecutorsCount) {
i = 0;
}
break;
case TThreadCtx::WS_ACTIVE:
case TThreadCtx::WS_BLOCKED:
- if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_RUNNING, state)) {
+ if (AtomicCas(&threadCtx.WaitingFlag, TThreadCtx::WS_NONE, state)) {
if (state == TThreadCtx::WS_BLOCKED) {
threadCtx.Pad.Unpark();
}
@@ -313,7 +327,7 @@ namespace NActors {
TAtomic x = AtomicGet(Semaphore);
TSemaphore semaphore = TSemaphore::GetSemaphore(x);
do {
- needToWakeUp = semaphore.CurrentSleepThreadCount > 0;
+ needToWakeUp = semaphore.CurrentSleepThreadCount > SharedExecutorsCount;
i64 oldX = semaphore.ConverToI64();
semaphore.OldSemaphore++;
if (needToWakeUp) {
@@ -362,7 +376,7 @@ namespace NActors {
RecalculateStuckActors(statsCopy[0]);
#endif
// Per-thread stats
- for (size_t i = 0; i < PoolThreads; ++i) {
+ for (i16 i = 0; i < PoolThreads; ++i) {
Threads[i].Thread->GetCurrentStats(statsCopy[i + 1]);
}
}
@@ -375,17 +389,29 @@ namespace NActors {
ScheduleReaders.Reset(new NSchedulerQueue::TReader[PoolThreads]);
ScheduleWriters.Reset(new NSchedulerQueue::TWriter[PoolThreads]);
- for (ui32 i = 0; i != PoolThreads; ++i) {
- Threads[i].Thread.Reset(
- new TExecutorThread(
- i,
- 0, // CpuId is not used in BASIC pool
- actorSystem,
- this,
- MailboxTable.Get(),
- PoolName,
- TimePerMailbox,
- EventsPerMailbox));
+ for (i16 i = 0; i != PoolThreads; ++i) {
+ if (i < MaxThreadCount - SharedExecutorsCount) {
+ Threads[i].Thread.Reset(
+ new TExecutorThread(
+ i,
+ 0, // CpuId is not used in BASIC pool
+ actorSystem,
+ this,
+ MailboxTable.Get(),
+ PoolName,
+ TimePerMailbox,
+ EventsPerMailbox));
+ } else {
+ Threads[i].Thread.Reset(
+ new TExecutorThread(
+ i,
+ actorSystem,
+ actorSystem->GetBasicExecutorPools(),
+ PoolName,
+ SoftProcessingDurationTs,
+ TimePerMailbox,
+ EventsPerMailbox));
+ }
ScheduleWriters[i].Init(ScheduleReaders[i]);
}
@@ -399,21 +425,22 @@ namespace NActors {
ThreadUtilization = 0;
AtomicAdd(MaxUtilizationCounter, -(i64)GetCycleCountFast());
- for (ui32 i = 0; i != PoolThreads; ++i) {
+ for (i16 i = 0; i != PoolThreads; ++i) {
Threads[i].Thread->Start();
}
}
void TBasicExecutorPool::PrepareStop() {
AtomicStore(&StopFlag, true);
- for (ui32 i = 0; i != PoolThreads; ++i) {
+ for (i16 i = 0; i != PoolThreads; ++i) {
Threads[i].Pad.Interrupt();
Threads[i].BlockedPad.Interrupt();
+ AtomicStore(&Threads[i].Thread->StopFlag, TAtomic(true));
}
}
void TBasicExecutorPool::Shutdown() {
- for (ui32 i = 0; i != PoolThreads; ++i)
+ for (i16 i = 0; i != PoolThreads; ++i)
Threads[i].Thread->Join();
}
@@ -456,24 +483,22 @@ namespace NActors {
#endif
}
- ui32 TBasicExecutorPool::GetThreadCount() const {
+ i16 TBasicExecutorPool::GetThreadCount() const {
return AtomicGet(ThreadCount);
}
- void TBasicExecutorPool::SetThreadCount(ui32 threads) {
- threads = Max(1u, Min(PoolThreads, threads));
+ void TBasicExecutorPool::SetThreadCount(i16 threads) {
+ threads = Max(i16(1), Min(PoolThreads, threads));
with_lock (ChangeThreadsLock) {
- size_t prevCount = GetThreadCount();
+ i16 prevCount = GetThreadCount();
AtomicSet(ThreadCount, threads);
TSemaphore semaphore = TSemaphore::GetSemaphore(AtomicGet(Semaphore));
i64 oldX = semaphore.ConverToI64();
semaphore.CurrentThreadCount = threads;
if (threads > prevCount) {
semaphore.CurrentSleepThreadCount += (i64)threads - prevCount;
- semaphore.OldSemaphore -= (i64)threads - prevCount;
} else {
semaphore.CurrentSleepThreadCount -= (i64)prevCount - threads;
- semaphore.OldSemaphore += prevCount - threads;
}
AtomicAdd(Semaphore, semaphore.ConverToI64() - oldX);
LWPROBE(ThreadCount, PoolId, PoolName, threads, MinThreadCount, MaxThreadCount, DefaultThreadCount);
@@ -492,19 +517,8 @@ namespace NActors {
return MaxThreadCount;
}
- bool TBasicExecutorPool::IsThreadBeingStopped(i16 threadIdx) const {
- if ((ui32)threadIdx >= PoolThreads) {
- return false;
- }
- auto blockedFlag = AtomicGet(Threads[threadIdx].BlockedFlag);
- if (blockedFlag == TThreadCtx::BS_BLOCKING) {
- return true;
- }
- return false;
- }
-
TCpuConsumption TBasicExecutorPool::GetThreadCpuConsumption(i16 threadIdx) {
- if ((ui32)threadIdx >= PoolThreads) {
+ if (threadIdx >= PoolThreads) {
return {0.0, 0.0};
}
TThreadCtx& threadCtx = Threads[threadIdx];
@@ -522,4 +536,8 @@ namespace NActors {
i16 TBasicExecutorPool::GetPriority() const {
return Priority;
}
+
+ void TBasicExecutorPool::SetSharedExecutorsCount(i16 count) {
+ SharedExecutorsCount = count;
+ }
}
diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h
index 2a727fafd5..8adb5773e2 100644
--- a/library/cpp/actors/core/executor_pool_basic.h
+++ b/library/cpp/actors/core/executor_pool_basic.h
@@ -23,7 +23,7 @@ namespace NActors {
// different threads must spin/block on different cache-lines.
// we add some padding bytes to enforce this rule
- static const size_t SizeWithoutPadding = sizeof(TAutoPtr<TExecutorThread>) + 2 * sizeof(TThreadParkPad) + 2 * sizeof(TAtomic);
+ static const size_t SizeWithoutPadding = sizeof(TAutoPtr<TExecutorThread>) + 2 * sizeof(TThreadParkPad) + sizeof(TAtomic);
ui8 Padding[64 - SizeWithoutPadding];
static_assert(64 >= SizeWithoutPadding);
@@ -34,15 +34,8 @@ namespace NActors {
WS_RUNNING
};
- enum EBlockedState {
- BS_NONE,
- BS_BLOCKING,
- BS_BLOCKED
- };
-
TThreadCtx()
: WaitingFlag(WS_NONE)
- , BlockedFlag(BS_NONE)
{
}
};
@@ -81,6 +74,8 @@ namespace NActors {
i16 MaxThreadCount;
i16 DefaultThreadCount;
IHarmonizer *Harmonizer;
+ i16 SharedExecutorsCount = 0;
+ ui64 SoftProcessingDurationTs = 0;
const i16 Priority = 0;
const ui32 ActorSystemIndex = NActors::TActorTypeOperator::GetActorSystemIndex();
@@ -128,6 +123,8 @@ namespace NActors {
explicit TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer);
~TBasicExecutorPool();
+ void SetSharedExecutorsCount(i16 count);
+
ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingReadCounter) override;
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
@@ -148,17 +145,18 @@ namespace NActors {
void SetRealTimeMode() const override;
- ui32 GetThreadCount() const override;
- void SetThreadCount(ui32 threads) override;
+ i16 GetThreadCount() const override;
+ void SetThreadCount(i16 threads) override;
i16 GetDefaultThreadCount() const override;
i16 GetMinThreadCount() const override;
i16 GetMaxThreadCount() const override;
- bool IsThreadBeingStopped(i16 threadIdx) const override;
TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) override;
i16 GetBlockingThreadCount() const override;
i16 GetPriority() const override;
private:
+ void AskToGoToSleep(bool *needToWait, bool *needToBlock);
+
void WakeUpLoop(i16 currentThreadCount);
bool GoToWaiting(TThreadCtx& threadCtx, TTimers &timers, bool needToBlock);
void GoToSpin(TThreadCtx& threadCtx);
diff --git a/library/cpp/actors/core/executor_pool_io.cpp b/library/cpp/actors/core/executor_pool_io.cpp
index 80b927c486..539b6348b2 100644
--- a/library/cpp/actors/core/executor_pool_io.cpp
+++ b/library/cpp/actors/core/executor_pool_io.cpp
@@ -27,7 +27,7 @@ namespace NActors {
}
ui32 TIOExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) {
- ui32 workerId = wctx.WorkerId;
+ i16 workerId = wctx.WorkerId;
Y_VERIFY_DEBUG(workerId < PoolThreads);
NHPTimer::STime elapsed = 0;
@@ -108,7 +108,7 @@ namespace NActors {
ScheduleQueue.Reset(new NSchedulerQueue::TQueueType());
- for (ui32 i = 0; i != PoolThreads; ++i) {
+ for (i16 i = 0; i != PoolThreads; ++i) {
Threads[i].Thread.Reset(new TExecutorThread(i, 0, actorSystem, this, MailboxTable.Get(), PoolName));
}
@@ -119,18 +119,18 @@ namespace NActors {
void TIOExecutorPool::Start() {
TAffinityGuard affinityGuard(Affinity());
- for (ui32 i = 0; i != PoolThreads; ++i)
+ for (i16 i = 0; i != PoolThreads; ++i)
Threads[i].Thread->Start();
}
void TIOExecutorPool::PrepareStop() {
AtomicStore(&StopFlag, true);
- for (ui32 i = 0; i != PoolThreads; ++i)
+ for (i16 i = 0; i != PoolThreads; ++i)
Threads[i].Pad.Interrupt();
}
void TIOExecutorPool::Shutdown() {
- for (ui32 i = 0; i != PoolThreads; ++i)
+ for (i16 i = 0; i != PoolThreads; ++i)
Threads[i].Thread->Join();
}
@@ -140,7 +140,7 @@ namespace NActors {
statsCopy[0] = TExecutorThreadStats();
statsCopy[0].Aggregate(Stats);
// Per-thread stats
- for (size_t i = 0; i < PoolThreads; ++i) {
+ for (i16 i = 0; i < PoolThreads; ++i) {
Threads[i].Thread->GetCurrentStats(statsCopy[i + 1]);
}
}
diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp
index eb4c8348d7..2e689dec59 100644
--- a/library/cpp/actors/core/executor_pool_united.cpp
+++ b/library/cpp/actors/core/executor_pool_united.cpp
@@ -1168,7 +1168,7 @@ namespace NActors {
// Setup cpu-local workers
if (cpu.LocalManager) {
- for (size_t i = 0; i < cpu.LocalManager->WorkerCount(); i++) {
+ for (i16 i = 0; i < cpu.LocalManager->WorkerCount(); i++) {
TWorkerId workerId = workers++;
cpu.LocalManager->AddWorker(workerId);
diff --git a/library/cpp/actors/core/executor_pool_united_workers.h b/library/cpp/actors/core/executor_pool_united_workers.h
index b088b582f6..c683ae7d9f 100644
--- a/library/cpp/actors/core/executor_pool_united_workers.h
+++ b/library/cpp/actors/core/executor_pool_united_workers.h
@@ -18,7 +18,7 @@ namespace NActors {
struct TPool;
struct TCpu;
- size_t WorkerCount;
+ i16 WorkerCount;
TArrayHolder<TWorker> Workers; // indexed by WorkerId
size_t PoolCount;
TArrayHolder<TPool> Pools; // indexed by PoolId, so may include not used (not united) pools
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index 0b6045693b..e8fff6857c 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -53,6 +53,24 @@ namespace NActors {
&Ctx.WorkerStats);
}
+ TExecutorThread::TExecutorThread(TWorkerId workerId,
+ TActorSystem* actorSystem,
+ TVector<IExecutorPool*> executorPools,
+ const TString& threadName,
+ ui64 softProcessingDurationTs,
+ TDuration timePerMailbox,
+ ui32 eventsPerMailbox)
+ : ActorSystem(actorSystem)
+ , AvailableExecutorPools(executorPools)
+ , Ctx(workerId, 0)
+ , ThreadName(threadName)
+ , IsUnitedWorker(false)
+ , TimePerMailbox(timePerMailbox)
+ , EventsPerMailbox(eventsPerMailbox)
+ , SoftProcessingDurationTs(softProcessingDurationTs)
+ {}
+
+
TExecutorThread::~TExecutorThread()
{ }
@@ -325,19 +343,8 @@ namespace NActors {
return ThreadId;
}
- void* TExecutorThread::ThreadProc() {
-#ifdef _linux_
- pid_t tid = syscall(SYS_gettid);
- AtomicSet(ThreadId, (ui64)tid);
-#endif
-
-#ifdef BALLOC
- ThreadDisableBalloc();
-#endif
-
- if (ThreadName) {
- ::SetCurrentThreadName(ThreadName);
- }
+ void TExecutorThread::ProcessExecutorPool(IExecutorPool *pool, bool isSharedThread) {
+ ExecutorPool = pool;
TThreadContext threadCtx;
TlsThreadContext = &threadCtx;
TlsThreadContext->Pool = static_cast<IExecutorPool*>(ExecutorPool);
@@ -352,6 +359,8 @@ namespace NActors {
i64 execCycles = 0;
i64 nonExecCycles = 0;
+ bool needToStop = false;
+
auto executeActivation = [&]<bool IsTailExecution>(ui32 activation) {
LWTRACK(ActivationBegin, Ctx.Orbit, Ctx.CpuId, Ctx.PoolId, Ctx.WorkerId, NHPTimer::GetSeconds(Ctx.Lease.GetPreciseExpireTs()) * 1e3);
readyActivationCount++;
@@ -394,6 +403,10 @@ namespace NActors {
Ctx.UpdateThreadTime();
}
+ if (isSharedThread && (ui64)hpnow > Ctx.SoftDeadlineTs) {
+ needToStop = true;
+ }
+
if (!TlsThreadContext->IsEnoughCpu) {
Ctx.IncreaseNotEnoughCpuExecutions();
TlsThreadContext->IsEnoughCpu = true;
@@ -404,14 +417,14 @@ namespace NActors {
Ctx.Orbit.Reset();
};
- for (;;) {
+ while (!needToStop) {
if (TlsThreadContext->CapturedType == ESendingType::Tail) {
TlsThreadContext->CapturedType = ESendingType::Lazy;
ui32 activation = std::exchange(TlsThreadContext->CapturedActivation, 0);
executeActivation.operator()<true>(activation);
continue;
}
- Ctx.HasCapturedMessageBox = TlsThreadContext->CapturedActivation;
+ Ctx.IsNeededToWaitNextActivation = !TlsThreadContext->CapturedActivation && !isSharedThread;
ui32 activation = ExecutorPool->GetReadyActivation(Ctx, ++RevolvingReadCounter);
if (!activation) {
activation = std::exchange(TlsThreadContext->CapturedActivation, 0);
@@ -424,6 +437,64 @@ namespace NActors {
}
executeActivation.operator()<false>(activation);
}
+ }
+
+ void* TExecutorThread::ThreadProc() {
+#ifdef _linux_
+ pid_t tid = syscall(SYS_gettid);
+ AtomicSet(ThreadId, (ui64)tid);
+#endif
+
+#ifdef BALLOC
+ ThreadDisableBalloc();
+#endif
+
+ if (ThreadName) {
+ ::SetCurrentThreadName(ThreadName);
+ }
+
+
+ std::vector<TExecutorPoolBaseMailboxed*> pools;
+ pools.reserve(AvailableExecutorPools.size());
+ for (auto pool : AvailableExecutorPools) {
+ TExecutorPoolBaseMailboxed* mailboxedPool = dynamic_cast<TExecutorPoolBaseMailboxed*>(pool);
+ if (mailboxedPool) {
+ pools.push_back(mailboxedPool);
+ }
+ }
+
+ if (pools.size() == 1) {
+ ExecutorPool = pools[0];
+ Ctx.Switch(
+ pools[0],
+ pools[0]->MailboxTable.Get(),
+ NHPTimer::GetClockRate() * TimePerMailbox.SecondsFloat(),
+ EventsPerMailbox,
+ GetCycleCountFast() + SoftProcessingDurationTs,
+ &Ctx.WorkerStats);
+ }
+
+ if (pools.size() <= 1) {
+ ProcessExecutorPool(ExecutorPool, false);
+ } else {
+ while (true) {
+ for (auto pool : pools) {
+ Ctx.Switch(
+ pool,
+ pool->MailboxTable.Get(),
+ NHPTimer::GetClockRate() * TimePerMailbox.SecondsFloat(),
+ EventsPerMailbox,
+ GetCycleCountFast() + SoftProcessingDurationTs,
+ &Ctx.WorkerStats);
+ Ctx.WorkerId = -1;
+ ProcessExecutorPool(pool, true);
+ if (RelaxedLoad(&StopFlag)) {
+ return nullptr;
+ }
+ }
+ }
+ }
+
return nullptr;
}
diff --git a/library/cpp/actors/core/executor_thread.h b/library/cpp/actors/core/executor_thread.h
index 1e78a34c2d..a5aa3ed8a7 100644
--- a/library/cpp/actors/core/executor_thread.h
+++ b/library/cpp/actors/core/executor_thread.h
@@ -40,6 +40,14 @@ namespace NActors {
: TExecutorThread(workerId, 0, actorSystem, executorPool, mailboxTable, threadName, timePerMailbox, eventsPerMailbox)
{}
+ TExecutorThread(TWorkerId workerId,
+ TActorSystem* actorSystem,
+ TVector<IExecutorPool*> executorPools,
+ const TString& threadName,
+ ui64 softProcessingDurationTs,
+ TDuration timePerMailbox,
+ ui32 eventsPerMailbox);
+
virtual ~TExecutorThread();
template <ESendingType SendingType = ESendingType::Common>
@@ -66,15 +74,19 @@ namespace NActors {
private:
void* ThreadProc();
+ void ProcessExecutorPool(IExecutorPool *pool, bool isSharedThread);
+
template <typename TMailbox, bool IsTailExecution = false>
bool Execute(TMailbox* mailbox, ui32 hint);
public:
TActorSystem* const ActorSystem;
+ TAtomic StopFlag = false;
private:
// Pool-specific
- IExecutorPool* const ExecutorPool;
+ IExecutorPool* ExecutorPool;
+ TVector<IExecutorPool*> AvailableExecutorPools;
// Event-specific (currently executing)
TVector<THolder<IActor>> DyingActors;
@@ -88,6 +100,10 @@ namespace NActors {
const TString ThreadName;
volatile TThreadId ThreadId = UnknownThreadId;
bool IsUnitedWorker = false;
+
+ TDuration TimePerMailbox;
+ ui32 EventsPerMailbox;
+ ui64 SoftProcessingDurationTs;
};
template <typename TMailbox>
diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp
index 7e350ac2cb..1a67cf1f24 100644
--- a/library/cpp/actors/core/harmonizer.cpp
+++ b/library/cpp/actors/core/harmonizer.cpp
@@ -190,7 +190,6 @@ struct TPoolInfo {
TAtomic MaxBookedCpu = 0;
TAtomic MinBookedCpu = 0;
- bool IsBeingStopped(i16 threadIdx);
double GetBooked(i16 threadIdx);
double GetlastSecondPoolBooked(i16 threadIdx);
double GetConsumed(i16 threadIdx);
@@ -201,10 +200,6 @@ struct TPoolInfo {
bool IsAvgPingGood();
};
-bool TPoolInfo::IsBeingStopped(i16 threadIdx) {
- return Pool->IsThreadBeingStopped(threadIdx);
-}
-
double TPoolInfo::GetBooked(i16 threadIdx) {
if ((size_t)threadIdx < ThreadInfo.size()) {
return ThreadInfo[threadIdx].Booked.GetAvgPart();
diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h
index 1ad78e2207..7ee9394cd9 100644
--- a/library/cpp/actors/core/worker_context.h
+++ b/library/cpp/actors/core/worker_context.h
@@ -18,7 +18,7 @@
namespace NActors {
struct TWorkerContext {
- const TWorkerId WorkerId;
+ TWorkerId WorkerId;
const TCpuId CpuId;
TLease Lease;
IExecutorPool* Executor = nullptr;
@@ -30,7 +30,7 @@ namespace NActors {
TExecutorThreadStats WorkerStats;
TPoolId PoolId = MaxPools;
mutable NLWTrace::TOrbit Orbit;
- bool HasCapturedMessageBox = false;
+ bool IsNeededToWaitNextActivation = true;
i64 HPStart = 0;
ui32 ExecutedEvents = 0;