summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <[email protected]>2024-12-09 22:53:49 +0300
committerGitHub <[email protected]>2024-12-09 22:53:49 +0300
commitf9e79db6a8e464ea6f5675a277f0643d2a0f9098 (patch)
tree481640e3d55257ecf6671c12887c4fd2815b45d1
parenta15d8708109b977cfb16c47d1e53f35266ca9651 (diff)
Fix miscalculated budget in THarmonizer (#12422)
-rw-r--r--ydb/library/actors/core/cpu_manager.cpp10
-rw-r--r--ydb/library/actors/core/cpu_manager.h4
-rw-r--r--ydb/library/actors/core/executor_pool.h19
-rw-r--r--ydb/library/actors/core/executor_pool_base.cpp1
-rw-r--r--ydb/library/actors/core/executor_pool_basic.cpp8
-rw-r--r--ydb/library/actors/core/executor_pool_basic.h9
-rw-r--r--ydb/library/actors/core/executor_pool_io.cpp2
-rw-r--r--ydb/library/actors/core/executor_pool_io.h2
-rw-r--r--ydb/library/actors/core/executor_pool_shared.cpp158
-rw-r--r--ydb/library/actors/core/executor_pool_shared.h56
-rw-r--r--ydb/library/actors/core/executor_thread_ctx.h7
-rw-r--r--ydb/library/actors/core/harmonizer.cpp861
-rw-r--r--ydb/library/actors/core/harmonizer/cpu_consumption.cpp171
-rw-r--r--ydb/library/actors/core/harmonizer/cpu_consumption.h45
-rw-r--r--ydb/library/actors/core/harmonizer/debug.h54
-rw-r--r--ydb/library/actors/core/harmonizer/defs.h3
-rw-r--r--ydb/library/actors/core/harmonizer/harmonizer.cpp487
-rw-r--r--ydb/library/actors/core/harmonizer/harmonizer.h (renamed from ydb/library/actors/core/harmonizer.h)40
-rw-r--r--ydb/library/actors/core/harmonizer/history.h183
-rw-r--r--ydb/library/actors/core/harmonizer/pool.cpp150
-rw-r--r--ydb/library/actors/core/harmonizer/pool.h93
-rw-r--r--ydb/library/actors/core/harmonizer/shared_info.cpp55
-rw-r--r--ydb/library/actors/core/harmonizer/shared_info.h20
-rw-r--r--ydb/library/actors/core/harmonizer/ut/harmonizer_ut.cpp673
-rw-r--r--ydb/library/actors/core/harmonizer/ut/history_ut.cpp345
-rw-r--r--ydb/library/actors/core/harmonizer/ut/ya.make22
-rw-r--r--ydb/library/actors/core/harmonizer/waiting_stats.cpp48
-rw-r--r--ydb/library/actors/core/harmonizer/waiting_stats.h21
-rw-r--r--ydb/library/actors/core/harmonizer/ya.make44
-rw-r--r--ydb/library/actors/core/mon_stats.h8
-rw-r--r--ydb/library/actors/core/probes.h4
-rw-r--r--ydb/library/actors/core/ya.make7
-rw-r--r--ydb/library/actors/helpers/pool_stats_collector.h48
33 files changed, 2644 insertions, 1014 deletions
diff --git a/ydb/library/actors/core/cpu_manager.cpp b/ydb/library/actors/core/cpu_manager.cpp
index 1c286d9bd79..047388ce906 100644
--- a/ydb/library/actors/core/cpu_manager.cpp
+++ b/ydb/library/actors/core/cpu_manager.cpp
@@ -37,8 +37,8 @@ namespace NActors {
poolsWithSharedThreads.push_back(cfg.PoolId);
}
}
- Shared.reset(new TSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads));
- auto sharedPool = static_cast<TSharedExecutorPool*>(Shared.get());
+ Shared.reset(CreateSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads));
+ auto sharedPool = static_cast<ISharedExecutorPool*>(Shared.get());
ui64 ts = GetCycleCountFast();
Harmonizer.reset(MakeHarmonizer(ts));
@@ -135,11 +135,9 @@ namespace NActors {
for (TBasicExecutorPoolConfig& cfg : Config.Basic) {
if (cfg.PoolId == poolId) {
if (cfg.HasSharedThread) {
- auto *sharedPool = static_cast<TSharedExecutorPool*>(Shared.get());
+ auto *sharedPool = Shared.get();
auto *pool = new TBasicExecutorPool(cfg, Harmonizer.get(), Jail.get());
- if (pool) {
- pool->AddSharedThread(sharedPool->GetSharedThread(poolId));
- }
+ pool->AddSharedThread(sharedPool->GetSharedThread(poolId));
return pool;
} else {
return new TBasicExecutorPool(cfg, Harmonizer.get(), Jail.get());
diff --git a/ydb/library/actors/core/cpu_manager.h b/ydb/library/actors/core/cpu_manager.h
index 1fee9467987..0bd0994f4b6 100644
--- a/ydb/library/actors/core/cpu_manager.h
+++ b/ydb/library/actors/core/cpu_manager.h
@@ -2,10 +2,10 @@
#include "config.h"
#include "executor_pool_jail.h"
-#include "harmonizer.h"
#include "executor_pool.h"
#include "executor_pool_shared.h"
#include "mon_stats.h"
+#include <ydb/library/actors/core/harmonizer/harmonizer.h>
#include <memory>
namespace NActors {
@@ -16,7 +16,7 @@ namespace NActors {
const ui32 ExecutorPoolCount;
TArrayHolder<TAutoPtr<IExecutorPool>> Executors;
std::unique_ptr<IHarmonizer> Harmonizer;
- std::unique_ptr<TSharedExecutorPool> Shared;
+ std::unique_ptr<ISharedExecutorPool> Shared;
std::unique_ptr<TExecutorPoolJail> Jail;
TCpuManagerConfig Config;
diff --git a/ydb/library/actors/core/executor_pool.h b/ydb/library/actors/core/executor_pool.h
index 67297b428ba..8fd79113391 100644
--- a/ydb/library/actors/core/executor_pool.h
+++ b/ydb/library/actors/core/executor_pool.h
@@ -14,15 +14,16 @@ namespace NActors {
struct TExecutorThreadStats;
class TExecutorPoolJail;
class ISchedulerCookie;
+ struct TSharedExecutorThreadCtx;
struct TCpuConsumption {
- double ConsumedUs = 0;
- double BookedUs = 0;
+ double CpuUs = 0;
+ double ElapsedUs = 0;
ui64 NotEnoughCpuExecutions = 0;
void Add(const TCpuConsumption& other) {
- ConsumedUs += other.ConsumedUs;
- BookedUs += other.BookedUs;
+ CpuUs += other.CpuUs;
+ ElapsedUs += other.ElapsedUs;
NotEnoughCpuExecutions += other.NotEnoughCpuExecutions;
}
};
@@ -176,6 +177,16 @@ namespace NActors {
return 1;
}
+ virtual TSharedExecutorThreadCtx* ReleaseSharedThread() {
+ return nullptr;
+ }
+ virtual void AddSharedThread(TSharedExecutorThreadCtx*) {
+ }
+
+ virtual bool IsSharedThreadEnabled() const {
+ return false;
+ }
+
virtual TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) {
Y_UNUSED(threadIdx);
return TCpuConsumption{0.0, 0.0};
diff --git a/ydb/library/actors/core/executor_pool_base.cpp b/ydb/library/actors/core/executor_pool_base.cpp
index fe41e946233..e8e504abe20 100644
--- a/ydb/library/actors/core/executor_pool_base.cpp
+++ b/ydb/library/actors/core/executor_pool_base.cpp
@@ -1,4 +1,5 @@
#include "actorsystem.h"
+#include "activity_guard.h"
#include "actor.h"
#include "executor_pool_base.h"
#include "executor_pool_basic_feature_flags.h"
diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp
index bfb72f02b36..fce778ef3be 100644
--- a/ydb/library/actors/core/executor_pool_basic.cpp
+++ b/ydb/library/actors/core/executor_pool_basic.cpp
@@ -407,10 +407,6 @@ namespace NActors {
poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState;
poolStats.DecreasingThreadsByExchange = stats.DecreasingThreadsByExchange;
poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount;
- poolStats.MaxConsumedCpuUs = stats.MaxConsumedCpu;
- poolStats.MinConsumedCpuUs = stats.MinConsumedCpu;
- poolStats.MaxBookedCpuUs = stats.MaxBookedCpu;
- poolStats.MinBookedCpuUs = stats.MinBookedCpu;
}
statsCopy.resize(MaxFullThreadCount + 1);
@@ -429,7 +425,7 @@ namespace NActors {
void TBasicExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
if (Harmonizer) {
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
- poolState.UsedCpu = stats.AvgConsumedCpu;
+ poolState.ElapsedCpu = stats.AvgElapsedCpu;
poolState.PossibleMaxLimit = stats.PotentialMaxThreadCount;
} else {
poolState.PossibleMaxLimit = poolState.MaxLimit;
@@ -625,7 +621,7 @@ namespace NActors {
TExecutorThreadCtx& threadCtx = Threads[threadIdx];
TExecutorThreadStats stats;
threadCtx.Thread->GetCurrentStatsForHarmonizer(stats);
- return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs), stats.NotEnoughCpuExecutions};
+ return {static_cast<double>(stats.CpuUs), Ts2Us(stats.SafeElapsedTicks), stats.NotEnoughCpuExecutions};
}
i16 TBasicExecutorPool::GetBlockingThreadCount() const {
diff --git a/ydb/library/actors/core/executor_pool_basic.h b/ydb/library/actors/core/executor_pool_basic.h
index c60e2ab8334..76909997519 100644
--- a/ydb/library/actors/core/executor_pool_basic.h
+++ b/ydb/library/actors/core/executor_pool_basic.h
@@ -7,8 +7,8 @@
#include "executor_pool_basic_feature_flags.h"
#include "scheduler_queue.h"
#include "executor_pool_base.h"
-#include "harmonizer.h"
#include <memory>
+#include <ydb/library/actors/core/harmonizer/harmonizer.h>
#include <ydb/library/actors/actor_type/indexes.h>
#include <ydb/library/actors/util/unordered_cache.h>
#include <ydb/library/actors/util/threadparkpad.h>
@@ -278,8 +278,11 @@ namespace NActors {
void CalcSpinPerThread(ui64 wakingUpConsumption);
void ClearWaitingStats() const;
- TSharedExecutorThreadCtx* ReleaseSharedThread();
- void AddSharedThread(TSharedExecutorThreadCtx* thread);
+ TSharedExecutorThreadCtx* ReleaseSharedThread() override;
+ void AddSharedThread(TSharedExecutorThreadCtx* thread) override;
+ bool IsSharedThreadEnabled() const override {
+ return true;
+ }
private:
void AskToGoToSleep(bool *needToWait, bool *needToBlock);
diff --git a/ydb/library/actors/core/executor_pool_io.cpp b/ydb/library/actors/core/executor_pool_io.cpp
index 310a6f83b35..c6555f2c488 100644
--- a/ydb/library/actors/core/executor_pool_io.cpp
+++ b/ydb/library/actors/core/executor_pool_io.cpp
@@ -156,7 +156,7 @@ namespace NActors {
void TIOExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
if (Harmonizer) {
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
- poolState.UsedCpu = stats.AvgConsumedCpu;
+ poolState.ElapsedCpu = stats.AvgElapsedCpu;
}
poolState.CurrentLimit = PoolThreads;
poolState.MaxLimit = PoolThreads;
diff --git a/ydb/library/actors/core/executor_pool_io.h b/ydb/library/actors/core/executor_pool_io.h
index 316b21d4d2f..16b17127d4c 100644
--- a/ydb/library/actors/core/executor_pool_io.h
+++ b/ydb/library/actors/core/executor_pool_io.h
@@ -3,9 +3,9 @@
#include "actorsystem.h"
#include "executor_thread.h"
#include "executor_thread_ctx.h"
-#include "harmonizer.h"
#include "scheduler_queue.h"
#include "executor_pool_base.h"
+#include <ydb/library/actors/core/harmonizer/harmonizer.h>
#include <ydb/library/actors/actor_type/indexes.h>
#include <ydb/library/actors/util/ticket_lock.h>
#include <ydb/library/actors/util/unordered_cache.h>
diff --git a/ydb/library/actors/core/executor_pool_shared.cpp b/ydb/library/actors/core/executor_pool_shared.cpp
index 104e02812cc..7d1327f8454 100644
--- a/ydb/library/actors/core/executor_pool_shared.cpp
+++ b/ydb/library/actors/core/executor_pool_shared.cpp
@@ -12,6 +12,50 @@
namespace NActors {
+class TSharedExecutorPool: public ISharedExecutorPool {
+public:
+ TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads);
+
+ // IThreadPool
+ void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override;
+ void Start() override;
+ void PrepareStop() override;
+ void Shutdown() override;
+ bool Cleanup() override;
+
+ TSharedExecutorThreadCtx *GetSharedThread(i16 poolId) override;
+ void GetSharedStats(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override;
+ void GetSharedStatsForHarmonizer(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override;
+ TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx) override;
+ std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId) override;
+
+ i16 ReturnOwnHalfThread(i16 pool) override;
+ i16 ReturnBorrowedHalfThread(i16 pool) override;
+ void GiveHalfThread(i16 from, i16 to) override;
+
+ i16 GetSharedThreadCount() const override;
+
+ TSharedPoolState GetState() const override;
+
+ void Init(const std::vector<IExecutorPool*>& pools, bool withThreads) override;
+
+private:
+ TSharedPoolState State;
+
+ std::vector<IExecutorPool*> Pools;
+
+ i16 PoolCount;
+ i16 SharedThreadCount;
+ std::unique_ptr<TSharedExecutorThreadCtx[]> Threads;
+
+ std::unique_ptr<NSchedulerQueue::TReader[]> ScheduleReaders;
+ std::unique_ptr<NSchedulerQueue::TWriter[]> ScheduleWriters;
+
+ TDuration TimePerMailbox;
+ ui32 EventsPerMailbox;
+ ui64 SoftProcessingDurationTs;
+}; // class TSharedExecutorPool
+
TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads)
: State(poolCount, poolsWithThreads.size())
, Pools(poolCount)
@@ -29,40 +73,47 @@ TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config
}
}
-void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
- // ActorSystem = actorSystem;
-
- ScheduleReaders.reset(new NSchedulerQueue::TReader[SharedThreadCount]);
- ScheduleWriters.reset(new NSchedulerQueue::TWriter[SharedThreadCount]);
-
- std::vector<IExecutorPool*> poolsBasic = actorSystem->GetBasicExecutorPools();
- std::vector<IExecutorPool*> poolByThread(SharedThreadCount);
- for (IExecutorPool* pool : poolsBasic) {
- Pools[pool->PoolId] = dynamic_cast<TBasicExecutorPool*>(pool);
- i16 threadIdx = State.ThreadByPool[pool->PoolId];
- if (threadIdx >= 0) {
- poolByThread[threadIdx] = pool;
- }
+void TSharedExecutorPool::Init(const std::vector<IExecutorPool*>& pools, bool withThreads) {
+ std::vector<IExecutorPool*> poolByThread(SharedThreadCount);
+ for (IExecutorPool* pool : pools) {
+ Pools[pool->PoolId] = pool;
+ i16 threadIdx = State.ThreadByPool[pool->PoolId];
+ if (threadIdx >= 0) {
+ poolByThread[threadIdx] = pool;
}
+ }
- for (i16 i = 0; i != SharedThreadCount; ++i) {
- // !TODO
- Threads[i].ExecutorPools[0].store(dynamic_cast<TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release);
+ for (i16 i = 0; i != SharedThreadCount; ++i) {
+ // !TODO
+ Threads[i].ExecutorPools[0].store(dynamic_cast<TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release);
+ if (withThreads) {
Threads[i].Thread.reset(
new TSharedExecutorThread(
-1,
- actorSystem,
- &Threads[i],
- PoolCount,
- "SharedThread",
- SoftProcessingDurationTs,
- TimePerMailbox,
+ nullptr,
+ &Threads[i],
+ PoolCount,
+ "SharedThread",
+ SoftProcessingDurationTs,
+ TimePerMailbox,
EventsPerMailbox));
- ScheduleWriters[i].Init(ScheduleReaders[i]);
}
+ }
+}
+
+void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
+ ScheduleReaders.reset(new NSchedulerQueue::TReader[SharedThreadCount]);
+ ScheduleWriters.reset(new NSchedulerQueue::TWriter[SharedThreadCount]);
+
+ std::vector<IExecutorPool*> poolsBasic = actorSystem->GetBasicExecutorPools();
+ Init(poolsBasic, true);
- *scheduleReaders = ScheduleReaders.get();
- *scheduleSz = SharedThreadCount;
+ for (i16 i = 0; i != SharedThreadCount; ++i) {
+ ScheduleWriters[i].Init(ScheduleReaders[i]);
+ }
+
+ *scheduleReaders = ScheduleReaders.get();
+ *scheduleSz = SharedThreadCount;
}
void TSharedExecutorPool::Start() {
@@ -99,24 +150,27 @@ TSharedExecutorThreadCtx* TSharedExecutorPool::GetSharedThread(i16 pool) {
return &Threads[threadIdx];
}
-void TSharedExecutorPool::ReturnOwnHalfThread(i16 pool) {
+i16 TSharedExecutorPool::ReturnOwnHalfThread(i16 pool) {
i16 threadIdx = State.ThreadByPool[pool];
- TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
+ IExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
Y_ABORT_UNLESS(borrowingPool);
- State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1;
+ i16 borrowedPool = State.PoolByBorrowedThread[threadIdx];
+ State.BorrowedThreadByPool[borrowedPool] = -1;
State.PoolByBorrowedThread[threadIdx] = -1;
// TODO(kruall): Check on race
borrowingPool->ReleaseSharedThread();
+ return borrowedPool;
}
-void TSharedExecutorPool::ReturnBorrowedHalfThread(i16 pool) {
+i16 TSharedExecutorPool::ReturnBorrowedHalfThread(i16 pool) {
i16 threadIdx = State.BorrowedThreadByPool[pool];
- TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
+ IExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
Y_ABORT_UNLESS(borrowingPool);
State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1;
State.PoolByBorrowedThread[threadIdx] = -1;
// TODO(kruall): Check on race
borrowingPool->ReleaseSharedThread();
+ return State.PoolByThread[threadIdx];
}
void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
@@ -127,14 +181,14 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
if (borrowedThreadIdx != -1) {
i16 originalPool = State.PoolByThread[borrowedThreadIdx];
if (originalPool == to) {
- return ReturnOwnHalfThread(to);
+ ReturnOwnHalfThread(to);
} else {
ReturnOwnHalfThread(originalPool);
}
from = originalPool;
}
i16 threadIdx = State.ThreadByPool[from];
- TBasicExecutorPool* borrowingPool = Pools[to];
+ IExecutorPool* borrowingPool = Pools[to];
Threads[threadIdx].ExecutorPools[1].store(borrowingPool, std::memory_order_release);
State.BorrowedThreadByPool[to] = threadIdx;
State.PoolByBorrowedThread[threadIdx] = to;
@@ -143,16 +197,16 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
}
void TSharedExecutorPool::GetSharedStats(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) {
- statsCopy.resize(SharedThreadCount + 1);
+ statsCopy.resize(SharedThreadCount);
for (i16 i = 0; i < SharedThreadCount; ++i) {
- Threads[i].Thread->GetSharedStats(poolId, statsCopy[i + 1]);
+ Threads[i].Thread->GetSharedStats(poolId, statsCopy[i]);
}
}
void TSharedExecutorPool::GetSharedStatsForHarmonizer(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) {
- statsCopy.resize(SharedThreadCount + 1);
+ statsCopy.resize(SharedThreadCount);
for (i16 i = 0; i < SharedThreadCount; ++i) {
- Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i + 1]);
+ Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i]);
}
}
@@ -181,4 +235,34 @@ TSharedPoolState TSharedExecutorPool::GetState() const {
return State;
}
+ISharedExecutorPool *CreateSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads) {
+ return new TSharedExecutorPool(config, poolCount, poolsWithThreads);
+}
+
+TString TSharedPoolState::ToString() const {
+ TStringBuilder builder;
+ builder << '{';
+ builder << "ThreadByPool: [";
+ for (ui32 i = 0; i < ThreadByPool.size(); ++i) {
+ builder << ThreadByPool[i] << (i == ThreadByPool.size() - 1 ? "" : ", ");
+ }
+ builder << "], ";
+ builder << "PoolByThread: [";
+ for (ui32 i = 0; i < PoolByThread.size(); ++i) {
+ builder << PoolByThread[i] << (i == PoolByThread.size() - 1 ? "" : ", ");
+ }
+ builder << "], ";
+ builder << "BorrowedThreadByPool: [";
+ for (ui32 i = 0; i < BorrowedThreadByPool.size(); ++i) {
+ builder << BorrowedThreadByPool[i] << (i == BorrowedThreadByPool.size() - 1 ? "" : ", ");
+ }
+ builder << "], ";
+ builder << "PoolByBorrowedThread: [";
+ for (ui32 i = 0; i < PoolByBorrowedThread.size(); ++i) {
+ builder << PoolByBorrowedThread[i] << (i == PoolByBorrowedThread.size() - 1 ? "" : ", ");
+ }
+ builder << ']';
+ return builder << '}';
+}
+
}
diff --git a/ydb/library/actors/core/executor_pool_shared.h b/ydb/library/actors/core/executor_pool_shared.h
index c083c21654a..f6c79c07a58 100644
--- a/ydb/library/actors/core/executor_pool_shared.h
+++ b/ydb/library/actors/core/executor_pool_shared.h
@@ -1,14 +1,12 @@
#pragma once
#include "executor_pool.h"
-#include "executor_thread_ctx.h"
namespace NActors {
- struct TExecutorThreadCtx;
struct TSharedExecutorPoolConfig;
- class TBasicExecutorPool;
+ struct TSharedExecutorThreadCtx;
struct TSharedPoolState {
std::vector<i16> ThreadByPool;
@@ -22,48 +20,30 @@ namespace NActors {
, BorrowedThreadByPool(poolCount, -1)
, PoolByBorrowedThread(threadCount, -1)
{}
+
+ TString ToString() const;
};
- class TSharedExecutorPool: public IActorThreadPool {
+ class ISharedExecutorPool : public IActorThreadPool {
public:
- TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads);
-
- // IThreadPool
- void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override;
- void Start() override;
- void PrepareStop() override;
- void Shutdown() override;
- bool Cleanup() override;
-
- TSharedExecutorThreadCtx *GetSharedThread(i16 poolId);
- void GetSharedStats(i16 pool, std::vector<TExecutorThreadStats>& statsCopy);
- void GetSharedStatsForHarmonizer(i16 pool, std::vector<TExecutorThreadStats>& statsCopy);
- TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx);
- std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId);
-
- void ReturnOwnHalfThread(i16 pool);
- void ReturnBorrowedHalfThread(i16 pool);
- void GiveHalfThread(i16 from, i16 to);
+ virtual ~ISharedExecutorPool() = default;
- i16 GetSharedThreadCount() const;
+ virtual TSharedExecutorThreadCtx *GetSharedThread(i16 poolId) = 0;
+ virtual void GetSharedStats(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) = 0;
+ virtual void GetSharedStatsForHarmonizer(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) = 0;
+ virtual TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx) = 0;
+ virtual std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId) = 0;
+ virtual void Init(const std::vector<IExecutorPool*>& pools, bool withThreads) = 0;
- TSharedPoolState GetState() const;
+ virtual i16 ReturnOwnHalfThread(i16 pool) = 0;
+ virtual i16 ReturnBorrowedHalfThread(i16 pool) = 0;
+ virtual void GiveHalfThread(i16 from, i16 to) = 0;
- private:
- TSharedPoolState State;
-
- std::vector<TBasicExecutorPool*> Pools;
+ virtual i16 GetSharedThreadCount() const = 0;
- i16 PoolCount;
- i16 SharedThreadCount;
- std::unique_ptr<TSharedExecutorThreadCtx[]> Threads;
-
- std::unique_ptr<NSchedulerQueue::TReader[]> ScheduleReaders;
- std::unique_ptr<NSchedulerQueue::TWriter[]> ScheduleWriters;
-
- TDuration TimePerMailbox;
- ui32 EventsPerMailbox;
- ui64 SoftProcessingDurationTs;
+ virtual TSharedPoolState GetState() const = 0;
};
+ ISharedExecutorPool *CreateSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads);
+
} \ No newline at end of file
diff --git a/ydb/library/actors/core/executor_thread_ctx.h b/ydb/library/actors/core/executor_thread_ctx.h
index 6a5bfaa34be..c7d1caa5ed6 100644
--- a/ydb/library/actors/core/executor_thread_ctx.h
+++ b/ydb/library/actors/core/executor_thread_ctx.h
@@ -11,8 +11,7 @@
namespace NActors {
class TGenericExecutorThread;
- class TBasicExecutorPool;
- class TIOExecutorPool;
+ class IExecutorPool;
enum class EThreadState : ui64 {
None,
@@ -123,7 +122,7 @@ namespace NActors {
struct TExecutorThreadCtx : public TGenericExecutorThreadCtx {
using TBase = TGenericExecutorThreadCtx;
- TBasicExecutorPool *OwnerExecutorPool = nullptr;
+ IExecutorPool *OwnerExecutorPool = nullptr;
void SetWork() {
ExchangeState(EThreadState::Work);
@@ -186,7 +185,7 @@ namespace NActors {
}
};
- std::atomic<TBasicExecutorPool*> ExecutorPools[MaxPoolsForSharedThreads];
+ std::atomic<IExecutorPool*> ExecutorPools[MaxPoolsForSharedThreads];
std::atomic<i64> RequestsForWakeUp = 0;
ui32 NextPool = 0;
diff --git a/ydb/library/actors/core/harmonizer.cpp b/ydb/library/actors/core/harmonizer.cpp
deleted file mode 100644
index df651642169..00000000000
--- a/ydb/library/actors/core/harmonizer.cpp
+++ /dev/null
@@ -1,861 +0,0 @@
-#include "harmonizer.h"
-
-#include "executor_thread_ctx.h"
-#include "executor_thread.h"
-#include "probes.h"
-
-#include "activity_guard.h"
-#include "actorsystem.h"
-#include "executor_pool_basic.h"
-#include "executor_pool_basic_feature_flags.h"
-#include "executor_pool_shared.h"
-
-#include <atomic>
-#include <ydb/library/actors/util/cpu_load_log.h>
-#include <ydb/library/actors/util/datetime.h>
-#include <ydb/library/actors/util/intrinsics.h>
-
-#include <util/system/spinlock.h>
-
-#include <algorithm>
-
-namespace NActors {
-
-LWTRACE_USING(ACTORLIB_PROVIDER);
-
-constexpr bool CheckBinaryPower(ui64 value) {
- return !(value & (value - 1));
-}
-
-template <ui8 HistoryBufferSize = 8>
-struct TValueHistory {
- static_assert(CheckBinaryPower(HistoryBufferSize));
-
- double History[HistoryBufferSize] = {0.0};
- ui64 HistoryIdx = 0;
- ui64 LastTs = Max<ui64>();
- double LastUs = 0.0;
- double AccumulatedUs = 0.0;
- ui64 AccumulatedTs = 0;
-
- template <bool WithTail=false>
- double Accumulate(auto op, auto comb, ui8 seconds) {
- double acc = AccumulatedUs;
- size_t idx = HistoryIdx;
- ui8 leftSeconds = seconds;
- if constexpr (!WithTail) {
- idx--;
- leftSeconds--;
- if (idx >= HistoryBufferSize) {
- idx = HistoryBufferSize - 1;
- }
- acc = History[idx];
- }
- do {
- idx--;
- leftSeconds--;
- if (idx >= HistoryBufferSize) {
- idx = HistoryBufferSize - 1;
- }
- if constexpr (WithTail) {
- acc = op(acc, History[idx]);
- } else if (leftSeconds) {
- acc = op(acc, History[idx]);
- } else {
- ui64 tsInSecond = Us2Ts(1'000'000.0);
- acc = op(acc, History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond);
- }
- } while (leftSeconds);
- double duration = 1'000'000.0 * seconds;
- if constexpr (WithTail) {
- duration += Ts2Us(AccumulatedTs);
- }
- return comb(acc, duration);
- }
-
- template <bool WithTail=false>
- double GetAvgPartForLastSeconds(ui8 seconds) {
- auto sum = [](double acc, double value) {
- return acc + value;
- };
- auto avg = [](double sum, double duration) {
- return sum / duration;
- };
- return Accumulate<WithTail>(sum, avg, seconds);
- }
-
- double GetAvgPart() {
- return GetAvgPartForLastSeconds<true>(HistoryBufferSize);
- }
-
- double GetMaxForLastSeconds(ui8 seconds) {
- auto max = [](const double& acc, const double& value) {
- return Max(acc, value);
- };
- auto fst = [](const double& value, const double&) { return value; };
- return Accumulate<false>(max, fst, seconds);
- }
-
- double GetMax() {
- return GetMaxForLastSeconds(HistoryBufferSize);
- }
-
- i64 GetMaxInt() {
- return static_cast<i64>(GetMax());
- }
-
- double GetMinForLastSeconds(ui8 seconds) {
- auto min = [](const double& acc, const double& value) {
- return Min(acc, value);
- };
- auto fst = [](const double& value, const double&) { return value; };
- return Accumulate<false>(min, fst, seconds);
- }
-
- double GetMin() {
- return GetMinForLastSeconds(HistoryBufferSize);
- }
-
- i64 GetMinInt() {
- return static_cast<i64>(GetMin());
- }
-
- void Register(ui64 ts, double valueUs) {
- if (ts < LastTs) {
- LastTs = ts;
- LastUs = valueUs;
- AccumulatedUs = 0.0;
- AccumulatedTs = 0;
- return;
- }
- ui64 lastTs = std::exchange(LastTs, ts);
- ui64 dTs = ts - lastTs;
- double lastUs = std::exchange(LastUs, valueUs);
- double dUs = valueUs - lastUs;
- LWPROBE(RegisterValue, ts, lastTs, dTs, Us2Ts(8'000'000.0), valueUs, lastUs, dUs);
-
- if (dTs > Us2Ts(8'000'000.0)) {
- dUs = dUs * 1'000'000.0 / Ts2Us(dTs);
- for (size_t idx = 0; idx < HistoryBufferSize; ++idx) {
- History[idx] = dUs;
- }
- AccumulatedUs = 0.0;
- AccumulatedTs = 0;
- return;
- }
-
- while (dTs > 0) {
- if (AccumulatedTs + dTs < Us2Ts(1'000'000.0)) {
- AccumulatedTs += dTs;
- AccumulatedUs += dUs;
- break;
- } else {
- ui64 addTs = Us2Ts(1'000'000.0) - AccumulatedTs;
- double addUs = dUs * addTs / dTs;
- dTs -= addTs;
- dUs -= addUs;
- History[HistoryIdx] = AccumulatedUs + addUs;
- HistoryIdx = (HistoryIdx + 1) % HistoryBufferSize;
- AccumulatedUs = 0.0;
- AccumulatedTs = 0;
- }
- }
- }
-};
-
-struct TThreadInfo {
- TValueHistory<8> Consumed;
- TValueHistory<8> Booked;
-};
-
-struct TPoolInfo {
- std::vector<TThreadInfo> ThreadInfo;
- std::vector<TThreadInfo> SharedInfo;
- TSharedExecutorPool* Shared = nullptr;
- IExecutorPool* Pool = nullptr;
- TBasicExecutorPool* BasicPool = nullptr;
-
- i16 DefaultFullThreadCount = 0;
- i16 MinFullThreadCount = 0;
- i16 MaxFullThreadCount = 0;
-
- float DefaultThreadCount = 0;
- float MinThreadCount = 0;
- float MaxThreadCount = 0;
-
- i16 Priority = 0;
- NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter;
- NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
- ui32 MaxAvgPingUs = 0;
- ui64 LastUpdateTs = 0;
- ui64 NotEnoughCpuExecutions = 0;
- ui64 NewNotEnoughCpuExecutions = 0;
- ui16 LocalQueueSize = NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE;
-
- TAtomic LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish
- TAtomic IncreasingThreadsByNeedyState = 0;
- TAtomic IncreasingThreadsByExchange = 0;
- TAtomic DecreasingThreadsByStarvedState = 0;
- TAtomic DecreasingThreadsByHoggishState = 0;
- TAtomic DecreasingThreadsByExchange = 0;
- TAtomic PotentialMaxThreadCount = 0;
-
- TValueHistory<16> Consumed;
- TValueHistory<16> Booked;
-
- std::atomic<float> MaxConsumedCpu = 0;
- std::atomic<float> MinConsumedCpu = 0;
- std::atomic<float> AvgConsumedCpu = 0;
- std::atomic<float> MaxBookedCpu = 0;
- std::atomic<float> MinBookedCpu = 0;
-
- std::unique_ptr<TWaitingStats<ui64>> WaitingStats;
- std::unique_ptr<TWaitingStats<double>> MovingWaitingStats;
-
- double GetBooked(i16 threadIdx);
- double GetSharedBooked(i16 threadIdx);
- double GetLastSecondBooked(i16 threadIdx);
- double GetLastSecondSharedBooked(i16 threadIdx);
- double GetConsumed(i16 threadIdx);
- double GetSharedConsumed(i16 threadIdx);
- double GetLastSecondConsumed(i16 threadIdx);
- double GetLastSecondSharedConsumed(i16 threadIdx);
- TCpuConsumption PullStats(ui64 ts);
- i16 GetFullThreadCount();
- float GetThreadCount();
- void SetFullThreadCount(i16 threadCount);
- bool IsAvgPingGood();
-};
-
-double TPoolInfo::GetBooked(i16 threadIdx) {
- if ((size_t)threadIdx < ThreadInfo.size()) {
- return ThreadInfo[threadIdx].Booked.GetAvgPart();
- }
- return 0.0;
-}
-
-double TPoolInfo::GetSharedBooked(i16 threadIdx) {
- if ((size_t)threadIdx < SharedInfo.size()) {
- return SharedInfo[threadIdx].Booked.GetAvgPart();
- }
- return 0.0;
-}
-
-double TPoolInfo::GetLastSecondBooked(i16 threadIdx) {
- if ((size_t)threadIdx < ThreadInfo.size()) {
- return ThreadInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1);
- }
- return 0.0;
-}
-
-double TPoolInfo::GetLastSecondSharedBooked(i16 threadIdx) {
- if ((size_t)threadIdx < SharedInfo.size()) {
- return SharedInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1);
- }
- return 0.0;
-}
-
-double TPoolInfo::GetConsumed(i16 threadIdx) {
- if ((size_t)threadIdx < ThreadInfo.size()) {
- return ThreadInfo[threadIdx].Consumed.GetAvgPart();
- }
- return 0.0;
-}
-
-double TPoolInfo::GetSharedConsumed(i16 threadIdx) {
- if ((size_t)threadIdx < SharedInfo.size()) {
- return SharedInfo[threadIdx].Consumed.GetAvgPart();
- }
- return 0.0;
-}
-
-double TPoolInfo::GetLastSecondConsumed(i16 threadIdx) {
- if ((size_t)threadIdx < ThreadInfo.size()) {
- return ThreadInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1);
- }
- return 0.0;
-}
-
-double TPoolInfo::GetLastSecondSharedConsumed(i16 threadIdx) {
- if ((size_t)threadIdx < SharedInfo.size()) {
- return SharedInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1);
- }
- return 0.0;
-}
-
-#define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7]
-TCpuConsumption TPoolInfo::PullStats(ui64 ts) {
- TCpuConsumption acc;
- for (i16 threadIdx = 0; threadIdx < MaxFullThreadCount; ++threadIdx) {
- TThreadInfo &threadInfo = ThreadInfo[threadIdx];
- TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx);
- acc.Add(cpuConsumption);
- threadInfo.Consumed.Register(ts, cpuConsumption.ConsumedUs);
- LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History));
- threadInfo.Booked.Register(ts, cpuConsumption.BookedUs);
- LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History));
- }
- TVector<TExecutorThreadStats> sharedStats;
- if (Shared) {
- Shared->GetSharedStatsForHarmonizer(Pool->PoolId, sharedStats);
- }
-
- for (ui32 sharedIdx = 0; sharedIdx < SharedInfo.size(); ++sharedIdx) {
- auto stat = sharedStats[sharedIdx];
- TCpuConsumption sharedConsumption{
- Ts2Us(stat.SafeElapsedTicks),
- static_cast<double>(stat.CpuUs),
- stat.NotEnoughCpuExecutions
- };
- acc.Add(sharedConsumption);
- SharedInfo[sharedIdx].Consumed.Register(ts, sharedConsumption.ConsumedUs);
- LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "shared_consumed", UNROLL_HISTORY(SharedInfo[sharedIdx].Consumed.History));
- SharedInfo[sharedIdx].Booked.Register(ts, sharedConsumption.BookedUs);
- LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "shared_booked", UNROLL_HISTORY(SharedInfo[sharedIdx].Booked.History));
- }
-
- Consumed.Register(ts, acc.ConsumedUs);
- MaxConsumedCpu.store(Consumed.GetMax() / 1'000'000, std::memory_order_relaxed);
- MinConsumedCpu.store(Consumed.GetMin() / 1'000'000, std::memory_order_relaxed);
- AvgConsumedCpu.store(Consumed.GetAvgPart() / 1'000'000, std::memory_order_relaxed);
- Booked.Register(ts, acc.BookedUs);
- MaxBookedCpu.store(Booked.GetMax() / 1'000'000, std::memory_order_relaxed);
- MinBookedCpu.store(Booked.GetMin() / 1'000'000, std::memory_order_relaxed);
- NewNotEnoughCpuExecutions = acc.NotEnoughCpuExecutions - NotEnoughCpuExecutions;
- NotEnoughCpuExecutions = acc.NotEnoughCpuExecutions;
- if (WaitingStats && BasicPool) {
- WaitingStats->Clear();
- BasicPool->GetWaitingStats(*WaitingStats);
- if constexpr (!NFeatures::TSpinFeatureFlags::CalcPerThread) {
- MovingWaitingStats->Add(*WaitingStats, 0.8, 0.2);
- }
- }
- return acc;
-}
-#undef UNROLL_HISTORY
-
-float TPoolInfo::GetThreadCount() {
- return Pool->GetThreadCount();
-}
-
-i16 TPoolInfo::GetFullThreadCount() {
- return Pool->GetFullThreadCount();
-}
-
-void TPoolInfo::SetFullThreadCount(i16 threadCount) {
- Pool->SetFullThreadCount(threadCount);
-}
-
-bool TPoolInfo::IsAvgPingGood() {
- bool res = true;
- if (AvgPingCounter) {
- res &= *AvgPingCounter > MaxAvgPingUs;
- }
- if (AvgPingCounterWithSmallWindow) {
- res &= *AvgPingCounterWithSmallWindow > MaxAvgPingUs;
- }
- return res;
-}
-
-class THarmonizer: public IHarmonizer {
-private:
- std::atomic<bool> IsDisabled = false;
- TSpinLock Lock;
- std::atomic<ui64> NextHarmonizeTs = 0;
- std::vector<std::unique_ptr<TPoolInfo>> Pools;
- std::vector<ui16> PriorityOrder;
-
- TValueHistory<16> Consumed;
- TValueHistory<16> Booked;
-
- TAtomic MaxConsumedCpu = 0;
- TAtomic MinConsumedCpu = 0;
- TAtomic MaxBookedCpu = 0;
- TAtomic MinBookedCpu = 0;
-
- TSharedExecutorPool* Shared = nullptr;
-
- std::atomic<double> AvgAwakeningTimeUs = 0;
- std::atomic<double> AvgWakingUpTimeUs = 0;
-
- void PullStats(ui64 ts);
- void HarmonizeImpl(ui64 ts);
- void CalculatePriorityOrder();
-public:
- THarmonizer(ui64 ts);
- virtual ~THarmonizer();
- double Rescale(double value) const;
- void Harmonize(ui64 ts) override;
- void DeclareEmergency(ui64 ts) override;
- void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override;
- void Enable(bool enable) override;
- TPoolHarmonizerStats GetPoolStats(i16 poolId) const override;
- THarmonizerStats GetStats() const override;
- void SetSharedPool(TSharedExecutorPool* pool) override;
-};
-
-THarmonizer::THarmonizer(ui64 ts) {
- NextHarmonizeTs = ts;
-}
-
-THarmonizer::~THarmonizer() {
-}
-
-double THarmonizer::Rescale(double value) const {
- return Max(0.0, Min(1.0, value * (1.0/0.9)));
-}
-
-void THarmonizer::PullStats(ui64 ts) {
- TCpuConsumption acc;
- for (auto &pool : Pools) {
- TCpuConsumption consumption = pool->PullStats(ts);
- acc.Add(consumption);
- }
- Consumed.Register(ts, acc.ConsumedUs);
- RelaxedStore(&MaxConsumedCpu, Consumed.GetMaxInt());
- RelaxedStore(&MinConsumedCpu, Consumed.GetMinInt());
- Booked.Register(ts, acc.BookedUs);
- RelaxedStore(&MaxBookedCpu, Booked.GetMaxInt());
- RelaxedStore(&MinBookedCpu, Booked.GetMinInt());
-}
-
-Y_FORCE_INLINE bool IsStarved(double consumed, double booked) {
- return Max(consumed, booked) > 0.1 && consumed < booked * 0.7;
-}
-
-Y_FORCE_INLINE bool IsHoggish(double booked, double currentThreadCount) {
- return booked < currentThreadCount - 0.5;
-}
-
-void THarmonizer::HarmonizeImpl(ui64 ts) {
- bool isStarvedPresent = false;
- double booked = 0.0;
- double consumed = 0.0;
- double lastSecondBooked = 0.0;
- i64 beingStopped = 0;
- double total = 0;
- TStackVec<size_t, 8> needyPools;
- TStackVec<std::pair<size_t, double>, 8> hoggishPools;
- TStackVec<bool, 8> isNeedyByPool;
-
- size_t sumOfAdditionalThreads = 0;
-
- ui64 TotalWakingUpTime = 0;
- ui64 TotalWakingUps = 0;
- ui64 TotalAwakeningTime = 0;
- ui64 TotalAwakenings = 0;
- for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
- TPoolInfo& pool = *Pools[poolIdx];
- if (pool.WaitingStats) {
- TotalWakingUpTime += pool.WaitingStats->WakingUpTotalTime;
- TotalWakingUps += pool.WaitingStats->WakingUpCount;
- TotalAwakeningTime += pool.WaitingStats->AwakingTotalTime;
- TotalAwakenings += pool.WaitingStats->AwakingCount;
- }
- }
-
- constexpr ui64 knownAvgWakingUpTime = TWaitingStatsConstants::KnownAvgWakingUpTime;
- constexpr ui64 knownAvgAwakeningUpTime = TWaitingStatsConstants::KnownAvgAwakeningTime;
-
- ui64 realAvgWakingUpTime = (TotalWakingUps ? TotalWakingUpTime / TotalWakingUps : knownAvgWakingUpTime);
- ui64 avgWakingUpTime = realAvgWakingUpTime;
- if (avgWakingUpTime > 2 * knownAvgWakingUpTime || !realAvgWakingUpTime) {
- avgWakingUpTime = knownAvgWakingUpTime;
- }
- AvgWakingUpTimeUs = Ts2Us(avgWakingUpTime);
-
- ui64 realAvgAwakeningTime = (TotalAwakenings ? TotalAwakeningTime / TotalAwakenings : knownAvgAwakeningUpTime);
- ui64 avgAwakeningTime = realAvgAwakeningTime;
- if (avgAwakeningTime > 2 * knownAvgAwakeningUpTime || !realAvgAwakeningTime) {
- avgAwakeningTime = knownAvgAwakeningUpTime;
- }
- AvgAwakeningTimeUs = Ts2Us(avgAwakeningTime);
-
- ui64 avgWakingUpConsumption = avgWakingUpTime + avgAwakeningTime;
- LWPROBE(WakingUpConsumption, Ts2Us(avgWakingUpTime), Ts2Us(avgWakingUpTime), Ts2Us(avgAwakeningTime), Ts2Us(realAvgAwakeningTime), Ts2Us(avgWakingUpConsumption));
-
- for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
- TPoolInfo& pool = *Pools[poolIdx];
- if (!pool.BasicPool) {
- continue;
- }
- if (pool.BasicPool->ActorSystemProfile != EASProfile::Default) {
- if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) {
- pool.BasicPool->CalcSpinPerThread(avgWakingUpConsumption);
- } else if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) {
- ui64 newSpinThreshold = pool.MovingWaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption);
- pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold);
- } else {
- ui64 newSpinThreshold = pool.WaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption);
- pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold);
- }
- pool.BasicPool->ClearWaitingStats();
- }
- }
-
- std::vector<bool> hasSharedThread(Pools.size());
- std::vector<bool> hasSharedThreadWhichWasNotBorrowed(Pools.size());
- std::vector<bool> hasBorrowedSharedThread(Pools.size());
- std::vector<i16> freeHalfThread;
- if (Shared) {
- auto sharedState = Shared->GetState();
- for (ui32 poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
- i16 threadIdx = sharedState.ThreadByPool[poolIdx];
- if (threadIdx != -1) {
- hasSharedThread[poolIdx] = true;
- if (sharedState.PoolByBorrowedThread[threadIdx] == -1) {
- hasSharedThreadWhichWasNotBorrowed[poolIdx] = true;
- }
-
- }
- if (sharedState.BorrowedThreadByPool[poolIdx] != -1) {
- hasBorrowedSharedThread[poolIdx] = true;
- }
- }
- }
-
- for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
- TPoolInfo& pool = *Pools[poolIdx];
- total += pool.DefaultThreadCount;
-
- i16 currentFullThreadCount = pool.GetFullThreadCount();
- sumOfAdditionalThreads += Max(0, currentFullThreadCount - pool.DefaultFullThreadCount);
- float currentThreadCount = pool.GetThreadCount();
-
- double poolBooked = 0.0;
- double poolConsumed = 0.0;
- double lastSecondPoolBooked = 0.0;
- double lastSecondPoolConsumed = 0.0;
- beingStopped += pool.Pool->GetBlockingThreadCount();
-
- for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) {
- double threadBooked = Rescale(pool.GetBooked(threadIdx));
- double threadLastSecondBooked = Rescale(pool.GetLastSecondBooked(threadIdx));
- double threadConsumed = Rescale(pool.GetConsumed(threadIdx));
- double threadLastSecondConsumed = Rescale(pool.GetLastSecondConsumed(threadIdx));
- poolBooked += threadBooked;
- lastSecondPoolBooked += threadLastSecondBooked;
- poolConsumed += threadConsumed;
- lastSecondPoolConsumed += threadLastSecondConsumed;
- LWPROBE(HarmonizeCheckPoolByThread, poolIdx, pool.Pool->GetName(), threadIdx, threadBooked, threadConsumed, threadLastSecondBooked, threadLastSecondConsumed);
- }
-
- for (ui32 sharedIdx = 0; sharedIdx < pool.SharedInfo.size(); ++sharedIdx) {
- double sharedBooked = Rescale(pool.GetSharedBooked(sharedIdx));
- double sharedLastSecondBooked = Rescale(pool.GetLastSecondSharedBooked(sharedIdx));
- double sharedConsumed = Rescale(pool.GetSharedConsumed(sharedIdx));
- double sharedLastSecondConsumed = Rescale(pool.GetLastSecondSharedConsumed(sharedIdx));
- poolBooked += sharedBooked;
- lastSecondPoolBooked += sharedLastSecondBooked;
- poolConsumed += sharedConsumed;
- lastSecondPoolConsumed += sharedLastSecondConsumed;
- LWPROBE(HarmonizeCheckPoolByThread, poolIdx, pool.Pool->GetName(), -1 - sharedIdx, sharedBooked, sharedConsumed, sharedLastSecondBooked, sharedLastSecondConsumed);
- }
-
- bool isStarved = IsStarved(poolConsumed, poolBooked) || IsStarved(lastSecondPoolConsumed, lastSecondPoolBooked);
- if (isStarved) {
- isStarvedPresent = true;
- }
-
- bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (poolBooked >= currentThreadCount);
- if (pool.AvgPingCounter) {
- if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) {
- isNeedy = false;
- } else {
- pool.LastUpdateTs = ts;
- }
- }
- if (currentThreadCount - poolBooked > 0.5) {
- if (hasBorrowedSharedThread[poolIdx] || hasSharedThreadWhichWasNotBorrowed[poolIdx]) {
- freeHalfThread.push_back(poolIdx);
- }
- }
- isNeedyByPool.push_back(isNeedy);
- if (isNeedy) {
- needyPools.push_back(poolIdx);
- }
- bool isHoggish = IsHoggish(poolBooked, currentThreadCount)
- || IsHoggish(lastSecondPoolBooked, currentThreadCount);
- if (isHoggish) {
- hoggishPools.push_back({poolIdx, std::max(poolBooked - currentThreadCount, lastSecondPoolBooked - currentThreadCount)});
- }
- booked += poolBooked;
- consumed += poolConsumed;
- AtomicSet(pool.LastFlags, (i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2));
- LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, currentThreadCount, pool.MaxFullThreadCount, isStarved, isNeedy, isHoggish);
- }
-
- double budget = total - Max(booked, lastSecondBooked);
- i16 budgetInt = static_cast<i16>(Max(budget, 0.0));
- if (budget < -0.1) {
- isStarvedPresent = true;
- }
- double overbooked = consumed - booked;
- if (overbooked < 0) {
- isStarvedPresent = false;
- }
-
- if (needyPools.size()) {
- Sort(needyPools.begin(), needyPools.end(), [&] (i16 lhs, i16 rhs) {
- if (Pools[lhs]->Priority != Pools[rhs]->Priority) {
- return Pools[lhs]->Priority > Pools[rhs]->Priority;
- }
- return Pools[lhs]->Pool->PoolId < Pools[rhs]->Pool->PoolId;
- });
- }
-
- if (freeHalfThread.size()) {
- Sort(freeHalfThread.begin(), freeHalfThread.end(), [&] (i16 lhs, i16 rhs) {
- if (Pools[lhs]->Priority != Pools[rhs]->Priority) {
- return Pools[lhs]->Priority > Pools[rhs]->Priority;
- }
- return Pools[lhs]->Pool->PoolId < Pools[rhs]->Pool->PoolId;
- });
- }
-
- if (isStarvedPresent) {
- // last_starved_at_consumed_value = сумма по всем пулам consumed;
- // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total,
- // использовать вместо total
- if (beingStopped && beingStopped >= overbooked) {
- // do nothing
- } else {
- for (ui16 poolIdx : PriorityOrder) {
- TPoolInfo &pool = *Pools[poolIdx];
- i64 threadCount = pool.GetFullThreadCount();
- if (hasSharedThread[poolIdx] && !hasSharedThreadWhichWasNotBorrowed[poolIdx]) {
- Shared->ReturnOwnHalfThread(poolIdx);
- }
- while (threadCount > pool.DefaultFullThreadCount) {
- pool.SetFullThreadCount(--threadCount);
- AtomicIncrement(pool.DecreasingThreadsByStarvedState);
- overbooked--;
- sumOfAdditionalThreads--;
-
- LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by starving", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
- if (overbooked < 1) {
- break;
- }
- }
- if (overbooked < 1) {
- break;
- }
- }
- }
- } else {
- for (size_t needyPoolIdx : needyPools) {
- TPoolInfo &pool = *Pools[needyPoolIdx];
- i64 threadCount = pool.GetFullThreadCount();
- if (budget >= 1.0) {
- if (threadCount + 1 <= pool.MaxFullThreadCount) {
- AtomicIncrement(pool.IncreasingThreadsByNeedyState);
- isNeedyByPool[needyPoolIdx] = false;
- sumOfAdditionalThreads++;
- pool.SetFullThreadCount(threadCount + 1);
- budget -= 1.0;
- LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
- }
- } else if (Shared && budget >= 0.5 && !hasBorrowedSharedThread[needyPoolIdx] && freeHalfThread.size()) {
- Shared->GiveHalfThread(freeHalfThread.back(), needyPoolIdx);
- freeHalfThread.pop_back();
- isNeedyByPool[needyPoolIdx] = false;
- budget -= 0.5;
- }
- if constexpr (NFeatures::IsLocalQueues()) {
- bool needToExpandLocalQueue = budget < 1.0 || threadCount >= pool.MaxFullThreadCount;
- needToExpandLocalQueue &= (bool)pool.BasicPool;
- needToExpandLocalQueue &= (pool.MaxFullThreadCount > 1);
- needToExpandLocalQueue &= (pool.LocalQueueSize < NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE);
- if (needToExpandLocalQueue) {
- pool.BasicPool->SetLocalQueueSize(++pool.LocalQueueSize);
- }
- }
- }
- }
-
- if (budget < 1.0) {
- size_t takingAwayThreads = 0;
- for (size_t needyPoolIdx : needyPools) {
- TPoolInfo &pool = *Pools[needyPoolIdx];
- i64 threadCount = pool.GetFullThreadCount();
- sumOfAdditionalThreads -= threadCount - pool.DefaultFullThreadCount;
- if (sumOfAdditionalThreads < takingAwayThreads + 1) {
- break;
- }
- if (!isNeedyByPool[needyPoolIdx]) {
- continue;
- }
- AtomicIncrement(pool.IncreasingThreadsByExchange);
- isNeedyByPool[needyPoolIdx] = false;
- takingAwayThreads++;
- pool.SetFullThreadCount(threadCount + 1);
-
- LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by exchanging", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
- }
-
- for (ui16 poolIdx : PriorityOrder) {
- if (takingAwayThreads <= 0) {
- break;
- }
-
- TPoolInfo &pool = *Pools[poolIdx];
- size_t threadCount = pool.GetFullThreadCount();
- size_t additionalThreadsCount = Max<size_t>(0L, threadCount - pool.DefaultFullThreadCount);
- size_t currentTakingAwayThreads = Min(additionalThreadsCount, takingAwayThreads);
-
- if (!currentTakingAwayThreads) {
- continue;
- }
- takingAwayThreads -= currentTakingAwayThreads;
- pool.SetFullThreadCount(threadCount - currentTakingAwayThreads);
-
- AtomicAdd(pool.DecreasingThreadsByExchange, takingAwayThreads);
- LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by exchanging", threadCount - currentTakingAwayThreads, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
- }
- }
-
- for (auto &[hoggishPoolIdx, freeCpu] : hoggishPools) {
- TPoolInfo &pool = *Pools[hoggishPoolIdx];
- i64 threadCount = pool.GetFullThreadCount();
- if (hasBorrowedSharedThread[hoggishPoolIdx]) {
- Shared->ReturnBorrowedHalfThread(hoggishPoolIdx);
- freeCpu -= 0.5;
- continue;
- }
- if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) {
- pool.LocalQueueSize = std::min<ui16>(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE, pool.LocalQueueSize / 2);
- pool.BasicPool->SetLocalQueueSize(pool.LocalQueueSize);
- }
- if (threadCount > pool.MinFullThreadCount && freeCpu >= 1) {
- AtomicIncrement(pool.DecreasingThreadsByHoggishState);
- LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
- pool.SetFullThreadCount(threadCount - 1);
- }
- }
-
- for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
- TPoolInfo& pool = *Pools[poolIdx];
- AtomicSet(pool.PotentialMaxThreadCount, std::min<i64>(pool.MaxThreadCount, pool.GetThreadCount() + budgetInt));
- }
-}
-
-void THarmonizer::CalculatePriorityOrder() {
- PriorityOrder.resize(Pools.size());
- Iota(PriorityOrder.begin(), PriorityOrder.end(), 0);
- Sort(PriorityOrder.begin(), PriorityOrder.end(), [&] (i16 lhs, i16 rhs) {
- if (Pools[lhs]->Priority != Pools[rhs]->Priority) {
- return Pools[lhs]->Priority < Pools[rhs]->Priority;
- }
- return Pools[lhs]->Pool->PoolId > Pools[rhs]->Pool->PoolId;
- });
-}
-
-void THarmonizer::Harmonize(ui64 ts) {
- if (IsDisabled || NextHarmonizeTs > ts || !Lock.TryAcquire()) {
- LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, false);
- return;
- }
- // Check again under the lock
- if (IsDisabled) {
- LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, true);
- Lock.Release();
- return;
- }
- // Will never reach this line disabled
- ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull));
- LWPROBE(TryToHarmonizeSuccess, ts, NextHarmonizeTs, previousNextHarmonizeTs);
-
- {
- TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_HARMONIZER> activityGuard;
-
- if (PriorityOrder.empty()) {
- CalculatePriorityOrder();
- }
-
- PullStats(ts);
- HarmonizeImpl(ts);
- }
-
- Lock.Release();
-}
-
-void THarmonizer::DeclareEmergency(ui64 ts) {
- NextHarmonizeTs = ts;
-}
-
-void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) {
- TGuard<TSpinLock> guard(Lock);
- Pools.emplace_back(new TPoolInfo);
- TPoolInfo &poolInfo = *Pools.back();
- poolInfo.Pool = pool;
- poolInfo.Shared = Shared;
- poolInfo.BasicPool = dynamic_cast<TBasicExecutorPool*>(pool);
- poolInfo.DefaultThreadCount = pool->GetDefaultThreadCount();
- poolInfo.MinThreadCount = pool->GetMinThreadCount();
- poolInfo.MaxThreadCount = pool->GetMaxThreadCount();
-
- poolInfo.DefaultFullThreadCount = pool->GetDefaultFullThreadCount();
- poolInfo.MinFullThreadCount = pool->GetMinFullThreadCount();
- poolInfo.MaxFullThreadCount = pool->GetMaxFullThreadCount();
- poolInfo.ThreadInfo.resize(poolInfo.MaxFullThreadCount);
- poolInfo.SharedInfo.resize(Shared ? Shared->GetSharedThreadCount() : 0);
- poolInfo.Priority = pool->GetPriority();
- pool->SetFullThreadCount(poolInfo.DefaultFullThreadCount);
- if (pingInfo) {
- poolInfo.AvgPingCounter = pingInfo->AvgPingCounter;
- poolInfo.AvgPingCounterWithSmallWindow = pingInfo->AvgPingCounterWithSmallWindow;
- poolInfo.MaxAvgPingUs = pingInfo->MaxAvgPingUs;
- }
- if (poolInfo.BasicPool) {
- poolInfo.WaitingStats.reset(new TWaitingStats<ui64>());
- poolInfo.MovingWaitingStats.reset(new TWaitingStats<double>());
- }
- PriorityOrder.clear();
-}
-
-void THarmonizer::Enable(bool enable) {
- TGuard<TSpinLock> guard(Lock);
- IsDisabled = enable;
-}
-
-IHarmonizer* MakeHarmonizer(ui64 ts) {
- return new THarmonizer(ts);
-}
-
-TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const {
- const TPoolInfo &pool = *Pools[poolId];
- ui64 flags = RelaxedLoad(&pool.LastFlags);
- return TPoolHarmonizerStats{
- .IncreasingThreadsByNeedyState = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)),
- .IncreasingThreadsByExchange = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByExchange)),
- .DecreasingThreadsByStarvedState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)),
- .DecreasingThreadsByHoggishState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)),
- .DecreasingThreadsByExchange = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByExchange)),
- .MaxConsumedCpu = pool.MaxConsumedCpu.load(std::memory_order_relaxed),
- .MinConsumedCpu = pool.MinConsumedCpu.load(std::memory_order_relaxed),
- .AvgConsumedCpu = pool.AvgConsumedCpu.load(std::memory_order_relaxed),
- .MaxBookedCpu = pool.MaxBookedCpu.load(std::memory_order_relaxed),
- .MinBookedCpu = pool.MinBookedCpu.load(std::memory_order_relaxed),
- .PotentialMaxThreadCount = static_cast<i16>(RelaxedLoad(&pool.PotentialMaxThreadCount)),
- .IsNeedy = static_cast<bool>(flags & 1),
- .IsStarved = static_cast<bool>(flags & 2),
- .IsHoggish = static_cast<bool>(flags & 4),
- };
-}
-
-THarmonizerStats THarmonizer::GetStats() const {
- return THarmonizerStats{
- .MaxConsumedCpu = static_cast<i64>(RelaxedLoad(&MaxConsumedCpu)),
- .MinConsumedCpu = static_cast<i64>(RelaxedLoad(&MinConsumedCpu)),
- .MaxBookedCpu = static_cast<i64>(RelaxedLoad(&MaxBookedCpu)),
- .MinBookedCpu = static_cast<i64>(RelaxedLoad(&MinBookedCpu)),
- .AvgAwakeningTimeUs = AvgAwakeningTimeUs,
- .AvgWakingUpTimeUs = AvgWakingUpTimeUs,
- };
-}
-
-void THarmonizer::SetSharedPool(TSharedExecutorPool* pool) {
- Shared = pool;
-}
-
-}
diff --git a/ydb/library/actors/core/harmonizer/cpu_consumption.cpp b/ydb/library/actors/core/harmonizer/cpu_consumption.cpp
new file mode 100644
index 00000000000..4cb858e435a
--- /dev/null
+++ b/ydb/library/actors/core/harmonizer/cpu_consumption.cpp
@@ -0,0 +1,171 @@
+#include "cpu_consumption.h"
+#include "debug.h"
+
+namespace NActors {
+
+LWTRACE_USING(ACTORLIB_PROVIDER);
+
+void TCpuConsumptionInfo::Clear() {
+ Elapsed = 0.0;
+ Cpu = 0.0;
+ LastSecondElapsed = 0.0;
+ LastSecondCpu = 0.0;
+}
+
+void THarmonizerCpuConsumption::Init(i16 poolCount) {
+ PoolConsumption.resize(poolCount);
+ IsNeedyByPool.reserve(poolCount);
+ NeedyPools.reserve(poolCount);
+ HoggishPools.reserve(poolCount);
+}
+
+namespace {
+
+ float Rescale(float value) {
+ return Max(0.0, Min(1.0, value * (1.0/0.9)));
+ }
+
+ void UpdatePoolConsumption(const TPoolInfo& pool, TCpuConsumptionInfo *poolConsumption) {
+ poolConsumption->Clear();
+ for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) {
+ float threadElapsed = Rescale(pool.GetElapsed(threadIdx));
+ float threadLastSecondElapsed = Rescale(pool.GetLastSecondElapsed(threadIdx));
+ float threadCpu = Rescale(pool.GetCpu(threadIdx));
+ float threadLastSecondCpu = Rescale(pool.GetLastSecondCpu(threadIdx));
+ poolConsumption->Elapsed += threadElapsed;
+ poolConsumption->LastSecondElapsed += threadLastSecondElapsed;
+ poolConsumption->Cpu += threadCpu;
+ poolConsumption->LastSecondCpu += threadLastSecondCpu;
+ LWPROBE_WITH_DEBUG(HarmonizeCheckPoolByThread, pool.Pool->PoolId, pool.Pool->GetName(), threadIdx, threadElapsed, threadCpu, threadLastSecondElapsed, threadLastSecondCpu);
+ }
+ for (ui32 sharedIdx = 0; sharedIdx < pool.SharedInfo.size(); ++sharedIdx) {
+ float sharedElapsed = Rescale(pool.GetSharedElapsed(sharedIdx));
+ float sharedLastSecondElapsed = Rescale(pool.GetLastSecondSharedElapsed(sharedIdx));
+ float sharedCpu = Rescale(pool.GetSharedCpu(sharedIdx));
+ float sharedLastSecondCpu = Rescale(pool.GetLastSecondSharedCpu(sharedIdx));
+ poolConsumption->Elapsed += sharedElapsed;
+ poolConsumption->LastSecondElapsed += sharedLastSecondElapsed;
+ poolConsumption->Cpu += sharedCpu;
+ poolConsumption->LastSecondCpu += sharedLastSecondCpu;
+ LWPROBE_WITH_DEBUG(HarmonizeCheckPoolByThread, pool.Pool->PoolId, pool.Pool->GetName(), -1 - sharedIdx, sharedElapsed, sharedCpu, sharedLastSecondElapsed, sharedLastSecondCpu);
+ }
+ }
+
+ bool IsStarved(double elapsed, double cpu) {
+ return Max(elapsed, cpu) > 0.1 && (cpu < elapsed * 0.7 || elapsed - cpu > 0.5);
+ }
+
+ bool IsHoggish(double elapsed, double currentThreadCount) {
+ return elapsed < currentThreadCount - 0.5;
+ }
+
+} // namespace
+
+
+void THarmonizerCpuConsumption::Pull(const std::vector<std::unique_ptr<TPoolInfo>> &pools, const TSharedInfo& sharedInfo) {
+ FreeHalfThread.clear();
+ NeedyPools.clear();
+ HoggishPools.clear();
+ IsNeedyByPool.clear();
+
+
+ TotalCores = 0;
+ AdditionalThreads = 0;
+ StoppingThreads = 0;
+ IsStarvedPresent = false;
+ Elapsed = 0.0;
+ Cpu = 0.0;
+ LastSecondElapsed = 0.0;
+ LastSecondCpu = 0.0;
+ for (size_t poolIdx = 0; poolIdx < pools.size(); ++poolIdx) {
+ TPoolInfo& pool = *pools[poolIdx];
+ TotalCores += pool.DefaultThreadCount;
+
+ AdditionalThreads += Max(0, pool.GetFullThreadCount() - pool.DefaultFullThreadCount);
+ float currentThreadCount = pool.GetThreadCount();
+ StoppingThreads += pool.Pool->GetBlockingThreadCount();
+ HARMONIZER_DEBUG_PRINT("pool", poolIdx, "pool name", pool.Pool->GetName(), "current thread count", currentThreadCount, "stopping threads", StoppingThreads, "default thread count", pool.DefaultThreadCount);
+
+ UpdatePoolConsumption(pool, &PoolConsumption[poolIdx]);
+
+ HARMONIZER_DEBUG_PRINT("pool", poolIdx, "pool name", pool.Pool->GetName(), "elapsed", PoolConsumption[poolIdx].Elapsed, "cpu", PoolConsumption[poolIdx].Cpu, "last second elapsed", PoolConsumption[poolIdx].LastSecondElapsed, "last second cpu", PoolConsumption[poolIdx].LastSecondCpu);
+
+ bool isStarved = IsStarved(PoolConsumption[poolIdx].Elapsed, PoolConsumption[poolIdx].Cpu)
+ || IsStarved(PoolConsumption[poolIdx].LastSecondElapsed, PoolConsumption[poolIdx].LastSecondCpu);
+ if (isStarved) {
+ IsStarvedPresent = true;
+ }
+
+ bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (PoolConsumption[poolIdx].Cpu >= currentThreadCount);
+ IsNeedyByPool.push_back(isNeedy);
+ if (isNeedy) {
+ NeedyPools.push_back(poolIdx);
+ }
+
+ if (currentThreadCount - PoolConsumption[poolIdx].Elapsed > 0.5) {
+ if (sharedInfo.HasBorrowedSharedThread[poolIdx] || sharedInfo.HasSharedThreadWhichWasNotBorrowed[poolIdx]) {
+ FreeHalfThread.push_back(poolIdx);
+ }
+ }
+
+ bool isHoggish = IsHoggish(PoolConsumption[poolIdx].Elapsed, currentThreadCount)
+ || IsHoggish(PoolConsumption[poolIdx].LastSecondElapsed, currentThreadCount);
+ if (isHoggish) {
+ float freeCpu = std::max(currentThreadCount - PoolConsumption[poolIdx].Elapsed, currentThreadCount - PoolConsumption[poolIdx].LastSecondElapsed);
+ HoggishPools.push_back({poolIdx, freeCpu});
+ }
+
+ Elapsed += PoolConsumption[poolIdx].Elapsed;
+ Cpu += PoolConsumption[poolIdx].Cpu;
+ LastSecondElapsed += PoolConsumption[poolIdx].LastSecondElapsed;
+ LastSecondCpu += PoolConsumption[poolIdx].LastSecondCpu;
+ pool.LastFlags.store((i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2), std::memory_order_relaxed);
+ LWPROBE_WITH_DEBUG(
+ HarmonizeCheckPool,
+ poolIdx,
+ pool.Pool->GetName(),
+ PoolConsumption[poolIdx].Elapsed,
+ PoolConsumption[poolIdx].Cpu,
+ PoolConsumption[poolIdx].LastSecondElapsed,
+ PoolConsumption[poolIdx].LastSecondCpu,
+ currentThreadCount,
+ pool.MaxFullThreadCount,
+ isStarved,
+ isNeedy,
+ isHoggish
+ );
+ }
+
+ if (NeedyPools.size()) {
+ Sort(NeedyPools.begin(), NeedyPools.end(), [&] (i16 lhs, i16 rhs) {
+ if (pools[lhs]->Priority != pools[rhs]->Priority) {
+ return pools[lhs]->Priority > pools[rhs]->Priority;
+ }
+ return pools[lhs]->Pool->PoolId < pools[rhs]->Pool->PoolId;
+ });
+ }
+
+ if (FreeHalfThread.size()) {
+ Sort(FreeHalfThread.begin(), FreeHalfThread.end(), [&] (i16 lhs, i16 rhs) {
+ if (pools[lhs]->Priority != pools[rhs]->Priority) {
+ return pools[lhs]->Priority > pools[rhs]->Priority;
+ }
+ return pools[lhs]->Pool->PoolId < pools[rhs]->Pool->PoolId;
+ });
+ }
+
+ HARMONIZER_DEBUG_PRINT("NeedyPools", NeedyPools.size(), "FreeHalfThread", FreeHalfThread.size(), "HoggishPools", HoggishPools.size());
+
+ Budget = TotalCores - Max(Elapsed, LastSecondElapsed);
+ BudgetInt = static_cast<i16>(Max(Budget, 0.0f));
+ if (Budget < -0.1) {
+ IsStarvedPresent = true;
+ }
+ Overbooked = Elapsed - Cpu;
+ if (Overbooked < 0) {
+ IsStarvedPresent = false;
+ }
+ HARMONIZER_DEBUG_PRINT("IsStarvedPresent", IsStarvedPresent, "Budget", Budget, "BudgetInt", BudgetInt, "Overbooked", Overbooked, "TotalCores", TotalCores, "Elapsed", Elapsed, "Cpu", Cpu, "LastSecondElapsed", LastSecondElapsed, "LastSecondCpu", LastSecondCpu);
+}
+
+} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/cpu_consumption.h b/ydb/library/actors/core/harmonizer/cpu_consumption.h
new file mode 100644
index 00000000000..0fbfd85b6ee
--- /dev/null
+++ b/ydb/library/actors/core/harmonizer/cpu_consumption.h
@@ -0,0 +1,45 @@
+#pragma once
+
+#include "defs.h"
+#include "pool.h"
+#include "shared_info.h"
+namespace NActors {
+
+struct TPoolInfo;
+
+struct TCpuConsumptionInfo {
+ float Elapsed;
+ float Cpu;
+ float LastSecondElapsed;
+ float LastSecondCpu;
+
+ void Clear();
+}; // struct TCpuConsumptionInfo
+
+struct THarmonizerCpuConsumption {
+ std::vector<TCpuConsumptionInfo> PoolConsumption;
+
+ float TotalCores = 0;
+ i16 AdditionalThreads = 0;
+ i16 StoppingThreads = 0;
+ bool IsStarvedPresent = false;
+ float Budget = 0.0;
+ i16 BudgetInt = 0;
+ float Overbooked = 0.0;
+
+ float Elapsed = 0.0;
+ float Cpu = 0.0;
+ float LastSecondElapsed = 0.0;
+ float LastSecondCpu = 0.0;
+ TStackVec<i16, 8> NeedyPools;
+ TStackVec<std::pair<i16, float>, 8> HoggishPools;
+ TStackVec<bool, 8> IsNeedyByPool;
+ std::vector<i16> FreeHalfThread;
+
+ void Init(i16 poolCount);
+
+ void Pull(const std::vector<std::unique_ptr<TPoolInfo>> &pools, const TSharedInfo& sharedInfo);
+
+}; // struct THarmonizerCpuConsumption
+
+} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/debug.h b/ydb/library/actors/core/harmonizer/debug.h
new file mode 100644
index 00000000000..e4c95639ccc
--- /dev/null
+++ b/ydb/library/actors/core/harmonizer/debug.h
@@ -0,0 +1,54 @@
+#pragma once
+
+#include <ydb/library/actors/core/probes.h>
+
+namespace NActors {
+
+enum class EDebugHarmonizerLevel {
+ None = 0,
+ Debug = 1,
+ Trace = 2,
+ History = 3,
+};
+
+#ifdef HARMONIZER_DEBUG
+constexpr EDebugHarmonizerLevel DebugHarmonizerLevel = EDebugHarmonizerLevel::Debug;
+#elif defined(HARMONIZER_HISTORY_DEBUG)
+constexpr EDebugHarmonizerLevel DebugHarmonizerLevel = EDebugHarmonizerLevel::History;
+#else
+constexpr EDebugHarmonizerLevel DebugHarmonizerLevel = EDebugHarmonizerLevel::None;
+#endif
+
+template <typename ... TArgs>
+void Print(TArgs&& ... args) {
+ ((Cerr << std::forward<TArgs>(args) << " "), ...) << Endl;
+}
+
+#define HARMONIZER_DEBUG_PRINT(...) \
+ if constexpr (DebugHarmonizerLevel >= EDebugHarmonizerLevel::Debug) { Print(__VA_ARGS__); }
+// HARMONIZER_DEBUG_PRINT(...)
+
+#define LWPROBE_WITH_DEBUG(probe, ...) \
+ LWPROBE(probe, __VA_ARGS__); \
+ HARMONIZER_DEBUG_PRINT(#probe, __VA_ARGS__); \
+// LWPROBE_WITH_DEBUG(probe, ...)
+
+#define HARMONIZER_TRACE_PRINT(...) \
+ if constexpr (DebugHarmonizerLevel >= EDebugHarmonizerLevel::Trace) { Print(__VA_ARGS__); }
+// HARMONIZER_TRACE_PRINT(...)
+
+#define LWPROBE_WITH_TRACE(probe, ...) \
+ LWPROBE(probe, __VA_ARGS__); \
+ HARMONIZER_TRACE_PRINT(#probe, __VA_ARGS__); \
+// LWPROBE_WITH_TRACE(probe, ...)
+
+#define HARMONIZER_HISTORY_PRINT(...) \
+ if constexpr (DebugHarmonizerLevel >= EDebugHarmonizerLevel::History) { Print(__VA_ARGS__); }
+// HARMONIZER_HISTORY_PRINT(...)
+
+#define LWPROBE_WITH_HISTORY(probe, ...) \
+ LWPROBE(probe, __VA_ARGS__); \
+ HARMONIZER_HISTORY_PRINT(#probe, __VA_ARGS__); \
+// LWPROBE_WITH_HISTORY(probe, ...)
+
+} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/defs.h b/ydb/library/actors/core/harmonizer/defs.h
new file mode 100644
index 00000000000..4e185067403
--- /dev/null
+++ b/ydb/library/actors/core/harmonizer/defs.h
@@ -0,0 +1,3 @@
+#pragma once
+
+#include <ydb/library/actors/core/defs.h>
diff --git a/ydb/library/actors/core/harmonizer/harmonizer.cpp b/ydb/library/actors/core/harmonizer/harmonizer.cpp
new file mode 100644
index 00000000000..0c92e172308
--- /dev/null
+++ b/ydb/library/actors/core/harmonizer/harmonizer.cpp
@@ -0,0 +1,487 @@
+#include "harmonizer.h"
+#include "history.h"
+#include "pool.h"
+#include "waiting_stats.h"
+#include "cpu_consumption.h"
+#include "shared_info.h"
+#include "debug.h"
+#include <ydb/library/actors/core/executor_pool.h>
+
+#include <ydb/library/actors/core/executor_thread_ctx.h>
+#include <ydb/library/actors/core/executor_thread.h>
+#include <ydb/library/actors/core/probes.h>
+
+#include <ydb/library/actors/core/activity_guard.h>
+#include <ydb/library/actors/core/actorsystem.h>
+#include <ydb/library/actors/core/executor_pool_basic.h>
+#include <ydb/library/actors/core/executor_pool_basic_feature_flags.h>
+#include <ydb/library/actors/core/executor_pool_shared.h>
+
+#include <atomic>
+#include <ydb/library/actors/util/cpu_load_log.h>
+#include <ydb/library/actors/util/datetime.h>
+#include <ydb/library/actors/util/intrinsics.h>
+
+#include <util/system/spinlock.h>
+
+#include <algorithm>
+
+namespace NActors {
+
+
+
+LWTRACE_USING(ACTORLIB_PROVIDER);
+
+
+class THarmonizer: public IHarmonizer {
+private:
+ std::atomic<bool> IsDisabled = false;
+ TSpinLock Lock;
+ std::atomic<ui64> NextHarmonizeTs = 0;
+ std::vector<std::unique_ptr<TPoolInfo>> Pools;
+ std::vector<ui16> PriorityOrder;
+
+ TValueHistory<16> UsedCpu;
+ TValueHistory<16> ElapsedCpu;
+
+ std::atomic<float> MaxUsedCpu = 0;
+ std::atomic<float> MinUsedCpu = 0;
+ std::atomic<float> MaxElapsedCpu = 0;
+ std::atomic<float> MinElapsedCpu = 0;
+
+ ISharedExecutorPool* Shared = nullptr;
+ TSharedInfo SharedInfo;
+
+ TWaitingInfo WaitingInfo;
+ THarmonizerCpuConsumption CpuConsumption;
+ THarmonizerStats Stats;
+ float ProcessingBudget = 0.0;
+
+ void PullStats(ui64 ts);
+ void PullSharedInfo();
+ void ProcessWaitingStats();
+ void HarmonizeImpl(ui64 ts);
+ void CalculatePriorityOrder();
+ void ProcessStarvedState();
+ void ProcessNeedyState();
+ void ProcessExchange();
+ void ProcessHoggishState();
+public:
+ THarmonizer(ui64 ts);
+ virtual ~THarmonizer();
+ double Rescale(double value) const;
+ void Harmonize(ui64 ts) override;
+ void DeclareEmergency(ui64 ts) override;
+ void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override;
+ void Enable(bool enable) override;
+ TPoolHarmonizerStats GetPoolStats(i16 poolId) const override;
+ THarmonizerStats GetStats() const override;
+ void SetSharedPool(ISharedExecutorPool* pool) override;
+};
+
+THarmonizer::THarmonizer(ui64 ts) {
+ NextHarmonizeTs = ts;
+}
+
+THarmonizer::~THarmonizer() {
+}
+
+void THarmonizer::PullStats(ui64 ts) {
+ HARMONIZER_DEBUG_PRINT("PullStats");
+ TCpuConsumption acc;
+ for (auto &pool : Pools) {
+ TCpuConsumption consumption = pool->PullStats(ts);
+ acc.Add(consumption);
+ }
+ UsedCpu.Register(ts, acc.CpuUs / 1'000'000.0);
+ MaxUsedCpu.store(UsedCpu.GetMax(), std::memory_order_relaxed);
+ MinUsedCpu.store(UsedCpu.GetMin(), std::memory_order_relaxed);
+ ElapsedCpu.Register(ts, acc.ElapsedUs / 1'000'000.0);
+ MaxElapsedCpu.store(ElapsedCpu.GetMax(), std::memory_order_relaxed);
+ MinElapsedCpu.store(ElapsedCpu.GetMin(), std::memory_order_relaxed);
+
+ WaitingInfo.Pull(Pools);
+ if (Shared) {
+ SharedInfo.Pull(*Shared);
+ }
+ CpuConsumption.Pull(Pools, SharedInfo);
+ ProcessingBudget = CpuConsumption.Budget;
+}
+
+void THarmonizer::ProcessWaitingStats() {
+ HARMONIZER_DEBUG_PRINT("ProcessWaitingStats");
+ for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
+ TPoolInfo& pool = *Pools[poolIdx];
+ if (!pool.BasicPool) {
+ continue;
+ }
+ if (pool.BasicPool->ActorSystemProfile != EASProfile::Default) {
+ if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) {
+ pool.BasicPool->CalcSpinPerThread(WaitingInfo.AvgWakingUpTimeUs);
+ } else if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) {
+ ui64 newSpinThreshold = pool.MovingWaitingStats->CalculateGoodSpinThresholdCycles(WaitingInfo.AvgWakingUpTimeUs);
+ pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold);
+ } else {
+ ui64 newSpinThreshold = pool.WaitingStats->CalculateGoodSpinThresholdCycles(WaitingInfo.AvgWakingUpTimeUs);
+ pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold);
+ }
+ pool.BasicPool->ClearWaitingStats();
+ }
+ }
+}
+
+void THarmonizer::ProcessStarvedState() {
+ HARMONIZER_DEBUG_PRINT("ProcessStarvedState");
+ HARMONIZER_DEBUG_PRINT("shared info", SharedInfo.ToString());
+ for (ui16 poolIdx : PriorityOrder) {
+ TPoolInfo &pool = *Pools[poolIdx];
+ i64 threadCount = pool.GetFullThreadCount();
+ if (SharedInfo.HasSharedThread[poolIdx] && !SharedInfo.HasSharedThreadWhichWasNotBorrowed[poolIdx]) {
+ HARMONIZER_DEBUG_PRINT("return own half thread", poolIdx);
+ i16 borrowedPool = Shared->ReturnOwnHalfThread(poolIdx);
+ pool.ReturnedHalfThreadByStarvedState.fetch_add(1, std::memory_order_relaxed);
+ Y_ABORT_UNLESS(borrowedPool != -1);
+ Pools[borrowedPool]->GivenHalfThreadByOtherStarvedState.fetch_add(1, std::memory_order_relaxed);
+ } else {
+ HARMONIZER_DEBUG_PRINT("no own half thread", poolIdx, "has shared thread", SharedInfo.HasSharedThread[poolIdx], "has shared thread which was not borrowed", SharedInfo.HasSharedThreadWhichWasNotBorrowed[poolIdx]);
+ }
+ HARMONIZER_DEBUG_PRINT("poolIdx", poolIdx, "threadCount", threadCount, "pool.DefaultFullThreadCount", pool.DefaultFullThreadCount);
+ while (threadCount > pool.DefaultFullThreadCount) {
+ pool.SetFullThreadCount(--threadCount);
+ pool.DecreasingThreadsByStarvedState.fetch_add(1, std::memory_order_relaxed);
+ CpuConsumption.AdditionalThreads--;
+ CpuConsumption.StoppingThreads++;
+
+ LWPROBE_WITH_DEBUG(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by starving", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
+ if (CpuConsumption.Overbooked <= CpuConsumption.StoppingThreads) {
+ break;
+ }
+ }
+ if (CpuConsumption.Overbooked <= CpuConsumption.StoppingThreads) {
+ break;
+ }
+ }
+}
+
+void THarmonizer::ProcessNeedyState() {
+ HARMONIZER_DEBUG_PRINT("ProcessNeedyState");
+ if (CpuConsumption.NeedyPools.empty()) {
+ HARMONIZER_DEBUG_PRINT("No needy pools");
+ return;
+ }
+ for (size_t needyPoolIdx : CpuConsumption.NeedyPools) {
+ TPoolInfo &pool = *Pools[needyPoolIdx];
+ if (!CpuConsumption.IsNeedyByPool[needyPoolIdx]) {
+ continue;
+ }
+ i64 threadCount = pool.GetFullThreadCount();
+ if (Shared && ProcessingBudget >= 0.5 && !SharedInfo.HasBorrowedSharedThread[needyPoolIdx] && CpuConsumption.FreeHalfThread.size()) {
+ i16 poolWithHalfThread = CpuConsumption.FreeHalfThread.back();
+ Shared->GiveHalfThread(poolWithHalfThread, needyPoolIdx);
+ CpuConsumption.FreeHalfThread.pop_back();
+ CpuConsumption.IsNeedyByPool[needyPoolIdx] = false;
+ ProcessingBudget -= 0.5;
+ pool.ReceivedHalfThreadByNeedyState.fetch_add(1, std::memory_order_relaxed);
+ Pools[poolWithHalfThread]->GivenHalfThreadByOtherNeedyState.fetch_add(1, std::memory_order_relaxed);
+ } else if (ProcessingBudget >= 1.0) {
+ if (threadCount + 1 <= pool.MaxFullThreadCount) {
+ pool.IncreasingThreadsByNeedyState.fetch_add(1, std::memory_order_relaxed);
+ CpuConsumption.IsNeedyByPool[needyPoolIdx] = false;
+ CpuConsumption.AdditionalThreads++;
+ pool.SetFullThreadCount(threadCount + 1);
+ ProcessingBudget -= 1.0;
+ LWPROBE_WITH_DEBUG(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
+ }
+ }
+ if constexpr (NFeatures::IsLocalQueues()) {
+ bool needToExpandLocalQueue = ProcessingBudget < 1.0 || threadCount >= pool.MaxFullThreadCount;
+ needToExpandLocalQueue &= (bool)pool.BasicPool;
+ needToExpandLocalQueue &= (pool.MaxFullThreadCount > 1);
+ needToExpandLocalQueue &= (pool.LocalQueueSize < NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE);
+ if (needToExpandLocalQueue) {
+ pool.BasicPool->SetLocalQueueSize(++pool.LocalQueueSize);
+ }
+ }
+ }
+}
+
+void THarmonizer::ProcessExchange() {
+ HARMONIZER_DEBUG_PRINT("ProcessExchange");
+ if (CpuConsumption.NeedyPools.empty()) {
+ HARMONIZER_DEBUG_PRINT("No needy pools");
+ return;
+ }
+ size_t takingAwayThreads = 0;
+ size_t sumOfAdditionalThreads = CpuConsumption.AdditionalThreads;
+ for (size_t needyPoolIdx : CpuConsumption.NeedyPools) {
+ TPoolInfo &pool = *Pools[needyPoolIdx];
+ i64 threadCount = pool.GetFullThreadCount();
+ sumOfAdditionalThreads -= threadCount - pool.DefaultFullThreadCount;
+ if (sumOfAdditionalThreads < takingAwayThreads + 1) {
+ break;
+ }
+ if (!CpuConsumption.IsNeedyByPool[needyPoolIdx]) {
+ continue;
+ }
+ pool.IncreasingThreadsByExchange.fetch_add(1, std::memory_order_relaxed);
+ CpuConsumption.IsNeedyByPool[needyPoolIdx] = false;
+ takingAwayThreads++;
+ pool.SetFullThreadCount(threadCount + 1);
+
+ LWPROBE_WITH_DEBUG(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by exchanging", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
+ }
+
+ for (ui16 poolIdx : PriorityOrder) {
+ if (takingAwayThreads <= 0) {
+ break;
+ }
+
+ TPoolInfo &pool = *Pools[poolIdx];
+ size_t threadCount = pool.GetFullThreadCount();
+ size_t additionalThreadsCount = Max<size_t>(0L, threadCount - pool.DefaultFullThreadCount);
+ size_t currentTakingAwayThreads = Min(additionalThreadsCount, takingAwayThreads);
+
+ if (!currentTakingAwayThreads) {
+ continue;
+ }
+ takingAwayThreads -= currentTakingAwayThreads;
+ pool.SetFullThreadCount(threadCount - currentTakingAwayThreads);
+
+ pool.DecreasingThreadsByExchange.fetch_add(currentTakingAwayThreads, std::memory_order_relaxed);
+ LWPROBE_WITH_DEBUG(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by exchanging", threadCount - currentTakingAwayThreads, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
+ }
+}
+
+void THarmonizer::ProcessHoggishState() {
+ HARMONIZER_DEBUG_PRINT("ProcessHoggishState");
+ for (auto &[hoggishPoolIdx, freeCpu] : CpuConsumption.HoggishPools) {
+ TPoolInfo &pool = *Pools[hoggishPoolIdx];
+ i64 threadCount = pool.GetFullThreadCount();
+ if (SharedInfo.HasBorrowedSharedThread[hoggishPoolIdx]) {
+ i16 originalPool = Shared->ReturnBorrowedHalfThread(hoggishPoolIdx);
+ HARMONIZER_DEBUG_PRINT("has borrowed shared thread", hoggishPoolIdx, "will return half thread to", originalPool);
+ freeCpu -= 0.5;
+ pool.GivenHalfThreadByHoggishState.fetch_add(1, std::memory_order_relaxed);
+ Y_ABORT_UNLESS(originalPool != -1);
+ Pools[originalPool]->ReturnedHalfThreadByOtherHoggishState.fetch_add(1, std::memory_order_relaxed);
+ }
+ if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) {
+ pool.LocalQueueSize = std::min<ui16>(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE, pool.LocalQueueSize / 2);
+ pool.BasicPool->SetLocalQueueSize(pool.LocalQueueSize);
+ }
+ HARMONIZER_DEBUG_PRINT("poolIdx", hoggishPoolIdx, "threadCount", threadCount, "pool.MinFullThreadCount", pool.MinFullThreadCount, "freeCpu", freeCpu);
+ if (threadCount > pool.MinFullThreadCount && freeCpu >= 1) {
+ pool.DecreasingThreadsByHoggishState.fetch_add(1, std::memory_order_relaxed);
+ pool.SetFullThreadCount(threadCount - 1);
+ LWPROBE_WITH_DEBUG(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
+ }
+ }
+}
+
+void THarmonizer::HarmonizeImpl(ui64 ts) {
+ HARMONIZER_DEBUG_PRINT("HarmonizeImpl");
+ Y_UNUSED(ts);
+
+ ProcessWaitingStats();
+
+ for (size_t needyPoolIdx : CpuConsumption.NeedyPools) {
+ TPoolInfo &pool = *Pools[needyPoolIdx];
+ if (pool.AvgPingCounter && pool.LastUpdateTs + Us2Ts(3'000'000) > ts) {
+ CpuConsumption.IsNeedyByPool[needyPoolIdx] = false;
+ HARMONIZER_DEBUG_PRINT("pool won't be updated because time", needyPoolIdx);
+ }
+ }
+
+ HARMONIZER_DEBUG_PRINT("IsStarvedPresent", CpuConsumption.IsStarvedPresent, "Overbooked", CpuConsumption.Overbooked, "StoppingThreads", CpuConsumption.StoppingThreads);
+ if (CpuConsumption.IsStarvedPresent && CpuConsumption.Overbooked >= CpuConsumption.StoppingThreads) {
+ ProcessStarvedState();
+ } else if (!CpuConsumption.IsStarvedPresent) {
+ ProcessNeedyState();
+ }
+
+ if (ProcessingBudget < 1.0) {
+ ProcessExchange();
+ }
+
+ if (!CpuConsumption.HoggishPools.empty()) {
+ ProcessHoggishState();
+ }
+
+ for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
+ TPoolInfo& pool = *Pools[poolIdx];
+ pool.PotentialMaxThreadCount.store(std::min<i64>(pool.MaxThreadCount, static_cast<i64>(pool.GetThreadCount() + CpuConsumption.Budget)), std::memory_order_relaxed);
+ HARMONIZER_DEBUG_PRINT(poolIdx, pool.Pool->GetName(), "potential max thread count", pool.PotentialMaxThreadCount.load(std::memory_order_relaxed), "budget", CpuConsumption.Budget, "thread count", pool.GetThreadCount());
+ }
+}
+
+void THarmonizer::CalculatePriorityOrder() {
+ PriorityOrder.resize(Pools.size());
+ Iota(PriorityOrder.begin(), PriorityOrder.end(), 0);
+ Sort(PriorityOrder.begin(), PriorityOrder.end(), [&] (i16 lhs, i16 rhs) {
+ if (Pools[lhs]->Priority != Pools[rhs]->Priority) {
+ return Pools[lhs]->Priority < Pools[rhs]->Priority;
+ }
+ return Pools[lhs]->Pool->PoolId > Pools[rhs]->Pool->PoolId;
+ });
+}
+
+void THarmonizer::Harmonize(ui64 ts) {
+ if (IsDisabled.load(std::memory_order_relaxed) || NextHarmonizeTs.load(std::memory_order_acquire) > ts || !Lock.TryAcquire()) {
+ LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs.load(std::memory_order_relaxed), IsDisabled.load(std::memory_order_relaxed), false);
+ return;
+ }
+
+ if (NextHarmonizeTs.load(std::memory_order_acquire) > ts) {
+ Lock.Release();
+ return;
+ }
+
+ // Check again under the lock
+ if (IsDisabled.load(std::memory_order_relaxed)) {
+ LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs.load(std::memory_order_relaxed), IsDisabled.load(std::memory_order_relaxed), true);
+ Lock.Release();
+ return;
+ }
+ // Will never reach this line disabled
+ ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull), std::memory_order_acquire);
+ LWPROBE_WITH_DEBUG(TryToHarmonizeSuccess, ts, NextHarmonizeTs.load(std::memory_order_relaxed), previousNextHarmonizeTs);
+
+ {
+ TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_HARMONIZER> activityGuard;
+
+ if (PriorityOrder.empty()) {
+ CalculatePriorityOrder();
+ SharedInfo.Init(Pools.size());
+ CpuConsumption.Init(Pools.size());
+ }
+
+ PullStats(ts);
+ HarmonizeImpl(ts);
+ }
+
+ Lock.Release();
+}
+
+void THarmonizer::DeclareEmergency(ui64 ts) {
+ NextHarmonizeTs = ts;
+}
+
+void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) {
+ TGuard<TSpinLock> guard(Lock);
+ Pools.emplace_back(new TPoolInfo);
+ TPoolInfo &poolInfo = *Pools.back();
+ poolInfo.Pool = pool;
+ poolInfo.Shared = Shared;
+ poolInfo.BasicPool = dynamic_cast<TBasicExecutorPool*>(pool);
+ poolInfo.DefaultThreadCount = pool->GetDefaultThreadCount();
+ poolInfo.MinThreadCount = pool->GetMinThreadCount();
+ poolInfo.MaxThreadCount = pool->GetMaxThreadCount();
+ poolInfo.PotentialMaxThreadCount = poolInfo.MaxThreadCount;
+
+ poolInfo.DefaultFullThreadCount = pool->GetDefaultFullThreadCount();
+ poolInfo.MinFullThreadCount = pool->GetMinFullThreadCount();
+ poolInfo.MaxFullThreadCount = pool->GetMaxFullThreadCount();
+ poolInfo.ThreadInfo.resize(poolInfo.MaxFullThreadCount);
+ poolInfo.SharedInfo.resize(Shared ? Shared->GetSharedThreadCount() : 0);
+ poolInfo.Priority = pool->GetPriority();
+ pool->SetFullThreadCount(poolInfo.DefaultFullThreadCount);
+ if (pingInfo) {
+ poolInfo.AvgPingCounter = pingInfo->AvgPingCounter;
+ poolInfo.AvgPingCounterWithSmallWindow = pingInfo->AvgPingCounterWithSmallWindow;
+ poolInfo.MaxAvgPingUs = pingInfo->MaxAvgPingUs;
+ }
+ if (poolInfo.BasicPool) {
+ poolInfo.WaitingStats.reset(new TWaitingStats<ui64>());
+ poolInfo.MovingWaitingStats.reset(new TWaitingStats<double>());
+ }
+ PriorityOrder.clear();
+}
+
+void THarmonizer::Enable(bool enable) {
+ TGuard<TSpinLock> guard(Lock);
+ IsDisabled = enable;
+}
+
+IHarmonizer* MakeHarmonizer(ui64 ts) {
+ return new THarmonizer(ts);
+}
+
+TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const {
+ const TPoolInfo &pool = *Pools[poolId];
+ ui64 flags = pool.LastFlags.load(std::memory_order_relaxed);
+ return TPoolHarmonizerStats{
+ .IncreasingThreadsByNeedyState = pool.IncreasingThreadsByNeedyState.load(std::memory_order_relaxed),
+ .IncreasingThreadsByExchange = pool.IncreasingThreadsByExchange.load(std::memory_order_relaxed),
+ .DecreasingThreadsByStarvedState = pool.DecreasingThreadsByStarvedState.load(std::memory_order_relaxed),
+ .DecreasingThreadsByHoggishState = pool.DecreasingThreadsByHoggishState.load(std::memory_order_relaxed),
+ .DecreasingThreadsByExchange = pool.DecreasingThreadsByExchange.load(std::memory_order_relaxed),
+ .ReceivedHalfThreadByNeedyState = pool.ReceivedHalfThreadByNeedyState.load(std::memory_order_relaxed),
+ .GivenHalfThreadByOtherStarvedState = pool.GivenHalfThreadByOtherStarvedState.load(std::memory_order_relaxed),
+ .GivenHalfThreadByHoggishState = pool.GivenHalfThreadByHoggishState.load(std::memory_order_relaxed),
+ .GivenHalfThreadByOtherNeedyState = pool.GivenHalfThreadByOtherNeedyState.load(std::memory_order_relaxed),
+ .ReturnedHalfThreadByStarvedState = pool.ReturnedHalfThreadByStarvedState.load(std::memory_order_relaxed),
+ .ReturnedHalfThreadByOtherHoggishState = pool.ReturnedHalfThreadByOtherHoggishState.load(std::memory_order_relaxed),
+ .MaxUsedCpu = pool.MaxUsedCpu.load(std::memory_order_relaxed),
+ .MinUsedCpu = pool.MinUsedCpu.load(std::memory_order_relaxed),
+ .AvgUsedCpu = pool.AvgUsedCpu.load(std::memory_order_relaxed),
+ .MaxElapsedCpu = pool.MaxElapsedCpu.load(std::memory_order_relaxed),
+ .MinElapsedCpu = pool.MinElapsedCpu.load(std::memory_order_relaxed),
+ .AvgElapsedCpu = pool.AvgElapsedCpu.load(std::memory_order_relaxed),
+ .PotentialMaxThreadCount = pool.PotentialMaxThreadCount.load(std::memory_order_relaxed),
+ .IsNeedy = static_cast<bool>(flags & 1),
+ .IsStarved = static_cast<bool>(flags & 2),
+ .IsHoggish = static_cast<bool>(flags & 4),
+ };
+}
+
+THarmonizerStats THarmonizer::GetStats() const {
+ return THarmonizerStats{
+ .MaxUsedCpu = MaxUsedCpu.load(std::memory_order_relaxed),
+ .MinUsedCpu = MinUsedCpu.load(std::memory_order_relaxed),
+ .MaxElapsedCpu = MaxElapsedCpu.load(std::memory_order_relaxed),
+ .MinElapsedCpu = MinElapsedCpu.load(std::memory_order_relaxed),
+ .AvgAwakeningTimeUs = WaitingInfo.AvgAwakeningTimeUs.load(std::memory_order_relaxed),
+ .AvgWakingUpTimeUs = WaitingInfo.AvgWakingUpTimeUs.load(std::memory_order_relaxed),
+ };
+}
+
+void THarmonizer::SetSharedPool(ISharedExecutorPool* pool) {
+ Shared = pool;
+}
+
+TString TPoolHarmonizerStats::ToString() const {
+ return TStringBuilder() << '{'
+ << "IncreasingThreadsByNeedyState: " << IncreasingThreadsByNeedyState << ", "
+ << "IncreasingThreadsByExchange: " << IncreasingThreadsByExchange << ", "
+ << "DecreasingThreadsByStarvedState: " << DecreasingThreadsByStarvedState << ", "
+ << "DecreasingThreadsByHoggishState: " << DecreasingThreadsByHoggishState << ", "
+ << "DecreasingThreadsByExchange: " << DecreasingThreadsByExchange << ", "
+ << "ReceivedHalfThreadByNeedyState: " << ReceivedHalfThreadByNeedyState << ", "
+ << "GivenHalfThreadByOtherStarvedState: " << GivenHalfThreadByOtherStarvedState << ", "
+ << "GivenHalfThreadByHoggishState: " << GivenHalfThreadByHoggishState << ", "
+ << "GivenHalfThreadByOtherNeedyState: " << GivenHalfThreadByOtherNeedyState << ", "
+ << "ReturnedHalfThreadByStarvedState: " << ReturnedHalfThreadByStarvedState << ", "
+ << "ReturnedHalfThreadByOtherHoggishState: " << ReturnedHalfThreadByOtherHoggishState << ", "
+ << "MaxUsedCpu: " << MaxUsedCpu << ", "
+ << "MinUsedCpu: " << MinUsedCpu << ", "
+ << "AvgUsedCpu: " << AvgUsedCpu << ", "
+ << "MaxElapsedCpu: " << MaxElapsedCpu << ", "
+ << "MinElapsedCpu: " << MinElapsedCpu << ", "
+ << "AvgElapsedCpu: " << AvgElapsedCpu << ", "
+ << "PotentialMaxThreadCount: " << PotentialMaxThreadCount << ", "
+ << "IsNeedy: " << IsNeedy << ", "
+ << "IsStarved: " << IsStarved << ", "
+ << "IsHoggish: " << IsHoggish << '}';
+}
+
+TString THarmonizerStats::ToString() const {
+ return TStringBuilder() << '{'
+ << "MaxUsedCpu: " << MaxUsedCpu << ", "
+ << "MinUsedCpu: " << MinUsedCpu << ", "
+ << "MaxElapsedCpu: " << MaxElapsedCpu << ", "
+ << "MinElapsedCpu: " << MinElapsedCpu << ", "
+ << "AvgAwakeningTimeUs: " << AvgAwakeningTimeUs << ", "
+ << "AvgWakingUpTimeUs: " << AvgWakingUpTimeUs << '}';
+}
+
+}
diff --git a/ydb/library/actors/core/harmonizer.h b/ydb/library/actors/core/harmonizer/harmonizer.h
index b4323ef8de1..e60bb19b919 100644
--- a/ydb/library/actors/core/harmonizer.h
+++ b/ydb/library/actors/core/harmonizer/harmonizer.h
@@ -1,11 +1,10 @@
#pragma once
#include "defs.h"
-#include "executor_pool_shared.h"
namespace NActors {
class IExecutorPool;
- class TSharedExecutorPool;
+ class ISharedExecutorPool;
struct TSelfPingInfo;
template <typename T>
@@ -17,25 +16,38 @@ namespace NActors {
ui64 DecreasingThreadsByStarvedState = 0;
ui64 DecreasingThreadsByHoggishState = 0;
ui64 DecreasingThreadsByExchange = 0;
- float MaxConsumedCpu = 0.0;
- float MinConsumedCpu = 0.0;
- float AvgConsumedCpu = 0.0;
- float MaxBookedCpu = 0.0;
- float MinBookedCpu = 0.0;
+
+ ui64 ReceivedHalfThreadByNeedyState = 0;
+ ui64 GivenHalfThreadByOtherStarvedState = 0;
+ ui64 GivenHalfThreadByHoggishState = 0;
+ ui64 GivenHalfThreadByOtherNeedyState = 0;
+ ui64 ReturnedHalfThreadByStarvedState = 0;
+ ui64 ReturnedHalfThreadByOtherHoggishState = 0;
+
+ float MaxUsedCpu = 0.0;
+ float MinUsedCpu = 0.0;
+ float AvgUsedCpu = 0.0;
+ float MaxElapsedCpu = 0.0;
+ float MinElapsedCpu = 0.0;
+ float AvgElapsedCpu = 0.0;
i16 PotentialMaxThreadCount = 0;
bool IsNeedy = false;
bool IsStarved = false;
bool IsHoggish = false;
+
+ TString ToString() const;
};
struct THarmonizerStats {
- i64 MaxConsumedCpu = 0.0;
- i64 MinConsumedCpu = 0.0;
- i64 MaxBookedCpu = 0.0;
- i64 MinBookedCpu = 0.0;
+ float MaxUsedCpu = 0.0;
+ float MinUsedCpu = 0.0;
+ float MaxElapsedCpu = 0.0;
+ float MinElapsedCpu = 0.0;
+
+ float AvgAwakeningTimeUs = 0;
+ float AvgWakingUpTimeUs = 0;
- double AvgAwakeningTimeUs = 0;
- double AvgWakingUpTimeUs = 0;
+ TString ToString() const;
};
// Pool cpu harmonizer
@@ -48,7 +60,7 @@ namespace NActors {
virtual void Enable(bool enable) = 0;
virtual TPoolHarmonizerStats GetPoolStats(i16 poolId) const = 0;
virtual THarmonizerStats GetStats() const = 0;
- virtual void SetSharedPool(TSharedExecutorPool* pool) = 0;
+ virtual void SetSharedPool(ISharedExecutorPool* pool) = 0;
};
IHarmonizer* MakeHarmonizer(ui64 ts);
diff --git a/ydb/library/actors/core/harmonizer/history.h b/ydb/library/actors/core/harmonizer/history.h
new file mode 100644
index 00000000000..889033dc7d8
--- /dev/null
+++ b/ydb/library/actors/core/harmonizer/history.h
@@ -0,0 +1,183 @@
+#pragma once
+
+#include "defs.h"
+#include "debug.h"
+#include <ydb/library/actors/util/datetime.h>
+
+
+namespace NActors {
+
+// TValueHistory takes an accumulating value that only increases
+// Then it splits this value by seconds relative to the elapsed time since the last value registration
+// Allows finding per-second maximum, minimum, and average values for the last N seconds
+// WithTail=false - calculate only fully passed seconds; example: current time is 4.5, GetAvgLastSeconds(2) calculates values from 2.0 to 4.0
+// WithTail=true - calculate also partially passed seconds; example: current time is 4.5, GetAvgLastSeconds(2) calculates values from 2.5 to 4.5
+// If time passed more seconds than history buffer size, then all values are recalculated by average value
+
+template <ui8 HistoryBufferSize = 8>
+struct TValueHistory {
+
+ static constexpr bool CheckBinaryPower(ui64 value) {
+ return !(value & (value - 1));
+ }
+
+ static_assert(CheckBinaryPower(HistoryBufferSize));
+
+ double History[HistoryBufferSize] = {0.0};
+ ui64 HistoryIdx = 0;
+ ui64 LastTs = Max<ui64>();
+ double LastValue = 0.0;
+ double AccumulatedValue = 0.0;
+ ui64 AccumulatedTs = 0;
+
+ template <bool WithTail=false>
+ double Accumulate(auto op, auto comb, ui8 seconds) const {
+ HARMONIZER_HISTORY_PRINT("Accumulate, seconds = ", static_cast<ui16>(seconds), ", WithTail = ", WithTail);
+ double acc = AccumulatedValue;
+ size_t idx = HistoryIdx;
+ ui8 leftSeconds = seconds;
+ if constexpr (!WithTail) {
+ idx--;
+ leftSeconds--;
+ if (idx >= HistoryBufferSize) {
+ idx = HistoryBufferSize - 1;
+ }
+ acc = History[idx];
+ }
+ HARMONIZER_HISTORY_PRINT("Accumulate iteration, acc = ", acc, ", idx = ", static_cast<ui16>(idx), ", leftSeconds = ", static_cast<ui16>(leftSeconds));
+ while (leftSeconds) {
+ idx--;
+ leftSeconds--;
+ if (idx >= HistoryBufferSize) {
+ idx = HistoryBufferSize - 1;
+ }
+ if constexpr (!WithTail) {
+ acc = op(acc, History[idx]);
+ } else if (leftSeconds) {
+ acc = op(acc, History[idx]);
+ } else {
+ ui64 tsInSecond = Us2Ts(1'000'000.0);
+ acc = op(acc, History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond);
+ }
+ HARMONIZER_HISTORY_PRINT("Accumulate iteration, acc = ", acc, ", idx = ", static_cast<ui16>(idx), ", leftSeconds = ", static_cast<ui16>(leftSeconds));
+ }
+ auto result = comb(acc, seconds);
+ HARMONIZER_HISTORY_PRINT("Accumulate return, acc = ", acc, ", duration = ", static_cast<ui16>(seconds), ", result = ", result);
+ return result;
+ }
+
+ template <bool WithTail=false>
+ double GetAvgPartForLastSeconds(ui8 seconds) const {
+ auto sum = [](double acc, double value) {
+ HARMONIZER_HISTORY_PRINT("calculate sum, acc = ", acc, ", value = ", value, ", acc + value = ", acc + value);
+ return acc + value;
+ };
+ auto avg = [](double sum, double duration) {
+ HARMONIZER_HISTORY_PRINT("calculate avg, sum = ", sum, ", duration = ", duration);
+ return sum / duration;
+ };
+ HARMONIZER_HISTORY_PRINT("GetAvgPartForLastSeconds, seconds = ", static_cast<ui16>(seconds), ", WithTail = ", WithTail);
+ return Accumulate<WithTail>(sum, avg, seconds);
+ }
+
+ double GetAvgPart() const {
+ HARMONIZER_HISTORY_PRINT("GetAvgPart, seconds = ", static_cast<ui16>(HistoryBufferSize), ", WithTail = ", true);
+ return GetAvgPartForLastSeconds<true>(HistoryBufferSize);
+ }
+
+ double GetMaxForLastSeconds(ui8 seconds) const {
+ auto max = [](const double& acc, const double& value) {
+ HARMONIZER_HISTORY_PRINT("calculate max, acc = ", acc, ", value = ", value, ", Max(acc, value) = ", Max(acc, value));
+ return Max(acc, value);
+ };
+ auto fst = [](const double& value, const double&) { return value; };
+ HARMONIZER_HISTORY_PRINT("GetMaxForLastSeconds, seconds = ", static_cast<ui16>(seconds), ", WithTail = ", false);
+ return Accumulate<false>(max, fst, seconds);
+ }
+
+ double GetMax() const {
+ HARMONIZER_HISTORY_PRINT("GetMax, seconds = ", static_cast<ui16>(HistoryBufferSize), ", WithTail = ", false);
+ return GetMaxForLastSeconds(HistoryBufferSize);
+ }
+
+ double GetMinForLastSeconds(ui8 seconds) const {
+ auto min = [](const double& acc, const double& value) {
+ HARMONIZER_HISTORY_PRINT("calculate min, acc = ", acc, ", value = ", value, ", Min(acc, value) = ", Min(acc, value));
+ return Min(acc, value);
+ };
+ auto fst = [](const double& value, const double&) { return value; };
+ HARMONIZER_HISTORY_PRINT("GetMinForLastSeconds, seconds = ", static_cast<ui16>(seconds), ", WithTail = ", false);
+ return Accumulate<false>(min, fst, seconds);
+ }
+
+ double GetMin() const {
+ HARMONIZER_HISTORY_PRINT("GetMin, seconds = ", static_cast<ui16>(HistoryBufferSize), ", WithTail = ", false);
+ return GetMinForLastSeconds(HistoryBufferSize);
+ }
+
+ void Register(ui64 ts, double value) {
+ HARMONIZER_HISTORY_PRINT("Register, ts = ", ts, ", value = ", value);
+ if (ts < LastTs) {
+ LastTs = ts;
+ LastValue = value;
+ AccumulatedValue = 0.0;
+ AccumulatedTs = 0;
+ for (size_t idx = 0; idx < HistoryBufferSize; ++idx) {
+ History[idx] = 0.0;
+ }
+ HARMONIZER_HISTORY_PRINT("Register backward time, LastTs = ", LastTs, ", LastValue = ", LastValue, ", AccumulatedValue = ", AccumulatedValue, ", AccumulatedTs = ", AccumulatedTs);
+ return;
+ }
+ ui64 lastTs = std::exchange(LastTs, ts);
+ ui64 dTs = ts - lastTs;
+ double lastValue = std::exchange(LastValue, value);
+ double dValue = value - lastValue;
+
+ if (dTs > HistoryBufferSize * Us2Ts(1'000'000.0)) {
+ dValue = dValue * 1'000'000.0 / Ts2Us(dTs);
+ for (size_t idx = 0; idx < HistoryBufferSize; ++idx) {
+ History[idx] = dValue;
+ }
+ AccumulatedValue = 0.0;
+ AccumulatedTs = 0;
+ HARMONIZER_HISTORY_PRINT("Register big gap, dTs = ", dTs, ", dValue = ", dValue, ", AccumulatedValue = ", AccumulatedValue, ", AccumulatedTs = ", AccumulatedTs, "lastTs = ", lastTs, ", lastValue = ", lastValue);
+ return;
+ }
+
+ if (dTs == 0) {
+ AccumulatedValue += dValue;
+ HARMONIZER_HISTORY_PRINT("Register zero gap, dTs = ", dTs, ", dValue = ", dValue, ", AccumulatedValue = ", AccumulatedValue, ", AccumulatedTs = ", AccumulatedTs, "lastTs = ", lastTs, ", lastValue = ", lastValue);
+ return;
+ }
+
+ HARMONIZER_HISTORY_PRINT("Register start processing seconds, dTs = ", dTs, ", dValue = ", dValue, ", AccumulatedValue = ", AccumulatedValue, ", AccumulatedTs = ", AccumulatedTs, "lastTs = ", lastTs, ", lastValue = ", lastValue);
+
+ while (dTs > 0) {
+ if (AccumulatedTs + dTs < Us2Ts(1'000'000.0)) {
+ AccumulatedTs += dTs;
+ AccumulatedValue += dValue;
+ HARMONIZER_HISTORY_PRINT("Register small gap, dTs = ", dTs, ", dValue = ", dValue, ", AccumulatedValue = ", AccumulatedValue, ", AccumulatedTs = ", AccumulatedTs);
+ break;
+ } else {
+ ui64 addTs = Us2Ts(1'000'000.0) - AccumulatedTs;
+ double addValue = dValue * addTs / dTs;
+ dTs -= addTs;
+ dValue -= addValue;
+ History[HistoryIdx] = AccumulatedValue + addValue;
+ HistoryIdx = (HistoryIdx + 1) % HistoryBufferSize;
+ AccumulatedValue = 0.0;
+ AccumulatedTs = 0;
+ HARMONIZER_HISTORY_PRINT("Register process one second, dTs = ", dTs,
+ ", dValue = ", dValue,
+ ", addTs = ", addTs,
+ ", addValue = ", addValue,
+ ", AccumulatedValue = ", AccumulatedValue,
+ ", AccumulatedTs = ", AccumulatedTs,
+ ", HistoryIdx = ", HistoryIdx,
+ ", History[HistoryIdx - 1] = ", History[(HistoryIdx + HistoryBufferSize - 1) % HistoryBufferSize]);
+ }
+ }
+ }
+}; // struct TValueHistory
+
+} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/pool.cpp b/ydb/library/actors/core/harmonizer/pool.cpp
new file mode 100644
index 00000000000..2f96b891c8a
--- /dev/null
+++ b/ydb/library/actors/core/harmonizer/pool.cpp
@@ -0,0 +1,150 @@
+#include "pool.h"
+
+#include <ydb/library/actors/core/executor_pool.h>
+#include <ydb/library/actors/core/executor_pool_shared.h>
+#include <ydb/library/actors/core/executor_pool_basic.h>
+#include <ydb/library/actors/core/executor_pool_basic_feature_flags.h>
+
+#include "debug.h"
+
+namespace NActors {
+
+LWTRACE_USING(ACTORLIB_PROVIDER);
+
+TPoolInfo::TPoolInfo()
+ : LocalQueueSize(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE)
+{}
+
+double TPoolInfo::GetCpu(i16 threadIdx) const {
+ if ((size_t)threadIdx < ThreadInfo.size()) {
+ return ThreadInfo[threadIdx].UsedCpu.GetAvgPart();
+ }
+ return 0.0;
+}
+
+double TPoolInfo::GetSharedCpu(i16 sharedThreadIdx) const {
+ if ((size_t)sharedThreadIdx < SharedInfo.size()) {
+ return SharedInfo[sharedThreadIdx].UsedCpu.GetAvgPart();
+ }
+ return 0.0;
+}
+
+double TPoolInfo::GetLastSecondCpu(i16 threadIdx) const {
+ if ((size_t)threadIdx < ThreadInfo.size()) {
+ return ThreadInfo[threadIdx].UsedCpu.GetAvgPartForLastSeconds<true>(1);
+ }
+ return 0.0;
+}
+
+double TPoolInfo::GetLastSecondSharedCpu(i16 sharedThreadIdx) const {
+ if ((size_t)sharedThreadIdx < SharedInfo.size()) {
+ return SharedInfo[sharedThreadIdx].UsedCpu.GetAvgPartForLastSeconds<true>(1);
+ }
+ return 0.0;
+}
+
+double TPoolInfo::GetElapsed(i16 threadIdx) const {
+ if ((size_t)threadIdx < ThreadInfo.size()) {
+ return ThreadInfo[threadIdx].ElapsedCpu.GetAvgPart();
+ }
+ return 0.0;
+}
+
+double TPoolInfo::GetSharedElapsed(i16 sharedThreadIdx) const {
+ if ((size_t)sharedThreadIdx < SharedInfo.size()) {
+ return SharedInfo[sharedThreadIdx].ElapsedCpu.GetAvgPart();
+ }
+ return 0.0;
+}
+
+double TPoolInfo::GetLastSecondElapsed(i16 threadIdx) const {
+ if ((size_t)threadIdx < ThreadInfo.size()) {
+ return ThreadInfo[threadIdx].ElapsedCpu.GetAvgPartForLastSeconds<true>(1);
+ }
+ return 0.0;
+}
+
+double TPoolInfo::GetLastSecondSharedElapsed(i16 sharedThreadIdx) const {
+ if ((size_t)sharedThreadIdx < SharedInfo.size()) {
+ return SharedInfo[sharedThreadIdx].ElapsedCpu.GetAvgPartForLastSeconds<true>(1);
+ }
+ return 0.0;
+}
+
+#define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7]
+TCpuConsumption TPoolInfo::PullStats(ui64 ts) {
+ TCpuConsumption acc;
+ for (i16 threadIdx = 0; threadIdx < MaxFullThreadCount; ++threadIdx) {
+ TThreadInfo &threadInfo = ThreadInfo[threadIdx];
+ TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx);
+ acc.Add(cpuConsumption);
+ threadInfo.ElapsedCpu.Register(ts, cpuConsumption.ElapsedUs / 1'000'000.0);
+ LWPROBE_WITH_TRACE(SavedValues, Pool->PoolId, Pool->GetName(), "elapsed", UNROLL_HISTORY(threadInfo.ElapsedCpu.History));
+ threadInfo.UsedCpu.Register(ts, cpuConsumption.CpuUs / 1'000'000.0);
+ LWPROBE_WITH_TRACE(SavedValues, Pool->PoolId, Pool->GetName(), "cpu", UNROLL_HISTORY(threadInfo.UsedCpu.History));
+ }
+ TVector<TExecutorThreadStats> sharedStats;
+ if (Shared) {
+ Shared->GetSharedStatsForHarmonizer(Pool->PoolId, sharedStats);
+ }
+
+ for (ui32 sharedIdx = 0; sharedIdx < SharedInfo.size(); ++sharedIdx) {
+ auto stat = sharedStats[sharedIdx];
+ TCpuConsumption sharedConsumption{
+ static_cast<float>(stat.CpuUs),
+ Ts2Us(stat.SafeElapsedTicks),
+ stat.NotEnoughCpuExecutions
+ };
+ acc.Add(sharedConsumption);
+ SharedInfo[sharedIdx].ElapsedCpu.Register(ts, sharedConsumption.ElapsedUs / 1'000'000.0);
+ LWPROBE_WITH_TRACE(SavedValues, Pool->PoolId, Pool->GetName(), "elapsed", UNROLL_HISTORY(SharedInfo[sharedIdx].ElapsedCpu.History));
+ SharedInfo[sharedIdx].UsedCpu.Register(ts, sharedConsumption.CpuUs / 1'000'000.0);
+ LWPROBE_WITH_TRACE(SavedValues, Pool->PoolId, Pool->GetName(), "cpu", UNROLL_HISTORY(SharedInfo[sharedIdx].UsedCpu.History));
+ }
+
+ UsedCpu.Register(ts, acc.CpuUs / 1'000'000.0);
+ MaxUsedCpu.store(UsedCpu.GetMax(), std::memory_order_relaxed);
+ MinUsedCpu.store(UsedCpu.GetMin(), std::memory_order_relaxed);
+ AvgUsedCpu.store(UsedCpu.GetAvgPart(), std::memory_order_relaxed);
+ ElapsedCpu.Register(ts, acc.ElapsedUs / 1'000'000.0);
+ MaxElapsedCpu.store(ElapsedCpu.GetMax(), std::memory_order_relaxed);
+ MinElapsedCpu.store(ElapsedCpu.GetMin(), std::memory_order_relaxed);
+ AvgElapsedCpu.store(ElapsedCpu.GetAvgPart(), std::memory_order_relaxed);
+ NewNotEnoughCpuExecutions = acc.NotEnoughCpuExecutions - NotEnoughCpuExecutions;
+ NotEnoughCpuExecutions = acc.NotEnoughCpuExecutions;
+ if (WaitingStats && BasicPool) {
+ WaitingStats->Clear();
+ BasicPool->GetWaitingStats(*WaitingStats);
+ if constexpr (!NFeatures::TSpinFeatureFlags::CalcPerThread) {
+ MovingWaitingStats->Add(*WaitingStats, 0.8, 0.2);
+ }
+ }
+ return acc;
+}
+#undef UNROLL_HISTORY
+
+float TPoolInfo::GetThreadCount() {
+ return Pool->GetThreadCount();
+}
+
+i16 TPoolInfo::GetFullThreadCount() {
+ return Pool->GetFullThreadCount();
+}
+
+void TPoolInfo::SetFullThreadCount(i16 threadCount) {
+ HARMONIZER_DEBUG_PRINT(Pool->PoolId, Pool->GetName(), "set full thread count", threadCount);
+ Pool->SetFullThreadCount(threadCount);
+}
+
+bool TPoolInfo::IsAvgPingGood() {
+ bool res = true;
+ if (AvgPingCounter) {
+ res &= *AvgPingCounter > MaxAvgPingUs;
+ }
+ if (AvgPingCounterWithSmallWindow) {
+ res &= *AvgPingCounterWithSmallWindow > MaxAvgPingUs;
+ }
+ return res;
+}
+
+} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/pool.h b/ydb/library/actors/core/harmonizer/pool.h
new file mode 100644
index 00000000000..9b083dcb8e4
--- /dev/null
+++ b/ydb/library/actors/core/harmonizer/pool.h
@@ -0,0 +1,93 @@
+#pragma once
+
+#include "defs.h"
+
+#include "history.h"
+#include <ydb/library/actors/core/executor_pool.h>
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+namespace NActors {
+
+class ISharedExecutorPool;
+class TBasicExecutorPool;
+class IExecutorPool;
+template <typename T>
+struct TWaitingStats;
+
+struct TThreadInfo {
+ TValueHistory<8> UsedCpu;
+ TValueHistory<8> ElapsedCpu;
+}; // struct TThreadInfo
+
+struct TPoolInfo {
+ std::vector<TThreadInfo> ThreadInfo;
+ std::vector<TThreadInfo> SharedInfo;
+ ISharedExecutorPool* Shared = nullptr;
+ IExecutorPool* Pool = nullptr;
+ TBasicExecutorPool* BasicPool = nullptr;
+
+ i16 DefaultFullThreadCount = 0;
+ i16 MinFullThreadCount = 0;
+ i16 MaxFullThreadCount = 0;
+
+ float DefaultThreadCount = 0;
+ float MinThreadCount = 0;
+ float MaxThreadCount = 0;
+
+ i16 Priority = 0;
+ NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter;
+ NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
+ ui32 MaxAvgPingUs = 0;
+ ui64 LastUpdateTs = 0;
+ ui64 NotEnoughCpuExecutions = 0;
+ ui64 NewNotEnoughCpuExecutions = 0;
+ ui16 LocalQueueSize;
+
+ std::atomic<i64> LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish
+ std::atomic<ui64> IncreasingThreadsByNeedyState = 0;
+ std::atomic<ui64> IncreasingThreadsByExchange = 0;
+ std::atomic<ui64> DecreasingThreadsByStarvedState = 0;
+ std::atomic<ui64> DecreasingThreadsByHoggishState = 0;
+ std::atomic<ui64> DecreasingThreadsByExchange = 0;
+ std::atomic<i16> PotentialMaxThreadCount = 0;
+ std::atomic<ui64> ReceivedHalfThreadByNeedyState = 0;
+ std::atomic<ui64> GivenHalfThreadByOtherStarvedState = 0;
+ std::atomic<ui64> GivenHalfThreadByHoggishState = 0;
+ std::atomic<ui64> GivenHalfThreadByOtherNeedyState = 0;
+ std::atomic<ui64> ReturnedHalfThreadByStarvedState = 0;
+ std::atomic<ui64> ReturnedHalfThreadByOtherHoggishState = 0;
+
+ TValueHistory<16> UsedCpu;
+ TValueHistory<16> ElapsedCpu;
+
+ std::atomic<float> MaxUsedCpu = 0;
+ std::atomic<float> MinUsedCpu = 0;
+ std::atomic<float> AvgUsedCpu = 0;
+ std::atomic<float> MaxElapsedCpu = 0;
+ std::atomic<float> MinElapsedCpu = 0;
+ std::atomic<float> AvgElapsedCpu = 0;
+
+ std::unique_ptr<TWaitingStats<ui64>> WaitingStats;
+ std::unique_ptr<TWaitingStats<double>> MovingWaitingStats;
+
+ TPoolInfo();
+
+ double GetCpu(i16 threadIdx) const;
+ double GetElapsed(i16 threadIdx) const;
+ double GetLastSecondCpu(i16 threadIdx) const;
+ double GetLastSecondElapsed(i16 threadIdx) const;
+
+ double GetSharedCpu(i16 sharedThreadIdx) const;
+ double GetSharedElapsed(i16 sharedThreadIdx) const;
+ double GetLastSecondSharedCpu(i16 sharedThreadIdx) const;
+ double GetLastSecondSharedElapsed(i16 sharedThreadIdx) const;
+
+ TCpuConsumption PullStats(ui64 ts);
+ i16 GetFullThreadCount();
+ float GetThreadCount();
+ void SetFullThreadCount(i16 threadCount);
+ bool IsAvgPingGood();
+}; // struct TPoolInfo
+
+} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/shared_info.cpp b/ydb/library/actors/core/harmonizer/shared_info.cpp
new file mode 100644
index 00000000000..6b8b94f0bd8
--- /dev/null
+++ b/ydb/library/actors/core/harmonizer/shared_info.cpp
@@ -0,0 +1,55 @@
+#include "shared_info.h"
+#include <util/string/builder.h>
+
+#include <ydb/library/actors/core/executor_pool_shared.h>
+
+namespace NActors {
+
+void TSharedInfo::Init(i16 poolCount) {
+ HasSharedThread.resize(poolCount, false);
+ HasSharedThreadWhichWasNotBorrowed.resize(poolCount, false);
+ HasBorrowedSharedThread.resize(poolCount, false);
+}
+
+void TSharedInfo::Pull(const ISharedExecutorPool& shared) {
+ auto sharedState = shared.GetState();
+ for (ui32 poolIdx = 0; poolIdx < HasSharedThread.size(); ++poolIdx) {
+ i16 threadIdx = sharedState.ThreadByPool[poolIdx];
+ if (threadIdx != -1) {
+ HasSharedThread[poolIdx] = true;
+ if (sharedState.PoolByBorrowedThread[threadIdx] == -1) {
+ HasSharedThreadWhichWasNotBorrowed[poolIdx] = true;
+ } else {
+ HasSharedThreadWhichWasNotBorrowed[poolIdx] = false;
+ }
+ }
+ if (sharedState.BorrowedThreadByPool[poolIdx] != -1) {
+ HasBorrowedSharedThread[poolIdx] = true;
+ } else {
+ HasBorrowedSharedThread[poolIdx] = false;
+ }
+ }
+}
+
+TString TSharedInfo::ToString() const {
+ TStringBuilder builder;
+ builder << "{";
+ builder << "HasSharedThread: \"";
+ for (ui32 i = 0; i < HasSharedThread.size(); ++i) {
+ builder << (HasSharedThread[i] ? "1" : "0");
+ }
+ builder << "\", ";
+ builder << "HasSharedThreadWhichWasNotBorrowed: \"";
+ for (ui32 i = 0; i < HasSharedThreadWhichWasNotBorrowed.size(); ++i) {
+ builder << (HasSharedThreadWhichWasNotBorrowed[i] ? "1" : "0");
+ }
+ builder << "\", ";
+ builder << "HasBorrowedSharedThread: \"";
+ for (ui32 i = 0; i < HasBorrowedSharedThread.size(); ++i) {
+ builder << (HasBorrowedSharedThread[i] ? "1" : "0");
+ }
+ builder << "\"}";
+ return builder;
+}
+
+} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/shared_info.h b/ydb/library/actors/core/harmonizer/shared_info.h
new file mode 100644
index 00000000000..41277db1bbb
--- /dev/null
+++ b/ydb/library/actors/core/harmonizer/shared_info.h
@@ -0,0 +1,20 @@
+#pragma once
+
+#include "defs.h"
+
+namespace NActors {
+
+class ISharedExecutorPool;
+
+struct TSharedInfo {
+ std::vector<bool> HasSharedThread;
+ std::vector<bool> HasSharedThreadWhichWasNotBorrowed;
+ std::vector<bool> HasBorrowedSharedThread;
+
+ void Init(i16 poolCount);
+ void Pull(const ISharedExecutorPool& shared);
+
+ TString ToString() const;
+}; // struct TSharedInfo
+
+} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/ut/harmonizer_ut.cpp b/ydb/library/actors/core/harmonizer/ut/harmonizer_ut.cpp
new file mode 100644
index 00000000000..571550c7b1b
--- /dev/null
+++ b/ydb/library/actors/core/harmonizer/ut/harmonizer_ut.cpp
@@ -0,0 +1,673 @@
+#include "harmonizer.h"
+#include "debug.h"
+#include <library/cpp/testing/unittest/registar.h>
+#include <ydb/library/actors/core/executor_pool_shared.h>
+#include <ydb/library/actors/core/executor_thread_ctx.h>
+#include <ydb/library/actors/helpers/pool_stats_collector.h>
+
+using namespace NActors;
+
+
+#define CHECK_CHANGING_THREADS(stats, inc_needy, inc_exchange, dec_hoggish, dec_starved, dec_exchange) \
+ UNIT_ASSERT_VALUES_EQUAL_C((stats).IncreasingThreadsByNeedyState, inc_needy, (stats).ToString()); \
+ UNIT_ASSERT_VALUES_EQUAL_C((stats).IncreasingThreadsByExchange, inc_exchange, (stats).ToString()); \
+ UNIT_ASSERT_VALUES_EQUAL_C((stats).DecreasingThreadsByHoggishState, dec_hoggish, (stats).ToString()); \
+ UNIT_ASSERT_VALUES_EQUAL_C((stats).DecreasingThreadsByStarvedState, dec_starved, (stats).ToString()); \
+ UNIT_ASSERT_VALUES_EQUAL_C((stats).DecreasingThreadsByExchange, dec_exchange, (stats).ToString());
+// end CHECK_CHANGING_THREADS
+
+#define CHECK_CHANGING_HALF_THREADS(stats, received_needy, given_starved, given_hoggish, given_other_needy, returned_starved, returned_other_hoggish) \
+ UNIT_ASSERT_VALUES_EQUAL_C((stats).ReceivedHalfThreadByNeedyState, received_needy, (stats).ToString()); \
+ UNIT_ASSERT_VALUES_EQUAL_C((stats).GivenHalfThreadByOtherStarvedState, given_starved, (stats).ToString()); \
+ UNIT_ASSERT_VALUES_EQUAL_C((stats).GivenHalfThreadByHoggishState, given_hoggish, (stats).ToString()); \
+ UNIT_ASSERT_VALUES_EQUAL_C((stats).GivenHalfThreadByOtherNeedyState, given_other_needy, (stats).ToString()); \
+ UNIT_ASSERT_VALUES_EQUAL_C((stats).ReturnedHalfThreadByStarvedState, returned_starved, (stats).ToString()); \
+ UNIT_ASSERT_VALUES_EQUAL_C((stats).ReturnedHalfThreadByOtherHoggishState, returned_other_hoggish, (stats).ToString());
+// end CHECK_CHANGING_HALF_THREADS
+
+#define CHECK_IS_NEEDY(stats) \
+ UNIT_ASSERT_VALUES_EQUAL_C((stats).IsNeedy, true, (stats).ToString()); \
+// end CHECK_IS_NEEDY
+
+#define CHECK_IS_NOT_NEEDY(stats) \
+ UNIT_ASSERT_VALUES_EQUAL_C((stats).IsNeedy, false, (stats).ToString()); \
+// end CHECK_IS_NOT_NEEDY
+
+#define CHECK_IS_HOGGISH(stats) \
+ UNIT_ASSERT_VALUES_EQUAL_C((stats).IsHoggish, true, (stats).ToString()); \
+// end CHECK_IS_HOGGISH
+
+#define CHECK_IS_NOT_HOGGISH(stats) \
+ UNIT_ASSERT_VALUES_EQUAL_C((stats).IsHoggish, false, (stats).ToString()); \
+// end CHECK_IS_NOT_HOGGISH
+
+#define CHECK_IS_STARVED(stats) \
+ UNIT_ASSERT_VALUES_EQUAL_C((stats).IsStarved, true, (stats).ToString()); \
+// end CHECK_IS_STARVED
+
+#define CHECK_IS_NOT_STARVED(stats) \
+ UNIT_ASSERT_VALUES_EQUAL_C((stats).IsStarved, false, (stats).ToString()); \
+// end CHECK_IS_NOT_STARVED
+
+
+Y_UNIT_TEST_SUITE(HarmonizerTests) {
+
+ struct TMockExecutorPoolParams {
+ i16 DefaultFullThreadCount = 4;
+ i16 MinFullThreadCount = 4;
+ i16 MaxFullThreadCount = 8;
+ float DefaultThreadCount = 4.0f;
+ float MinThreadCount = 4.0f;
+ float MaxThreadCount = 8.0f;
+ i16 Priority = 0;
+ TString Name = "MockPool";
+ ui32 PoolId = 0;
+
+ TString ToString() const {
+ return TStringBuilder() << "PoolId: " << PoolId << ", Name: " << Name << ", DefaultFullThreadCount: " << DefaultFullThreadCount << ", MinFullThreadCount: " << MinFullThreadCount << ", MaxFullThreadCount: " << MaxFullThreadCount << ", DefaultThreadCount: " << DefaultThreadCount << ", MinThreadCount: " << MinThreadCount << ", MaxThreadCount: " << MaxThreadCount << ", Priority: " << Priority;
+ }
+ };
+
+ struct TCpuConsumptionModel {
+ TCpuConsumption value;
+ TCpuConsumptionModel() : value() {}
+ TCpuConsumptionModel(const TCpuConsumption& other) : value(other) {}
+ operator TCpuConsumption() const {
+ return value;
+ }
+ void Increase(const TCpuConsumption& other) {
+ value.ElapsedUs += other.ElapsedUs;
+ value.CpuUs += other.CpuUs;
+ value.NotEnoughCpuExecutions += other.NotEnoughCpuExecutions;
+ }
+ };
+
+ class TMockExecutorPool : public IExecutorPool {
+ public:
+ TMockExecutorPool(const TMockExecutorPoolParams& params = TMockExecutorPoolParams())
+ : IExecutorPool(params.PoolId)
+ , Params(params)
+ , ThreadCount(params.DefaultFullThreadCount)
+ , ThreadCpuConsumptions(params.MaxFullThreadCount, TCpuConsumption{0.0, 0.0})
+ {
+
+ }
+
+ TMockExecutorPoolParams Params;
+ i16 ThreadCount = 0;
+ std::vector<TCpuConsumptionModel> ThreadCpuConsumptions;
+ std::vector<TSharedExecutorThreadCtx*> SharedThreads;
+
+ i16 GetDefaultFullThreadCount() const override { return Params.DefaultFullThreadCount; }
+ i16 GetMinFullThreadCount() const override { return Params.MinFullThreadCount; }
+ i16 GetMaxFullThreadCount() const override { return Params.MaxFullThreadCount; }
+ void SetFullThreadCount(i16 count) override {
+ HARMONIZER_DEBUG_PRINT(Params.PoolId, Params.Name, "set full thread count", count);
+ ThreadCount = Max(Params.MinFullThreadCount, Min(Params.MaxFullThreadCount, count));
+ }
+ i16 GetFullThreadCount() const override { return ThreadCount; }
+ float GetDefaultThreadCount() const override { return Params.DefaultThreadCount; }
+ float GetMinThreadCount() const override { return Params.MinThreadCount; }
+ float GetMaxThreadCount() const override { return Params.MaxThreadCount; }
+ i16 GetPriority() const override { return Params.Priority; }
+ TString GetName() const override { return Params.Name; }
+
+ // Дополнительные методы из IExecutorPool
+ void Prepare(TActorSystem* /*actorSystem*/, NSchedulerQueue::TReader** /*scheduleReaders*/, ui32* /*scheduleSz*/) override {}
+ void Start() override {}
+ void PrepareStop() override {}
+ void Shutdown() override {}
+ bool Cleanup() override { return true; }
+
+ TMailbox* GetReadyActivation(TWorkerContext& /*wctx*/, ui64 /*revolvingCounter*/) override { return nullptr; }
+ TMailbox* ResolveMailbox(ui32 /*hint*/) override { return nullptr; }
+
+ void Schedule(TInstant /*deadline*/, TAutoPtr<IEventHandle> /*ev*/, ISchedulerCookie* /*cookie*/, TWorkerId /*workerId*/) override {}
+ void Schedule(TMonotonic /*deadline*/, TAutoPtr<IEventHandle> /*ev*/, ISchedulerCookie* /*cookie*/, TWorkerId /*workerId*/) override {}
+ void Schedule(TDuration /*delta*/, TAutoPtr<IEventHandle> /*ev*/, ISchedulerCookie* /*cookie*/, TWorkerId /*workerId*/) override {}
+
+ bool Send(TAutoPtr<IEventHandle>& /*ev*/) override { return true; }
+ bool SpecificSend(TAutoPtr<IEventHandle>& /*ev*/) override { return true; }
+ void ScheduleActivation(TMailbox* /*activation*/) override {}
+ void SpecificScheduleActivation(TMailbox* /*activation*/) override {}
+ void ScheduleActivationEx(TMailbox* /*activation*/, ui64 /*revolvingCounter*/) override {}
+ TActorId Register(IActor* /*actor*/, TMailboxType::EType /*mailboxType*/, ui64 /*revolvingCounter*/, const TActorId& /*parentId*/) override { return TActorId(); }
+ TActorId Register(IActor* /*actor*/, TMailboxCache& /*cache*/, ui64 /*revolvingCounter*/, const TActorId& /*parentId*/) override { return TActorId(); }
+ TActorId Register(IActor* /*actor*/, TMailbox* /*mailbox*/, const TActorId& /*parentId*/) override { return TActorId(); }
+
+ TAffinity* Affinity() const override { return nullptr; }
+
+ ui32 GetThreads() const override { return static_cast<ui32>(ThreadCount); }
+ float GetThreadCount() const override { return static_cast<float>(ThreadCount); }
+
+ TSharedExecutorThreadCtx* ReleaseSharedThread() override {
+ UNIT_ASSERT(!SharedThreads.empty());
+ TSharedExecutorThreadCtx* thread = SharedThreads.back();
+ SharedThreads.pop_back();
+ return thread;
+ }
+ void AddSharedThread(TSharedExecutorThreadCtx* thread) override {
+ UNIT_ASSERT(SharedThreads.size() < 2);
+ SharedThreads.push_back(thread);
+ }
+
+ void IncreaseThreadCpuConsumption(TCpuConsumption consumption, i16 start = 0, i16 count = -1) {
+ if (count == -1) {
+ count = Params.MaxFullThreadCount - start;
+ }
+ for (i16 i = start; i < start + count; ++i) {
+ ThreadCpuConsumptions[i].Increase(consumption);
+ }
+ }
+
+ void SetThreadCpuConsumption(TCpuConsumption consumption, i16 start = 0, i16 count = -1) {
+ if (count == -1) {
+ count = Params.MaxFullThreadCount - start;
+ }
+ for (i16 i = start; i < start + count; ++i) {
+ ThreadCpuConsumptions[i] = consumption;
+ }
+ }
+
+ TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) override {
+ UNIT_ASSERT_GE(threadIdx, 0);
+ UNIT_ASSERT_LE(static_cast<ui16>(threadIdx), ThreadCpuConsumptions.size());
+ return ThreadCpuConsumptions[threadIdx];
+ }
+ };
+
+ class TMockSharedExecutorPool : public ISharedExecutorPool {
+ std::unique_ptr<ISharedExecutorPool> OriginalPool;
+ std::vector<std::vector<TCpuConsumptionModel>> ThreadCpuConsumptions;
+
+ static std::vector<i16> GetPoolIds(const std::vector<IExecutorPool*>& pools) {
+ std::vector<i16> poolIds;
+ for (size_t i = 0; i < pools.size(); ++i) {
+ poolIds.push_back(static_cast<i16>(i));
+ }
+ return poolIds;
+ }
+
+ public:
+ TMockSharedExecutorPool(const TSharedExecutorPoolConfig& config, i16 poolCount, std::vector<IExecutorPool*> pools)
+ : OriginalPool(CreateSharedExecutorPool(config, poolCount, GetPoolIds(pools)))
+ , ThreadCpuConsumptions(poolCount, std::vector<TCpuConsumptionModel>(config.Threads, TCpuConsumption{0.0, 0.0}))
+ {
+ OriginalPool->Init(pools, false);
+ }
+
+ void Prepare(TActorSystem* /*actorSystem*/, NSchedulerQueue::TReader** /*scheduleReaders*/, ui32* /*scheduleSz*/) override {}
+ void Start() override {}
+ void PrepareStop() override {}
+ void Shutdown() override {}
+ bool Cleanup() override { return true; }
+
+ void GetSharedStatsForHarmonizer(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override {
+ OriginalPool->GetSharedStatsForHarmonizer(pool, statsCopy);
+ }
+
+ void GetSharedStats(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override {
+ OriginalPool->GetSharedStats(pool, statsCopy);
+ }
+
+ void Init(const std::vector<IExecutorPool*>& pools, bool /*isShared*/) override {
+ OriginalPool->Init(pools, false);
+ }
+
+ TSharedExecutorThreadCtx* GetSharedThread(i16 poolId) override {
+ return OriginalPool->GetSharedThread(poolId);
+ }
+
+ i16 ReturnOwnHalfThread(i16 pool) override {
+ return OriginalPool->ReturnOwnHalfThread(pool);
+ }
+
+ i16 ReturnBorrowedHalfThread(i16 pool) override {
+ return OriginalPool->ReturnBorrowedHalfThread(pool);
+ }
+
+ void GiveHalfThread(i16 from, i16 to) override {
+ OriginalPool->GiveHalfThread(from, to);
+ }
+
+ i16 GetSharedThreadCount() const override {
+ return OriginalPool->GetSharedThreadCount();
+ }
+
+ TSharedPoolState GetState() const override {
+ return OriginalPool->GetState();
+ }
+
+ TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx) override {
+ return ThreadCpuConsumptions[poolId][threadIdx];
+ }
+
+ std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId) override {
+ std::vector<TCpuConsumption> poolConsumptions(ThreadCpuConsumptions[poolId].size());
+ for (size_t i = 0; i < poolConsumptions.size(); ++i) {
+ poolConsumptions[i] = ThreadCpuConsumptions[poolId][i];
+ }
+ return poolConsumptions;
+ }
+
+ void IncreaseThreadCpuConsumption(ui32 threadIdx, ui32 poolId, TCpuConsumption consumption) {
+ ThreadCpuConsumptions[poolId][threadIdx].Increase(consumption);
+ }
+
+ void SetThreadCpuConsumption(ui32 threadIdx, ui32 poolId, TCpuConsumption consumption) {
+ ThreadCpuConsumptions[poolId][threadIdx] = consumption;
+ }
+ };
+
+ Y_UNIT_TEST(TestHarmonizerCreation) {
+ ui64 currentTs = 1000000;
+ std::unique_ptr<IHarmonizer> harmonizer(MakeHarmonizer(currentTs));
+ UNIT_ASSERT(harmonizer != nullptr);
+ }
+
+ Y_UNIT_TEST(TestAddPool) {
+ ui64 currentTs = 1000000;
+ std::unique_ptr<IHarmonizer> harmonizer(MakeHarmonizer(currentTs));
+ auto mockPool = std::make_unique<TMockExecutorPool>();
+ harmonizer->AddPool(mockPool.get());
+
+ auto stats = harmonizer->GetPoolStats(0);
+ UNIT_ASSERT_VALUES_EQUAL(stats.PotentialMaxThreadCount, 8);
+ UNIT_ASSERT_VALUES_EQUAL(stats.IncreasingThreadsByNeedyState, 0);
+ UNIT_ASSERT_VALUES_EQUAL(stats.DecreasingThreadsByStarvedState, 0);
+ }
+
+ Y_UNIT_TEST(TestHarmonize) {
+ ui64 currentTs = 1000000;
+ auto harmonizer = MakeHarmonizer(currentTs);
+ auto mockPool = new TMockExecutorPool();
+ harmonizer->AddPool(mockPool);
+
+ harmonizer->Harmonize(currentTs + 1000000); // 1 second later
+
+ auto stats = harmonizer->GetPoolStats(0);
+ Y_UNUSED(stats);
+ UNIT_ASSERT_VALUES_EQUAL(mockPool->ThreadCount, 4); // Should start with default
+
+ delete harmonizer;
+ delete mockPool;
+ }
+
+ Y_UNIT_TEST(TestToNeedyNextToHoggish) {
+ ui64 currentTs = 1000000;
+ auto harmonizer = MakeHarmonizer(currentTs);
+ TMockExecutorPoolParams params;
+ std::vector<std::unique_ptr<TMockExecutorPool>> mockPools;
+ mockPools.emplace_back(new TMockExecutorPool(params));
+ params.PoolId = 1;
+ mockPools.emplace_back(new TMockExecutorPool(params));
+ for (auto& pool : mockPools) {
+ harmonizer->AddPool(pool.get());
+ pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params.MaxFullThreadCount);
+ }
+
+ TCpuConsumptionModel cpuConsumptionModel;
+
+ currentTs += Us2Ts(1'000'000);
+ harmonizer->Harmonize(currentTs);
+ mockPools[0]->SetThreadCpuConsumption({59'000'000.0, 59'000'000.0}, 0, params.DefaultFullThreadCount);
+
+ currentTs += Us2Ts(59'000'000);
+ harmonizer->Harmonize(currentTs);
+
+ auto stats = harmonizer->GetPoolStats(0);
+
+ CHECK_CHANGING_THREADS(stats, 1, 0, 0, 0, 0);
+ CHECK_IS_NEEDY(stats);
+ UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 5);
+ UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4);
+
+ currentTs += Us2Ts(60'000'000);
+ mockPools[0]->SetThreadCpuConsumption({0.0, 0.0}, 0, params.DefaultFullThreadCount);
+ harmonizer->Harmonize(currentTs);
+
+ stats = harmonizer->GetPoolStats(0);
+
+ CHECK_CHANGING_THREADS(stats, 1, 0, 1, 0, 0);
+ CHECK_IS_HOGGISH(stats);
+ UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 4);
+ UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4);
+ }
+
+ Y_UNIT_TEST(TestToNeedyNextToStarved) {
+ ui64 currentTs = 1000000;
+ auto harmonizer = MakeHarmonizer(currentTs);
+ TMockExecutorPoolParams params;
+ std::vector<std::unique_ptr<TMockExecutorPool>> mockPools;
+ mockPools.emplace_back(new TMockExecutorPool(params));
+ params.PoolId = 1;
+ mockPools.emplace_back(new TMockExecutorPool(params));
+ for (auto& pool : mockPools) {
+ harmonizer->AddPool(pool.get());
+ pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params.MaxFullThreadCount);
+ }
+
+ TCpuConsumptionModel cpuConsumptionModel;
+
+ currentTs += Us2Ts(1'000'000);
+ harmonizer->Harmonize(currentTs);
+ mockPools[0]->IncreaseThreadCpuConsumption({59'000'000.0, 59'000'000.0}, 0, params.DefaultFullThreadCount);
+
+ currentTs += Us2Ts(59'000'000);
+ harmonizer->Harmonize(currentTs);
+
+ auto stats = harmonizer->GetPoolStats(0);
+
+ CHECK_CHANGING_THREADS(stats, 1, 0, 0, 0, 0);
+ CHECK_IS_NEEDY(stats);
+ UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 5);
+ UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4);
+
+ currentTs += Us2Ts(60'000'000);
+ mockPools[0]->IncreaseThreadCpuConsumption({40'000'000.0, 60'000'000.0}, 0, 5);
+ mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 4);
+ harmonizer->Harmonize(currentTs);
+
+ stats = harmonizer->GetPoolStats(0);
+
+ CHECK_CHANGING_THREADS(stats, 1, 0, 0, 1, 0);
+ CHECK_IS_STARVED(stats);
+ UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 4);
+ UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4);
+ }
+
+ Y_UNIT_TEST(TestExchangeThreads) {
+ ui64 currentTs = 1000000;
+ auto harmonizer = MakeHarmonizer(currentTs);
+ TMockExecutorPoolParams params {
+ .DefaultFullThreadCount = 1,
+ .MinFullThreadCount = 1,
+ .MaxFullThreadCount = 2,
+ .DefaultThreadCount = 1.0f,
+ .MinThreadCount = 1.0f,
+ .MaxThreadCount = 2.0f,
+ };
+ std::vector<std::unique_ptr<TMockExecutorPool>> mockPools;
+ mockPools.emplace_back(new TMockExecutorPool(params));
+ params.PoolId = 1;
+ mockPools.emplace_back(new TMockExecutorPool(params));
+ params.PoolId = 2;
+ mockPools.emplace_back(new TMockExecutorPool(params));
+ for (auto& pool : mockPools) {
+ harmonizer->AddPool(pool.get());
+ pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params.MaxFullThreadCount);
+ }
+
+ currentTs += Us2Ts(1'000'000);
+ harmonizer->Harmonize(currentTs);
+
+ currentTs += Us2Ts(60'000'000);
+ mockPools[0]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 1);
+ mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 1);
+ mockPools[2]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 1);
+ harmonizer->Harmonize(currentTs);
+
+ auto stats0 = harmonizer->GetPoolStats(0);
+ auto stats1 = harmonizer->GetPoolStats(1);
+ auto stats2 = harmonizer->GetPoolStats(2);
+
+ CHECK_CHANGING_THREADS(stats0, 0, 0, 0, 0, 0);
+ CHECK_CHANGING_THREADS(stats1, 1, 0, 0, 0, 0);
+ CHECK_CHANGING_THREADS(stats2, 0, 0, 0, 0, 0);
+ CHECK_IS_HOGGISH(stats0);
+ CHECK_IS_NEEDY(stats1);
+ CHECK_IS_NEEDY(stats2);
+ UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 1);
+ UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 2);
+ UNIT_ASSERT_VALUES_EQUAL(mockPools[2]->ThreadCount, 1);
+
+ currentTs += Us2Ts(60'000'000);
+ mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 1);
+ mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2);
+ mockPools[2]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 1);
+ harmonizer->Harmonize(currentTs);
+
+ stats0 = harmonizer->GetPoolStats(0);
+ stats1 = harmonizer->GetPoolStats(1);
+ stats2 = harmonizer->GetPoolStats(2);
+
+ CHECK_CHANGING_THREADS(stats0, 0, 1, 0, 0, 0);
+ CHECK_CHANGING_THREADS(stats1, 1, 0, 0, 0, 1);
+ CHECK_CHANGING_THREADS(stats2, 0, 0, 0, 0, 0);
+ CHECK_IS_NEEDY(stats0);
+ CHECK_IS_NEEDY(stats1);
+ CHECK_IS_HOGGISH(stats2);
+ UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 2);
+ UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 1);
+ UNIT_ASSERT_VALUES_EQUAL(mockPools[2]->ThreadCount, 1);
+ }
+
+ Y_UNIT_TEST(TestThreadCounts) {
+ ui64 currentTs = 1000000;
+ std::vector<TMockExecutorPoolParams> params {
+ {
+ .DefaultFullThreadCount = 5,
+ .MinFullThreadCount = 5,
+ .MaxFullThreadCount = 15,
+ .DefaultThreadCount = 5.0f,
+ .MinThreadCount = 5.0f,
+ .MaxThreadCount = 15.0f,
+ .PoolId = 0,
+ },
+ {
+ .DefaultFullThreadCount = 5,
+ .MinFullThreadCount = 5,
+ .MaxFullThreadCount = 15,
+ .DefaultThreadCount = 5.0f,
+ .MinThreadCount = 5.0f,
+ .MaxThreadCount = 15.0f,
+ .PoolId = 1,
+ },
+ {
+ .DefaultFullThreadCount = 5,
+ .MinFullThreadCount = 5,
+ .MaxFullThreadCount = 15,
+ .DefaultThreadCount = 5.0f,
+ .MinThreadCount = 5.0f,
+ .MaxThreadCount = 15.0f,
+ .PoolId = 2,
+ },
+ };
+ auto harmonizer = MakeHarmonizer(currentTs);
+ std::vector<std::unique_ptr<TMockExecutorPool>> mockPools;
+ i16 budget = 0;
+ for (auto& param : params) {
+ mockPools.emplace_back(new TMockExecutorPool(param));
+ HARMONIZER_DEBUG_PRINT("created pool", mockPools.back()->Params.ToString());
+ budget += param.DefaultFullThreadCount;
+ }
+ for (ui32 poolIdx = 0; poolIdx < params.size(); ++poolIdx) {
+ auto &pool = mockPools[poolIdx];
+ harmonizer->AddPool(pool.get());
+ pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params[poolIdx].MaxFullThreadCount);
+ }
+ currentTs += Us2Ts(1'000'000);
+ harmonizer->Harmonize(currentTs);
+
+ for (i16 i = 0; i < params[0].MaxFullThreadCount; ++i) {
+ for (i16 ii = 0; ii < params[1].MaxFullThreadCount; ++ii) {
+ for (i16 iii = 0; iii < params[2].MaxFullThreadCount; ++iii) {
+ if (i + ii + iii > budget) {
+ continue;
+ }
+ ui32 localBudget = budget - (i + ii + iii);
+ HARMONIZER_DEBUG_PRINT("first pool", i, "second pool", ii, "third pool", iii, "budget", budget, "local budget", localBudget);
+ currentTs += Us2Ts(60'000'000);
+ mockPools[0]->SetFullThreadCount(i);
+ mockPools[1]->SetFullThreadCount(ii);
+ mockPools[2]->SetFullThreadCount(iii);
+ mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, std::min<i16>(i, mockPools[0]->ThreadCount));
+ mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, std::min<i16>(ii, mockPools[1]->ThreadCount));
+ mockPools[2]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, std::min(iii, mockPools[2]->ThreadCount));
+ harmonizer->Harmonize(currentTs);
+ std::vector<TPoolHarmonizerStats> stats;
+ for (auto& pool : params) {
+ stats.emplace_back(harmonizer->GetPoolStats(pool.PoolId));
+ }
+ for (ui32 poolIdx = 0; poolIdx < params.size(); ++poolIdx) {
+ UNIT_ASSERT_VALUES_EQUAL(stats[poolIdx].PotentialMaxThreadCount, std::min<i16>(mockPools[poolIdx]->ThreadCount + localBudget, params[poolIdx].MaxFullThreadCount));
+ }
+ }
+ }
+ }
+ }
+
+ Y_UNIT_TEST(TestSharedHalfThreads) {
+ ui64 currentTs = 1000000;
+ std::unique_ptr<IHarmonizer> harmonizer(MakeHarmonizer(currentTs));
+ TMockExecutorPoolParams params {
+ .DefaultFullThreadCount = 2,
+ .MinFullThreadCount = 1,
+ .MaxFullThreadCount = 3,
+ .DefaultThreadCount = 2.0f,
+ .MinThreadCount = 1.0f,
+ .MaxThreadCount = 3.0f,
+ };
+ std::vector<std::unique_ptr<TMockExecutorPool>> mockPools;
+ std::vector<IExecutorPool*> pools;
+ mockPools.emplace_back(new TMockExecutorPool(params));
+ pools.push_back(mockPools.back().get());
+ params.PoolId = 1;
+ mockPools.emplace_back(new TMockExecutorPool(params));
+ pools.push_back(mockPools.back().get());
+ params.PoolId = 2;
+ mockPools.emplace_back(new TMockExecutorPool(params));
+ pools.push_back(mockPools.back().get());
+
+
+ TSharedExecutorPoolConfig sharedConfig;
+ sharedConfig.Threads = 3;
+ std::unique_ptr<ISharedExecutorPool> sharedPool(new TMockSharedExecutorPool(sharedConfig, 3, pools));
+
+ for (auto& pool : mockPools) {
+ harmonizer->AddPool(pool.get());
+ }
+ harmonizer->SetSharedPool(sharedPool.get());
+
+ for (ui32 i = 0; i < mockPools.size(); ++i) {
+ mockPools[i]->AddSharedThread(sharedPool->GetSharedThread(i));
+ }
+
+ currentTs += Us2Ts(1'000'000);
+ harmonizer->Harmonize(currentTs);
+
+ currentTs += Us2Ts(60'000'000);
+ mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2);
+ mockPools[1]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 2);
+ mockPools[2]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2);
+
+ harmonizer->Harmonize(currentTs);
+
+ auto stats0 = harmonizer->GetPoolStats(0);
+ auto stats1 = harmonizer->GetPoolStats(1);
+ auto stats2 = harmonizer->GetPoolStats(2);
+ auto sharedState = sharedPool->GetState();
+
+ CHECK_CHANGING_HALF_THREADS(stats0, 1, 0, 0, 0, 0, 0);
+ CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 1, 0, 0);
+ CHECK_IS_NEEDY(stats0);
+ CHECK_IS_HOGGISH(stats1);
+ CHECK_IS_NEEDY(stats2);
+ UNIT_ASSERT_VALUES_EQUAL(sharedState.BorrowedThreadByPool[0], 1);
+ UNIT_ASSERT_VALUES_EQUAL(sharedState.PoolByBorrowedThread[1], 0);
+
+ currentTs += Us2Ts(60'000'000);
+
+ harmonizer->Harmonize(currentTs);
+
+ stats0 = harmonizer->GetPoolStats(0);
+ stats1 = harmonizer->GetPoolStats(1);
+ stats2 = harmonizer->GetPoolStats(2);
+ sharedState = sharedPool->GetState();
+
+ CHECK_CHANGING_HALF_THREADS(stats0, 1, 0, 1, 0, 0, 0);
+ CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 1, 0, 1);
+ CHECK_IS_HOGGISH(stats0);
+ CHECK_IS_HOGGISH(stats1);
+ CHECK_IS_HOGGISH(stats2);
+ UNIT_ASSERT_VALUES_EQUAL(sharedState.BorrowedThreadByPool[0], -1);
+ UNIT_ASSERT_VALUES_EQUAL(sharedState.PoolByBorrowedThread[1], -1);
+ }
+
+ Y_UNIT_TEST(TestSharedHalfThreadsStarved) {
+ ui64 currentTs = 1000000;
+ std::unique_ptr<IHarmonizer> harmonizer(MakeHarmonizer(currentTs));
+ TMockExecutorPoolParams params {
+ .DefaultFullThreadCount = 2,
+ .MinFullThreadCount = 1,
+ .MaxFullThreadCount = 3,
+ .DefaultThreadCount = 2.0f,
+ .MinThreadCount = 1.0f,
+ .MaxThreadCount = 3.0f,
+ };
+ std::vector<std::unique_ptr<TMockExecutorPool>> mockPools;
+ std::vector<IExecutorPool*> pools;
+ mockPools.emplace_back(new TMockExecutorPool(params));
+ pools.push_back(mockPools.back().get());
+ params.PoolId = 1;
+ mockPools.emplace_back(new TMockExecutorPool(params));
+ pools.push_back(mockPools.back().get());
+
+ TSharedExecutorPoolConfig sharedConfig;
+ sharedConfig.Threads = 2;
+ std::unique_ptr<ISharedExecutorPool> sharedPool(new TMockSharedExecutorPool(sharedConfig, 2, pools));
+
+ for (auto& pool : mockPools) {
+ harmonizer->AddPool(pool.get());
+ }
+ harmonizer->SetSharedPool(sharedPool.get());
+
+ currentTs += Us2Ts(1'000'000);
+ harmonizer->Harmonize(currentTs);
+
+ currentTs += Us2Ts(60'000'000);
+ mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2);
+ mockPools[1]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 2);
+ harmonizer->Harmonize(currentTs);
+
+ auto stats0 = harmonizer->GetPoolStats(0);
+ auto stats1 = harmonizer->GetPoolStats(1);
+ auto sharedState = sharedPool->GetState();
+ CHECK_CHANGING_HALF_THREADS(stats0, 1, 0, 0, 0, 0, 0);
+ CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 1, 0, 0);
+ CHECK_IS_NEEDY(stats0);
+ CHECK_IS_HOGGISH(stats1);
+ UNIT_ASSERT_VALUES_EQUAL_C(sharedState.BorrowedThreadByPool[0], 1, sharedState.ToString());
+ UNIT_ASSERT_VALUES_EQUAL_C(sharedState.PoolByBorrowedThread[1], 0, sharedState.ToString());
+
+ currentTs += Us2Ts(60'000'000);
+ mockPools[0]->IncreaseThreadCpuConsumption({30'000'000.0, 60'000'000.0}, 0, 2);
+ mockPools[1]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 2);
+ harmonizer->Harmonize(currentTs);
+
+ stats0 = harmonizer->GetPoolStats(0);
+ stats1 = harmonizer->GetPoolStats(1);
+ sharedState = sharedPool->GetState();
+ UNIT_ASSERT_VALUES_EQUAL_C(sharedState.BorrowedThreadByPool[0], -1, sharedState.ToString());
+ UNIT_ASSERT_VALUES_EQUAL_C(sharedState.PoolByBorrowedThread[1], -1, sharedState.ToString());
+ CHECK_CHANGING_HALF_THREADS(stats0, 1, 1, 0, 0, 0, 0);
+ CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 1, 1, 0);
+ CHECK_IS_STARVED(stats0);
+ CHECK_IS_HOGGISH(stats1);
+
+ currentTs += Us2Ts(60'000'000);
+ mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2);
+ mockPools[1]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 2);
+
+ harmonizer->Harmonize(currentTs);
+
+ stats0 = harmonizer->GetPoolStats(0);
+ stats1 = harmonizer->GetPoolStats(1);
+
+ CHECK_CHANGING_HALF_THREADS(stats0, 2, 1, 0, 0, 0, 0);
+ CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 2, 1, 0);
+ CHECK_IS_NEEDY(stats0);
+ CHECK_IS_HOGGISH(stats1);
+ }
+
+}
diff --git a/ydb/library/actors/core/harmonizer/ut/history_ut.cpp b/ydb/library/actors/core/harmonizer/ut/history_ut.cpp
new file mode 100644
index 00000000000..7c9965adf32
--- /dev/null
+++ b/ydb/library/actors/core/harmonizer/ut/history_ut.cpp
@@ -0,0 +1,345 @@
+#define HARMONIZER_HISTORY_DEBUG
+#include "debug.h"
+#include "history.h"
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NActors;
+
+constexpr double secondInUs = 1'000'000.0;
+ui64 secondInTs = Us2Ts(secondInUs);
+
+Y_UNIT_TEST_SUITE(ValueHistoryTests) {
+
+ Y_UNIT_TEST(TestConstructorAndInitialization) {
+ TValueHistory<8> history;
+
+ UNIT_ASSERT_VALUES_EQUAL(history.HistoryIdx, 0);
+ UNIT_ASSERT_VALUES_EQUAL(history.LastTs, Max<ui64>());
+ UNIT_ASSERT_VALUES_EQUAL(history.LastValue, 0.0);
+ UNIT_ASSERT_VALUES_EQUAL(history.AccumulatedValue, 0.0);
+ UNIT_ASSERT_VALUES_EQUAL(history.AccumulatedTs, 0);
+
+ for (size_t i = 0; i < 8; ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(history.History[i], 0.0);
+ }
+ }
+
+ Y_UNIT_TEST(TestBufferSizePowerOfTwo) {
+ UNIT_ASSERT(TValueHistory<1>::CheckBinaryPower(1));
+ UNIT_ASSERT(TValueHistory<2>::CheckBinaryPower(2));
+ UNIT_ASSERT(TValueHistory<4>::CheckBinaryPower(4));
+ UNIT_ASSERT(TValueHistory<8>::CheckBinaryPower(8));
+ UNIT_ASSERT(TValueHistory<16>::CheckBinaryPower(16));
+
+ UNIT_ASSERT(!TValueHistory<1>::CheckBinaryPower(3));
+ UNIT_ASSERT(!TValueHistory<1>::CheckBinaryPower(5));
+ UNIT_ASSERT(!TValueHistory<1>::CheckBinaryPower(6));
+ UNIT_ASSERT(!TValueHistory<1>::CheckBinaryPower(7));
+ }
+
+ Y_UNIT_TEST(TestCircularBufferFilling) {
+ TValueHistory<4> history;
+ ui64 baseTs = secondInUs;
+
+ // increasing counter by 1 each second
+ for (size_t i = 0; i < 8; ++i) {
+ history.Register(baseTs + i * secondInTs, i * 1.0);
+ }
+
+ // expecting 1.0 for each second
+ double expectedValues[] = {1.0, 1.0, 1.0, 1.0};
+ for (size_t i = 0; i < 4; ++i) {
+ UNIT_ASSERT_DOUBLES_EQUAL(history.History[i], expectedValues[i], 1e-6);
+ }
+ }
+
+ Y_UNIT_TEST(TestBasicRegistration) {
+ TValueHistory<4> history;
+ ui64 baseTs = secondInUs;
+
+ history.Register(baseTs, 1.0);
+ UNIT_ASSERT_VALUES_EQUAL(history.LastTs, baseTs);
+ UNIT_ASSERT_DOUBLES_EQUAL(history.LastValue, 1.0, 1e-6);
+
+ history.Register(baseTs + secondInTs, 2.0);
+ UNIT_ASSERT_DOUBLES_EQUAL(history.History[0], 1.0, 1e-6);
+ UNIT_ASSERT_VALUES_EQUAL(history.HistoryIdx, 1);
+ }
+
+ Y_UNIT_TEST(TestRegistrationWithBackwardTime) {
+ TValueHistory<4> history;
+ ui64 baseTs = secondInUs;
+
+ history.Register(baseTs, 1.0);
+
+ history.Register(baseTs - 1000, 2.0);
+
+ UNIT_ASSERT_VALUES_EQUAL(history.LastTs, baseTs - 1000);
+ UNIT_ASSERT_DOUBLES_EQUAL(history.LastValue, 2.0, 1e-6);
+ UNIT_ASSERT_DOUBLES_EQUAL(history.AccumulatedValue, 0.0, 1e-6);
+ UNIT_ASSERT_VALUES_EQUAL(history.AccumulatedTs, 0);
+ }
+
+ Y_UNIT_TEST(TestRegistrationWithLargeTimeGap) {
+ TValueHistory<4> history;
+ ui64 baseTs = secondInUs;
+
+ history.Register(baseTs, 1.0);
+
+ history.Register(baseTs + 10 * secondInTs, 4.0);
+
+ double expectedValue = (4.0 - 1.0) / 10;
+ for (size_t i = 0; i < 4; ++i) {
+ UNIT_ASSERT_DOUBLES_EQUAL(history.History[i], expectedValue, 1e-6);
+ }
+ }
+
+ Y_UNIT_TEST(TestRegistrationOnSecondBoundary) {
+ TValueHistory<4> history;
+ ui64 baseTs = Us2Ts(1'000'000.0);
+
+ history.Register(baseTs - Us2Ts(100'000.0), 1.0);
+ history.Register(baseTs, 2.0);
+ history.Register(baseTs + Us2Ts(100'000.0), 3.0);
+
+ UNIT_ASSERT_VALUES_EQUAL(history.HistoryIdx, 0);
+ UNIT_ASSERT_VALUES_EQUAL(history.LastTs, baseTs + Us2Ts(100'000.0));
+ UNIT_ASSERT_VALUES_EQUAL(history.AccumulatedValue, 2.0);
+ UNIT_ASSERT_DOUBLES_EQUAL(history.History[0], 0.0, 1e-6);
+ }
+
+ Y_UNIT_TEST(TestAccumulationWithinSecond) {
+ TValueHistory<4> history;
+ ui64 baseTs = 1000000;
+
+ history.Register(baseTs, 1.0);
+ history.Register(baseTs + Us2Ts(500'000.0), 2.0);
+
+ UNIT_ASSERT_VALUES_EQUAL(history.AccumulatedTs, Us2Ts(500'000.0));
+ UNIT_ASSERT_DOUBLES_EQUAL(history.AccumulatedValue, 1.0, 1e-6);
+ }
+
+ Y_UNIT_TEST(TestTransitionBetweenSeconds) {
+ TValueHistory<4> history;
+ ui64 baseTs = 1000000;
+
+ history.Register(baseTs, 1.0);
+ history.Register(baseTs + Us2Ts(1'500'000.0), 2.0);
+
+ UNIT_ASSERT_VALUES_EQUAL(history.HistoryIdx, 1);
+ UNIT_ASSERT_DOUBLES_EQUAL(history.History[0], 2.0/3, 1e-6);
+ UNIT_ASSERT_DOUBLES_EQUAL(history.AccumulatedValue, 1.0/3, 1e-6);
+ UNIT_ASSERT_DOUBLES_EQUAL(Ts2Us(history.AccumulatedTs), 500'000.0, 1e-3);
+ }
+
+ Y_UNIT_TEST(TestGetAvgPartForLastSeconds) {
+ TValueHistory<4> history;
+ ui64 baseTs = 1000000;
+
+ float acc = 0;
+ for (size_t i = 0; i < 4; ++i) {
+ acc += i * 1.0;
+ history.Register(baseTs + i * Us2Ts(1'000'000.0), acc);
+ }
+
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds(1), 3.0, 1e-6);
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds(2), 2.5, 1e-6);
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds(3), 2.0, 1e-6);
+ }
+
+ Y_UNIT_TEST(TestGetAvgPartWithTail) {
+ TValueHistory<4> history;
+ ui64 baseTs = 1000000;
+
+ history.Register(baseTs, 1.0);
+ history.Register(baseTs + Us2Ts(500'000.0), 2.0);
+
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds<true>(1), 1.0, 1e-6);
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds<false>(1), 0.0, 1e-6);
+ }
+
+ Y_UNIT_TEST(TestGetAvgPartWithIncompleteData) {
+ TValueHistory<4> history;
+ ui64 baseTs = 1000000;
+
+ history.Register(baseTs, 1.0);
+ history.Register(baseTs + Us2Ts(1'000'000.0), 2.0);
+
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds<true>(3), 1.0/3, 1e-6);
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds<false>(3), 1.0/3, 1e-6);
+ }
+
+ Y_UNIT_TEST(TestGetAvgPartOnBufferBoundary) {
+ TValueHistory<4> history;
+ ui64 baseTs = 1000000;
+
+ for (size_t i = 0; i < 5; ++i) {
+ history.Register(baseTs + i * Us2Ts(1'000'000.0), i * 1.0);
+ }
+
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds(4), 1.0, 1e-6);
+ }
+
+ Y_UNIT_TEST(TestGetMaxForLastSeconds) {
+ TValueHistory<4> history;
+ ui64 baseTs = 1000000;
+
+ double acc = 0;
+ for (size_t i = 0; i < 4; ++i) {
+ acc += i * 1.0;
+ history.Register(baseTs + i * Us2Ts(1'000'000.0), acc);
+ }
+
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetMaxForLastSeconds(1), 3.0, 1e-6);
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetMaxForLastSeconds(2), 3.0, 1e-6);
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetMaxForLastSeconds(3), 3.0, 1e-6);
+ }
+
+ Y_UNIT_TEST(TestGetMax) {
+ TValueHistory<4> history;
+ ui64 baseTs = 1000000;
+
+ double acc = 0;
+ for (size_t i = 0; i < 4; ++i) {
+ acc += i * 1.0;
+ history.Register(baseTs + i * Us2Ts(1'000'000.0), acc);
+ }
+
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetMax(), 3.0, 1e-6);
+ }
+
+ Y_UNIT_TEST(TestGetMaxWithNegativeValues) {
+ TValueHistory<4> history;
+ ui64 baseTs = 1000000;
+
+ history.Register(baseTs, -1.0);
+ history.Register(baseTs + Us2Ts(1'000'000.0), -2.0);
+ history.Register(baseTs + Us2Ts(2'000'000.0), -3.0);
+ history.Register(baseTs + Us2Ts(3'000'000.0), -4.0);
+ history.Register(baseTs + Us2Ts(4'000'000.0), -5.0);
+
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetMax(), -1.0, 1e-6);
+ }
+
+ Y_UNIT_TEST(TestGetMinForLastSeconds) {
+ TValueHistory<4> history;
+ ui64 baseTs = 1000000;
+
+ double acc = 0;
+ for (size_t i = 0; i < 4; ++i) {
+ acc += (4 - i) * 1.0;
+ history.Register(baseTs + i * Us2Ts(1'000'000.0), acc);
+ }
+
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetMinForLastSeconds(1), 1.0, 1e-6);
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetMinForLastSeconds(2), 1.0, 1e-6);
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetMinForLastSeconds(3), 1.0, 1e-6);
+ }
+
+ Y_UNIT_TEST(TestGetMin) {
+ TValueHistory<4> history;
+ ui64 baseTs = 1000000;
+
+ double acc = 0;
+ for (size_t i = 0; i < 5; ++i) {
+ acc += (5 - i) * 1.0;
+ history.Register(baseTs + i * Us2Ts(1'000'000.0), acc);
+ }
+
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetMin(), 1.0, 1e-6);
+ }
+
+ Y_UNIT_TEST(TestGetMinWithNegativeValues) {
+ TValueHistory<4> history;
+ ui64 baseTs = 1000000;
+
+ history.Register(baseTs, -1.0);
+ history.Register(baseTs + Us2Ts(1'000'000.0), -2.0);
+ history.Register(baseTs + Us2Ts(2'000'000.0), -4.0);
+ history.Register(baseTs + Us2Ts(3'000'000.0), -7.0);
+
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetMin(), -3.0, 1e-6);
+ }
+
+ Y_UNIT_TEST(TestBufferOverflow) {
+ TValueHistory<4> history;
+ ui64 baseTs = 1000000;
+
+ // Fill buffer
+ double acc = 0;
+ for (size_t i = 0; i < 5; ++i) {
+ acc += i * 1.0;
+ history.Register(baseTs + i * Us2Ts(1'000'000.0), acc);
+ }
+
+ // Add another value, which should overwrite the oldest
+ acc += 5 * 1.0;
+ history.Register(baseTs + 5 * Us2Ts(1'000'000.0), acc);
+
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetMax(), 5.0, 1e-6);
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetMin(), 2.0, 1e-6);
+ }
+
+ Y_UNIT_TEST(TestSmallTimeIntervals) {
+ TValueHistory<4> history;
+ ui64 baseTs = 1000000;
+
+ history.Register(baseTs, 1.0);
+ history.Register(baseTs + 1, 1.1); // Очень маленький интервал
+
+ UNIT_ASSERT_DOUBLES_EQUAL(history.AccumulatedValue, 0.1, 1e-6);
+ UNIT_ASSERT_VALUES_EQUAL(history.AccumulatedTs, 1);
+ }
+
+ Y_UNIT_TEST(TestLargeTimeIntervals) {
+ TValueHistory<4> history;
+ ui64 baseTs = 1000000;
+
+ history.Register(baseTs, 1.0);
+ history.Register(baseTs + 100 * secondInTs, 101.0); // Очень большой интервал
+
+ double expectedValue = 1.0;
+ for (size_t i = 0; i < 4; ++i) {
+ UNIT_ASSERT_DOUBLES_EQUAL(history.History[i], expectedValue, 1e-6);
+ }
+ }
+
+ Y_UNIT_TEST(TestPrecisionOverLongDuration) {
+ TValueHistory<4> history;
+ ui64 baseTs = 1000000;
+
+ double acc = 0;
+ int values[4] = {1, 2, 3, 4};
+ for (size_t i = 0; i < 100'000; ++i) {
+ acc += values[i % 4] * 1.0;
+ history.Register(baseTs + i * Us2Ts(1'000'000.0), acc);
+ }
+
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPart(), 2.5, 1e-6);
+ }
+
+ Y_UNIT_TEST(TestRoundingDuringTimestampConversion) {
+ TValueHistory<4> history;
+ ui64 baseTs = 1000000;
+
+ history.Register(baseTs, 1.0);
+ history.Register(baseTs + Us2Ts(1'000'000.5), 2.0);
+
+ double expectedValue = 1.0 / (1'000'000.5 / 1'000'000.0);
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds<true>(1), expectedValue, 1e-6);
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds<false>(1), expectedValue, 1e-6);
+ double expectedValue2 = 1.0 - expectedValue;
+ UNIT_ASSERT_DOUBLES_EQUAL(history.AccumulatedValue, expectedValue2, 1e-6);
+ }
+
+ Y_UNIT_TEST(TestAverageCalculationPrecision) {
+ TValueHistory<4> history;
+ ui64 baseTs = 1000000;
+
+ history.Register(baseTs, 1.0);
+ history.Register(baseTs + Us2Ts(1'000'000.0), 3.0);
+ history.Register(baseTs + Us2Ts(2'000'000.0), 6.0);
+
+ UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds(2), 2.5, 1e-6);
+ }
+}
diff --git a/ydb/library/actors/core/harmonizer/ut/ya.make b/ydb/library/actors/core/harmonizer/ut/ya.make
new file mode 100644
index 00000000000..c9aedbc5e18
--- /dev/null
+++ b/ydb/library/actors/core/harmonizer/ut/ya.make
@@ -0,0 +1,22 @@
+UNITTEST_FOR(ydb/library/actors/core/harmonizer)
+
+IF (SANITIZER_TYPE)
+ SIZE(MEDIUM)
+ TIMEOUT(600)
+ELSE()
+ SIZE(SMALL)
+ TIMEOUT(60)
+ENDIF()
+
+
+PEERDIR(
+ ydb/library/actors/interconnect
+ ydb/library/actors/testlib
+)
+
+SRCS(
+ harmonizer_ut.cpp
+ history_ut.cpp
+)
+
+END()
diff --git a/ydb/library/actors/core/harmonizer/waiting_stats.cpp b/ydb/library/actors/core/harmonizer/waiting_stats.cpp
new file mode 100644
index 00000000000..f855055034a
--- /dev/null
+++ b/ydb/library/actors/core/harmonizer/waiting_stats.cpp
@@ -0,0 +1,48 @@
+#include "waiting_stats.h"
+
+#include "pool.h"
+#include <ydb/library/actors/core/executor_pool_basic.h>
+#include <ydb/library/actors/core/probes.h>
+
+namespace NActors {
+
+LWTRACE_USING(ACTORLIB_PROVIDER);
+
+void TWaitingInfo::Pull(const std::vector<std::unique_ptr<TPoolInfo>> &pools) {
+ WakingUpTotalTime = 0;
+ WakingUpCount = 0;
+ AwakingTotalTime = 0;
+ AwakingCount = 0;
+
+ for (size_t poolIdx = 0; poolIdx < pools.size(); ++poolIdx) {
+ TPoolInfo& pool = *pools[poolIdx];
+ if (pool.WaitingStats) {
+ WakingUpTotalTime += pool.WaitingStats->WakingUpTotalTime;
+ WakingUpCount += pool.WaitingStats->WakingUpCount;
+ AwakingTotalTime += pool.WaitingStats->AwakingTotalTime;
+ AwakingCount += pool.WaitingStats->AwakingCount;
+ }
+ }
+
+ constexpr ui64 knownAvgWakingUpTime = TWaitingStatsConstants::KnownAvgWakingUpTime;
+ constexpr ui64 knownAvgAwakeningUpTime = TWaitingStatsConstants::KnownAvgAwakeningTime;
+
+ ui64 realAvgWakingUpTime = (WakingUpCount ? WakingUpTotalTime / WakingUpCount : knownAvgWakingUpTime);
+ ui64 avgWakingUpTime = realAvgWakingUpTime;
+ if (avgWakingUpTime > 2 * knownAvgWakingUpTime || !realAvgWakingUpTime) {
+ avgWakingUpTime = knownAvgWakingUpTime;
+ }
+ AvgWakingUpTimeUs.store(Ts2Us(avgWakingUpTime), std::memory_order_relaxed);
+
+ ui64 realAvgAwakeningTime = (AwakingCount ? AwakingTotalTime / AwakingCount : knownAvgAwakeningUpTime);
+ ui64 avgAwakeningTime = realAvgAwakeningTime;
+ if (avgAwakeningTime > 2 * knownAvgAwakeningUpTime || !realAvgAwakeningTime) {
+ avgAwakeningTime = knownAvgAwakeningUpTime;
+ }
+ AvgAwakeningTimeUs.store(Ts2Us(avgAwakeningTime), std::memory_order_relaxed);
+
+ ui64 avgWakingUpConsumption = avgWakingUpTime + avgAwakeningTime;
+ LWPROBE(WakingUpConsumption, Ts2Us(avgWakingUpTime), Ts2Us(avgWakingUpTime), Ts2Us(avgAwakeningTime), Ts2Us(realAvgAwakeningTime), Ts2Us(avgWakingUpConsumption));
+}
+
+} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/waiting_stats.h b/ydb/library/actors/core/harmonizer/waiting_stats.h
new file mode 100644
index 00000000000..76f661bb37b
--- /dev/null
+++ b/ydb/library/actors/core/harmonizer/waiting_stats.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include "defs.h"
+
+namespace NActors {
+
+struct TPoolInfo;
+
+struct TWaitingInfo {
+ ui64 WakingUpTotalTime = 0;
+ ui64 WakingUpCount = 0;
+ ui64 AwakingTotalTime = 0;
+ ui64 AwakingCount = 0;
+ std::atomic<float> AvgWakingUpTimeUs = 0;
+ std::atomic<float> AvgAwakeningTimeUs = 0;
+
+ void Pull(const std::vector<std::unique_ptr<TPoolInfo>> &pools);
+
+}; // struct TWaitingInfo
+
+} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/ya.make b/ydb/library/actors/core/harmonizer/ya.make
new file mode 100644
index 00000000000..70993baf64a
--- /dev/null
+++ b/ydb/library/actors/core/harmonizer/ya.make
@@ -0,0 +1,44 @@
+LIBRARY()
+
+NO_WSHADOW()
+
+IF (PROFILE_MEMORY_ALLOCATIONS)
+ CFLAGS(-DPROFILE_MEMORY_ALLOCATIONS)
+ENDIF()
+
+IF (ALLOCATOR == "B" OR ALLOCATOR == "BS" OR ALLOCATOR == "C")
+ CXXFLAGS(-DBALLOC)
+ PEERDIR(
+ library/cpp/balloc/optional
+ )
+ENDIF()
+
+SRCS(
+ cpu_consumption.cpp
+ pool.cpp
+ shared_info.cpp
+ waiting_stats.cpp
+ harmonizer.cpp
+)
+
+PEERDIR(
+ ydb/library/actors/util
+ ydb/library/actors/protos
+ ydb/library/services
+ library/cpp/logger
+ library/cpp/lwtrace
+ library/cpp/monlib/dynamic_counters
+ library/cpp/time_provider
+)
+
+IF (SANITIZER_TYPE == "thread")
+ SUPPRESSIONS(
+ ../tsan.supp
+ )
+ENDIF()
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/ydb/library/actors/core/mon_stats.h b/ydb/library/actors/core/mon_stats.h
index 4bceffa8f37..5862dffa7d7 100644
--- a/ydb/library/actors/core/mon_stats.h
+++ b/ydb/library/actors/core/mon_stats.h
@@ -41,14 +41,14 @@ namespace NActors {
};
struct TExecutorPoolState {
- float UsedCpu = 0;
+ float ElapsedCpu = 0;
float CurrentLimit = 0;
float PossibleMaxLimit = 0;
float MaxLimit = 0;
float MinLimit = 0;
void Aggregate(const TExecutorPoolState& other) {
- UsedCpu += other.UsedCpu;
+ ElapsedCpu += other.ElapsedCpu;
CurrentLimit += other.CurrentLimit;
PossibleMaxLimit += other.PossibleMaxLimit;
MaxLimit += other.MaxLimit;
@@ -63,10 +63,6 @@ namespace NActors {
ui64 DecreasingThreadsByStarvedState = 0;
ui64 DecreasingThreadsByHoggishState = 0;
ui64 DecreasingThreadsByExchange = 0;
- i64 MaxConsumedCpuUs = 0;
- i64 MinConsumedCpuUs = 0;
- i64 MaxBookedCpuUs = 0;
- i64 MinBookedCpuUs = 0;
double SpinningTimeUs = 0;
double SpinThresholdUs = 0;
i16 WrongWakenedThreadCount = 0;
diff --git a/ydb/library/actors/core/probes.h b/ydb/library/actors/core/probes.h
index f9394f3273f..ebf34299e27 100644
--- a/ydb/library/actors/core/probes.h
+++ b/ydb/library/actors/core/probes.h
@@ -174,11 +174,11 @@
NAMES("poolId", "pool", "threacCount", "minThreadCount", "maxThreadCount", "defaultThreadCount")) \
PROBE(HarmonizeCheckPool, GROUPS("Harmonizer"), \
TYPES(ui32, TString, double, double, double, double, ui32, ui32, bool, bool, bool), \
- NAMES("poolId", "pool", "booked", "consumed", "lastSecondBooked", "lastSecondConsumed", "threadCount", "maxThreadCount", \
+ NAMES("poolId", "pool", "elapsed", "cpu", "lastSecondElapsed", "lastSecondCpu", "threadCount", "maxThreadCount", \
"isStarved", "isNeedy", "isHoggish")) \
PROBE(HarmonizeCheckPoolByThread, GROUPS("Harmonizer"), \
TYPES(ui32, TString, i16, double, double, double, double), \
- NAMES("poolId", "pool", "threadIdx", "booked", "consumed", "lastSecondBooked", "lastSecondConsumed")) \
+ NAMES("poolId", "pool", "threadIdx", "elapsed", "cpu", "lastSecondElapsed", "lastSecondCpu")) \
PROBE(WakingUpConsumption, GROUPS("Harmonizer"), \
TYPES(double, double, double, double, double), \
NAMES("avgWakingUpUs", "realAvgWakingUpUs", "avgAwakeningUs", "realAvgAwakeningUs", "total")) \
diff --git a/ydb/library/actors/core/ya.make b/ydb/library/actors/core/ya.make
index fbeb0e2b0d9..ca44ab7707b 100644
--- a/ydb/library/actors/core/ya.make
+++ b/ydb/library/actors/core/ya.make
@@ -56,8 +56,6 @@ SRCS(
executor_pool_shared.h
executor_thread.cpp
executor_thread.h
- harmonizer.cpp
- harmonizer.h
hfunc.h
interconnect.cpp
interconnect.h
@@ -106,6 +104,7 @@ GENERATE_ENUM_SERIALIZATION(log_iface.h)
PEERDIR(
ydb/library/actors/actor_type
+ ydb/library/actors/core/harmonizer
ydb/library/actors/memory_log
ydb/library/actors/prof
ydb/library/actors/protos
@@ -129,6 +128,10 @@ ENDIF()
END()
+RECURSE(
+ harmonizer
+)
+
RECURSE_FOR_TESTS(
ut
ut_fat
diff --git a/ydb/library/actors/helpers/pool_stats_collector.h b/ydb/library/actors/helpers/pool_stats_collector.h
index a0790a55c8b..244cc27cefc 100644
--- a/ydb/library/actors/helpers/pool_stats_collector.h
+++ b/ydb/library/actors/helpers/pool_stats_collector.h
@@ -175,6 +175,7 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr MaxThreadCount;
NMonitoring::TDynamicCounters::TCounterPtr CurrentThreadCountPercent;
NMonitoring::TDynamicCounters::TCounterPtr PotentialMaxThreadCountPercent;
+ NMonitoring::TDynamicCounters::TCounterPtr PossibleMaxThreadCountPercent;
NMonitoring::TDynamicCounters::TCounterPtr DefaultThreadCountPercent;
NMonitoring::TDynamicCounters::TCounterPtr MaxThreadCountPercent;
NMonitoring::TDynamicCounters::TCounterPtr IsNeedy;
@@ -189,10 +190,6 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState;
NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByExchange;
NMonitoring::TDynamicCounters::TCounterPtr NotEnoughCpuExecutions;
- NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu;
- NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu;
- NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu;
- NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu;
NMonitoring::TDynamicCounters::TCounterPtr SpinningTimeUs;
NMonitoring::TDynamicCounters::TCounterPtr SpinThresholdUs;
@@ -251,6 +248,7 @@ private:
CurrentThreadCountPercent = PoolGroup->GetCounter("CurrentThreadCountPercent", false);
PotentialMaxThreadCountPercent = PoolGroup->GetCounter("PotentialMaxThreadCountPercent", false);
+ PossibleMaxThreadCountPercent = PoolGroup->GetCounter("PossibleMaxThreadCountPercent", false);
DefaultThreadCountPercent = PoolGroup->GetCounter("DefaultThreadCountPercent", false);
MaxThreadCountPercent = PoolGroup->GetCounter("MaxThreadCountPercent", false);
@@ -266,10 +264,6 @@ private:
DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true);
DecreasingThreadsByExchange = PoolGroup->GetCounter("DecreasingThreadsByExchange", true);
NotEnoughCpuExecutions = PoolGroup->GetCounter("NotEnoughCpuExecutions", true);
- MaxConsumedCpu = PoolGroup->GetCounter("MaxConsumedCpuByPool", false);
- MinConsumedCpu = PoolGroup->GetCounter("MinConsumedCpuByPool", false);
- MaxBookedCpu = PoolGroup->GetCounter("MaxBookedCpuByPool", false);
- MinBookedCpu = PoolGroup->GetCounter("MinBookedCpuByPool", false);
SpinningTimeUs = PoolGroup->GetCounter("SpinningTimeUs", true);
SpinThresholdUs = PoolGroup->GetCounter("SpinThresholdUs", false);
@@ -316,9 +310,9 @@ private:
*DefaultThreadCount = poolStats.DefaultThreadCount;
*MaxThreadCount = poolStats.MaxThreadCount;
-
*CurrentThreadCountPercent = poolStats.CurrentThreadCount * 100;
*PotentialMaxThreadCountPercent = poolStats.PotentialMaxThreadCount * 100;
+ *PossibleMaxThreadCountPercent = poolStats.PotentialMaxThreadCount * 100;
*DefaultThreadCountPercent = poolStats.DefaultThreadCount * 100;
*MaxThreadCountPercent = poolStats.MaxThreadCount * 100;
@@ -384,35 +378,35 @@ private:
struct TActorSystemCounters {
TIntrusivePtr<NMonitoring::TDynamicCounters> Group;
- NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu;
- NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu;
- NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu;
- NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu;
+ NMonitoring::TDynamicCounters::TCounterPtr MaxUsedCpuPercent;
+ NMonitoring::TDynamicCounters::TCounterPtr MinUsedCpuPercent;
+ NMonitoring::TDynamicCounters::TCounterPtr MaxElapsedCpuPercent;
+ NMonitoring::TDynamicCounters::TCounterPtr MinElapsedCpuPercent;
- NMonitoring::TDynamicCounters::TCounterPtr AvgAwakeningTimeUs;
- NMonitoring::TDynamicCounters::TCounterPtr AvgWakingUpTimeUs;
+ NMonitoring::TDynamicCounters::TCounterPtr AvgAwakeningTimeNs;
+ NMonitoring::TDynamicCounters::TCounterPtr AvgWakingUpTimeNs;
void Init(NMonitoring::TDynamicCounters* group) {
Group = group;
- MaxConsumedCpu = Group->GetCounter("MaxConsumedCpu", false);
- MinConsumedCpu = Group->GetCounter("MinConsumedCpu", false);
- MaxBookedCpu = Group->GetCounter("MaxBookedCpu", false);
- MinBookedCpu = Group->GetCounter("MinBookedCpu", false);
- AvgAwakeningTimeUs = Group->GetCounter("AvgAwakeningTimeUs", false);
- AvgWakingUpTimeUs = Group->GetCounter("AvgWakingUpTimeUs", false);
+ MaxUsedCpuPercent = Group->GetCounter("MaxUsedCpuPercent", false);
+ MinUsedCpuPercent = Group->GetCounter("MinUsedCpuPercent", false);
+ MaxElapsedCpuPercent = Group->GetCounter("MaxElapsedCpuPercent", false);
+ MinElapsedCpuPercent = Group->GetCounter("MinElapsedCpuPercent", false);
+ AvgAwakeningTimeNs = Group->GetCounter("AvgAwakeningTimeNs", false);
+ AvgWakingUpTimeNs = Group->GetCounter("AvgWakingUpTimeNs", false);
}
void Set(const THarmonizerStats& harmonizerStats) {
#ifdef ACTORSLIB_COLLECT_EXEC_STATS
- *MaxConsumedCpu = harmonizerStats.MaxConsumedCpu;
- *MinConsumedCpu = harmonizerStats.MinConsumedCpu;
- *MaxBookedCpu = harmonizerStats.MaxBookedCpu;
- *MinBookedCpu = harmonizerStats.MinBookedCpu;
+ *MaxUsedCpuPercent = harmonizerStats.MaxUsedCpu;
+ *MinUsedCpuPercent = harmonizerStats.MinUsedCpu;
+ *MaxElapsedCpuPercent = harmonizerStats.MaxElapsedCpu;
+ *MinElapsedCpuPercent = harmonizerStats.MinElapsedCpu;
- *AvgAwakeningTimeUs = harmonizerStats.AvgAwakeningTimeUs;
- *AvgWakingUpTimeUs = harmonizerStats.AvgWakingUpTimeUs;
+ *AvgAwakeningTimeNs = harmonizerStats.AvgAwakeningTimeUs * 1000;
+ *AvgWakingUpTimeNs = harmonizerStats.AvgWakingUpTimeUs * 1000;
#else
Y_UNUSED(harmonizerStats);
#endif