aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2023-09-09 18:09:17 +0300
committerkruall <kruall@ydb.tech>2023-09-09 18:28:22 +0300
commit7082400c058c777817caaadd12743bfe0323cdef (patch)
treeba89a3761000830b2f3382a82130d37d115548d2 /library/cpp
parent76c98524e8489976b437edce988dcd5e672bba11 (diff)
downloadydb-7082400c058c777817caaadd12743bfe0323cdef.tar.gz
Add local activation queues, KIKIMR-19207
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/core/actor.h13
-rw-r--r--library/cpp/actors/core/executor_pool_base.cpp7
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp65
-rw-r--r--library/cpp/actors/core/executor_pool_basic.h13
-rw-r--r--library/cpp/actors/core/executor_pool_basic_feature_flags.h37
-rw-r--r--library/cpp/actors/core/executor_thread.cpp1
-rw-r--r--library/cpp/actors/core/harmonizer.cpp20
-rw-r--r--library/cpp/actors/core/thread_context.h25
8 files changed, 166 insertions, 15 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index ac6aef78646..3f9a5751a45 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -4,8 +4,9 @@
#include "event.h"
#include "executor_thread.h"
#include "monotonic.h"
-#include <library/cpp/actors/actor_type/indexes.h>
+#include "thread_context.h"
+#include <library/cpp/actors/actor_type/indexes.h>
#include <library/cpp/actors/util/local_process_key.h>
#include <util/system/tls.h>
@@ -25,16 +26,6 @@ namespace NActors {
struct TSettings;
}
- struct TThreadContext {
- IExecutorPool *Pool = nullptr;
- ui32 CapturedActivation = 0;
- ESendingType CapturedType = ESendingType::Lazy;
- ESendingType SendingType = ESendingType::Common;
- bool IsEnoughCpu = true;
- };
-
- extern Y_POD_THREAD(TThreadContext*) TlsThreadContext;
-
struct TActorContext;
struct TActivationContext;
diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp
index 10362a7a2b4..ba7ab0e7bef 100644
--- a/library/cpp/actors/core/executor_pool_base.cpp
+++ b/library/cpp/actors/core/executor_pool_base.cpp
@@ -1,6 +1,7 @@
#include "actorsystem.h"
#include "actor.h"
#include "executor_pool_base.h"
+#include "executor_pool_basic_feature_flags.h"
#include "executor_thread.h"
#include "mailbox.h"
#include "probes.h"
@@ -115,8 +116,12 @@ namespace NActors {
return TlsThreadContext->SendingType != ESendingType::Common;
}
+ Y_FORCE_INLINE bool IsTailSend(IExecutorPool *self) {
+ return TlsThreadContext->Pool == self && TlsThreadContext->SendingType == ESendingType::Tail && TlsThreadContext->CapturedType != ESendingType::Tail;
+ }
+
void TExecutorPoolBase::SpecificScheduleActivation(ui32 activation) {
- if (IsAllowedToCapture(this)) {
+ if (NFeatures::IsCommon() && IsAllowedToCapture(this) || IsTailSend(this)) {
std::swap(TlsThreadContext->CapturedActivation, activation);
TlsThreadContext->CapturedType = TlsThreadContext->SendingType;
}
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index b27f98487f2..54b6e262685 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -1,4 +1,5 @@
#include "executor_pool_basic.h"
+#include "executor_pool_basic_feature_flags.h"
#include "actor.h"
#include "probes.h"
#include "mailbox.h"
@@ -48,6 +49,15 @@ namespace NActors {
, Harmonizer(harmonizer)
, Priority(priority)
{
+ if constexpr (NFeatures::IsLocalQueues()) {
+ LocalQueues.Reset(new NThreading::TPadded<std::queue<ui32>>[threads]);
+ if constexpr (NFeatures::TLocalQueuesFeatureFlags::FIXED_LOCAL_QUEUE_SIZE) {
+ LocalQueueSize = *NFeatures::TLocalQueuesFeatureFlags::FIXED_LOCAL_QUEUE_SIZE;
+ } else {
+ LocalQueueSize = NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE;
+ }
+ }
+
Y_UNUSED(maxActivityType);
i16 limit = Min(threads, (ui32)Max<i16>());
if (DefaultThreadCount) {
@@ -230,7 +240,7 @@ namespace NActors {
} while (true);
}
- ui32 TBasicExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) {
+ ui32 TBasicExecutorPool::GetReadyActivationCommon(TWorkerContext& wctx, ui64 revolvingCounter) {
TWorkerId workerId = wctx.WorkerId;
Y_VERIFY_DEBUG(workerId < PoolThreads);
@@ -291,6 +301,33 @@ namespace NActors {
return 0;
}
+ ui32 TBasicExecutorPool::GetReadyActivationLocalQueue(TWorkerContext& wctx, ui64 revolvingCounter) {
+ TWorkerId workerId = wctx.WorkerId;
+ Y_VERIFY_DEBUG(workerId < static_cast<i32>(PoolThreads));
+
+ if (workerId >= 0 && LocalQueues[workerId].size()) {
+ ui32 activation = LocalQueues[workerId].front();
+ LocalQueues[workerId].pop();
+ return activation;
+ } else {
+ TlsThreadContext->WriteTurn = 0;
+ TlsThreadContext->LocalQueueSize = LocalQueueSize.load(std::memory_order_relaxed);
+ }
+ return GetReadyActivationCommon(wctx, revolvingCounter);
+ }
+
+ ui32 TBasicExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) {
+ if constexpr (NFeatures::IsLocalQueues()) {
+ if (SharedExecutorsCount) {
+ return GetReadyActivationCommon(wctx, revolvingCounter);
+ }
+ return GetReadyActivationLocalQueue(wctx, revolvingCounter);
+ } else {
+ return GetReadyActivationCommon(wctx, revolvingCounter);
+ }
+ return 0;
+ }
+
inline void TBasicExecutorPool::WakeUpLoop(i16 currentThreadCount) {
for (i16 i = 0;;) {
TThreadCtx& threadCtx = Threads[i];
@@ -320,7 +357,7 @@ namespace NActors {
}
}
- void TBasicExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) {
+ void TBasicExecutorPool::ScheduleActivationExCommon(ui32 activation, ui64 revolvingCounter) {
Activations.Push(activation, revolvingCounter);
bool needToWakeUp = false;
@@ -345,6 +382,24 @@ namespace NActors {
}
}
+ void TBasicExecutorPool::ScheduleActivationExLocalQueue(ui32 activation, ui64 revolvingWriteCounter) {
+ if (TlsThreadContext && TlsThreadContext->Pool == this && TlsThreadContext->WorkerId >= 0) {
+ if (++TlsThreadContext->WriteTurn < TlsThreadContext->LocalQueueSize) {
+ LocalQueues[TlsThreadContext->WorkerId].push(activation);
+ return;
+ }
+ }
+ ScheduleActivationExCommon(activation, revolvingWriteCounter);
+ }
+
+ void TBasicExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) {
+ if constexpr (NFeatures::IsLocalQueues()) {
+ ScheduleActivationExLocalQueue(activation, revolvingCounter);
+ } else {
+ ScheduleActivationExCommon(activation, revolvingCounter);
+ }
+ }
+
void TBasicExecutorPool::GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const {
poolStats.MaxUtilizationTime = RelaxedLoad(&MaxUtilizationAccumulator) / (i64)(NHPTimer::GetCyclesPerSecond() / 1000);
poolStats.WrongWakenedThreadCount = RelaxedLoad(&WrongWakenedThreadCount);
@@ -540,4 +595,10 @@ namespace NActors {
void TBasicExecutorPool::SetSharedExecutorsCount(i16 count) {
SharedExecutorsCount = count;
}
+
+ void TBasicExecutorPool::SetLocalQueueSize(ui16 size) {
+ if constexpr (!NFeatures::TLocalQueuesFeatureFlags::FIXED_LOCAL_QUEUE_SIZE) {
+ LocalQueueSize.store(std::max(size, NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE), std::memory_order_relaxed);
+ }
+ }
}
diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h
index 8adb5773e2f..9e1d017ce6f 100644
--- a/library/cpp/actors/core/executor_pool_basic.h
+++ b/library/cpp/actors/core/executor_pool_basic.h
@@ -10,8 +10,12 @@
#include <library/cpp/actors/util/threadparkpad.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/threading/chunk_queue/queue.h>
+
#include <util/system/mutex.h>
+#include <queue>
+
namespace NActors {
class TBasicExecutorPool: public TExecutorPoolBase {
struct TThreadCtx {
@@ -52,6 +56,9 @@ namespace NActors {
const ui64 SpinThresholdCycles;
TArrayHolder<TThreadCtx> Threads;
+ TArrayHolder<NThreading::TPadded<std::queue<ui32>>> LocalQueues;
+ std::atomic<ui16> LocalQueueSize;
+
TArrayHolder<NSchedulerQueue::TReader> ScheduleReaders;
TArrayHolder<NSchedulerQueue::TWriter> ScheduleWriters;
@@ -126,12 +133,18 @@ namespace NActors {
void SetSharedExecutorsCount(i16 count);
ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingReadCounter) override;
+ ui32 GetReadyActivationCommon(TWorkerContext& wctx, ui64 revolvingReadCounter);
+ ui32 GetReadyActivationLocalQueue(TWorkerContext& wctx, ui64 revolvingReadCounter);
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
void ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) override;
+ void ScheduleActivationExCommon(ui32 activation, ui64 revolvingWriteCounter);
+ void ScheduleActivationExLocalQueue(ui32 activation, ui64 revolvingWriteCounter);
+
+ void SetLocalQueueSize(ui16 size);
void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override;
void Start() override;
diff --git a/library/cpp/actors/core/executor_pool_basic_feature_flags.h b/library/cpp/actors/core/executor_pool_basic_feature_flags.h
new file mode 100644
index 00000000000..d87b713a07a
--- /dev/null
+++ b/library/cpp/actors/core/executor_pool_basic_feature_flags.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include "defs.h"
+
+#include <optional>
+
+
+namespace NActors::NFeatures {
+
+ enum class EActorSystemOptimizationType {
+ Common,
+ LocalQueues,
+ };
+
+ struct TCommonFeatureFlags {
+ static constexpr EActorSystemOptimizationType OptimizationType = EActorSystemOptimizationType::Common;
+ };
+
+ struct TLocalQueuesFeatureFlags {
+ static constexpr EActorSystemOptimizationType OptimizationType = EActorSystemOptimizationType::LocalQueues;
+
+ static constexpr ui16 MIN_LOCAL_QUEUE_SIZE = 0;
+ static constexpr ui16 MAX_LOCAL_QUEUE_SIZE = 16;
+ static constexpr std::optional<ui16> FIXED_LOCAL_QUEUE_SIZE = std::nullopt;
+ };
+
+ using TFeatureFlags = TCommonFeatureFlags;
+
+ consteval bool IsCommon() {
+ return TFeatureFlags::OptimizationType == EActorSystemOptimizationType::Common;
+ }
+
+ consteval bool IsLocalQueues() {
+ return TFeatureFlags::OptimizationType == EActorSystemOptimizationType::LocalQueues;
+ }
+
+}
diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp
index 061ee938dbe..2cdb54a8741 100644
--- a/library/cpp/actors/core/executor_thread.cpp
+++ b/library/cpp/actors/core/executor_thread.cpp
@@ -352,6 +352,7 @@ namespace NActors {
TThreadContext threadCtx;
TlsThreadContext = &threadCtx;
TlsThreadContext->Pool = static_cast<IExecutorPool*>(ExecutorPool);
+ TlsThreadContext->WorkerId = Ctx.WorkerId;
ExecutorPool->SetRealTimeMode();
TAffinityGuard affinity(ExecutorPool->Affinity());
diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp
index 1a67cf1f24d..ced855c8094 100644
--- a/library/cpp/actors/core/harmonizer.cpp
+++ b/library/cpp/actors/core/harmonizer.cpp
@@ -2,6 +2,8 @@
#include "probes.h"
#include "actorsystem.h"
+#include "executor_pool_basic.h"
+#include "executor_pool_basic_feature_flags.h"
#include <library/cpp/actors/util/cpu_load_log.h>
#include <library/cpp/actors/util/datetime.h>
@@ -163,6 +165,7 @@ struct TThreadInfo {
struct TPoolInfo {
std::vector<TThreadInfo> ThreadInfo;
IExecutorPool* Pool = nullptr;
+ TBasicExecutorPool* BasicPool = nullptr;
i16 DefaultThreadCount = 0;
i16 MinThreadCount = 0;
i16 MaxThreadCount = 0;
@@ -173,6 +176,7 @@ struct TPoolInfo {
ui64 LastUpdateTs = 0;
ui64 NotEnoughCpuExecutions = 0;
ui64 NewNotEnoughCpuExecutions = 0;
+ ui16 LocalQueueSize = NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE;
TAtomic LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish
TAtomic IncreasingThreadsByNeedyState = 0;
@@ -445,8 +449,8 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
} else {
for (size_t needyPoolIdx : needyPools) {
TPoolInfo &pool = Pools[needyPoolIdx];
+ i64 threadCount = pool.GetThreadCount();
if (budget >= 1.0) {
- i64 threadCount = pool.GetThreadCount();
if (threadCount + 1 <= pool.MaxThreadCount) {
AtomicIncrement(pool.IncreasingThreadsByNeedyState);
isNeedyByPool[needyPoolIdx] = false;
@@ -456,6 +460,15 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultThreadCount, pool.MaxThreadCount);
}
}
+ if constexpr (NFeatures::IsLocalQueues()) {
+ bool needToExpandLocalQueue = budget < 1.0 || threadCount >= pool.MaxThreadCount;
+ needToExpandLocalQueue &= (bool)pool.BasicPool;
+ needToExpandLocalQueue &= (pool.MaxThreadCount > 1);
+ needToExpandLocalQueue &= (pool.LocalQueueSize < NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE);
+ if (needToExpandLocalQueue) {
+ pool.BasicPool->SetLocalQueueSize(++pool.LocalQueueSize);
+ }
+ }
}
}
@@ -503,6 +516,10 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
for (size_t hoggishPoolIdx : hoggishPools) {
TPoolInfo &pool = Pools[hoggishPoolIdx];
i64 threadCount = pool.GetThreadCount();
+ if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) {
+ pool.LocalQueueSize = std::min<ui16>(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE, pool.LocalQueueSize / 2);
+ pool.BasicPool->SetLocalQueueSize(pool.LocalQueueSize);
+ }
if (threadCount > pool.MinThreadCount) {
AtomicIncrement(pool.DecreasingThreadsByHoggishState);
LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
@@ -555,6 +572,7 @@ void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) {
TGuard<TSpinLock> guard(Lock);
TPoolInfo poolInfo;
poolInfo.Pool = pool;
+ poolInfo.BasicPool = dynamic_cast<TBasicExecutorPool*>(pool);
poolInfo.DefaultThreadCount = pool->GetDefaultThreadCount();
poolInfo.MinThreadCount = pool->GetMinThreadCount();
poolInfo.MaxThreadCount = pool->GetMaxThreadCount();
diff --git a/library/cpp/actors/core/thread_context.h b/library/cpp/actors/core/thread_context.h
new file mode 100644
index 00000000000..2c70ccd566d
--- /dev/null
+++ b/library/cpp/actors/core/thread_context.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include "defs.h"
+
+#include <util/system/tls.h>
+
+
+namespace NActors {
+
+ class IExecutorPool;
+
+ struct TThreadContext {
+ IExecutorPool *Pool = nullptr;
+ ui32 CapturedActivation = 0;
+ ESendingType CapturedType = ESendingType::Lazy;
+ ESendingType SendingType = ESendingType::Common;
+ bool IsEnoughCpu = true;
+ ui32 WriteTurn = 0;
+ TWorkerId WorkerId;
+ ui16 LocalQueueSize = 0;
+ };
+
+ extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp
+
+}