aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2025-03-12 17:45:41 +0500
committerGitHub <noreply@github.com>2025-03-12 15:45:41 +0300
commit70284e90bceb26efe3a11d069f9d7abd27e63106 (patch)
treef7613d18b69c4bf88a4fc6623de7f83f69fbb14a
parenteda007c0ac9f6a9ea30c558b002ba95bd8391a1f (diff)
downloadydb-70284e90bceb26efe3a11d069f9d7abd27e63106.tar.gz
Improve perfomance of actorsystem (#15610)
-rw-r--r--ydb/library/actors/core/executor_pool_base.cpp15
-rw-r--r--ydb/library/actors/core/executor_pool_base.h9
-rw-r--r--ydb/library/actors/core/executor_pool_basic.cpp94
-rw-r--r--ydb/library/actors/core/executor_pool_basic.h3
-rw-r--r--ydb/library/actors/core/executor_thread.h6
-rw-r--r--ydb/library/actors/core/thread_context.cpp17
-rw-r--r--ydb/library/actors/core/thread_context.h5
7 files changed, 125 insertions, 24 deletions
diff --git a/ydb/library/actors/core/executor_pool_base.cpp b/ydb/library/actors/core/executor_pool_base.cpp
index 4e3a54871b..561a6d2572 100644
--- a/ydb/library/actors/core/executor_pool_base.cpp
+++ b/ydb/library/actors/core/executor_pool_base.cpp
@@ -71,7 +71,7 @@ namespace NActors {
TExecutorPoolBase::TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity, bool useRingQueue)
: TExecutorPoolBaseMailboxed(poolId)
, PoolThreads(threads)
- , UseRingQueue(useRingQueue)
+ , UseRingQueueValue(useRingQueue)
, ThreadsAffinity(affinity)
{
if (useRingQueue) {
@@ -147,7 +147,7 @@ namespace NActors {
}
void TExecutorPoolBase::ScheduleActivation(TMailbox* mailbox) {
- if (UseRingQueue) {
+ if (UseRingQueue()) {
ScheduleActivationEx(mailbox, 0);
} else {
ScheduleActivationEx(mailbox, AtomicIncrement(ActivationsRevolvingCounter));
@@ -169,9 +169,12 @@ namespace NActors {
if (NFeatures::IsCommon() && IsAllowedToCapture(this) || IsTailSend(this)) {
mailbox = TlsThreadContext->CaptureMailbox(mailbox);
}
- if (mailbox && UseRingQueue) {
+ if (!mailbox) {
+ return;
+ }
+ if (UseRingQueueValue) {
ScheduleActivationEx(mailbox, 0);
- } else if (mailbox) {
+ } else {
ScheduleActivationEx(mailbox, AtomicIncrement(ActivationsRevolvingCounter));
}
}
@@ -302,4 +305,8 @@ namespace NActors {
TMailboxTable* TExecutorPoolBaseMailboxed::GetMailboxTable() const {
return MailboxTable;
}
+
+ bool TExecutorPoolBase::UseRingQueue() const {
+ return UseRingQueueValue;
+ }
}
diff --git a/ydb/library/actors/core/executor_pool_base.h b/ydb/library/actors/core/executor_pool_base.h
index 8d6127b9e4..aaf7363f61 100644
--- a/ydb/library/actors/core/executor_pool_base.h
+++ b/ydb/library/actors/core/executor_pool_base.h
@@ -54,10 +54,10 @@ namespace NActors {
using TUnorderedCacheActivationQueue = TUnorderedCache<ui32, 512, 4>;
const i16 PoolThreads;
- const bool UseRingQueue;
- TIntrusivePtr<TAffinity> ThreadsAffinity;
- TAtomic Semaphore = 0;
- std::variant<TUnorderedCacheActivationQueue, TRingActivationQueue> Activations;
+ const bool UseRingQueueValue;
+ alignas(64) TIntrusivePtr<TAffinity> ThreadsAffinity;
+ alignas(64) TAtomic Semaphore = 0;
+ alignas(64) std::variant<TUnorderedCacheActivationQueue, TRingActivationQueue> Activations;
TAtomic ActivationsRevolvingCounter = 0;
std::atomic_bool StopFlag = false;
public:
@@ -67,6 +67,7 @@ namespace NActors {
void SpecificScheduleActivation(TMailbox* mailbox) override;
TAffinity* Affinity() const override;
ui32 GetThreads() const override;
+ bool UseRingQueue() const;
};
void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&);
diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp
index 32ab8e75a5..e1182da7d9 100644
--- a/ydb/library/actors/core/executor_pool_basic.cpp
+++ b/ydb/library/actors/core/executor_pool_basic.cpp
@@ -264,6 +264,63 @@ namespace NActors {
return nullptr;
}
+ TMailbox* TBasicExecutorPool::GetReadyActivationRingQueue(ui64 revolvingCounter) {
+ if (StopFlag.load(std::memory_order_acquire)) {
+ return nullptr;
+ }
+
+ TWorkerId workerId = TlsThreadContext->WorkerId();
+ EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "");
+ NHPTimer::STime hpnow = GetCycleCountFast();
+ TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION, false> activityGuard(hpnow);
+
+ Y_DEBUG_ABORT_UNLESS(workerId < MaxFullThreadCount);
+
+ Threads[workerId].UnsetWork();
+ if (Harmonizer) {
+ EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "try to harmonize");
+ LWPROBE(TryToHarmonize, PoolId, PoolName);
+ Harmonizer->Harmonize(hpnow);
+ EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "harmonize done");
+ }
+
+ do {
+ {
+ TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE, false> activityGuard;
+ if (const ui32 activation = std::visit([&revolvingCounter](auto &x) {return x.Pop(++revolvingCounter);}, Activations)) {
+ EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "activation found");
+ Threads[workerId].SetWork();
+ AtomicDecrement(Semaphore);
+ return MailboxTable->Get(activation);
+ }
+ }
+
+ TAtomic semaphoreRaw = AtomicGet(Semaphore);
+ TSemaphore semaphore = TSemaphore::GetSemaphore(semaphoreRaw);
+ if (!semaphore.OldSemaphore || workerId >= 0 && semaphore.CurrentSleepThreadCount < 0) {
+ EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "semaphore.OldSemaphore == 0 or workerId >= 0 && semaphore.CurrentSleepThreadCount < 0");
+ if (!TlsThreadContext->ExecutionContext.IsNeededToWaitNextActivation) {
+ EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "wctx.ExecutionContext.IsNeededToWaitNextActivation == false");
+ return nullptr;
+ }
+
+ bool needToWait = false;
+ bool needToBlock = false;
+ AskToGoToSleep(&needToWait, &needToBlock);
+ if (needToWait) {
+ EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "go to sleep");
+ if (Threads[workerId].Wait(SpinThresholdCycles, &StopFlag)) {
+ EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "sleep interrupted");
+ return nullptr;
+ }
+ }
+ }
+ SpinLockPause();
+ } while (!StopFlag.load(std::memory_order_acquire));
+
+ return nullptr;
+ }
+
TMailbox* TBasicExecutorPool::GetReadyActivationLocalQueue(ui64 revolvingCounter) {
TWorkerId workerId = TlsThreadContext->WorkerId();
Y_DEBUG_ABORT_UNLESS(workerId < static_cast<i32>(MaxFullThreadCount));
@@ -278,6 +335,9 @@ namespace NActors {
TlsThreadContext->LocalQueueContext.LocalQueueSize = LocalQueueSize.load(std::memory_order_relaxed);
}
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "local queue done; moving to common");
+ if (TlsThreadContext->UseRingQueue()) {
+ return GetReadyActivationRingQueue(revolvingCounter);
+ }
return GetReadyActivationCommon(revolvingCounter);
}
@@ -285,6 +345,9 @@ namespace NActors {
if (MaxLocalQueueSize) {
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "local queue");
return GetReadyActivationLocalQueue(revolvingCounter);
+ } else if (TlsThreadContext->UseRingQueue()) {
+ EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "ring queue");
+ return GetReadyActivationRingQueue(revolvingCounter);
} else {
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "");
return GetReadyActivationCommon(revolvingCounter);
@@ -305,23 +368,29 @@ namespace NActors {
}
}
- void TBasicExecutorPool::ScheduleActivationExCommon(TMailbox* mailbox, ui64 revolvingCounter, TAtomic x) {
- TSemaphore semaphore = TSemaphore::GetSemaphore(x);
- EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "semaphore.OldSemaphore == ", semaphore.OldSemaphore, " semaphore.CurrentSleepThreadCount == ", semaphore.CurrentSleepThreadCount);
- std::visit([mailbox, revolvingCounter](auto &x) {
- x.Push(mailbox->Hint, revolvingCounter);
+ void TBasicExecutorPool::ScheduleActivationExCommon(TMailbox* mailbox, ui64 revolvingCounter, std::optional<TAtomic> initSemaphore) {
+ std::visit([mailbox, revolvingCounter](auto &queue) {
+ queue.Push(mailbox->Hint, revolvingCounter);
}, Activations);
bool needToWakeUp = false;
bool needToChangeOldSemaphore = true;
- if (SharedPool) {
+ TAtomic x;
+ TSemaphore semaphore;
+ if (!initSemaphore || SharedPool) {
x = AtomicIncrement(Semaphore);
needToChangeOldSemaphore = false;
+ semaphore = TSemaphore::GetSemaphore(x);
+ } else {
+ x = *initSemaphore;
+ semaphore = TSemaphore::GetSemaphore(x);
+ }
+ EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "semaphore.OldSemaphore == ", semaphore.OldSemaphore, " semaphore.CurrentSleepThreadCount == ", semaphore.CurrentSleepThreadCount);
+ if (SharedPool) {
if (SharedPool->WakeUpLocalThreads(PoolId)) {
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "shared pool wake up local threads");
return;
}
- semaphore = TSemaphore::GetSemaphore(x);
}
i16 sleepThreads = 0;
@@ -329,13 +398,18 @@ namespace NActors {
do {
needToWakeUp = semaphore.CurrentSleepThreadCount > 0;
i64 oldX = semaphore.ConvertToI64();
+ bool changed = false;
if (needToChangeOldSemaphore) {
semaphore.OldSemaphore++;
+ changed = true;
}
if (needToWakeUp) {
sleepThreads = semaphore.CurrentSleepThreadCount--;
+ changed = true;
+ }
+ if (changed) {
+ x = AtomicGetAndCas(&Semaphore, semaphore.ConvertToI64(), oldX);
}
- x = AtomicGetAndCas(&Semaphore, semaphore.ConvertToI64(), oldX);
if (x == oldX) {
break;
}
@@ -383,14 +457,14 @@ namespace NActors {
return;
}
}
- ScheduleActivationExCommon(mailbox, revolvingWriteCounter, AtomicGet(Semaphore));
+ ScheduleActivationExCommon(mailbox, revolvingWriteCounter, std::nullopt);
}
void TBasicExecutorPool::ScheduleActivationEx(TMailbox* mailbox, ui64 revolvingCounter) {
if (MaxLocalQueueSize) {
ScheduleActivationExLocalQueue(mailbox, revolvingCounter);
} else {
- ScheduleActivationExCommon(mailbox, revolvingCounter, AtomicGet(Semaphore));
+ ScheduleActivationExCommon(mailbox, revolvingCounter, std::nullopt);
}
}
diff --git a/ydb/library/actors/core/executor_pool_basic.h b/ydb/library/actors/core/executor_pool_basic.h
index 5daf392184..c44509ea23 100644
--- a/ydb/library/actors/core/executor_pool_basic.h
+++ b/ydb/library/actors/core/executor_pool_basic.h
@@ -234,6 +234,7 @@ namespace NActors {
TMailbox* GetReadyActivation(ui64 revolvingReadCounter) override;
TMailbox* GetReadyActivationCommon(ui64 revolvingReadCounter);
TMailbox* GetReadyActivationShared(ui64 revolvingReadCounter);
+ TMailbox* GetReadyActivationRingQueue(ui64 revolvingReadCounter);
TMailbox* GetReadyActivationLocalQueue(ui64 revolvingReadCounter);
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
@@ -241,7 +242,7 @@ namespace NActors {
void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
void ScheduleActivationEx(TMailbox* mailbox, ui64 revolvingWriteCounter) override;
- void ScheduleActivationExCommon(TMailbox* mailbox, ui64 revolvingWriteCounter, TAtomic semaphoreValue);
+ void ScheduleActivationExCommon(TMailbox* mailbox, ui64 revolvingWriteCounter, std::optional<TAtomic> semaphoreValue);
void ScheduleActivationExLocalQueue(TMailbox* mailbox, ui64 revolvingWriteCounter);
void SetLocalQueueSize(ui16 size);
diff --git a/ydb/library/actors/core/executor_thread.h b/ydb/library/actors/core/executor_thread.h
index f6fa0f9fb0..7091bcd8bb 100644
--- a/ydb/library/actors/core/executor_thread.h
+++ b/ydb/library/actors/core/executor_thread.h
@@ -110,9 +110,9 @@ namespace NActors {
ui64 CurrentActorScheduledEventsCounter = 0;
// Thread-specific
- mutable TThreadContext ThreadCtx;
- mutable TExecutionStats ExecutionStats;
- ui64 RevolvingReadCounter = 0;
+ alignas(64) mutable TThreadContext ThreadCtx;
+ alignas(64) mutable TExecutionStats ExecutionStats;
+ alignas(64) ui64 RevolvingReadCounter = 0;
ui64 RevolvingWriteCounter = 0;
const TString ThreadName;
volatile TThreadId ThreadId = UnknownThreadId;
diff --git a/ydb/library/actors/core/thread_context.cpp b/ydb/library/actors/core/thread_context.cpp
index a6d640335a..75acb418b6 100644
--- a/ydb/library/actors/core/thread_context.cpp
+++ b/ydb/library/actors/core/thread_context.cpp
@@ -1,14 +1,23 @@
#include "thread_context.h"
#include "executor_pool.h"
+#include "executor_pool_base.h"
namespace NActors {
+ bool UseRingQueue(IExecutorPool* pool) {
+ if (auto* basePool = dynamic_cast<TExecutorPoolBase*>(pool)) {
+ return basePool->UseRingQueue();
+ }
+ return false;
+ }
+
TWorkerContext::TWorkerContext(TWorkerId workerId, IExecutorPool* pool, IExecutorPool* sharedPool)
: WorkerId(workerId)
, Pool(pool)
, OwnerPool(pool)
, SharedPool(sharedPool)
+ , UseRingQueueValue(::NActors::UseRingQueue(pool))
{
AssignPool(pool);
}
@@ -37,6 +46,10 @@ namespace NActors {
return SharedPool != nullptr;
}
+ bool TWorkerContext::UseRingQueue() const {
+ return UseRingQueueValue;
+ }
+
void TWorkerContext::AssignPool(IExecutorPool* pool, ui64 softDeadlineTs) {
Pool = pool;
TimePerMailboxTs = pool ? pool->TimePerMailboxTs() : TBasicExecutorPoolConfig::DEFAULT_TIME_PER_MAILBOX.SecondsFloat() * NHPTimer::GetClockRate();
@@ -78,6 +91,10 @@ namespace NActors {
return WorkerContext.IsShared();
}
+ bool TThreadContext::UseRingQueue() const {
+ return WorkerContext.UseRingQueue();
+ }
+
ui64 TThreadContext::TimePerMailboxTs() const {
return WorkerContext.TimePerMailboxTs;
}
diff --git a/ydb/library/actors/core/thread_context.h b/ydb/library/actors/core/thread_context.h
index 090bfff994..c8048aa5ae 100644
--- a/ydb/library/actors/core/thread_context.h
+++ b/ydb/library/actors/core/thread_context.h
@@ -50,6 +50,7 @@ namespace NActors {
ui64 TimePerMailboxTs = 0;
ui32 EventsPerMailbox = 0;
ui64 SoftDeadlineTs = ui64(-1);
+ bool UseRingQueueValue = false;
TWorkerContext(TWorkerId workerId, IExecutorPool* pool, IExecutorPool* sharedPool);
@@ -57,7 +58,7 @@ namespace NActors {
TString PoolName() const;
ui32 OwnerPoolId() const;
bool IsShared() const;
-
+ bool UseRingQueue() const;
void AssignPool(IExecutorPool* pool, ui64 softDeadlineTs = -1);
void FreeMailbox(TMailbox* mailbox);
};
@@ -118,7 +119,7 @@ namespace NActors {
ui32 EventsPerMailbox() const;
ui64 SoftDeadlineTs() const;
void FreeMailbox(TMailbox* mailbox);
-
+ bool UseRingQueue() const;
void AssignPool(IExecutorPool* pool, ui64 softDeadlineTs = Max<ui64>());
bool CheckSendingType(ESendingType type) const;