aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2022-12-08 13:31:53 +0300
committerkruall <kruall@ydb.tech>2022-12-08 13:31:53 +0300
commit2e77ddea288b1279cb28699cdb446e659d97380d (patch)
treef22b186a28177482ab217f528ca2092aa0aaf2b3
parentd2f1358e178da6fc852cab295d41f92f8eb2d7a5 (diff)
downloadydb-2e77ddea288b1279cb28699cdb446e659d97380d.tar.gz
AS1.4,
-rw-r--r--library/cpp/actors/core/CMakeLists.txt1
-rw-r--r--library/cpp/actors/core/actor_ut.cpp6
-rw-r--r--library/cpp/actors/core/actorsystem.h41
-rw-r--r--library/cpp/actors/core/balancer.cpp26
-rw-r--r--library/cpp/actors/core/balancer.h3
-rw-r--r--library/cpp/actors/core/config.h3
-rw-r--r--library/cpp/actors/core/cpu_manager.cpp6
-rw-r--r--library/cpp/actors/core/cpu_manager.h2
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp98
-rw-r--r--library/cpp/actors/core/executor_pool_basic.h25
-rw-r--r--library/cpp/actors/core/executor_pool_united.cpp22
-rw-r--r--library/cpp/actors/core/executor_pool_united.h2
-rw-r--r--library/cpp/actors/core/executor_pool_united_ut.cpp4
-rw-r--r--library/cpp/actors/core/harmonizer.cpp313
-rw-r--r--library/cpp/actors/core/harmonizer.h20
-rw-r--r--library/cpp/actors/core/mon_stats.h4
-rw-r--r--library/cpp/actors/core/probes.h24
-rw-r--r--library/cpp/actors/core/worker_context.h1
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp5
19 files changed, 583 insertions, 23 deletions
diff --git a/library/cpp/actors/core/CMakeLists.txt b/library/cpp/actors/core/CMakeLists.txt
index c1c8d82623..7bbf9297c2 100644
--- a/library/cpp/actors/core/CMakeLists.txt
+++ b/library/cpp/actors/core/CMakeLists.txt
@@ -46,6 +46,7 @@ target_sources(cpp-actors-core PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/executor_pool_io.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/executor_pool_united.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/executor_thread.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/harmonizer.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/interconnect.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/io_dispatcher.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/log.cpp
diff --git a/library/cpp/actors/core/actor_ut.cpp b/library/cpp/actors/core/actor_ut.cpp
index 1d7f236a2e..3081c9305e 100644
--- a/library/cpp/actors/core/actor_ut.cpp
+++ b/library/cpp/actors/core/actor_ut.cpp
@@ -562,8 +562,12 @@ Y_UNIT_TEST_SUITE(TestDecorator) {
setup->NodeId = 0;
setup->ExecutorsCount = 1;
setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]);
+
+ ui64 ts = GetCycleCountFast();
+ THolder<IHarmonizer> harmonizer(MakeHarmonizer(ts));
for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
- setup->Executors[i] = new TBasicExecutorPool(i, 1, 10, "basic");
+ setup->Executors[i] = new TBasicExecutorPool(i, 1, 10, "basic", harmonizer.Get());
+ harmonizer->AddPool(setup->Executors[i].Get());
}
setup->Scheduler = new TBasicSchedulerThread;
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 3a47cc1603..6cb0f93792 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -130,6 +130,47 @@ namespace NActors {
virtual TAffinity* Affinity() const = 0;
virtual void SetRealTimeMode() const {}
+
+ virtual ui32 GetThreadCount() const {
+ return 1;
+ };
+
+ virtual void SetThreadCount(ui32 threads) {
+ Y_UNUSED(threads);
+ }
+
+ virtual i16 GetBlockingThreadCount() const {
+ return 0;
+ }
+
+ virtual i16 GetDefaultThreadCount() const {
+ return 1;
+ }
+
+ virtual i16 GetMinThreadCount() const {
+ return 1;
+ }
+
+ virtual i16 GetMaxThreadCount() const {
+ return 1;
+
+ }
+
+ virtual bool IsThreadBeingStopped(i16 threadIdx) const {
+ Y_UNUSED(threadIdx);
+ return false;
+ }
+
+ virtual double GetThreadConsumedUs(i16 threadIdx) {
+ Y_UNUSED(threadIdx);
+ return 0.0;
+ }
+
+ virtual double GetThreadBookedUs(i16 threadIdx) {
+ Y_UNUSED(threadIdx);
+ return 0.0;
+ }
+
};
// could be proxy to in-pool schedulers (for NUMA-aware executors)
diff --git a/library/cpp/actors/core/balancer.cpp b/library/cpp/actors/core/balancer.cpp
index 3dcc45c56b..d82701bbfb 100644
--- a/library/cpp/actors/core/balancer.cpp
+++ b/library/cpp/actors/core/balancer.cpp
@@ -2,8 +2,9 @@
#include "probes.h"
-#include <library/cpp/actors/util/intrinsics.h>
+#include <library/cpp/actors/util/cpu_load_log.h>
#include <library/cpp/actors/util/datetime.h>
+#include <library/cpp/actors/util/intrinsics.h>
#include <util/system/spinlock.h>
@@ -27,11 +28,11 @@ namespace NActors {
TLevel() {}
- TLevel(const TBalancingConfig& cfg, TPoolId poolId, ui64 currentCpus, double cpuIdle) {
+ TLevel(const TBalancingConfig& cfg, TPoolId poolId, ui64 currentCpus, double cpuIdle, ui64 addLatencyUs, ui64 worstLatencyUs) {
ScaleFactor = double(currentCpus) / cfg.Cpus;
- if (cpuIdle > 1.3) { // TODO: add a better underload criterion, based on estimated latency w/o 1 cpu
+ if ((worstLatencyUs + addLatencyUs) < 2000 && cpuIdle > 1.0) { // Uderload criterion, based on estimated latency w/o 1 cpu
LoadClass = Underloaded;
- } else if (cpuIdle < 0.2) { // TODO: add a better overload criterion, based on latency
+ } else if (worstLatencyUs > 2000 || cpuIdle < 0.2) { // Overload criterion, based on latency
LoadClass = Overloaded;
} else {
LoadClass = Moderate;
@@ -82,6 +83,8 @@ namespace NActors {
TBalancerConfig Config;
public:
+
+ ui64 GetPeriodUs() override;
// Setup
TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts);
bool AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* cpu) override;
@@ -238,9 +241,12 @@ namespace NActors {
}
// Compute levels
- pool.CurLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus, pool.CpuIdle);
- pool.AddLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus + 1, pool.CpuIdle); // we expect taken cpu to became utilized
- pool.SubLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus - 1, pool.CpuIdle - 1);
+ pool.CurLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus, pool.CpuIdle,
+ pool.Next.ExpectedLatencyIncreaseUs, pool.Next.WorstActivationTimeUs);
+ pool.AddLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus + 1, pool.CpuIdle,
+ 0, pool.Next.WorstActivationTimeUs); // we expect taken cpu to became utilized
+ pool.SubLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus - 1, pool.CpuIdle - 1,
+ pool.Next.ExpectedLatencyIncreaseUs, pool.Next.WorstActivationTimeUs);
// Prepare for balancing
pool.PrevCpus = pool.CurrentCpus;
@@ -263,7 +269,7 @@ namespace NActors {
TPool& from = **fromIter;
if (from.CurrentCpus == from.PrevCpus && // if not balanced yet
from.CurrentCpus > from.Config.MinCpus && // and constraints would not be violated
- from.SubLevel.Importance < to.AddLevel.Importance) // and which of two pools is more important would not change after cpu movement
+ from.SubLevel.Importance <= to.AddLevel.Importance) // and which of two pools is more important would not change after cpu movement
{
MoveCpu(from, to);
from.CurrentCpus--;
@@ -295,6 +301,10 @@ namespace NActors {
Lock.Release();
}
+ ui64 TBalancer::GetPeriodUs() {
+ return Config.PeriodUs;
+ }
+
IBalancer* MakeBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts) {
return new TBalancer(config, unitedPools, ts);
}
diff --git a/library/cpp/actors/core/balancer.h b/library/cpp/actors/core/balancer.h
index 9763ec79e1..e1f6f33bf3 100644
--- a/library/cpp/actors/core/balancer.h
+++ b/library/cpp/actors/core/balancer.h
@@ -10,6 +10,8 @@ namespace NActors {
ui64 Ts = 0; // Measurement timestamp
ui64 CpuUs = 0; // Total cpu microseconds consumed by pool on all cpus since start
ui64 IdleUs = ui64(-1); // Total cpu microseconds in spinning or waiting on futex
+ ui64 WorstActivationTimeUs = 0;
+ ui64 ExpectedLatencyIncreaseUs = 0;
};
// Pool cpu balancer
@@ -20,6 +22,7 @@ namespace NActors {
virtual void SetPoolStats(TPoolId pool, const TBalancerStats& stats) = 0;
virtual void Balance() = 0;
virtual void Unlock() = 0;
+ virtual ui64 GetPeriodUs() = 0;
// TODO: add method for reconfiguration on fly
};
diff --git a/library/cpp/actors/core/config.h b/library/cpp/actors/core/config.h
index 0d65815fd9..1d875f4c12 100644
--- a/library/cpp/actors/core/config.h
+++ b/library/cpp/actors/core/config.h
@@ -41,6 +41,9 @@ namespace NActors {
ui32 EventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX;
int RealtimePriority = 0;
ui32 MaxActivityType = 5;
+ i16 MinThreadCount = 0;
+ i16 MaxThreadCount = 0;
+ i16 DefaultThreadCount = 0;
};
struct TIOExecutorPoolConfig {
diff --git a/library/cpp/actors/core/cpu_manager.cpp b/library/cpp/actors/core/cpu_manager.cpp
index 39089b5d83..d9672272a0 100644
--- a/library/cpp/actors/core/cpu_manager.cpp
+++ b/library/cpp/actors/core/cpu_manager.cpp
@@ -16,10 +16,14 @@ namespace NActors {
UnitedWorkers.Reset(new TUnitedWorkers(Config.UnitedWorkers, Config.United, allocation, Balancer.Get()));
}
+ ui64 ts = GetCycleCountFast();
+ Harmonizer.Reset(MakeHarmonizer(ts));
+
Executors.Reset(new TAutoPtr<IExecutorPool>[ExecutorPoolCount]);
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
Executors[excIdx].Reset(CreateExecutorPool(excIdx));
+ Harmonizer->AddPool(Executors[excIdx].Get());
}
}
@@ -89,7 +93,7 @@ namespace NActors {
IExecutorPool* TCpuManager::CreateExecutorPool(ui32 poolId) {
for (TBasicExecutorPoolConfig& cfg : Config.Basic) {
if (cfg.PoolId == poolId) {
- return new TBasicExecutorPool(cfg);
+ return new TBasicExecutorPool(cfg, Harmonizer.Get());
}
}
for (TIOExecutorPoolConfig& cfg : Config.IO) {
diff --git a/library/cpp/actors/core/cpu_manager.h b/library/cpp/actors/core/cpu_manager.h
index 454035477b..42bede91b8 100644
--- a/library/cpp/actors/core/cpu_manager.h
+++ b/library/cpp/actors/core/cpu_manager.h
@@ -1,6 +1,7 @@
#pragma once
#include "actorsystem.h"
+#include "harmonizer.h"
#include "executor_pool_basic.h"
#include "executor_pool_io.h"
#include "executor_pool_united.h"
@@ -11,6 +12,7 @@ namespace NActors {
TArrayHolder<TAutoPtr<IExecutorPool>> Executors;
THolder<TUnitedWorkers> UnitedWorkers;
THolder<IBalancer> Balancer;
+ THolder<IHarmonizer> Harmonizer;
TCpuManagerConfig Config;
public:
explicit TCpuManager(THolder<TActorSystemSetup>& setup)
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index 7dff052d3e..36e295c231 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -18,11 +18,15 @@ namespace NActors {
ui32 threads,
ui64 spinThreshold,
const TString& poolName,
+ IHarmonizer *harmonizer,
TAffinity* affinity,
TDuration timePerMailbox,
ui32 eventsPerMailbox,
int realtimePriority,
- ui32 maxActivityType)
+ ui32 maxActivityType,
+ i16 minThreadCount,
+ i16 maxThreadCount,
+ i16 defaultThreadCount)
: TExecutorPoolBase(poolId, threads, affinity, maxActivityType)
, SpinThreshold(spinThreshold)
, SpinThresholdCycles(spinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles
@@ -35,22 +39,45 @@ namespace NActors {
, MaxUtilizationCounter(0)
, MaxUtilizationAccumulator(0)
, ThreadCount(threads)
+ , MinThreadCount(minThreadCount)
+ , MaxThreadCount(maxThreadCount)
+ , DefaultThreadCount(defaultThreadCount)
+ , Harmonizer(harmonizer)
{
+ i16 limit = Min(threads, (ui32)Max<i16>());
+ if (DefaultThreadCount) {
+ DefaultThreadCount = Min(DefaultThreadCount, limit);
+ } else {
+ DefaultThreadCount = limit;
+ }
+
+ MaxThreadCount = Min(Max(MaxThreadCount, DefaultThreadCount), limit);
+
+ if (MinThreadCount) {
+ MinThreadCount = Max((i16)1, Min(MinThreadCount, DefaultThreadCount));
+ } else {
+ MinThreadCount = DefaultThreadCount;
+ }
+ ThreadCount = MaxThreadCount;
auto semaphore = TSemaphore();
Semaphore = semaphore.ConverToI64();
}
- TBasicExecutorPool::TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg)
+ TBasicExecutorPool::TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer)
: TBasicExecutorPool(
cfg.PoolId,
cfg.Threads,
cfg.SpinThreshold,
cfg.PoolName,
+ harmonizer,
new TAffinity(cfg.Affinity),
cfg.TimePerMailbox,
cfg.EventsPerMailbox,
cfg.RealtimePriority,
- cfg.MaxActivityType
+ cfg.MaxActivityType,
+ cfg.MinThreadCount,
+ cfg.MaxThreadCount,
+ cfg.DefaultThreadCount
)
{}
@@ -179,6 +206,11 @@ namespace NActors {
TTimers timers;
+ if (Harmonizer) {
+ LWPROBE(TryToHarmonize, PoolId, PoolName);
+ Harmonizer->Harmonize(timers.HPStart);
+ }
+
TThreadCtx& threadCtx = Threads[workerId];
AtomicSet(threadCtx.WaitingFlag, TThreadCtx::WS_NONE);
@@ -399,12 +431,66 @@ namespace NActors {
with_lock (ChangeThreadsLock) {
size_t prevCount = GetThreadCount();
AtomicSet(ThreadCount, threads);
-
TSemaphore semaphore = TSemaphore::GetSemaphore(AtomicGet(Semaphore));
i64 oldX = semaphore.ConverToI64();
- semaphore.CurrentSleepThreadCount += threads - prevCount;
- semaphore.OldSemaphore -= threads - prevCount;
+ if (threads > prevCount) {
+ semaphore.CurrentSleepThreadCount += (i64)threads - prevCount;
+ semaphore.OldSemaphore -= (i64)threads - prevCount;
+ } else {
+ semaphore.CurrentSleepThreadCount -= (i64)prevCount - threads;
+ semaphore.OldSemaphore += prevCount - threads;
+ }
AtomicAdd(Semaphore, semaphore.ConverToI64() - oldX);
+ LWPROBE(ThreadCount, PoolId, PoolName, threads, MinThreadCount, MaxThreadCount, DefaultThreadCount);
}
}
+
+ i16 TBasicExecutorPool::GetDefaultThreadCount() const {
+ return DefaultThreadCount;
+ }
+
+ i16 TBasicExecutorPool::GetMinThreadCount() const {
+ return MinThreadCount;
+ }
+
+ i16 TBasicExecutorPool::GetMaxThreadCount() const {
+ return MaxThreadCount;
+ }
+
+ bool TBasicExecutorPool::IsThreadBeingStopped(i16 threadIdx) const {
+ if ((ui32)threadIdx >= PoolThreads) {
+ return false;
+ }
+ auto blockedFlag = AtomicGet(Threads[threadIdx].BlockedFlag);
+ if (blockedFlag == TThreadCtx::BS_BLOCKING) {
+ return true;
+ }
+ return false;
+ }
+
+ double TBasicExecutorPool::GetThreadConsumedUs(i16 threadIdx) {
+ if ((ui32)threadIdx >= PoolThreads) {
+ return 0;
+ }
+ TThreadCtx& threadCtx = Threads[threadIdx];
+ TExecutorThreadStats stats;
+ threadCtx.Thread->GetCurrentStats(stats);
+ return Ts2Us(stats.ElapsedTicks);
+ }
+
+ double TBasicExecutorPool::GetThreadBookedUs(i16 threadIdx) {
+ if ((ui32)threadIdx >= PoolThreads) {
+ return 0;
+ }
+ TThreadCtx& threadCtx = Threads[threadIdx];
+ TExecutorThreadStats stats;
+ threadCtx.Thread->GetCurrentStats(stats);
+ return stats.CpuNs / 1000.0;
+ }
+
+ i16 TBasicExecutorPool::GetBlockingThreadCount() const {
+ TAtomic x = AtomicGet(Semaphore);
+ TSemaphore semaphore = TSemaphore::GetSemaphore(x);
+ return -Min<i16>(semaphore.CurrentSleepThreadCount, 0);
+ }
}
diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h
index e65ad20a48..9185ed18f1 100644
--- a/library/cpp/actors/core/executor_pool_basic.h
+++ b/library/cpp/actors/core/executor_pool_basic.h
@@ -4,6 +4,7 @@
#include "executor_thread.h"
#include "scheduler_queue.h"
#include "executor_pool_base.h"
+#include "harmonizer.h"
#include <library/cpp/actors/util/unordered_cache.h>
#include <library/cpp/actors/util/threadparkpad.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
@@ -74,6 +75,11 @@ namespace NActors {
TAtomic ThreadCount;
TMutex ChangeThreadsLock;
+ i16 MinThreadCount;
+ i16 MaxThreadCount;
+ i16 DefaultThreadCount;
+ IHarmonizer *Harmonizer;
+
public:
struct TSemaphore {
i64 OldSemaphore = 0; // 34 bits
@@ -107,12 +113,16 @@ namespace NActors {
ui32 threads,
ui64 spinThreshold,
const TString& poolName = "",
+ IHarmonizer *harmonizer = nullptr,
TAffinity* affinity = nullptr,
TDuration timePerMailbox = DEFAULT_TIME_PER_MAILBOX,
ui32 eventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX,
int realtimePriority = 0,
- ui32 maxActivityType = 1);
- explicit TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg);
+ ui32 maxActivityType = 1,
+ i16 minThreadCount = 0,
+ i16 maxThreadCount = 0,
+ i16 defaultThreadCount = 0);
+ explicit TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer);
~TBasicExecutorPool();
ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingReadCounter) override;
@@ -135,8 +145,15 @@ namespace NActors {
void SetRealTimeMode() const override;
- ui32 GetThreadCount() const;
- void SetThreadCount(ui32 threads);
+ ui32 GetThreadCount() const override;
+ void SetThreadCount(ui32 threads) override;
+ i16 GetDefaultThreadCount() const override;
+ i16 GetMinThreadCount() const override;
+ i16 GetMaxThreadCount() const override;
+ bool IsThreadBeingStopped(i16 threadIdx) const override;
+ double GetThreadConsumedUs(i16 threadIdx) override;
+ double GetThreadBookedUs(i16 threadIdx) override;
+ i16 GetBlockingThreadCount() const override;
private:
void WakeUpLoop();
diff --git a/library/cpp/actors/core/executor_pool_united.cpp b/library/cpp/actors/core/executor_pool_united.cpp
index def7a2f335..4910ddf965 100644
--- a/library/cpp/actors/core/executor_pool_united.cpp
+++ b/library/cpp/actors/core/executor_pool_united.cpp
@@ -7,6 +7,7 @@
#include "mailbox.h"
#include "scheduler_queue.h"
#include <library/cpp/actors/util/affinity.h>
+#include <library/cpp/actors/util/cpu_load_log.h>
#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/actors/util/futex.h>
#include <library/cpp/actors/util/intrinsics.h>
@@ -955,6 +956,8 @@ namespace NActors {
// Thread-safe per pool stats
// NOTE: It's guaranteed that cpu never executes two instance of the same pool
TVector<TExecutorThreadStats> PoolStats;
+ TCpuLoadLog<1024> LoadLog;
+
// Configuration
TCpuId CpuId;
@@ -1002,7 +1005,9 @@ namespace NActors {
}
bool ActiveWait(ui64 spinThresholdTs, TPoolId& result) {
- ui64 deadline = GetCycleCountFast() + spinThresholdTs;
+ ui64 ts = GetCycleCountFast();
+ LoadLog.RegisterBusyPeriod(ts);
+ ui64 deadline = ts + spinThresholdTs;
while (GetCycleCountFast() < deadline) {
for (ui32 i = 0; i < 12; ++i) {
TPoolId current = State.CurrentPool();
@@ -1010,6 +1015,7 @@ namespace NActors {
SpinLockPause();
} else {
result = current;
+ LoadLog.RegisterIdlePeriod(GetCycleCountFast());
return true; // wakeup
}
}
@@ -1271,15 +1277,25 @@ namespace NActors {
if (Pools[pool].IsUnited()) {
ui64 ElapsedTs = 0;
ui64 ParkedTs = 0;
+ TStackVec<TCpuLoadLog<1024>*, 128> logs;
+ ui64 worstActivationTimeUs = 0;
for (TCpu* cpu : Pools[pool].WakeOrderCpus) {
- const TExecutorThreadStats& cpuStats = cpu->PoolStats[pool];
+ TExecutorThreadStats& cpuStats = cpu->PoolStats[pool];
ElapsedTs += cpuStats.ElapsedTicks;
ParkedTs += cpuStats.ParkedTicks;
+ worstActivationTimeUs = Max(worstActivationTimeUs, cpuStats.WorstActivationTimeUs);
+ cpuStats.WorstActivationTimeUs = 0;
+ logs.push_back(&cpu->LoadLog);
}
+ ui64 minPeriodTs = Min(ui64(Us2Ts(Balancer->GetPeriodUs())), ui64((1024ull-2ull)*64ull*128ull*1024ull));
+ ui64 estimatedTs = MinusOneCpuEstimator.MaxLatencyIncreaseWithOneLessCpu(
+ &logs[0], logs.size(), ts, minPeriodTs);
TBalancerStats stats;
stats.Ts = ts;
stats.CpuUs = Ts2Us(ElapsedTs);
stats.IdleUs = Ts2Us(ParkedTs);
+ stats.ExpectedLatencyIncreaseUs = Ts2Us(estimatedTs);
+ stats.WorstActivationTimeUs = worstActivationTimeUs;
Balancer->SetPoolStats(pool, stats);
}
}
@@ -1334,11 +1350,13 @@ namespace NActors {
return result;
}
wctx.AddElapsedCycles(IActor::ACTOR_SYSTEM, timeTracker.Elapsed());
+ cpu.LoadLog.RegisterBusyPeriod(GetCycleCountFast());
bool wakeup;
do {
wakeup = cpu.BlockedWait(result, Config.Balancer.PeriodUs * 1000);
wctx.AddParkedCycles(timeTracker.Elapsed());
} while (!wakeup);
+ cpu.LoadLog.RegisterIdlePeriod(GetCycleCountFast());
return result;
}
diff --git a/library/cpp/actors/core/executor_pool_united.h b/library/cpp/actors/core/executor_pool_united.h
index a090ba2466..0895b06462 100644
--- a/library/cpp/actors/core/executor_pool_united.h
+++ b/library/cpp/actors/core/executor_pool_united.h
@@ -8,6 +8,7 @@
#include <library/cpp/actors/util/unordered_cache.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/actors/util/cpu_load_log.h>
#include <library/cpp/actors/util/unordered_cache.h>
#include <library/cpp/containers/stack_vector/stack_vec.h>
@@ -34,6 +35,7 @@ namespace NActors {
TCpuAllocationConfig Allocation;
volatile bool StopFlag = false;
+ TMinusOneCpuEstimator<1024> MinusOneCpuEstimator;
public:
TUnitedWorkers(
diff --git a/library/cpp/actors/core/executor_pool_united_ut.cpp b/library/cpp/actors/core/executor_pool_united_ut.cpp
index 8a59bef974..133e9c5f2a 100644
--- a/library/cpp/actors/core/executor_pool_united_ut.cpp
+++ b/library/cpp/actors/core/executor_pool_united_ut.cpp
@@ -111,6 +111,10 @@ struct TRoundRobinBalancer: public IBalancer {
State->Load(assigned, current);
State->AssignPool(NextPool[assigned]);
}
+
+ ui64 GetPeriodUs() override {
+ return 1000;
+ }
};
void AddUnitedPool(THolder<TActorSystemSetup>& setup, ui32 concurrency = 0) {
diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp
new file mode 100644
index 0000000000..3a0f4109bb
--- /dev/null
+++ b/library/cpp/actors/core/harmonizer.cpp
@@ -0,0 +1,313 @@
+#include "harmonizer.h"
+
+#include "probes.h"
+#include "actorsystem.h"
+
+#include <library/cpp/actors/util/cpu_load_log.h>
+#include <library/cpp/actors/util/datetime.h>
+#include <library/cpp/actors/util/intrinsics.h>
+
+#include <util/system/spinlock.h>
+
+#include <algorithm>
+
+namespace NActors {
+
+LWTRACE_USING(ACTORLIB_PROVIDER);
+
+struct TValueHistory {
+ double History[8] = {0.0};
+ ui64 HistoryIdx = 0;
+ ui64 LastTs = Max<ui64>();
+ double LastUs = 0.0;
+ double AccumulatedUs = 0.0;
+ ui64 AccumulatedTs = 0;
+
+ double GetAvgPart() {
+ double sum = AccumulatedUs;
+ for (size_t idx = 0; idx < (sizeof(History) / sizeof(History[0])); ++idx) {
+ sum += History[idx];
+ }
+ double duration = 1'000'000.0 * (sizeof(History) / sizeof(History[0])) + Ts2Us(AccumulatedTs);
+ double avg = sum / duration;
+ return avg;
+ }
+
+ void Register(ui64 ts, double valueUs) {
+ if (ts < LastTs) {
+ LastTs = ts;
+ LastUs = valueUs;
+ AccumulatedUs = 0.0;
+ AccumulatedTs = 0;
+ return;
+ }
+ ui64 lastTs = std::exchange(LastTs, ts);
+ ui64 dTs = ts - lastTs;
+ double lastUs = std::exchange(LastUs, valueUs);
+ double dUs = valueUs - lastUs;
+ LWPROBE(RegisterValue, ts, lastTs, dTs, Us2Ts(8'000'000.0), valueUs, lastUs, dUs);
+
+ if (dTs > Us2Ts(8'000'000.0)) {
+ dUs = dUs * 1'000'000.0 / Ts2Us(dTs);
+ for (size_t idx = 0; idx < (sizeof(History) / sizeof(History[0])); ++idx) {
+ History[idx] = dUs;
+ }
+ AccumulatedUs = 0.0;
+ AccumulatedTs = 0;
+ return;
+ }
+
+ while (dTs > 0) {
+ if (AccumulatedTs + dTs < Us2Ts(1'000'000.0)) {
+ AccumulatedTs += dTs;
+ AccumulatedUs += dUs;
+ break;
+ } else {
+ ui64 addTs = Us2Ts(1'000'000.0) - AccumulatedTs;
+ double addUs = dUs * addTs / dTs;
+ dTs -= addTs;
+ dUs -= addUs;
+ History[HistoryIdx] = AccumulatedUs + addUs;
+ HistoryIdx = (HistoryIdx + 1) % (sizeof(History) / sizeof(History[0]));
+ AccumulatedUs = 0.0;
+ AccumulatedTs = 0;
+ }
+ }
+ }
+};
+
+struct TThreadInfo {
+ TValueHistory Consumed;
+ TValueHistory Booked;
+};
+
+struct TPoolInfo {
+ std::vector<TThreadInfo> ThreadInfo;
+ IExecutorPool* Pool = nullptr;
+ i16 DefaultThreadCount = 0;
+ i16 MinThreadCount = 0;
+ i16 MaxThreadCount = 0;
+
+ bool IsBeingStopped(i16 threadIdx);
+ double GetBooked(i16 threadIdx);
+ double GetConsumed(i16 threadIdx);
+ void PullStats(ui64 ts);
+ i16 GetThreadCount();
+ void SetThreadCount(i16 threadCount);
+};
+
+bool TPoolInfo::IsBeingStopped(i16 threadIdx) {
+ return Pool->IsThreadBeingStopped(threadIdx);
+}
+
+double TPoolInfo::GetBooked(i16 threadIdx) {
+ if ((size_t)threadIdx < ThreadInfo.size()) {
+ return ThreadInfo[threadIdx].Booked.GetAvgPart();
+ }
+ return 0.0;
+ //return Pool->GetThreadBooked(threadIdx);
+}
+
+double TPoolInfo::GetConsumed(i16 threadIdx) {
+ if ((size_t)threadIdx < ThreadInfo.size()) {
+ return ThreadInfo[threadIdx].Consumed.GetAvgPart();
+ }
+ return 0.0;
+ //return Pool->GetThreadConsumed(threadIdx);
+}
+
+#define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7]
+void TPoolInfo::PullStats(ui64 ts) {
+ for (i16 threadIdx = 0; threadIdx < MaxThreadCount; ++threadIdx) {
+ TThreadInfo &threadInfo = ThreadInfo[threadIdx];
+ threadInfo.Consumed.Register(ts, Pool->GetThreadConsumedUs(threadIdx));
+ LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History));
+ threadInfo.Booked.Register(ts, Pool->GetThreadBookedUs(threadIdx));
+ LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History));
+ }
+}
+#undef UNROLL_HISTORY
+
+i16 TPoolInfo::GetThreadCount() {
+ return Pool->GetThreadCount();
+}
+
+void TPoolInfo::SetThreadCount(i16 threadCount) {
+ Pool->SetThreadCount(threadCount);
+}
+
+class THarmonizer: public IHarmonizer {
+private:
+ std::atomic<bool> IsDisabled = false;
+ TSpinLock Lock;
+ std::atomic<ui64> NextHarmonizeTs = 0;
+ std::vector<TPoolInfo> Pools;
+
+ void PullStats(ui64 ts);
+ void HarmonizeImpl();
+public:
+ THarmonizer(ui64 ts);
+ virtual ~THarmonizer();
+ double Rescale(double value) const;
+ void Harmonize(ui64 ts) override;
+ void DeclareEmergency(ui64 ts) override;
+ void AddPool(IExecutorPool* pool) override;
+ void Enable(bool enable) override;
+};
+
+THarmonizer::THarmonizer(ui64 ts) {
+ NextHarmonizeTs = ts;
+}
+
+THarmonizer::~THarmonizer() {
+}
+
+double THarmonizer::Rescale(double value) const {
+ return Max(0.0, Min(1.0, value * (1.0/0.9)));
+}
+
+void THarmonizer::PullStats(ui64 ts) {
+ for (TPoolInfo &pool : Pools) {
+ pool.PullStats(ts);
+ }
+}
+
+void THarmonizer::HarmonizeImpl() {
+ bool isStarvedPresent = false;
+ double booked = 0.0;
+ double consumed = 0.0;
+ i64 beingStopped = 0;
+ i64 total = 0;
+ TStackVec<size_t, 8> needyPools;
+ TStackVec<size_t, 8> hoggishPools;
+ for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
+ TPoolInfo& pool = Pools[poolIdx];
+ total += pool.DefaultThreadCount;
+ double poolBooked = 0.0;
+ double poolConsumed = 0.0;
+ beingStopped += pool.Pool->GetBlockingThreadCount();
+ for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) {
+ poolBooked += Rescale(pool.GetBooked(threadIdx));
+ poolConsumed += Rescale(pool.GetConsumed(threadIdx));
+ }
+ bool isStarved = false;
+ if (Max(consumed, booked) > 0.1 && consumed < booked * 0.7) {
+ isStarvedPresent = true;
+ isStarved = true;
+ }
+ ui32 currentThreadCount = pool.GetThreadCount();
+ bool isNeedy = false;
+ if (poolBooked >= currentThreadCount) {
+ needyPools.push_back(poolIdx);
+ isNeedy = true;
+ }
+ bool isHoggish = false;
+ if (poolBooked < currentThreadCount - 1) {
+ hoggishPools.push_back(poolIdx);
+ isHoggish = true;
+ }
+ booked += poolBooked;
+ consumed += poolConsumed;
+ LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish);
+ }
+ double budget = total - booked;
+ double overbooked = consumed - booked;
+ if (isStarvedPresent) {
+ // last_starved_at_consumed_value = сумма по всем пулам consumed;
+ // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total,
+ // использовать вместо total
+ if (beingStopped && beingStopped >= overbooked) {
+ // do nothing
+ } else {
+ TStackVec<size_t> reorder;
+ for (size_t i = 0; i < Pools.size(); ++i) {
+ reorder.push_back(i);
+ }
+ while (!reorder.empty()) {
+ size_t rndIdx = rand() % reorder.size();
+ size_t poolIdx = reorder[rndIdx];
+ reorder[rndIdx] = reorder.back();
+ reorder.pop_back();
+
+ TPoolInfo &pool = Pools[poolIdx];
+ i64 threadCount = pool.GetThreadCount();
+ if (threadCount > pool.DefaultThreadCount) {
+ pool.SetThreadCount(threadCount - 1);
+ overbooked--;
+ LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
+ if (overbooked < 1) {
+ break;
+ }
+ }
+ }
+ }
+ } else {
+ for (size_t needyPoolIdx : needyPools) {
+ TPoolInfo &pool = Pools[needyPoolIdx];
+ if (budget >= 1.0) {
+ i64 threadCount = pool.GetThreadCount();
+ if (threadCount + 1 <= pool.MaxThreadCount) {
+ pool.SetThreadCount(threadCount + 1);
+ budget -= 1.0;
+ LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase", threadCount + 1, pool.DefaultThreadCount, pool.MaxThreadCount);
+ }
+ }
+ }
+ }
+ for (size_t hoggishPoolIdx : hoggishPools) {
+ TPoolInfo &pool = Pools[hoggishPoolIdx];
+ i64 threadCount = pool.GetThreadCount();
+ if (threadCount > pool.MinThreadCount) {
+ LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
+ pool.SetThreadCount(threadCount - 1);
+ }
+ }
+}
+
+void THarmonizer::Harmonize(ui64 ts) {
+ if (IsDisabled || NextHarmonizeTs > ts || !Lock.TryAcquire()) {
+ LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, false);
+ return;
+ }
+ // Check again under the lock
+ if (IsDisabled) {
+ LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, true);
+ Lock.Release();
+ return;
+ }
+ // Will never reach this line disabled
+ ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull));
+ LWPROBE(TryToHarmonizeSuccess, ts, NextHarmonizeTs, previousNextHarmonizeTs);
+
+ PullStats(ts);
+ HarmonizeImpl();
+
+ Lock.Release();
+}
+
+void THarmonizer::DeclareEmergency(ui64 ts) {
+ NextHarmonizeTs = ts;
+}
+
+void THarmonizer::AddPool(IExecutorPool* pool) {
+ TGuard<TSpinLock> guard(Lock);
+ TPoolInfo poolInfo;
+ poolInfo.Pool = pool;
+ poolInfo.DefaultThreadCount = pool->GetDefaultThreadCount();
+ poolInfo.MinThreadCount = pool->GetMinThreadCount();
+ poolInfo.MaxThreadCount = pool->GetMaxThreadCount();
+ poolInfo.ThreadInfo.resize(poolInfo.MaxThreadCount);
+ pool->SetThreadCount(poolInfo.DefaultThreadCount);
+ Pools.push_back(poolInfo);
+};
+
+void THarmonizer::Enable(bool enable) {
+ TGuard<TSpinLock> guard(Lock);
+ IsDisabled = enable;
+}
+
+IHarmonizer* MakeHarmonizer(ui64 ts) {
+ return new THarmonizer(ts);
+}
+
+}
diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h
new file mode 100644
index 0000000000..dae8d7de0c
--- /dev/null
+++ b/library/cpp/actors/core/harmonizer.h
@@ -0,0 +1,20 @@
+#pragma once
+
+#include "defs.h"
+#include "config.h"
+
+namespace NActors {
+ class IExecutorPool;
+
+ // Pool cpu harmonizer
+ class IHarmonizer {
+ public:
+ virtual ~IHarmonizer() {}
+ virtual void Harmonize(ui64 ts) = 0;
+ virtual void DeclareEmergency(ui64 ts) = 0;
+ virtual void AddPool(IExecutorPool* pool) = 0;
+ virtual void Enable(bool enable) = 0;
+ };
+
+ IHarmonizer* MakeHarmonizer(ui64 ts);
+}
diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h
index 6d482926d1..fb76410590 100644
--- a/library/cpp/actors/core/mon_stats.h
+++ b/library/cpp/actors/core/mon_stats.h
@@ -69,6 +69,7 @@ namespace NActors {
ui64 NonDeliveredEvents = 0;
ui64 EmptyMailboxActivation = 0;
ui64 CpuNs = 0; // nanoseconds thread was executing on CPU (accounts for preemtion)
+ ui64 WorstActivationTimeUs = 0;
NHPTimer::STime ElapsedTicks = 0;
NHPTimer::STime ParkedTicks = 0;
NHPTimer::STime BlockedTicks = 0;
@@ -111,6 +112,9 @@ namespace NActors {
NonDeliveredEvents += RelaxedLoad(&other.NonDeliveredEvents);
EmptyMailboxActivation += RelaxedLoad(&other.EmptyMailboxActivation);
CpuNs += RelaxedLoad(&other.CpuNs);
+ RelaxedStore(
+ &WorstActivationTimeUs,
+ std::max(RelaxedLoad(&WorstActivationTimeUs), RelaxedLoad(&other.WorstActivationTimeUs)));
ElapsedTicks += RelaxedLoad(&other.ElapsedTicks);
ParkedTicks += RelaxedLoad(&other.ParkedTicks);
BlockedTicks += RelaxedLoad(&other.BlockedTicks);
diff --git a/library/cpp/actors/core/probes.h b/library/cpp/actors/core/probes.h
index 4912d6dd26..33ac7b0f5e 100644
--- a/library/cpp/actors/core/probes.h
+++ b/library/cpp/actors/core/probes.h
@@ -166,6 +166,30 @@
PROBE(MoveCpu, GROUPS("PoolCpuBalancer"), \
TYPES(ui32, ui64, TString, TString, ui32), \
NAMES("fromPoolId", "toPoolId", "fromPool", "toPool", "cpu")) \
+ PROBE(ThreadCount, GROUPS("BasicThreadPool"), \
+ TYPES(ui32, TString, ui32, ui32, ui32, ui32), \
+ NAMES("poolId", "pool", "threacCount", "minThreadCount", "maxThreadCount", "defaultThreadCount")) \
+ PROBE(HarmonizeCheckPool, GROUPS("Harmonizer"), \
+ TYPES(ui32, TString, double, double, ui32, ui32, bool, bool, bool), \
+ NAMES("poolId", "pool", "booked", "consumed", "threadCount", "maxThreadCount", "isStarved", "isNeedy", "isHoggish")) \
+ PROBE(HarmonizeOperation, GROUPS("Harmonizer"), \
+ TYPES(ui32, TString, TString, ui32, ui32, ui32), \
+ NAMES("poolId", "pool", "operation", "newCount", "minCount", "maxCount")) \
+ PROBE(TryToHarmonize, GROUPS("Harmonizer"), \
+ TYPES(ui32, TString), \
+ NAMES("poolId", "pool")) \
+ PROBE(SavedValues, GROUPS("Harmonizer"), \
+ TYPES(ui32, TString, TString, double, double, double, double, double, double, double, double), \
+ NAMES("poolId", "pool", "valueName", "[0]", "[1]", "[2]", "[3]", "[4]", "[5]", "[6]", "[7]")) \
+ PROBE(RegisterValue, GROUPS("Harmonizer"), \
+ TYPES(ui64, ui64, ui64, ui64, double, double, double), \
+ NAMES("ts", "lastTs", "dTs", "8sTs", "us", "lastUs", "dUs")) \
+ PROBE(TryToHarmonizeFailed, GROUPS("Harmonizer"), \
+ TYPES(ui64, ui64, bool, bool), \
+ NAMES("ts", "nextHarmonizeTs", "isDisabled", "withLock")) \
+ PROBE(TryToHarmonizeSuccess, GROUPS("Harmonizer"), \
+ TYPES(ui64, ui64, ui64), \
+ NAMES("ts", "nextHarmonizeTs", "previousNextHarmonizeTs")) \
/**/
LWTRACE_DECLARE_PROVIDER(ACTORLIB_PROVIDER)
diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h
index b4c37a7629..384a13c5ee 100644
--- a/library/cpp/actors/core/worker_context.h
+++ b/library/cpp/actors/core/worker_context.h
@@ -95,6 +95,7 @@ namespace NActors {
i64 ts = deliveredTs > scheduleTs ? deliveredTs - scheduleTs : 0;
double usec = NHPTimer::GetSeconds(ts) * 1000000.0;
Stats->ActivationTimeHistogram.Add(usec);
+ Stats->WorstActivationTimeUs = Max(Stats->WorstActivationTimeUs, (ui64)usec);
return usec;
}
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index efce0c04b1..e037c7a673 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -249,7 +249,7 @@ void AddExecutorPool(
TBasicExecutorPoolConfig basic;
basic.PoolId = poolId;
basic.PoolName = poolConfig.GetName();
- basic.Threads = poolConfig.GetThreads();
+ basic.Threads = Max(poolConfig.GetThreads(), poolConfig.GetMaxThreads());
basic.SpinThreshold = poolConfig.GetSpinThreshold();
basic.Affinity = ParseAffinity(poolConfig.GetAffinity());
basic.RealtimePriority = poolConfig.GetRealtimePriority();
@@ -265,6 +265,9 @@ void AddExecutorPool(
basic.EventsPerMailbox = systemConfig.GetEventsPerMailbox();
}
Y_VERIFY(basic.EventsPerMailbox != 0);
+ basic.MinThreadCount = poolConfig.GetMinThreads();
+ basic.MaxThreadCount = poolConfig.GetMaxThreads();
+ basic.DefaultThreadCount = poolConfig.GetThreads();
cpuManager.Basic.emplace_back(std::move(basic));
break;
}