diff options
author | kruall <kruall@ydb.tech> | 2025-03-12 17:45:41 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-12 15:45:41 +0300 |
commit | 70284e90bceb26efe3a11d069f9d7abd27e63106 (patch) | |
tree | f7613d18b69c4bf88a4fc6623de7f83f69fbb14a | |
parent | eda007c0ac9f6a9ea30c558b002ba95bd8391a1f (diff) | |
download | ydb-70284e90bceb26efe3a11d069f9d7abd27e63106.tar.gz |
Improve perfomance of actorsystem (#15610)
-rw-r--r-- | ydb/library/actors/core/executor_pool_base.cpp | 15 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_pool_base.h | 9 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_pool_basic.cpp | 94 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_pool_basic.h | 3 | ||||
-rw-r--r-- | ydb/library/actors/core/executor_thread.h | 6 | ||||
-rw-r--r-- | ydb/library/actors/core/thread_context.cpp | 17 | ||||
-rw-r--r-- | ydb/library/actors/core/thread_context.h | 5 |
7 files changed, 125 insertions, 24 deletions
diff --git a/ydb/library/actors/core/executor_pool_base.cpp b/ydb/library/actors/core/executor_pool_base.cpp index 4e3a54871b..561a6d2572 100644 --- a/ydb/library/actors/core/executor_pool_base.cpp +++ b/ydb/library/actors/core/executor_pool_base.cpp @@ -71,7 +71,7 @@ namespace NActors { TExecutorPoolBase::TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity, bool useRingQueue) : TExecutorPoolBaseMailboxed(poolId) , PoolThreads(threads) - , UseRingQueue(useRingQueue) + , UseRingQueueValue(useRingQueue) , ThreadsAffinity(affinity) { if (useRingQueue) { @@ -147,7 +147,7 @@ namespace NActors { } void TExecutorPoolBase::ScheduleActivation(TMailbox* mailbox) { - if (UseRingQueue) { + if (UseRingQueue()) { ScheduleActivationEx(mailbox, 0); } else { ScheduleActivationEx(mailbox, AtomicIncrement(ActivationsRevolvingCounter)); @@ -169,9 +169,12 @@ namespace NActors { if (NFeatures::IsCommon() && IsAllowedToCapture(this) || IsTailSend(this)) { mailbox = TlsThreadContext->CaptureMailbox(mailbox); } - if (mailbox && UseRingQueue) { + if (!mailbox) { + return; + } + if (UseRingQueueValue) { ScheduleActivationEx(mailbox, 0); - } else if (mailbox) { + } else { ScheduleActivationEx(mailbox, AtomicIncrement(ActivationsRevolvingCounter)); } } @@ -302,4 +305,8 @@ namespace NActors { TMailboxTable* TExecutorPoolBaseMailboxed::GetMailboxTable() const { return MailboxTable; } + + bool TExecutorPoolBase::UseRingQueue() const { + return UseRingQueueValue; + } } diff --git a/ydb/library/actors/core/executor_pool_base.h b/ydb/library/actors/core/executor_pool_base.h index 8d6127b9e4..aaf7363f61 100644 --- a/ydb/library/actors/core/executor_pool_base.h +++ b/ydb/library/actors/core/executor_pool_base.h @@ -54,10 +54,10 @@ namespace NActors { using TUnorderedCacheActivationQueue = TUnorderedCache<ui32, 512, 4>; const i16 PoolThreads; - const bool UseRingQueue; - TIntrusivePtr<TAffinity> ThreadsAffinity; - TAtomic Semaphore = 0; - std::variant<TUnorderedCacheActivationQueue, TRingActivationQueue> Activations; + const bool UseRingQueueValue; + alignas(64) TIntrusivePtr<TAffinity> ThreadsAffinity; + alignas(64) TAtomic Semaphore = 0; + alignas(64) std::variant<TUnorderedCacheActivationQueue, TRingActivationQueue> Activations; TAtomic ActivationsRevolvingCounter = 0; std::atomic_bool StopFlag = false; public: @@ -67,6 +67,7 @@ namespace NActors { void SpecificScheduleActivation(TMailbox* mailbox) override; TAffinity* Affinity() const override; ui32 GetThreads() const override; + bool UseRingQueue() const; }; void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&); diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index 32ab8e75a5..e1182da7d9 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -264,6 +264,63 @@ namespace NActors { return nullptr; } + TMailbox* TBasicExecutorPool::GetReadyActivationRingQueue(ui64 revolvingCounter) { + if (StopFlag.load(std::memory_order_acquire)) { + return nullptr; + } + + TWorkerId workerId = TlsThreadContext->WorkerId(); + EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, ""); + NHPTimer::STime hpnow = GetCycleCountFast(); + TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION, false> activityGuard(hpnow); + + Y_DEBUG_ABORT_UNLESS(workerId < MaxFullThreadCount); + + Threads[workerId].UnsetWork(); + if (Harmonizer) { + EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "try to harmonize"); + LWPROBE(TryToHarmonize, PoolId, PoolName); + Harmonizer->Harmonize(hpnow); + EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "harmonize done"); + } + + do { + { + TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE, false> activityGuard; + if (const ui32 activation = std::visit([&revolvingCounter](auto &x) {return x.Pop(++revolvingCounter);}, Activations)) { + EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "activation found"); + Threads[workerId].SetWork(); + AtomicDecrement(Semaphore); + return MailboxTable->Get(activation); + } + } + + TAtomic semaphoreRaw = AtomicGet(Semaphore); + TSemaphore semaphore = TSemaphore::GetSemaphore(semaphoreRaw); + if (!semaphore.OldSemaphore || workerId >= 0 && semaphore.CurrentSleepThreadCount < 0) { + EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "semaphore.OldSemaphore == 0 or workerId >= 0 && semaphore.CurrentSleepThreadCount < 0"); + if (!TlsThreadContext->ExecutionContext.IsNeededToWaitNextActivation) { + EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "wctx.ExecutionContext.IsNeededToWaitNextActivation == false"); + return nullptr; + } + + bool needToWait = false; + bool needToBlock = false; + AskToGoToSleep(&needToWait, &needToBlock); + if (needToWait) { + EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "go to sleep"); + if (Threads[workerId].Wait(SpinThresholdCycles, &StopFlag)) { + EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "sleep interrupted"); + return nullptr; + } + } + } + SpinLockPause(); + } while (!StopFlag.load(std::memory_order_acquire)); + + return nullptr; + } + TMailbox* TBasicExecutorPool::GetReadyActivationLocalQueue(ui64 revolvingCounter) { TWorkerId workerId = TlsThreadContext->WorkerId(); Y_DEBUG_ABORT_UNLESS(workerId < static_cast<i32>(MaxFullThreadCount)); @@ -278,6 +335,9 @@ namespace NActors { TlsThreadContext->LocalQueueContext.LocalQueueSize = LocalQueueSize.load(std::memory_order_relaxed); } EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "local queue done; moving to common"); + if (TlsThreadContext->UseRingQueue()) { + return GetReadyActivationRingQueue(revolvingCounter); + } return GetReadyActivationCommon(revolvingCounter); } @@ -285,6 +345,9 @@ namespace NActors { if (MaxLocalQueueSize) { EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "local queue"); return GetReadyActivationLocalQueue(revolvingCounter); + } else if (TlsThreadContext->UseRingQueue()) { + EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "ring queue"); + return GetReadyActivationRingQueue(revolvingCounter); } else { EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, ""); return GetReadyActivationCommon(revolvingCounter); @@ -305,23 +368,29 @@ namespace NActors { } } - void TBasicExecutorPool::ScheduleActivationExCommon(TMailbox* mailbox, ui64 revolvingCounter, TAtomic x) { - TSemaphore semaphore = TSemaphore::GetSemaphore(x); - EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "semaphore.OldSemaphore == ", semaphore.OldSemaphore, " semaphore.CurrentSleepThreadCount == ", semaphore.CurrentSleepThreadCount); - std::visit([mailbox, revolvingCounter](auto &x) { - x.Push(mailbox->Hint, revolvingCounter); + void TBasicExecutorPool::ScheduleActivationExCommon(TMailbox* mailbox, ui64 revolvingCounter, std::optional<TAtomic> initSemaphore) { + std::visit([mailbox, revolvingCounter](auto &queue) { + queue.Push(mailbox->Hint, revolvingCounter); }, Activations); bool needToWakeUp = false; bool needToChangeOldSemaphore = true; - if (SharedPool) { + TAtomic x; + TSemaphore semaphore; + if (!initSemaphore || SharedPool) { x = AtomicIncrement(Semaphore); needToChangeOldSemaphore = false; + semaphore = TSemaphore::GetSemaphore(x); + } else { + x = *initSemaphore; + semaphore = TSemaphore::GetSemaphore(x); + } + EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "semaphore.OldSemaphore == ", semaphore.OldSemaphore, " semaphore.CurrentSleepThreadCount == ", semaphore.CurrentSleepThreadCount); + if (SharedPool) { if (SharedPool->WakeUpLocalThreads(PoolId)) { EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "shared pool wake up local threads"); return; } - semaphore = TSemaphore::GetSemaphore(x); } i16 sleepThreads = 0; @@ -329,13 +398,18 @@ namespace NActors { do { needToWakeUp = semaphore.CurrentSleepThreadCount > 0; i64 oldX = semaphore.ConvertToI64(); + bool changed = false; if (needToChangeOldSemaphore) { semaphore.OldSemaphore++; + changed = true; } if (needToWakeUp) { sleepThreads = semaphore.CurrentSleepThreadCount--; + changed = true; + } + if (changed) { + x = AtomicGetAndCas(&Semaphore, semaphore.ConvertToI64(), oldX); } - x = AtomicGetAndCas(&Semaphore, semaphore.ConvertToI64(), oldX); if (x == oldX) { break; } @@ -383,14 +457,14 @@ namespace NActors { return; } } - ScheduleActivationExCommon(mailbox, revolvingWriteCounter, AtomicGet(Semaphore)); + ScheduleActivationExCommon(mailbox, revolvingWriteCounter, std::nullopt); } void TBasicExecutorPool::ScheduleActivationEx(TMailbox* mailbox, ui64 revolvingCounter) { if (MaxLocalQueueSize) { ScheduleActivationExLocalQueue(mailbox, revolvingCounter); } else { - ScheduleActivationExCommon(mailbox, revolvingCounter, AtomicGet(Semaphore)); + ScheduleActivationExCommon(mailbox, revolvingCounter, std::nullopt); } } diff --git a/ydb/library/actors/core/executor_pool_basic.h b/ydb/library/actors/core/executor_pool_basic.h index 5daf392184..c44509ea23 100644 --- a/ydb/library/actors/core/executor_pool_basic.h +++ b/ydb/library/actors/core/executor_pool_basic.h @@ -234,6 +234,7 @@ namespace NActors { TMailbox* GetReadyActivation(ui64 revolvingReadCounter) override; TMailbox* GetReadyActivationCommon(ui64 revolvingReadCounter); TMailbox* GetReadyActivationShared(ui64 revolvingReadCounter); + TMailbox* GetReadyActivationRingQueue(ui64 revolvingReadCounter); TMailbox* GetReadyActivationLocalQueue(ui64 revolvingReadCounter); void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; @@ -241,7 +242,7 @@ namespace NActors { void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; void ScheduleActivationEx(TMailbox* mailbox, ui64 revolvingWriteCounter) override; - void ScheduleActivationExCommon(TMailbox* mailbox, ui64 revolvingWriteCounter, TAtomic semaphoreValue); + void ScheduleActivationExCommon(TMailbox* mailbox, ui64 revolvingWriteCounter, std::optional<TAtomic> semaphoreValue); void ScheduleActivationExLocalQueue(TMailbox* mailbox, ui64 revolvingWriteCounter); void SetLocalQueueSize(ui16 size); diff --git a/ydb/library/actors/core/executor_thread.h b/ydb/library/actors/core/executor_thread.h index f6fa0f9fb0..7091bcd8bb 100644 --- a/ydb/library/actors/core/executor_thread.h +++ b/ydb/library/actors/core/executor_thread.h @@ -110,9 +110,9 @@ namespace NActors { ui64 CurrentActorScheduledEventsCounter = 0; // Thread-specific - mutable TThreadContext ThreadCtx; - mutable TExecutionStats ExecutionStats; - ui64 RevolvingReadCounter = 0; + alignas(64) mutable TThreadContext ThreadCtx; + alignas(64) mutable TExecutionStats ExecutionStats; + alignas(64) ui64 RevolvingReadCounter = 0; ui64 RevolvingWriteCounter = 0; const TString ThreadName; volatile TThreadId ThreadId = UnknownThreadId; diff --git a/ydb/library/actors/core/thread_context.cpp b/ydb/library/actors/core/thread_context.cpp index a6d640335a..75acb418b6 100644 --- a/ydb/library/actors/core/thread_context.cpp +++ b/ydb/library/actors/core/thread_context.cpp @@ -1,14 +1,23 @@ #include "thread_context.h" #include "executor_pool.h" +#include "executor_pool_base.h" namespace NActors { + bool UseRingQueue(IExecutorPool* pool) { + if (auto* basePool = dynamic_cast<TExecutorPoolBase*>(pool)) { + return basePool->UseRingQueue(); + } + return false; + } + TWorkerContext::TWorkerContext(TWorkerId workerId, IExecutorPool* pool, IExecutorPool* sharedPool) : WorkerId(workerId) , Pool(pool) , OwnerPool(pool) , SharedPool(sharedPool) + , UseRingQueueValue(::NActors::UseRingQueue(pool)) { AssignPool(pool); } @@ -37,6 +46,10 @@ namespace NActors { return SharedPool != nullptr; } + bool TWorkerContext::UseRingQueue() const { + return UseRingQueueValue; + } + void TWorkerContext::AssignPool(IExecutorPool* pool, ui64 softDeadlineTs) { Pool = pool; TimePerMailboxTs = pool ? pool->TimePerMailboxTs() : TBasicExecutorPoolConfig::DEFAULT_TIME_PER_MAILBOX.SecondsFloat() * NHPTimer::GetClockRate(); @@ -78,6 +91,10 @@ namespace NActors { return WorkerContext.IsShared(); } + bool TThreadContext::UseRingQueue() const { + return WorkerContext.UseRingQueue(); + } + ui64 TThreadContext::TimePerMailboxTs() const { return WorkerContext.TimePerMailboxTs; } diff --git a/ydb/library/actors/core/thread_context.h b/ydb/library/actors/core/thread_context.h index 090bfff994..c8048aa5ae 100644 --- a/ydb/library/actors/core/thread_context.h +++ b/ydb/library/actors/core/thread_context.h @@ -50,6 +50,7 @@ namespace NActors { ui64 TimePerMailboxTs = 0; ui32 EventsPerMailbox = 0; ui64 SoftDeadlineTs = ui64(-1); + bool UseRingQueueValue = false; TWorkerContext(TWorkerId workerId, IExecutorPool* pool, IExecutorPool* sharedPool); @@ -57,7 +58,7 @@ namespace NActors { TString PoolName() const; ui32 OwnerPoolId() const; bool IsShared() const; - + bool UseRingQueue() const; void AssignPool(IExecutorPool* pool, ui64 softDeadlineTs = -1); void FreeMailbox(TMailbox* mailbox); }; @@ -118,7 +119,7 @@ namespace NActors { ui32 EventsPerMailbox() const; ui64 SoftDeadlineTs() const; void FreeMailbox(TMailbox* mailbox); - + bool UseRingQueue() const; void AssignPool(IExecutorPool* pool, ui64 softDeadlineTs = Max<ui64>()); bool CheckSendingType(ESendingType type) const; |