diff options
author | kruall <kruall@ydb.tech> | 2023-09-09 18:09:17 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2023-09-09 18:28:22 +0300 |
commit | 7082400c058c777817caaadd12743bfe0323cdef (patch) | |
tree | ba89a3761000830b2f3382a82130d37d115548d2 /library/cpp | |
parent | 76c98524e8489976b437edce988dcd5e672bba11 (diff) | |
download | ydb-7082400c058c777817caaadd12743bfe0323cdef.tar.gz |
Add local activation queues, KIKIMR-19207
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/actors/core/actor.h | 13 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_base.cpp | 7 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.cpp | 65 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.h | 13 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic_feature_flags.h | 37 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_thread.cpp | 1 | ||||
-rw-r--r-- | library/cpp/actors/core/harmonizer.cpp | 20 | ||||
-rw-r--r-- | library/cpp/actors/core/thread_context.h | 25 |
8 files changed, 166 insertions, 15 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index ac6aef78646..3f9a5751a45 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -4,8 +4,9 @@ #include "event.h" #include "executor_thread.h" #include "monotonic.h" -#include <library/cpp/actors/actor_type/indexes.h> +#include "thread_context.h" +#include <library/cpp/actors/actor_type/indexes.h> #include <library/cpp/actors/util/local_process_key.h> #include <util/system/tls.h> @@ -25,16 +26,6 @@ namespace NActors { struct TSettings; } - struct TThreadContext { - IExecutorPool *Pool = nullptr; - ui32 CapturedActivation = 0; - ESendingType CapturedType = ESendingType::Lazy; - ESendingType SendingType = ESendingType::Common; - bool IsEnoughCpu = true; - }; - - extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; - struct TActorContext; struct TActivationContext; diff --git a/library/cpp/actors/core/executor_pool_base.cpp b/library/cpp/actors/core/executor_pool_base.cpp index 10362a7a2b4..ba7ab0e7bef 100644 --- a/library/cpp/actors/core/executor_pool_base.cpp +++ b/library/cpp/actors/core/executor_pool_base.cpp @@ -1,6 +1,7 @@ #include "actorsystem.h" #include "actor.h" #include "executor_pool_base.h" +#include "executor_pool_basic_feature_flags.h" #include "executor_thread.h" #include "mailbox.h" #include "probes.h" @@ -115,8 +116,12 @@ namespace NActors { return TlsThreadContext->SendingType != ESendingType::Common; } + Y_FORCE_INLINE bool IsTailSend(IExecutorPool *self) { + return TlsThreadContext->Pool == self && TlsThreadContext->SendingType == ESendingType::Tail && TlsThreadContext->CapturedType != ESendingType::Tail; + } + void TExecutorPoolBase::SpecificScheduleActivation(ui32 activation) { - if (IsAllowedToCapture(this)) { + if (NFeatures::IsCommon() && IsAllowedToCapture(this) || IsTailSend(this)) { std::swap(TlsThreadContext->CapturedActivation, activation); TlsThreadContext->CapturedType = TlsThreadContext->SendingType; } diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index b27f98487f2..54b6e262685 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -1,4 +1,5 @@ #include "executor_pool_basic.h" +#include "executor_pool_basic_feature_flags.h" #include "actor.h" #include "probes.h" #include "mailbox.h" @@ -48,6 +49,15 @@ namespace NActors { , Harmonizer(harmonizer) , Priority(priority) { + if constexpr (NFeatures::IsLocalQueues()) { + LocalQueues.Reset(new NThreading::TPadded<std::queue<ui32>>[threads]); + if constexpr (NFeatures::TLocalQueuesFeatureFlags::FIXED_LOCAL_QUEUE_SIZE) { + LocalQueueSize = *NFeatures::TLocalQueuesFeatureFlags::FIXED_LOCAL_QUEUE_SIZE; + } else { + LocalQueueSize = NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE; + } + } + Y_UNUSED(maxActivityType); i16 limit = Min(threads, (ui32)Max<i16>()); if (DefaultThreadCount) { @@ -230,7 +240,7 @@ namespace NActors { } while (true); } - ui32 TBasicExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) { + ui32 TBasicExecutorPool::GetReadyActivationCommon(TWorkerContext& wctx, ui64 revolvingCounter) { TWorkerId workerId = wctx.WorkerId; Y_VERIFY_DEBUG(workerId < PoolThreads); @@ -291,6 +301,33 @@ namespace NActors { return 0; } + ui32 TBasicExecutorPool::GetReadyActivationLocalQueue(TWorkerContext& wctx, ui64 revolvingCounter) { + TWorkerId workerId = wctx.WorkerId; + Y_VERIFY_DEBUG(workerId < static_cast<i32>(PoolThreads)); + + if (workerId >= 0 && LocalQueues[workerId].size()) { + ui32 activation = LocalQueues[workerId].front(); + LocalQueues[workerId].pop(); + return activation; + } else { + TlsThreadContext->WriteTurn = 0; + TlsThreadContext->LocalQueueSize = LocalQueueSize.load(std::memory_order_relaxed); + } + return GetReadyActivationCommon(wctx, revolvingCounter); + } + + ui32 TBasicExecutorPool::GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) { + if constexpr (NFeatures::IsLocalQueues()) { + if (SharedExecutorsCount) { + return GetReadyActivationCommon(wctx, revolvingCounter); + } + return GetReadyActivationLocalQueue(wctx, revolvingCounter); + } else { + return GetReadyActivationCommon(wctx, revolvingCounter); + } + return 0; + } + inline void TBasicExecutorPool::WakeUpLoop(i16 currentThreadCount) { for (i16 i = 0;;) { TThreadCtx& threadCtx = Threads[i]; @@ -320,7 +357,7 @@ namespace NActors { } } - void TBasicExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) { + void TBasicExecutorPool::ScheduleActivationExCommon(ui32 activation, ui64 revolvingCounter) { Activations.Push(activation, revolvingCounter); bool needToWakeUp = false; @@ -345,6 +382,24 @@ namespace NActors { } } + void TBasicExecutorPool::ScheduleActivationExLocalQueue(ui32 activation, ui64 revolvingWriteCounter) { + if (TlsThreadContext && TlsThreadContext->Pool == this && TlsThreadContext->WorkerId >= 0) { + if (++TlsThreadContext->WriteTurn < TlsThreadContext->LocalQueueSize) { + LocalQueues[TlsThreadContext->WorkerId].push(activation); + return; + } + } + ScheduleActivationExCommon(activation, revolvingWriteCounter); + } + + void TBasicExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) { + if constexpr (NFeatures::IsLocalQueues()) { + ScheduleActivationExLocalQueue(activation, revolvingCounter); + } else { + ScheduleActivationExCommon(activation, revolvingCounter); + } + } + void TBasicExecutorPool::GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const { poolStats.MaxUtilizationTime = RelaxedLoad(&MaxUtilizationAccumulator) / (i64)(NHPTimer::GetCyclesPerSecond() / 1000); poolStats.WrongWakenedThreadCount = RelaxedLoad(&WrongWakenedThreadCount); @@ -540,4 +595,10 @@ namespace NActors { void TBasicExecutorPool::SetSharedExecutorsCount(i16 count) { SharedExecutorsCount = count; } + + void TBasicExecutorPool::SetLocalQueueSize(ui16 size) { + if constexpr (!NFeatures::TLocalQueuesFeatureFlags::FIXED_LOCAL_QUEUE_SIZE) { + LocalQueueSize.store(std::max(size, NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE), std::memory_order_relaxed); + } + } } diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h index 8adb5773e2f..9e1d017ce6f 100644 --- a/library/cpp/actors/core/executor_pool_basic.h +++ b/library/cpp/actors/core/executor_pool_basic.h @@ -10,8 +10,12 @@ #include <library/cpp/actors/util/threadparkpad.h> #include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/threading/chunk_queue/queue.h> + #include <util/system/mutex.h> +#include <queue> + namespace NActors { class TBasicExecutorPool: public TExecutorPoolBase { struct TThreadCtx { @@ -52,6 +56,9 @@ namespace NActors { const ui64 SpinThresholdCycles; TArrayHolder<TThreadCtx> Threads; + TArrayHolder<NThreading::TPadded<std::queue<ui32>>> LocalQueues; + std::atomic<ui16> LocalQueueSize; + TArrayHolder<NSchedulerQueue::TReader> ScheduleReaders; TArrayHolder<NSchedulerQueue::TWriter> ScheduleWriters; @@ -126,12 +133,18 @@ namespace NActors { void SetSharedExecutorsCount(i16 count); ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingReadCounter) override; + ui32 GetReadyActivationCommon(TWorkerContext& wctx, ui64 revolvingReadCounter); + ui32 GetReadyActivationLocalQueue(TWorkerContext& wctx, ui64 revolvingReadCounter); void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override; void ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) override; + void ScheduleActivationExCommon(ui32 activation, ui64 revolvingWriteCounter); + void ScheduleActivationExLocalQueue(ui32 activation, ui64 revolvingWriteCounter); + + void SetLocalQueueSize(ui16 size); void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override; void Start() override; diff --git a/library/cpp/actors/core/executor_pool_basic_feature_flags.h b/library/cpp/actors/core/executor_pool_basic_feature_flags.h new file mode 100644 index 00000000000..d87b713a07a --- /dev/null +++ b/library/cpp/actors/core/executor_pool_basic_feature_flags.h @@ -0,0 +1,37 @@ +#pragma once + +#include "defs.h" + +#include <optional> + + +namespace NActors::NFeatures { + + enum class EActorSystemOptimizationType { + Common, + LocalQueues, + }; + + struct TCommonFeatureFlags { + static constexpr EActorSystemOptimizationType OptimizationType = EActorSystemOptimizationType::Common; + }; + + struct TLocalQueuesFeatureFlags { + static constexpr EActorSystemOptimizationType OptimizationType = EActorSystemOptimizationType::LocalQueues; + + static constexpr ui16 MIN_LOCAL_QUEUE_SIZE = 0; + static constexpr ui16 MAX_LOCAL_QUEUE_SIZE = 16; + static constexpr std::optional<ui16> FIXED_LOCAL_QUEUE_SIZE = std::nullopt; + }; + + using TFeatureFlags = TCommonFeatureFlags; + + consteval bool IsCommon() { + return TFeatureFlags::OptimizationType == EActorSystemOptimizationType::Common; + } + + consteval bool IsLocalQueues() { + return TFeatureFlags::OptimizationType == EActorSystemOptimizationType::LocalQueues; + } + +} diff --git a/library/cpp/actors/core/executor_thread.cpp b/library/cpp/actors/core/executor_thread.cpp index 061ee938dbe..2cdb54a8741 100644 --- a/library/cpp/actors/core/executor_thread.cpp +++ b/library/cpp/actors/core/executor_thread.cpp @@ -352,6 +352,7 @@ namespace NActors { TThreadContext threadCtx; TlsThreadContext = &threadCtx; TlsThreadContext->Pool = static_cast<IExecutorPool*>(ExecutorPool); + TlsThreadContext->WorkerId = Ctx.WorkerId; ExecutorPool->SetRealTimeMode(); TAffinityGuard affinity(ExecutorPool->Affinity()); diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp index 1a67cf1f24d..ced855c8094 100644 --- a/library/cpp/actors/core/harmonizer.cpp +++ b/library/cpp/actors/core/harmonizer.cpp @@ -2,6 +2,8 @@ #include "probes.h" #include "actorsystem.h" +#include "executor_pool_basic.h" +#include "executor_pool_basic_feature_flags.h" #include <library/cpp/actors/util/cpu_load_log.h> #include <library/cpp/actors/util/datetime.h> @@ -163,6 +165,7 @@ struct TThreadInfo { struct TPoolInfo { std::vector<TThreadInfo> ThreadInfo; IExecutorPool* Pool = nullptr; + TBasicExecutorPool* BasicPool = nullptr; i16 DefaultThreadCount = 0; i16 MinThreadCount = 0; i16 MaxThreadCount = 0; @@ -173,6 +176,7 @@ struct TPoolInfo { ui64 LastUpdateTs = 0; ui64 NotEnoughCpuExecutions = 0; ui64 NewNotEnoughCpuExecutions = 0; + ui16 LocalQueueSize = NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE; TAtomic LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish TAtomic IncreasingThreadsByNeedyState = 0; @@ -445,8 +449,8 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { } else { for (size_t needyPoolIdx : needyPools) { TPoolInfo &pool = Pools[needyPoolIdx]; + i64 threadCount = pool.GetThreadCount(); if (budget >= 1.0) { - i64 threadCount = pool.GetThreadCount(); if (threadCount + 1 <= pool.MaxThreadCount) { AtomicIncrement(pool.IncreasingThreadsByNeedyState); isNeedyByPool[needyPoolIdx] = false; @@ -456,6 +460,15 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultThreadCount, pool.MaxThreadCount); } } + if constexpr (NFeatures::IsLocalQueues()) { + bool needToExpandLocalQueue = budget < 1.0 || threadCount >= pool.MaxThreadCount; + needToExpandLocalQueue &= (bool)pool.BasicPool; + needToExpandLocalQueue &= (pool.MaxThreadCount > 1); + needToExpandLocalQueue &= (pool.LocalQueueSize < NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE); + if (needToExpandLocalQueue) { + pool.BasicPool->SetLocalQueueSize(++pool.LocalQueueSize); + } + } } } @@ -503,6 +516,10 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { for (size_t hoggishPoolIdx : hoggishPools) { TPoolInfo &pool = Pools[hoggishPoolIdx]; i64 threadCount = pool.GetThreadCount(); + if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) { + pool.LocalQueueSize = std::min<ui16>(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE, pool.LocalQueueSize / 2); + pool.BasicPool->SetLocalQueueSize(pool.LocalQueueSize); + } if (threadCount > pool.MinThreadCount) { AtomicIncrement(pool.DecreasingThreadsByHoggishState); LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount); @@ -555,6 +572,7 @@ void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) { TGuard<TSpinLock> guard(Lock); TPoolInfo poolInfo; poolInfo.Pool = pool; + poolInfo.BasicPool = dynamic_cast<TBasicExecutorPool*>(pool); poolInfo.DefaultThreadCount = pool->GetDefaultThreadCount(); poolInfo.MinThreadCount = pool->GetMinThreadCount(); poolInfo.MaxThreadCount = pool->GetMaxThreadCount(); diff --git a/library/cpp/actors/core/thread_context.h b/library/cpp/actors/core/thread_context.h new file mode 100644 index 00000000000..2c70ccd566d --- /dev/null +++ b/library/cpp/actors/core/thread_context.h @@ -0,0 +1,25 @@ +#pragma once + +#include "defs.h" + +#include <util/system/tls.h> + + +namespace NActors { + + class IExecutorPool; + + struct TThreadContext { + IExecutorPool *Pool = nullptr; + ui32 CapturedActivation = 0; + ESendingType CapturedType = ESendingType::Lazy; + ESendingType SendingType = ESendingType::Common; + bool IsEnoughCpu = true; + ui32 WriteTurn = 0; + TWorkerId WorkerId; + ui16 LocalQueueSize = 0; + }; + + extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp + +} |