diff options
author | kruall <kruall@ydb.tech> | 2022-12-08 13:31:53 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2022-12-08 13:31:53 +0300 |
commit | 2e77ddea288b1279cb28699cdb446e659d97380d (patch) | |
tree | f22b186a28177482ab217f528ca2092aa0aaf2b3 | |
parent | d2f1358e178da6fc852cab295d41f92f8eb2d7a5 (diff) | |
download | ydb-2e77ddea288b1279cb28699cdb446e659d97380d.tar.gz |
AS1.4,
-rw-r--r-- | library/cpp/actors/core/CMakeLists.txt | 1 | ||||
-rw-r--r-- | library/cpp/actors/core/actor_ut.cpp | 6 | ||||
-rw-r--r-- | library/cpp/actors/core/actorsystem.h | 41 | ||||
-rw-r--r-- | library/cpp/actors/core/balancer.cpp | 26 | ||||
-rw-r--r-- | library/cpp/actors/core/balancer.h | 3 | ||||
-rw-r--r-- | library/cpp/actors/core/config.h | 3 | ||||
-rw-r--r-- | library/cpp/actors/core/cpu_manager.cpp | 6 | ||||
-rw-r--r-- | library/cpp/actors/core/cpu_manager.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.cpp | 98 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_basic.h | 25 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_united.cpp | 22 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_united.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/core/executor_pool_united_ut.cpp | 4 | ||||
-rw-r--r-- | library/cpp/actors/core/harmonizer.cpp | 313 | ||||
-rw-r--r-- | library/cpp/actors/core/harmonizer.h | 20 | ||||
-rw-r--r-- | library/cpp/actors/core/mon_stats.h | 4 | ||||
-rw-r--r-- | library/cpp/actors/core/probes.h | 24 | ||||
-rw-r--r-- | library/cpp/actors/core/worker_context.h | 1 | ||||
-rw-r--r-- | ydb/core/driver_lib/run/kikimr_services_initializers.cpp | 5 |
19 files changed, 583 insertions, 23 deletions
diff --git a/library/cpp/actors/core/CMakeLists.txt b/library/cpp/actors/core/CMakeLists.txt index c1c8d82623..7bbf9297c2 100644 --- a/library/cpp/actors/core/CMakeLists.txt +++ b/library/cpp/actors/core/CMakeLists.txt @@ -46,6 +46,7 @@ target_sources(cpp-actors-core PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/executor_pool_io.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/executor_pool_united.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/executor_thread.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/harmonizer.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/interconnect.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/io_dispatcher.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/log.cpp diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp index 1d7f236a2e..3081c9305e 100644 --- a/library/cpp/actors/core/actor_ut.cpp +++ b/library/cpp/actors/core/actor_ut.cpp @@ -562,8 +562,12 @@ Y_UNIT_TEST_SUITE(TestDecorator) { setup->NodeId = 0; setup->ExecutorsCount = 1; setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]); + + ui64 ts = GetCycleCountFast(); + THolder<IHarmonizer> harmonizer(MakeHarmonizer(ts)); for (ui32 i = 0; i < setup->ExecutorsCount; ++i) { - setup->Executors[i] = new TBasicExecutorPool(i, 1, 10, "basic"); + setup->Executors[i] = new TBasicExecutorPool(i, 1, 10, "basic", harmonizer.Get()); + harmonizer->AddPool(setup->Executors[i].Get()); } setup->Scheduler = new TBasicSchedulerThread; diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index 3a47cc1603..6cb0f93792 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -130,6 +130,47 @@ namespace NActors { virtual TAffinity* Affinity() const = 0; virtual void SetRealTimeMode() const {} + + virtual ui32 GetThreadCount() const { + return 1; + }; + + virtual void SetThreadCount(ui32 threads) { + Y_UNUSED(threads); + } + + virtual i16 GetBlockingThreadCount() const { + return 0; + } + + virtual i16 GetDefaultThreadCount() const { + return 1; + } + + virtual i16 GetMinThreadCount() const { + return 1; + } + + virtual i16 GetMaxThreadCount() const { + return 1; + + } + + virtual bool IsThreadBeingStopped(i16 threadIdx) const { + Y_UNUSED(threadIdx); + return false; + } + + virtual double GetThreadConsumedUs(i16 threadIdx) { + Y_UNUSED(threadIdx); + return 0.0; + } + + virtual double GetThreadBookedUs(i16 threadIdx) { + Y_UNUSED(threadIdx); + return 0.0; + } + }; // could be proxy to in-pool schedulers (for NUMA-aware executors) diff --git a/library/cpp/actors/core/balancer.cpp b/library/cpp/actors/core/balancer.cpp index 3dcc45c56b..d82701bbfb 100644 --- a/library/cpp/actors/core/balancer.cpp +++ b/library/cpp/actors/core/balancer.cpp @@ -2,8 +2,9 @@ #include "probes.h" -#include <library/cpp/actors/util/intrinsics.h> +#include <library/cpp/actors/util/cpu_load_log.h> #include <library/cpp/actors/util/datetime.h> +#include <library/cpp/actors/util/intrinsics.h> #include <util/system/spinlock.h> @@ -27,11 +28,11 @@ namespace NActors { TLevel() {} - TLevel(const TBalancingConfig& cfg, TPoolId poolId, ui64 currentCpus, double cpuIdle) { + TLevel(const TBalancingConfig& cfg, TPoolId poolId, ui64 currentCpus, double cpuIdle, ui64 addLatencyUs, ui64 worstLatencyUs) { ScaleFactor = double(currentCpus) / cfg.Cpus; - if (cpuIdle > 1.3) { // TODO: add a better underload criterion, based on estimated latency w/o 1 cpu + if ((worstLatencyUs + addLatencyUs) < 2000 && cpuIdle > 1.0) { // Uderload criterion, based on estimated latency w/o 1 cpu LoadClass = Underloaded; - } else if (cpuIdle < 0.2) { // TODO: add a better overload criterion, based on latency + } else if (worstLatencyUs > 2000 || cpuIdle < 0.2) { // Overload criterion, based on latency LoadClass = Overloaded; } else { LoadClass = Moderate; @@ -82,6 +83,8 @@ namespace NActors { TBalancerConfig Config; public: + + ui64 GetPeriodUs() override; // Setup TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts); bool AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* cpu) override; @@ -238,9 +241,12 @@ namespace NActors { } // Compute levels - pool.CurLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus, pool.CpuIdle); - pool.AddLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus + 1, pool.CpuIdle); // we expect taken cpu to became utilized - pool.SubLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus - 1, pool.CpuIdle - 1); + pool.CurLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus, pool.CpuIdle, + pool.Next.ExpectedLatencyIncreaseUs, pool.Next.WorstActivationTimeUs); + pool.AddLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus + 1, pool.CpuIdle, + 0, pool.Next.WorstActivationTimeUs); // we expect taken cpu to became utilized + pool.SubLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus - 1, pool.CpuIdle - 1, + pool.Next.ExpectedLatencyIncreaseUs, pool.Next.WorstActivationTimeUs); // Prepare for balancing pool.PrevCpus = pool.CurrentCpus; @@ -263,7 +269,7 @@ namespace NActors { TPool& from = **fromIter; if (from.CurrentCpus == from.PrevCpus && // if not balanced yet from.CurrentCpus > from.Config.MinCpus && // and constraints would not be violated - from.SubLevel.Importance < to.AddLevel.Importance) // and which of two pools is more important would not change after cpu movement + from.SubLevel.Importance <= to.AddLevel.Importance) // and which of two pools is more important would not change after cpu movement { MoveCpu(from, to); from.CurrentCpus--; @@ -295,6 +301,10 @@ namespace NActors { Lock.Release(); } + ui64 TBalancer::GetPeriodUs() { + return Config.PeriodUs; + } + IBalancer* MakeBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts) { return new TBalancer(config, unitedPools, ts); } diff --git a/library/cpp/actors/core/balancer.h b/library/cpp/actors/core/balancer.h index 9763ec79e1..e1f6f33bf3 100644 --- a/library/cpp/actors/core/balancer.h +++ b/library/cpp/actors/core/balancer.h @@ -10,6 +10,8 @@ namespace NActors { ui64 Ts = 0; // Measurement timestamp ui64 CpuUs = 0; // Total cpu microseconds consumed by pool on all cpus since start ui64 IdleUs = ui64(-1); // Total cpu microseconds in spinning or waiting on futex + ui64 WorstActivationTimeUs = 0; + ui64 ExpectedLatencyIncreaseUs = 0; }; // Pool cpu balancer @@ -20,6 +22,7 @@ namespace NActors { virtual void SetPoolStats(TPoolId pool, const TBalancerStats& stats) = 0; virtual void Balance() = 0; virtual void Unlock() = 0; + virtual ui64 GetPeriodUs() = 0; // TODO: add method for reconfiguration on fly }; diff --git a/library/cpp/actors/core/config.h b/library/cpp/actors/core/config.h index 0d65815fd9..1d875f4c12 100644 --- a/library/cpp/actors/core/config.h +++ b/library/cpp/actors/core/config.h @@ -41,6 +41,9 @@ namespace NActors { ui32 EventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX; int RealtimePriority = 0; ui32 MaxActivityType = 5; + i16 MinThreadCount = 0; + i16 MaxThreadCount = 0; + i16 DefaultThreadCount = 0; }; struct TIOExecutorPoolConfig { diff --git a/library/cpp/actors/core/cpu_manager.cpp b/library/cpp/actors/core/cpu_manager.cpp index 39089b5d83..d9672272a0 100644 --- a/library/cpp/actors/core/cpu_manager.cpp +++ b/library/cpp/actors/core/cpu_manager.cpp @@ -16,10 +16,14 @@ namespace NActors { UnitedWorkers.Reset(new TUnitedWorkers(Config.UnitedWorkers, Config.United, allocation, Balancer.Get())); } + ui64 ts = GetCycleCountFast(); + Harmonizer.Reset(MakeHarmonizer(ts)); + Executors.Reset(new TAutoPtr<IExecutorPool>[ExecutorPoolCount]); for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { Executors[excIdx].Reset(CreateExecutorPool(excIdx)); + Harmonizer->AddPool(Executors[excIdx].Get()); } } @@ -89,7 +93,7 @@ namespace NActors { IExecutorPool* TCpuManager::CreateExecutorPool(ui32 poolId) { for (TBasicExecutorPoolConfig& cfg : Config.Basic) { if (cfg.PoolId == poolId) { - return new TBasicExecutorPool(cfg); + return new TBasicExecutorPool(cfg, Harmonizer.Get()); } } for (TIOExecutorPoolConfig& cfg : Config.IO) { diff --git a/library/cpp/actors/core/cpu_manager.h b/library/cpp/actors/core/cpu_manager.h index 454035477b..42bede91b8 100644 --- a/library/cpp/actors/core/cpu_manager.h +++ b/library/cpp/actors/core/cpu_manager.h @@ -1,6 +1,7 @@ #pragma once #include "actorsystem.h" +#include "harmonizer.h" #include "executor_pool_basic.h" #include "executor_pool_io.h" #include "executor_pool_united.h" @@ -11,6 +12,7 @@ namespace NActors { TArrayHolder<TAutoPtr<IExecutorPool>> Executors; THolder<TUnitedWorkers> UnitedWorkers; THolder<IBalancer> Balancer; + THolder<IHarmonizer> Harmonizer; TCpuManagerConfig Config; public: explicit TCpuManager(THolder<TActorSystemSetup>& setup) diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index 7dff052d3e..36e295c231 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -18,11 +18,15 @@ namespace NActors { ui32 threads, ui64 spinThreshold, const TString& poolName, + IHarmonizer *harmonizer, TAffinity* affinity, TDuration timePerMailbox, ui32 eventsPerMailbox, int realtimePriority, - ui32 maxActivityType) + ui32 maxActivityType, + i16 minThreadCount, + i16 maxThreadCount, + i16 defaultThreadCount) : TExecutorPoolBase(poolId, threads, affinity, maxActivityType) , SpinThreshold(spinThreshold) , SpinThresholdCycles(spinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles @@ -35,22 +39,45 @@ namespace NActors { , MaxUtilizationCounter(0) , MaxUtilizationAccumulator(0) , ThreadCount(threads) + , MinThreadCount(minThreadCount) + , MaxThreadCount(maxThreadCount) + , DefaultThreadCount(defaultThreadCount) + , Harmonizer(harmonizer) { + i16 limit = Min(threads, (ui32)Max<i16>()); + if (DefaultThreadCount) { + DefaultThreadCount = Min(DefaultThreadCount, limit); + } else { + DefaultThreadCount = limit; + } + + MaxThreadCount = Min(Max(MaxThreadCount, DefaultThreadCount), limit); + + if (MinThreadCount) { + MinThreadCount = Max((i16)1, Min(MinThreadCount, DefaultThreadCount)); + } else { + MinThreadCount = DefaultThreadCount; + } + ThreadCount = MaxThreadCount; auto semaphore = TSemaphore(); Semaphore = semaphore.ConverToI64(); } - TBasicExecutorPool::TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg) + TBasicExecutorPool::TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer) : TBasicExecutorPool( cfg.PoolId, cfg.Threads, cfg.SpinThreshold, cfg.PoolName, + harmonizer, new TAffinity(cfg.Affinity), cfg.TimePerMailbox, cfg.EventsPerMailbox, cfg.RealtimePriority, - cfg.MaxActivityType + cfg.MaxActivityType, + cfg.MinThreadCount, + cfg.MaxThreadCount, + cfg.DefaultThreadCount ) {} @@ -179,6 +206,11 @@ namespace NActors { TTimers timers; + if (Harmonizer) { + LWPROBE(TryToHarmonize, PoolId, PoolName); + Harmonizer->Harmonize(timers.HPStart); + } + TThreadCtx& threadCtx = Threads[workerId]; AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_NONE); @@ -399,12 +431,66 @@ namespace NActors { with_lock (ChangeThreadsLock) { size_t prevCount = GetThreadCount(); AtomicSet(ThreadCount, threads); - TSemaphore semaphore = TSemaphore::GetSemaphore(AtomicGet(Semaphore)); i64 oldX = semaphore.ConverToI64(); - semaphore.CurrentSleepThreadCount += threads - prevCount; - semaphore.OldSemaphore -= threads - prevCount; + if (threads > prevCount) { + semaphore.CurrentSleepThreadCount += (i64)threads - prevCount; + semaphore.OldSemaphore -= (i64)threads - prevCount; + } else { + semaphore.CurrentSleepThreadCount -= (i64)prevCount - threads; + semaphore.OldSemaphore += prevCount - threads; + } AtomicAdd(Semaphore, semaphore.ConverToI64() - oldX); + LWPROBE(ThreadCount, PoolId, PoolName, threads, MinThreadCount, MaxThreadCount, DefaultThreadCount); } } + + i16 TBasicExecutorPool::GetDefaultThreadCount() const { + return DefaultThreadCount; + } + + i16 TBasicExecutorPool::GetMinThreadCount() const { + return MinThreadCount; + } + + i16 TBasicExecutorPool::GetMaxThreadCount() const { + return MaxThreadCount; + } + + bool TBasicExecutorPool::IsThreadBeingStopped(i16 threadIdx) const { + if ((ui32)threadIdx >= PoolThreads) { + return false; + } + auto blockedFlag = AtomicGet(Threads[threadIdx].BlockedFlag); + if (blockedFlag == TThreadCtx::BS_BLOCKING) { + return true; + } + return false; + } + + double TBasicExecutorPool::GetThreadConsumedUs(i16 threadIdx) { + if ((ui32)threadIdx >= PoolThreads) { + return 0; + } + TThreadCtx& threadCtx = Threads[threadIdx]; + TExecutorThreadStats stats; + threadCtx.Thread->GetCurrentStats(stats); + return Ts2Us(stats.ElapsedTicks); + } + + double TBasicExecutorPool::GetThreadBookedUs(i16 threadIdx) { + if ((ui32)threadIdx >= PoolThreads) { + return 0; + } + TThreadCtx& threadCtx = Threads[threadIdx]; + TExecutorThreadStats stats; + threadCtx.Thread->GetCurrentStats(stats); + return stats.CpuNs / 1000.0; + } + + i16 TBasicExecutorPool::GetBlockingThreadCount() const { + TAtomic x = AtomicGet(Semaphore); + TSemaphore semaphore = TSemaphore::GetSemaphore(x); + return -Min<i16>(semaphore.CurrentSleepThreadCount, 0); + } } diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h index e65ad20a48..9185ed18f1 100644 --- a/library/cpp/actors/core/executor_pool_basic.h +++ b/library/cpp/actors/core/executor_pool_basic.h @@ -4,6 +4,7 @@ #include "executor_thread.h" #include "scheduler_queue.h" #include "executor_pool_base.h" +#include "harmonizer.h" #include <library/cpp/actors/util/unordered_cache.h> #include <library/cpp/actors/util/threadparkpad.h> #include <library/cpp/monlib/dynamic_counters/counters.h> @@ -74,6 +75,11 @@ namespace NActors { TAtomic ThreadCount; TMutex ChangeThreadsLock; + i16 MinThreadCount; + i16 MaxThreadCount; + i16 DefaultThreadCount; + IHarmonizer *Harmonizer; + public: struct TSemaphore { i64 OldSemaphore = 0; // 34 bits @@ -107,12 +113,16 @@ namespace NActors { ui32 threads, ui64 spinThreshold, const TString& poolName = "", + IHarmonizer *harmonizer = nullptr, TAffinity* affinity = nullptr, TDuration timePerMailbox = DEFAULT_TIME_PER_MAILBOX, ui32 eventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX, int realtimePriority = 0, - ui32 maxActivityType = 1); - explicit TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg); + ui32 maxActivityType = 1, + i16 minThreadCount = 0, + i16 maxThreadCount = 0, + i16 defaultThreadCount = 0); + explicit TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer); ~TBasicExecutorPool(); ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingReadCounter) override; @@ -135,8 +145,15 @@ namespace NActors { void SetRealTimeMode() const override; - ui32 GetThreadCount() const; - void SetThreadCount(ui32 threads); + ui32 GetThreadCount() const override; + void SetThreadCount(ui32 threads) override; + i16 GetDefaultThreadCount() const override; + i16 GetMinThreadCount() const override; + i16 GetMaxThreadCount() const override; + bool IsThreadBeingStopped(i16 threadIdx) const override; + double GetThreadConsumedUs(i16 threadIdx) override; + double GetThreadBookedUs(i16 threadIdx) override; + i16 GetBlockingThreadCount() const override; private: void WakeUpLoop(); diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp index def7a2f335..4910ddf965 100644 --- a/library/cpp/actors/core/executor_pool_united.cpp +++ b/library/cpp/actors/core/executor_pool_united.cpp @@ -7,6 +7,7 @@ #include "mailbox.h" #include "scheduler_queue.h" #include <library/cpp/actors/util/affinity.h> +#include <library/cpp/actors/util/cpu_load_log.h> #include <library/cpp/actors/util/datetime.h> #include <library/cpp/actors/util/futex.h> #include <library/cpp/actors/util/intrinsics.h> @@ -955,6 +956,8 @@ namespace NActors { // Thread-safe per pool stats // NOTE: It's guaranteed that cpu never executes two instance of the same pool TVector<TExecutorThreadStats> PoolStats; + TCpuLoadLog<1024> LoadLog; + // Configuration TCpuId CpuId; @@ -1002,7 +1005,9 @@ namespace NActors { } bool ActiveWait(ui64 spinThresholdTs, TPoolId& result) { - ui64 deadline = GetCycleCountFast() + spinThresholdTs; + ui64 ts = GetCycleCountFast(); + LoadLog.RegisterBusyPeriod(ts); + ui64 deadline = ts + spinThresholdTs; while (GetCycleCountFast() < deadline) { for (ui32 i = 0; i < 12; ++i) { TPoolId current = State.CurrentPool(); @@ -1010,6 +1015,7 @@ namespace NActors { SpinLockPause(); } else { result = current; + LoadLog.RegisterIdlePeriod(GetCycleCountFast()); return true; // wakeup } } @@ -1271,15 +1277,25 @@ namespace NActors { if (Pools[pool].IsUnited()) { ui64 ElapsedTs = 0; ui64 ParkedTs = 0; + TStackVec<TCpuLoadLog<1024>*, 128> logs; + ui64 worstActivationTimeUs = 0; for (TCpu* cpu : Pools[pool].WakeOrderCpus) { - const TExecutorThreadStats& cpuStats = cpu->PoolStats[pool]; + TExecutorThreadStats& cpuStats = cpu->PoolStats[pool]; ElapsedTs += cpuStats.ElapsedTicks; ParkedTs += cpuStats.ParkedTicks; + worstActivationTimeUs = Max(worstActivationTimeUs, cpuStats.WorstActivationTimeUs); + cpuStats.WorstActivationTimeUs = 0; + logs.push_back(&cpu->LoadLog); } + ui64 minPeriodTs = Min(ui64(Us2Ts(Balancer->GetPeriodUs())), ui64((1024ull-2ull)*64ull*128ull*1024ull)); + ui64 estimatedTs = MinusOneCpuEstimator.MaxLatencyIncreaseWithOneLessCpu( + &logs[0], logs.size(), ts, minPeriodTs); TBalancerStats stats; stats.Ts = ts; stats.CpuUs = Ts2Us(ElapsedTs); stats.IdleUs = Ts2Us(ParkedTs); + stats.ExpectedLatencyIncreaseUs = Ts2Us(estimatedTs); + stats.WorstActivationTimeUs = worstActivationTimeUs; Balancer->SetPoolStats(pool, stats); } } @@ -1334,11 +1350,13 @@ namespace NActors { return result; } wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, timeTracker.Elapsed()); + cpu.LoadLog.RegisterBusyPeriod(GetCycleCountFast()); bool wakeup; do { wakeup = cpu.BlockedWait(result, Config.Balancer.PeriodUs * 1000); wctx.AddParkedCycles(timeTracker.Elapsed()); } while (!wakeup); + cpu.LoadLog.RegisterIdlePeriod(GetCycleCountFast()); return result; } diff --git a/library/cpp/actors/core/executor_pool_united.h b/library/cpp/actors/core/executor_pool_united.h index a090ba2466..0895b06462 100644 --- a/library/cpp/actors/core/executor_pool_united.h +++ b/library/cpp/actors/core/executor_pool_united.h @@ -8,6 +8,7 @@ #include <library/cpp/actors/util/unordered_cache.h> #include <library/cpp/monlib/dynamic_counters/counters.h> +#include <library/cpp/actors/util/cpu_load_log.h> #include <library/cpp/actors/util/unordered_cache.h> #include <library/cpp/containers/stack_vector/stack_vec.h> @@ -34,6 +35,7 @@ namespace NActors { TCpuAllocationConfig Allocation; volatile bool StopFlag = false; + TMinusOneCpuEstimator<1024> MinusOneCpuEstimator; public: TUnitedWorkers( diff --git a/library/cpp/actors/core/executor_pool_united_ut.cpp b/library/cpp/actors/core/executor_pool_united_ut.cpp index 8a59bef974..133e9c5f2a 100644 --- a/library/cpp/actors/core/executor_pool_united_ut.cpp +++ b/library/cpp/actors/core/executor_pool_united_ut.cpp @@ -111,6 +111,10 @@ struct TRoundRobinBalancer: public IBalancer { State->Load(assigned, current); State->AssignPool(NextPool[assigned]); } + + ui64 GetPeriodUs() override { + return 1000; + } }; void AddUnitedPool(THolder<TActorSystemSetup>& setup, ui32 concurrency = 0) { diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp new file mode 100644 index 0000000000..3a0f4109bb --- /dev/null +++ b/library/cpp/actors/core/harmonizer.cpp @@ -0,0 +1,313 @@ +#include "harmonizer.h" + +#include "probes.h" +#include "actorsystem.h" + +#include <library/cpp/actors/util/cpu_load_log.h> +#include <library/cpp/actors/util/datetime.h> +#include <library/cpp/actors/util/intrinsics.h> + +#include <util/system/spinlock.h> + +#include <algorithm> + +namespace NActors { + +LWTRACE_USING(ACTORLIB_PROVIDER); + +struct TValueHistory { + double History[8] = {0.0}; + ui64 HistoryIdx = 0; + ui64 LastTs = Max<ui64>(); + double LastUs = 0.0; + double AccumulatedUs = 0.0; + ui64 AccumulatedTs = 0; + + double GetAvgPart() { + double sum = AccumulatedUs; + for (size_t idx = 0; idx < (sizeof(History) / sizeof(History[0])); ++idx) { + sum += History[idx]; + } + double duration = 1'000'000.0 * (sizeof(History) / sizeof(History[0])) + Ts2Us(AccumulatedTs); + double avg = sum / duration; + return avg; + } + + void Register(ui64 ts, double valueUs) { + if (ts < LastTs) { + LastTs = ts; + LastUs = valueUs; + AccumulatedUs = 0.0; + AccumulatedTs = 0; + return; + } + ui64 lastTs = std::exchange(LastTs, ts); + ui64 dTs = ts - lastTs; + double lastUs = std::exchange(LastUs, valueUs); + double dUs = valueUs - lastUs; + LWPROBE(RegisterValue, ts, lastTs, dTs, Us2Ts(8'000'000.0), valueUs, lastUs, dUs); + + if (dTs > Us2Ts(8'000'000.0)) { + dUs = dUs * 1'000'000.0 / Ts2Us(dTs); + for (size_t idx = 0; idx < (sizeof(History) / sizeof(History[0])); ++idx) { + History[idx] = dUs; + } + AccumulatedUs = 0.0; + AccumulatedTs = 0; + return; + } + + while (dTs > 0) { + if (AccumulatedTs + dTs < Us2Ts(1'000'000.0)) { + AccumulatedTs += dTs; + AccumulatedUs += dUs; + break; + } else { + ui64 addTs = Us2Ts(1'000'000.0) - AccumulatedTs; + double addUs = dUs * addTs / dTs; + dTs -= addTs; + dUs -= addUs; + History[HistoryIdx] = AccumulatedUs + addUs; + HistoryIdx = (HistoryIdx + 1) % (sizeof(History) / sizeof(History[0])); + AccumulatedUs = 0.0; + AccumulatedTs = 0; + } + } + } +}; + +struct TThreadInfo { + TValueHistory Consumed; + TValueHistory Booked; +}; + +struct TPoolInfo { + std::vector<TThreadInfo> ThreadInfo; + IExecutorPool* Pool = nullptr; + i16 DefaultThreadCount = 0; + i16 MinThreadCount = 0; + i16 MaxThreadCount = 0; + + bool IsBeingStopped(i16 threadIdx); + double GetBooked(i16 threadIdx); + double GetConsumed(i16 threadIdx); + void PullStats(ui64 ts); + i16 GetThreadCount(); + void SetThreadCount(i16 threadCount); +}; + +bool TPoolInfo::IsBeingStopped(i16 threadIdx) { + return Pool->IsThreadBeingStopped(threadIdx); +} + +double TPoolInfo::GetBooked(i16 threadIdx) { + if ((size_t)threadIdx < ThreadInfo.size()) { + return ThreadInfo[threadIdx].Booked.GetAvgPart(); + } + return 0.0; + //return Pool->GetThreadBooked(threadIdx); +} + +double TPoolInfo::GetConsumed(i16 threadIdx) { + if ((size_t)threadIdx < ThreadInfo.size()) { + return ThreadInfo[threadIdx].Consumed.GetAvgPart(); + } + return 0.0; + //return Pool->GetThreadConsumed(threadIdx); +} + +#define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7] +void TPoolInfo::PullStats(ui64 ts) { + for (i16 threadIdx = 0; threadIdx < MaxThreadCount; ++threadIdx) { + TThreadInfo &threadInfo = ThreadInfo[threadIdx]; + threadInfo.Consumed.Register(ts, Pool->GetThreadConsumedUs(threadIdx)); + LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History)); + threadInfo.Booked.Register(ts, Pool->GetThreadBookedUs(threadIdx)); + LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History)); + } +} +#undef UNROLL_HISTORY + +i16 TPoolInfo::GetThreadCount() { + return Pool->GetThreadCount(); +} + +void TPoolInfo::SetThreadCount(i16 threadCount) { + Pool->SetThreadCount(threadCount); +} + +class THarmonizer: public IHarmonizer { +private: + std::atomic<bool> IsDisabled = false; + TSpinLock Lock; + std::atomic<ui64> NextHarmonizeTs = 0; + std::vector<TPoolInfo> Pools; + + void PullStats(ui64 ts); + void HarmonizeImpl(); +public: + THarmonizer(ui64 ts); + virtual ~THarmonizer(); + double Rescale(double value) const; + void Harmonize(ui64 ts) override; + void DeclareEmergency(ui64 ts) override; + void AddPool(IExecutorPool* pool) override; + void Enable(bool enable) override; +}; + +THarmonizer::THarmonizer(ui64 ts) { + NextHarmonizeTs = ts; +} + +THarmonizer::~THarmonizer() { +} + +double THarmonizer::Rescale(double value) const { + return Max(0.0, Min(1.0, value * (1.0/0.9))); +} + +void THarmonizer::PullStats(ui64 ts) { + for (TPoolInfo &pool : Pools) { + pool.PullStats(ts); + } +} + +void THarmonizer::HarmonizeImpl() { + bool isStarvedPresent = false; + double booked = 0.0; + double consumed = 0.0; + i64 beingStopped = 0; + i64 total = 0; + TStackVec<size_t, 8> needyPools; + TStackVec<size_t, 8> hoggishPools; + for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { + TPoolInfo& pool = Pools[poolIdx]; + total += pool.DefaultThreadCount; + double poolBooked = 0.0; + double poolConsumed = 0.0; + beingStopped += pool.Pool->GetBlockingThreadCount(); + for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) { + poolBooked += Rescale(pool.GetBooked(threadIdx)); + poolConsumed += Rescale(pool.GetConsumed(threadIdx)); + } + bool isStarved = false; + if (Max(consumed, booked) > 0.1 && consumed < booked * 0.7) { + isStarvedPresent = true; + isStarved = true; + } + ui32 currentThreadCount = pool.GetThreadCount(); + bool isNeedy = false; + if (poolBooked >= currentThreadCount) { + needyPools.push_back(poolIdx); + isNeedy = true; + } + bool isHoggish = false; + if (poolBooked < currentThreadCount - 1) { + hoggishPools.push_back(poolIdx); + isHoggish = true; + } + booked += poolBooked; + consumed += poolConsumed; + LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish); + } + double budget = total - booked; + double overbooked = consumed - booked; + if (isStarvedPresent) { + // last_starved_at_consumed_value = сумма по всем пулам consumed; + // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total, + // использовать вместо total + if (beingStopped && beingStopped >= overbooked) { + // do nothing + } else { + TStackVec<size_t> reorder; + for (size_t i = 0; i < Pools.size(); ++i) { + reorder.push_back(i); + } + while (!reorder.empty()) { + size_t rndIdx = rand() % reorder.size(); + size_t poolIdx = reorder[rndIdx]; + reorder[rndIdx] = reorder.back(); + reorder.pop_back(); + + TPoolInfo &pool = Pools[poolIdx]; + i64 threadCount = pool.GetThreadCount(); + if (threadCount > pool.DefaultThreadCount) { + pool.SetThreadCount(threadCount - 1); + overbooked--; + LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount); + if (overbooked < 1) { + break; + } + } + } + } + } else { + for (size_t needyPoolIdx : needyPools) { + TPoolInfo &pool = Pools[needyPoolIdx]; + if (budget >= 1.0) { + i64 threadCount = pool.GetThreadCount(); + if (threadCount + 1 <= pool.MaxThreadCount) { + pool.SetThreadCount(threadCount + 1); + budget -= 1.0; + LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase", threadCount + 1, pool.DefaultThreadCount, pool.MaxThreadCount); + } + } + } + } + for (size_t hoggishPoolIdx : hoggishPools) { + TPoolInfo &pool = Pools[hoggishPoolIdx]; + i64 threadCount = pool.GetThreadCount(); + if (threadCount > pool.MinThreadCount) { + LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount); + pool.SetThreadCount(threadCount - 1); + } + } +} + +void THarmonizer::Harmonize(ui64 ts) { + if (IsDisabled || NextHarmonizeTs > ts || !Lock.TryAcquire()) { + LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, false); + return; + } + // Check again under the lock + if (IsDisabled) { + LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, true); + Lock.Release(); + return; + } + // Will never reach this line disabled + ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull)); + LWPROBE(TryToHarmonizeSuccess, ts, NextHarmonizeTs, previousNextHarmonizeTs); + + PullStats(ts); + HarmonizeImpl(); + + Lock.Release(); +} + +void THarmonizer::DeclareEmergency(ui64 ts) { + NextHarmonizeTs = ts; +} + +void THarmonizer::AddPool(IExecutorPool* pool) { + TGuard<TSpinLock> guard(Lock); + TPoolInfo poolInfo; + poolInfo.Pool = pool; + poolInfo.DefaultThreadCount = pool->GetDefaultThreadCount(); + poolInfo.MinThreadCount = pool->GetMinThreadCount(); + poolInfo.MaxThreadCount = pool->GetMaxThreadCount(); + poolInfo.ThreadInfo.resize(poolInfo.MaxThreadCount); + pool->SetThreadCount(poolInfo.DefaultThreadCount); + Pools.push_back(poolInfo); +}; + +void THarmonizer::Enable(bool enable) { + TGuard<TSpinLock> guard(Lock); + IsDisabled = enable; +} + +IHarmonizer* MakeHarmonizer(ui64 ts) { + return new THarmonizer(ts); +} + +} diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h new file mode 100644 index 0000000000..dae8d7de0c --- /dev/null +++ b/library/cpp/actors/core/harmonizer.h @@ -0,0 +1,20 @@ +#pragma once + +#include "defs.h" +#include "config.h" + +namespace NActors { + class IExecutorPool; + + // Pool cpu harmonizer + class IHarmonizer { + public: + virtual ~IHarmonizer() {} + virtual void Harmonize(ui64 ts) = 0; + virtual void DeclareEmergency(ui64 ts) = 0; + virtual void AddPool(IExecutorPool* pool) = 0; + virtual void Enable(bool enable) = 0; + }; + + IHarmonizer* MakeHarmonizer(ui64 ts); +} diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h index 6d482926d1..fb76410590 100644 --- a/library/cpp/actors/core/mon_stats.h +++ b/library/cpp/actors/core/mon_stats.h @@ -69,6 +69,7 @@ namespace NActors { ui64 NonDeliveredEvents = 0; ui64 EmptyMailboxActivation = 0; ui64 CpuNs = 0; // nanoseconds thread was executing on CPU (accounts for preemtion) + ui64 WorstActivationTimeUs = 0; NHPTimer::STime ElapsedTicks = 0; NHPTimer::STime ParkedTicks = 0; NHPTimer::STime BlockedTicks = 0; @@ -111,6 +112,9 @@ namespace NActors { NonDeliveredEvents += RelaxedLoad(&other.NonDeliveredEvents); EmptyMailboxActivation += RelaxedLoad(&other.EmptyMailboxActivation); CpuNs += RelaxedLoad(&other.CpuNs); + RelaxedStore( + &WorstActivationTimeUs, + std::max(RelaxedLoad(&WorstActivationTimeUs), RelaxedLoad(&other.WorstActivationTimeUs))); ElapsedTicks += RelaxedLoad(&other.ElapsedTicks); ParkedTicks += RelaxedLoad(&other.ParkedTicks); BlockedTicks += RelaxedLoad(&other.BlockedTicks); diff --git a/library/cpp/actors/core/probes.h b/library/cpp/actors/core/probes.h index 4912d6dd26..33ac7b0f5e 100644 --- a/library/cpp/actors/core/probes.h +++ b/library/cpp/actors/core/probes.h @@ -166,6 +166,30 @@ PROBE(MoveCpu, GROUPS("PoolCpuBalancer"), \ TYPES(ui32, ui64, TString, TString, ui32), \ NAMES("fromPoolId", "toPoolId", "fromPool", "toPool", "cpu")) \ + PROBE(ThreadCount, GROUPS("BasicThreadPool"), \ + TYPES(ui32, TString, ui32, ui32, ui32, ui32), \ + NAMES("poolId", "pool", "threacCount", "minThreadCount", "maxThreadCount", "defaultThreadCount")) \ + PROBE(HarmonizeCheckPool, GROUPS("Harmonizer"), \ + TYPES(ui32, TString, double, double, ui32, ui32, bool, bool, bool), \ + NAMES("poolId", "pool", "booked", "consumed", "threadCount", "maxThreadCount", "isStarved", "isNeedy", "isHoggish")) \ + PROBE(HarmonizeOperation, GROUPS("Harmonizer"), \ + TYPES(ui32, TString, TString, ui32, ui32, ui32), \ + NAMES("poolId", "pool", "operation", "newCount", "minCount", "maxCount")) \ + PROBE(TryToHarmonize, GROUPS("Harmonizer"), \ + TYPES(ui32, TString), \ + NAMES("poolId", "pool")) \ + PROBE(SavedValues, GROUPS("Harmonizer"), \ + TYPES(ui32, TString, TString, double, double, double, double, double, double, double, double), \ + NAMES("poolId", "pool", "valueName", "[0]", "[1]", "[2]", "[3]", "[4]", "[5]", "[6]", "[7]")) \ + PROBE(RegisterValue, GROUPS("Harmonizer"), \ + TYPES(ui64, ui64, ui64, ui64, double, double, double), \ + NAMES("ts", "lastTs", "dTs", "8sTs", "us", "lastUs", "dUs")) \ + PROBE(TryToHarmonizeFailed, GROUPS("Harmonizer"), \ + TYPES(ui64, ui64, bool, bool), \ + NAMES("ts", "nextHarmonizeTs", "isDisabled", "withLock")) \ + PROBE(TryToHarmonizeSuccess, GROUPS("Harmonizer"), \ + TYPES(ui64, ui64, ui64), \ + NAMES("ts", "nextHarmonizeTs", "previousNextHarmonizeTs")) \ /**/ LWTRACE_DECLARE_PROVIDER(ACTORLIB_PROVIDER) diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h index b4c37a7629..384a13c5ee 100644 --- a/library/cpp/actors/core/worker_context.h +++ b/library/cpp/actors/core/worker_context.h @@ -95,6 +95,7 @@ namespace NActors { i64 ts = deliveredTs > scheduleTs ? deliveredTs - scheduleTs : 0; double usec = NHPTimer::GetSeconds(ts) * 1000000.0; Stats->ActivationTimeHistogram.Add(usec); + Stats->WorstActivationTimeUs = Max(Stats->WorstActivationTimeUs, (ui64)usec); return usec; } diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index efce0c04b1..e037c7a673 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -249,7 +249,7 @@ void AddExecutorPool( TBasicExecutorPoolConfig basic; basic.PoolId = poolId; basic.PoolName = poolConfig.GetName(); - basic.Threads = poolConfig.GetThreads(); + basic.Threads = Max(poolConfig.GetThreads(), poolConfig.GetMaxThreads()); basic.SpinThreshold = poolConfig.GetSpinThreshold(); basic.Affinity = ParseAffinity(poolConfig.GetAffinity()); basic.RealtimePriority = poolConfig.GetRealtimePriority(); @@ -265,6 +265,9 @@ void AddExecutorPool( basic.EventsPerMailbox = systemConfig.GetEventsPerMailbox(); } Y_VERIFY(basic.EventsPerMailbox != 0); + basic.MinThreadCount = poolConfig.GetMinThreads(); + basic.MaxThreadCount = poolConfig.GetMaxThreads(); + basic.DefaultThreadCount = poolConfig.GetThreads(); cpuManager.Basic.emplace_back(std::move(basic)); break; } |