aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2024-12-03 20:41:00 +0300
committerGitHub <noreply@github.com>2024-12-03 17:41:00 +0000
commit88daf44f9da55b4b67c13eba5a1afec31dc7f4f2 (patch)
treeeb943d6f41027a4e97898e597f47a34c49a6d163
parent5c8e3e634e7436f1f9958439b48283cddf6e17e2 (diff)
downloadydb-88daf44f9da55b4b67c13eba5a1afec31dc7f4f2.tar.gz
Revert "Refactor harmonizer in actorsystem" (#12254)
-rw-r--r--ydb/library/actors/core/cpu_manager.cpp10
-rw-r--r--ydb/library/actors/core/cpu_manager.h4
-rw-r--r--ydb/library/actors/core/executor_pool.h19
-rw-r--r--ydb/library/actors/core/executor_pool_base.cpp1
-rw-r--r--ydb/library/actors/core/executor_pool_basic.cpp12
-rw-r--r--ydb/library/actors/core/executor_pool_basic.h9
-rw-r--r--ydb/library/actors/core/executor_pool_io.cpp2
-rw-r--r--ydb/library/actors/core/executor_pool_io.h2
-rw-r--r--ydb/library/actors/core/executor_pool_shared.cpp158
-rw-r--r--ydb/library/actors/core/executor_pool_shared.h56
-rw-r--r--ydb/library/actors/core/executor_thread_ctx.h7
-rw-r--r--ydb/library/actors/core/harmonizer.cpp861
-rw-r--r--ydb/library/actors/core/harmonizer.h (renamed from ydb/library/actors/core/harmonizer/harmonizer.h)36
-rw-r--r--ydb/library/actors/core/harmonizer/cpu_consumption.cpp171
-rw-r--r--ydb/library/actors/core/harmonizer/cpu_consumption.h45
-rw-r--r--ydb/library/actors/core/harmonizer/debug.h23
-rw-r--r--ydb/library/actors/core/harmonizer/defs.h3
-rw-r--r--ydb/library/actors/core/harmonizer/harmonizer.cpp485
-rw-r--r--ydb/library/actors/core/harmonizer/history.h149
-rw-r--r--ydb/library/actors/core/harmonizer/pool.cpp149
-rw-r--r--ydb/library/actors/core/harmonizer/pool.h93
-rw-r--r--ydb/library/actors/core/harmonizer/shared_info.cpp55
-rw-r--r--ydb/library/actors/core/harmonizer/shared_info.h20
-rw-r--r--ydb/library/actors/core/harmonizer/ut/harmonizer_ut.cpp673
-rw-r--r--ydb/library/actors/core/harmonizer/ut/ya.make22
-rw-r--r--ydb/library/actors/core/harmonizer/waiting_stats.cpp48
-rw-r--r--ydb/library/actors/core/harmonizer/waiting_stats.h21
-rw-r--r--ydb/library/actors/core/harmonizer/ya.make44
-rw-r--r--ydb/library/actors/core/mon_stats.h8
-rw-r--r--ydb/library/actors/core/probes.h4
-rw-r--r--ydb/library/actors/core/ya.make7
-rw-r--r--ydb/library/actors/helpers/pool_stats_collector.h40
32 files changed, 1003 insertions, 2234 deletions
diff --git a/ydb/library/actors/core/cpu_manager.cpp b/ydb/library/actors/core/cpu_manager.cpp
index 047388ce90..1c286d9bd7 100644
--- a/ydb/library/actors/core/cpu_manager.cpp
+++ b/ydb/library/actors/core/cpu_manager.cpp
@@ -37,8 +37,8 @@ namespace NActors {
poolsWithSharedThreads.push_back(cfg.PoolId);
}
}
- Shared.reset(CreateSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads));
- auto sharedPool = static_cast<ISharedExecutorPool*>(Shared.get());
+ Shared.reset(new TSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads));
+ auto sharedPool = static_cast<TSharedExecutorPool*>(Shared.get());
ui64 ts = GetCycleCountFast();
Harmonizer.reset(MakeHarmonizer(ts));
@@ -135,9 +135,11 @@ namespace NActors {
for (TBasicExecutorPoolConfig& cfg : Config.Basic) {
if (cfg.PoolId == poolId) {
if (cfg.HasSharedThread) {
- auto *sharedPool = Shared.get();
+ auto *sharedPool = static_cast<TSharedExecutorPool*>(Shared.get());
auto *pool = new TBasicExecutorPool(cfg, Harmonizer.get(), Jail.get());
- pool->AddSharedThread(sharedPool->GetSharedThread(poolId));
+ if (pool) {
+ pool->AddSharedThread(sharedPool->GetSharedThread(poolId));
+ }
return pool;
} else {
return new TBasicExecutorPool(cfg, Harmonizer.get(), Jail.get());
diff --git a/ydb/library/actors/core/cpu_manager.h b/ydb/library/actors/core/cpu_manager.h
index 0bd0994f4b..1fee946798 100644
--- a/ydb/library/actors/core/cpu_manager.h
+++ b/ydb/library/actors/core/cpu_manager.h
@@ -2,10 +2,10 @@
#include "config.h"
#include "executor_pool_jail.h"
+#include "harmonizer.h"
#include "executor_pool.h"
#include "executor_pool_shared.h"
#include "mon_stats.h"
-#include <ydb/library/actors/core/harmonizer/harmonizer.h>
#include <memory>
namespace NActors {
@@ -16,7 +16,7 @@ namespace NActors {
const ui32 ExecutorPoolCount;
TArrayHolder<TAutoPtr<IExecutorPool>> Executors;
std::unique_ptr<IHarmonizer> Harmonizer;
- std::unique_ptr<ISharedExecutorPool> Shared;
+ std::unique_ptr<TSharedExecutorPool> Shared;
std::unique_ptr<TExecutorPoolJail> Jail;
TCpuManagerConfig Config;
diff --git a/ydb/library/actors/core/executor_pool.h b/ydb/library/actors/core/executor_pool.h
index 8fd7911339..67297b428b 100644
--- a/ydb/library/actors/core/executor_pool.h
+++ b/ydb/library/actors/core/executor_pool.h
@@ -14,16 +14,15 @@ namespace NActors {
struct TExecutorThreadStats;
class TExecutorPoolJail;
class ISchedulerCookie;
- struct TSharedExecutorThreadCtx;
struct TCpuConsumption {
- double CpuUs = 0;
- double ElapsedUs = 0;
+ double ConsumedUs = 0;
+ double BookedUs = 0;
ui64 NotEnoughCpuExecutions = 0;
void Add(const TCpuConsumption& other) {
- CpuUs += other.CpuUs;
- ElapsedUs += other.ElapsedUs;
+ ConsumedUs += other.ConsumedUs;
+ BookedUs += other.BookedUs;
NotEnoughCpuExecutions += other.NotEnoughCpuExecutions;
}
};
@@ -177,16 +176,6 @@ namespace NActors {
return 1;
}
- virtual TSharedExecutorThreadCtx* ReleaseSharedThread() {
- return nullptr;
- }
- virtual void AddSharedThread(TSharedExecutorThreadCtx*) {
- }
-
- virtual bool IsSharedThreadEnabled() const {
- return false;
- }
-
virtual TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) {
Y_UNUSED(threadIdx);
return TCpuConsumption{0.0, 0.0};
diff --git a/ydb/library/actors/core/executor_pool_base.cpp b/ydb/library/actors/core/executor_pool_base.cpp
index e8e504abe2..fe41e94623 100644
--- a/ydb/library/actors/core/executor_pool_base.cpp
+++ b/ydb/library/actors/core/executor_pool_base.cpp
@@ -1,5 +1,4 @@
#include "actorsystem.h"
-#include "activity_guard.h"
#include "actor.h"
#include "executor_pool_base.h"
#include "executor_pool_basic_feature_flags.h"
diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp
index dbe3e89877..bfb72f02b3 100644
--- a/ydb/library/actors/core/executor_pool_basic.cpp
+++ b/ydb/library/actors/core/executor_pool_basic.cpp
@@ -407,10 +407,10 @@ namespace NActors {
poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState;
poolStats.DecreasingThreadsByExchange = stats.DecreasingThreadsByExchange;
poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount;
- poolStats.MaxCpuUs = stats.MaxCpuUs;
- poolStats.MinCpuUs = stats.MinCpuUs;
- poolStats.MaxElapsedUs = stats.MaxElapsedUs;
- poolStats.MinElapsedUs = stats.MinElapsedUs;
+ poolStats.MaxConsumedCpuUs = stats.MaxConsumedCpu;
+ poolStats.MinConsumedCpuUs = stats.MinConsumedCpu;
+ poolStats.MaxBookedCpuUs = stats.MaxBookedCpu;
+ poolStats.MinBookedCpuUs = stats.MinBookedCpu;
}
statsCopy.resize(MaxFullThreadCount + 1);
@@ -429,7 +429,7 @@ namespace NActors {
void TBasicExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
if (Harmonizer) {
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
- poolState.UsedCpu = stats.AvgElapsedUs;
+ poolState.UsedCpu = stats.AvgConsumedCpu;
poolState.PossibleMaxLimit = stats.PotentialMaxThreadCount;
} else {
poolState.PossibleMaxLimit = poolState.MaxLimit;
@@ -625,7 +625,7 @@ namespace NActors {
TExecutorThreadCtx& threadCtx = Threads[threadIdx];
TExecutorThreadStats stats;
threadCtx.Thread->GetCurrentStatsForHarmonizer(stats);
- return {static_cast<double>(stats.CpuUs), Ts2Us(stats.SafeElapsedTicks), stats.NotEnoughCpuExecutions};
+ return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs), stats.NotEnoughCpuExecutions};
}
i16 TBasicExecutorPool::GetBlockingThreadCount() const {
diff --git a/ydb/library/actors/core/executor_pool_basic.h b/ydb/library/actors/core/executor_pool_basic.h
index 7690999751..c60e2ab833 100644
--- a/ydb/library/actors/core/executor_pool_basic.h
+++ b/ydb/library/actors/core/executor_pool_basic.h
@@ -7,8 +7,8 @@
#include "executor_pool_basic_feature_flags.h"
#include "scheduler_queue.h"
#include "executor_pool_base.h"
+#include "harmonizer.h"
#include <memory>
-#include <ydb/library/actors/core/harmonizer/harmonizer.h>
#include <ydb/library/actors/actor_type/indexes.h>
#include <ydb/library/actors/util/unordered_cache.h>
#include <ydb/library/actors/util/threadparkpad.h>
@@ -278,11 +278,8 @@ namespace NActors {
void CalcSpinPerThread(ui64 wakingUpConsumption);
void ClearWaitingStats() const;
- TSharedExecutorThreadCtx* ReleaseSharedThread() override;
- void AddSharedThread(TSharedExecutorThreadCtx* thread) override;
- bool IsSharedThreadEnabled() const override {
- return true;
- }
+ TSharedExecutorThreadCtx* ReleaseSharedThread();
+ void AddSharedThread(TSharedExecutorThreadCtx* thread);
private:
void AskToGoToSleep(bool *needToWait, bool *needToBlock);
diff --git a/ydb/library/actors/core/executor_pool_io.cpp b/ydb/library/actors/core/executor_pool_io.cpp
index 089f7057f9..310a6f83b3 100644
--- a/ydb/library/actors/core/executor_pool_io.cpp
+++ b/ydb/library/actors/core/executor_pool_io.cpp
@@ -156,7 +156,7 @@ namespace NActors {
void TIOExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
if (Harmonizer) {
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
- poolState.UsedCpu = stats.AvgElapsedUs;
+ poolState.UsedCpu = stats.AvgConsumedCpu;
}
poolState.CurrentLimit = PoolThreads;
poolState.MaxLimit = PoolThreads;
diff --git a/ydb/library/actors/core/executor_pool_io.h b/ydb/library/actors/core/executor_pool_io.h
index 16b17127d4..316b21d4d2 100644
--- a/ydb/library/actors/core/executor_pool_io.h
+++ b/ydb/library/actors/core/executor_pool_io.h
@@ -3,9 +3,9 @@
#include "actorsystem.h"
#include "executor_thread.h"
#include "executor_thread_ctx.h"
+#include "harmonizer.h"
#include "scheduler_queue.h"
#include "executor_pool_base.h"
-#include <ydb/library/actors/core/harmonizer/harmonizer.h>
#include <ydb/library/actors/actor_type/indexes.h>
#include <ydb/library/actors/util/ticket_lock.h>
#include <ydb/library/actors/util/unordered_cache.h>
diff --git a/ydb/library/actors/core/executor_pool_shared.cpp b/ydb/library/actors/core/executor_pool_shared.cpp
index 7d1327f845..104e02812c 100644
--- a/ydb/library/actors/core/executor_pool_shared.cpp
+++ b/ydb/library/actors/core/executor_pool_shared.cpp
@@ -12,50 +12,6 @@
namespace NActors {
-class TSharedExecutorPool: public ISharedExecutorPool {
-public:
- TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads);
-
- // IThreadPool
- void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override;
- void Start() override;
- void PrepareStop() override;
- void Shutdown() override;
- bool Cleanup() override;
-
- TSharedExecutorThreadCtx *GetSharedThread(i16 poolId) override;
- void GetSharedStats(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override;
- void GetSharedStatsForHarmonizer(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override;
- TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx) override;
- std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId) override;
-
- i16 ReturnOwnHalfThread(i16 pool) override;
- i16 ReturnBorrowedHalfThread(i16 pool) override;
- void GiveHalfThread(i16 from, i16 to) override;
-
- i16 GetSharedThreadCount() const override;
-
- TSharedPoolState GetState() const override;
-
- void Init(const std::vector<IExecutorPool*>& pools, bool withThreads) override;
-
-private:
- TSharedPoolState State;
-
- std::vector<IExecutorPool*> Pools;
-
- i16 PoolCount;
- i16 SharedThreadCount;
- std::unique_ptr<TSharedExecutorThreadCtx[]> Threads;
-
- std::unique_ptr<NSchedulerQueue::TReader[]> ScheduleReaders;
- std::unique_ptr<NSchedulerQueue::TWriter[]> ScheduleWriters;
-
- TDuration TimePerMailbox;
- ui32 EventsPerMailbox;
- ui64 SoftProcessingDurationTs;
-}; // class TSharedExecutorPool
-
TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads)
: State(poolCount, poolsWithThreads.size())
, Pools(poolCount)
@@ -73,47 +29,40 @@ TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config
}
}
-void TSharedExecutorPool::Init(const std::vector<IExecutorPool*>& pools, bool withThreads) {
- std::vector<IExecutorPool*> poolByThread(SharedThreadCount);
- for (IExecutorPool* pool : pools) {
- Pools[pool->PoolId] = pool;
- i16 threadIdx = State.ThreadByPool[pool->PoolId];
- if (threadIdx >= 0) {
- poolByThread[threadIdx] = pool;
+void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
+ // ActorSystem = actorSystem;
+
+ ScheduleReaders.reset(new NSchedulerQueue::TReader[SharedThreadCount]);
+ ScheduleWriters.reset(new NSchedulerQueue::TWriter[SharedThreadCount]);
+
+ std::vector<IExecutorPool*> poolsBasic = actorSystem->GetBasicExecutorPools();
+ std::vector<IExecutorPool*> poolByThread(SharedThreadCount);
+ for (IExecutorPool* pool : poolsBasic) {
+ Pools[pool->PoolId] = dynamic_cast<TBasicExecutorPool*>(pool);
+ i16 threadIdx = State.ThreadByPool[pool->PoolId];
+ if (threadIdx >= 0) {
+ poolByThread[threadIdx] = pool;
+ }
}
- }
- for (i16 i = 0; i != SharedThreadCount; ++i) {
- // !TODO
- Threads[i].ExecutorPools[0].store(dynamic_cast<TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release);
- if (withThreads) {
+ for (i16 i = 0; i != SharedThreadCount; ++i) {
+ // !TODO
+ Threads[i].ExecutorPools[0].store(dynamic_cast<TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release);
Threads[i].Thread.reset(
new TSharedExecutorThread(
-1,
- nullptr,
- &Threads[i],
- PoolCount,
- "SharedThread",
- SoftProcessingDurationTs,
- TimePerMailbox,
+ actorSystem,
+ &Threads[i],
+ PoolCount,
+ "SharedThread",
+ SoftProcessingDurationTs,
+ TimePerMailbox,
EventsPerMailbox));
+ ScheduleWriters[i].Init(ScheduleReaders[i]);
}
- }
-}
-
-void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
- ScheduleReaders.reset(new NSchedulerQueue::TReader[SharedThreadCount]);
- ScheduleWriters.reset(new NSchedulerQueue::TWriter[SharedThreadCount]);
-
- std::vector<IExecutorPool*> poolsBasic = actorSystem->GetBasicExecutorPools();
- Init(poolsBasic, true);
- for (i16 i = 0; i != SharedThreadCount; ++i) {
- ScheduleWriters[i].Init(ScheduleReaders[i]);
- }
-
- *scheduleReaders = ScheduleReaders.get();
- *scheduleSz = SharedThreadCount;
+ *scheduleReaders = ScheduleReaders.get();
+ *scheduleSz = SharedThreadCount;
}
void TSharedExecutorPool::Start() {
@@ -150,27 +99,24 @@ TSharedExecutorThreadCtx* TSharedExecutorPool::GetSharedThread(i16 pool) {
return &Threads[threadIdx];
}
-i16 TSharedExecutorPool::ReturnOwnHalfThread(i16 pool) {
+void TSharedExecutorPool::ReturnOwnHalfThread(i16 pool) {
i16 threadIdx = State.ThreadByPool[pool];
- IExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
+ TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
Y_ABORT_UNLESS(borrowingPool);
- i16 borrowedPool = State.PoolByBorrowedThread[threadIdx];
- State.BorrowedThreadByPool[borrowedPool] = -1;
+ State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1;
State.PoolByBorrowedThread[threadIdx] = -1;
// TODO(kruall): Check on race
borrowingPool->ReleaseSharedThread();
- return borrowedPool;
}
-i16 TSharedExecutorPool::ReturnBorrowedHalfThread(i16 pool) {
+void TSharedExecutorPool::ReturnBorrowedHalfThread(i16 pool) {
i16 threadIdx = State.BorrowedThreadByPool[pool];
- IExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
+ TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
Y_ABORT_UNLESS(borrowingPool);
State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1;
State.PoolByBorrowedThread[threadIdx] = -1;
// TODO(kruall): Check on race
borrowingPool->ReleaseSharedThread();
- return State.PoolByThread[threadIdx];
}
void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
@@ -181,14 +127,14 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
if (borrowedThreadIdx != -1) {
i16 originalPool = State.PoolByThread[borrowedThreadIdx];
if (originalPool == to) {
- ReturnOwnHalfThread(to);
+ return ReturnOwnHalfThread(to);
} else {
ReturnOwnHalfThread(originalPool);
}
from = originalPool;
}
i16 threadIdx = State.ThreadByPool[from];
- IExecutorPool* borrowingPool = Pools[to];
+ TBasicExecutorPool* borrowingPool = Pools[to];
Threads[threadIdx].ExecutorPools[1].store(borrowingPool, std::memory_order_release);
State.BorrowedThreadByPool[to] = threadIdx;
State.PoolByBorrowedThread[threadIdx] = to;
@@ -197,16 +143,16 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
}
void TSharedExecutorPool::GetSharedStats(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) {
- statsCopy.resize(SharedThreadCount);
+ statsCopy.resize(SharedThreadCount + 1);
for (i16 i = 0; i < SharedThreadCount; ++i) {
- Threads[i].Thread->GetSharedStats(poolId, statsCopy[i]);
+ Threads[i].Thread->GetSharedStats(poolId, statsCopy[i + 1]);
}
}
void TSharedExecutorPool::GetSharedStatsForHarmonizer(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) {
- statsCopy.resize(SharedThreadCount);
+ statsCopy.resize(SharedThreadCount + 1);
for (i16 i = 0; i < SharedThreadCount; ++i) {
- Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i]);
+ Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i + 1]);
}
}
@@ -235,34 +181,4 @@ TSharedPoolState TSharedExecutorPool::GetState() const {
return State;
}
-ISharedExecutorPool *CreateSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads) {
- return new TSharedExecutorPool(config, poolCount, poolsWithThreads);
-}
-
-TString TSharedPoolState::ToString() const {
- TStringBuilder builder;
- builder << '{';
- builder << "ThreadByPool: [";
- for (ui32 i = 0; i < ThreadByPool.size(); ++i) {
- builder << ThreadByPool[i] << (i == ThreadByPool.size() - 1 ? "" : ", ");
- }
- builder << "], ";
- builder << "PoolByThread: [";
- for (ui32 i = 0; i < PoolByThread.size(); ++i) {
- builder << PoolByThread[i] << (i == PoolByThread.size() - 1 ? "" : ", ");
- }
- builder << "], ";
- builder << "BorrowedThreadByPool: [";
- for (ui32 i = 0; i < BorrowedThreadByPool.size(); ++i) {
- builder << BorrowedThreadByPool[i] << (i == BorrowedThreadByPool.size() - 1 ? "" : ", ");
- }
- builder << "], ";
- builder << "PoolByBorrowedThread: [";
- for (ui32 i = 0; i < PoolByBorrowedThread.size(); ++i) {
- builder << PoolByBorrowedThread[i] << (i == PoolByBorrowedThread.size() - 1 ? "" : ", ");
- }
- builder << ']';
- return builder << '}';
-}
-
}
diff --git a/ydb/library/actors/core/executor_pool_shared.h b/ydb/library/actors/core/executor_pool_shared.h
index f6c79c07a5..c083c21654 100644
--- a/ydb/library/actors/core/executor_pool_shared.h
+++ b/ydb/library/actors/core/executor_pool_shared.h
@@ -1,12 +1,14 @@
#pragma once
#include "executor_pool.h"
+#include "executor_thread_ctx.h"
namespace NActors {
+ struct TExecutorThreadCtx;
struct TSharedExecutorPoolConfig;
- struct TSharedExecutorThreadCtx;
+ class TBasicExecutorPool;
struct TSharedPoolState {
std::vector<i16> ThreadByPool;
@@ -20,30 +22,48 @@ namespace NActors {
, BorrowedThreadByPool(poolCount, -1)
, PoolByBorrowedThread(threadCount, -1)
{}
-
- TString ToString() const;
};
- class ISharedExecutorPool : public IActorThreadPool {
+ class TSharedExecutorPool: public IActorThreadPool {
public:
- virtual ~ISharedExecutorPool() = default;
+ TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads);
- virtual TSharedExecutorThreadCtx *GetSharedThread(i16 poolId) = 0;
- virtual void GetSharedStats(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) = 0;
- virtual void GetSharedStatsForHarmonizer(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) = 0;
- virtual TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx) = 0;
- virtual std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId) = 0;
- virtual void Init(const std::vector<IExecutorPool*>& pools, bool withThreads) = 0;
+ // IThreadPool
+ void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override;
+ void Start() override;
+ void PrepareStop() override;
+ void Shutdown() override;
+ bool Cleanup() override;
- virtual i16 ReturnOwnHalfThread(i16 pool) = 0;
- virtual i16 ReturnBorrowedHalfThread(i16 pool) = 0;
- virtual void GiveHalfThread(i16 from, i16 to) = 0;
+ TSharedExecutorThreadCtx *GetSharedThread(i16 poolId);
+ void GetSharedStats(i16 pool, std::vector<TExecutorThreadStats>& statsCopy);
+ void GetSharedStatsForHarmonizer(i16 pool, std::vector<TExecutorThreadStats>& statsCopy);
+ TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx);
+ std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId);
- virtual i16 GetSharedThreadCount() const = 0;
+ void ReturnOwnHalfThread(i16 pool);
+ void ReturnBorrowedHalfThread(i16 pool);
+ void GiveHalfThread(i16 from, i16 to);
- virtual TSharedPoolState GetState() const = 0;
- };
+ i16 GetSharedThreadCount() const;
+
+ TSharedPoolState GetState() const;
+
+ private:
+ TSharedPoolState State;
+
+ std::vector<TBasicExecutorPool*> Pools;
- ISharedExecutorPool *CreateSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads);
+ i16 PoolCount;
+ i16 SharedThreadCount;
+ std::unique_ptr<TSharedExecutorThreadCtx[]> Threads;
+
+ std::unique_ptr<NSchedulerQueue::TReader[]> ScheduleReaders;
+ std::unique_ptr<NSchedulerQueue::TWriter[]> ScheduleWriters;
+
+ TDuration TimePerMailbox;
+ ui32 EventsPerMailbox;
+ ui64 SoftProcessingDurationTs;
+ };
} \ No newline at end of file
diff --git a/ydb/library/actors/core/executor_thread_ctx.h b/ydb/library/actors/core/executor_thread_ctx.h
index c7d1caa5ed..6a5bfaa34b 100644
--- a/ydb/library/actors/core/executor_thread_ctx.h
+++ b/ydb/library/actors/core/executor_thread_ctx.h
@@ -11,7 +11,8 @@
namespace NActors {
class TGenericExecutorThread;
- class IExecutorPool;
+ class TBasicExecutorPool;
+ class TIOExecutorPool;
enum class EThreadState : ui64 {
None,
@@ -122,7 +123,7 @@ namespace NActors {
struct TExecutorThreadCtx : public TGenericExecutorThreadCtx {
using TBase = TGenericExecutorThreadCtx;
- IExecutorPool *OwnerExecutorPool = nullptr;
+ TBasicExecutorPool *OwnerExecutorPool = nullptr;
void SetWork() {
ExchangeState(EThreadState::Work);
@@ -185,7 +186,7 @@ namespace NActors {
}
};
- std::atomic<IExecutorPool*> ExecutorPools[MaxPoolsForSharedThreads];
+ std::atomic<TBasicExecutorPool*> ExecutorPools[MaxPoolsForSharedThreads];
std::atomic<i64> RequestsForWakeUp = 0;
ui32 NextPool = 0;
diff --git a/ydb/library/actors/core/harmonizer.cpp b/ydb/library/actors/core/harmonizer.cpp
new file mode 100644
index 0000000000..df65164216
--- /dev/null
+++ b/ydb/library/actors/core/harmonizer.cpp
@@ -0,0 +1,861 @@
+#include "harmonizer.h"
+
+#include "executor_thread_ctx.h"
+#include "executor_thread.h"
+#include "probes.h"
+
+#include "activity_guard.h"
+#include "actorsystem.h"
+#include "executor_pool_basic.h"
+#include "executor_pool_basic_feature_flags.h"
+#include "executor_pool_shared.h"
+
+#include <atomic>
+#include <ydb/library/actors/util/cpu_load_log.h>
+#include <ydb/library/actors/util/datetime.h>
+#include <ydb/library/actors/util/intrinsics.h>
+
+#include <util/system/spinlock.h>
+
+#include <algorithm>
+
+namespace NActors {
+
+LWTRACE_USING(ACTORLIB_PROVIDER);
+
+constexpr bool CheckBinaryPower(ui64 value) {
+ return !(value & (value - 1));
+}
+
+template <ui8 HistoryBufferSize = 8>
+struct TValueHistory {
+ static_assert(CheckBinaryPower(HistoryBufferSize));
+
+ double History[HistoryBufferSize] = {0.0};
+ ui64 HistoryIdx = 0;
+ ui64 LastTs = Max<ui64>();
+ double LastUs = 0.0;
+ double AccumulatedUs = 0.0;
+ ui64 AccumulatedTs = 0;
+
+ template <bool WithTail=false>
+ double Accumulate(auto op, auto comb, ui8 seconds) {
+ double acc = AccumulatedUs;
+ size_t idx = HistoryIdx;
+ ui8 leftSeconds = seconds;
+ if constexpr (!WithTail) {
+ idx--;
+ leftSeconds--;
+ if (idx >= HistoryBufferSize) {
+ idx = HistoryBufferSize - 1;
+ }
+ acc = History[idx];
+ }
+ do {
+ idx--;
+ leftSeconds--;
+ if (idx >= HistoryBufferSize) {
+ idx = HistoryBufferSize - 1;
+ }
+ if constexpr (WithTail) {
+ acc = op(acc, History[idx]);
+ } else if (leftSeconds) {
+ acc = op(acc, History[idx]);
+ } else {
+ ui64 tsInSecond = Us2Ts(1'000'000.0);
+ acc = op(acc, History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond);
+ }
+ } while (leftSeconds);
+ double duration = 1'000'000.0 * seconds;
+ if constexpr (WithTail) {
+ duration += Ts2Us(AccumulatedTs);
+ }
+ return comb(acc, duration);
+ }
+
+ template <bool WithTail=false>
+ double GetAvgPartForLastSeconds(ui8 seconds) {
+ auto sum = [](double acc, double value) {
+ return acc + value;
+ };
+ auto avg = [](double sum, double duration) {
+ return sum / duration;
+ };
+ return Accumulate<WithTail>(sum, avg, seconds);
+ }
+
+ double GetAvgPart() {
+ return GetAvgPartForLastSeconds<true>(HistoryBufferSize);
+ }
+
+ double GetMaxForLastSeconds(ui8 seconds) {
+ auto max = [](const double& acc, const double& value) {
+ return Max(acc, value);
+ };
+ auto fst = [](const double& value, const double&) { return value; };
+ return Accumulate<false>(max, fst, seconds);
+ }
+
+ double GetMax() {
+ return GetMaxForLastSeconds(HistoryBufferSize);
+ }
+
+ i64 GetMaxInt() {
+ return static_cast<i64>(GetMax());
+ }
+
+ double GetMinForLastSeconds(ui8 seconds) {
+ auto min = [](const double& acc, const double& value) {
+ return Min(acc, value);
+ };
+ auto fst = [](const double& value, const double&) { return value; };
+ return Accumulate<false>(min, fst, seconds);
+ }
+
+ double GetMin() {
+ return GetMinForLastSeconds(HistoryBufferSize);
+ }
+
+ i64 GetMinInt() {
+ return static_cast<i64>(GetMin());
+ }
+
+ void Register(ui64 ts, double valueUs) {
+ if (ts < LastTs) {
+ LastTs = ts;
+ LastUs = valueUs;
+ AccumulatedUs = 0.0;
+ AccumulatedTs = 0;
+ return;
+ }
+ ui64 lastTs = std::exchange(LastTs, ts);
+ ui64 dTs = ts - lastTs;
+ double lastUs = std::exchange(LastUs, valueUs);
+ double dUs = valueUs - lastUs;
+ LWPROBE(RegisterValue, ts, lastTs, dTs, Us2Ts(8'000'000.0), valueUs, lastUs, dUs);
+
+ if (dTs > Us2Ts(8'000'000.0)) {
+ dUs = dUs * 1'000'000.0 / Ts2Us(dTs);
+ for (size_t idx = 0; idx < HistoryBufferSize; ++idx) {
+ History[idx] = dUs;
+ }
+ AccumulatedUs = 0.0;
+ AccumulatedTs = 0;
+ return;
+ }
+
+ while (dTs > 0) {
+ if (AccumulatedTs + dTs < Us2Ts(1'000'000.0)) {
+ AccumulatedTs += dTs;
+ AccumulatedUs += dUs;
+ break;
+ } else {
+ ui64 addTs = Us2Ts(1'000'000.0) - AccumulatedTs;
+ double addUs = dUs * addTs / dTs;
+ dTs -= addTs;
+ dUs -= addUs;
+ History[HistoryIdx] = AccumulatedUs + addUs;
+ HistoryIdx = (HistoryIdx + 1) % HistoryBufferSize;
+ AccumulatedUs = 0.0;
+ AccumulatedTs = 0;
+ }
+ }
+ }
+};
+
+struct TThreadInfo {
+ TValueHistory<8> Consumed;
+ TValueHistory<8> Booked;
+};
+
+struct TPoolInfo {
+ std::vector<TThreadInfo> ThreadInfo;
+ std::vector<TThreadInfo> SharedInfo;
+ TSharedExecutorPool* Shared = nullptr;
+ IExecutorPool* Pool = nullptr;
+ TBasicExecutorPool* BasicPool = nullptr;
+
+ i16 DefaultFullThreadCount = 0;
+ i16 MinFullThreadCount = 0;
+ i16 MaxFullThreadCount = 0;
+
+ float DefaultThreadCount = 0;
+ float MinThreadCount = 0;
+ float MaxThreadCount = 0;
+
+ i16 Priority = 0;
+ NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter;
+ NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
+ ui32 MaxAvgPingUs = 0;
+ ui64 LastUpdateTs = 0;
+ ui64 NotEnoughCpuExecutions = 0;
+ ui64 NewNotEnoughCpuExecutions = 0;
+ ui16 LocalQueueSize = NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE;
+
+ TAtomic LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish
+ TAtomic IncreasingThreadsByNeedyState = 0;
+ TAtomic IncreasingThreadsByExchange = 0;
+ TAtomic DecreasingThreadsByStarvedState = 0;
+ TAtomic DecreasingThreadsByHoggishState = 0;
+ TAtomic DecreasingThreadsByExchange = 0;
+ TAtomic PotentialMaxThreadCount = 0;
+
+ TValueHistory<16> Consumed;
+ TValueHistory<16> Booked;
+
+ std::atomic<float> MaxConsumedCpu = 0;
+ std::atomic<float> MinConsumedCpu = 0;
+ std::atomic<float> AvgConsumedCpu = 0;
+ std::atomic<float> MaxBookedCpu = 0;
+ std::atomic<float> MinBookedCpu = 0;
+
+ std::unique_ptr<TWaitingStats<ui64>> WaitingStats;
+ std::unique_ptr<TWaitingStats<double>> MovingWaitingStats;
+
+ double GetBooked(i16 threadIdx);
+ double GetSharedBooked(i16 threadIdx);
+ double GetLastSecondBooked(i16 threadIdx);
+ double GetLastSecondSharedBooked(i16 threadIdx);
+ double GetConsumed(i16 threadIdx);
+ double GetSharedConsumed(i16 threadIdx);
+ double GetLastSecondConsumed(i16 threadIdx);
+ double GetLastSecondSharedConsumed(i16 threadIdx);
+ TCpuConsumption PullStats(ui64 ts);
+ i16 GetFullThreadCount();
+ float GetThreadCount();
+ void SetFullThreadCount(i16 threadCount);
+ bool IsAvgPingGood();
+};
+
+double TPoolInfo::GetBooked(i16 threadIdx) {
+ if ((size_t)threadIdx < ThreadInfo.size()) {
+ return ThreadInfo[threadIdx].Booked.GetAvgPart();
+ }
+ return 0.0;
+}
+
+double TPoolInfo::GetSharedBooked(i16 threadIdx) {
+ if ((size_t)threadIdx < SharedInfo.size()) {
+ return SharedInfo[threadIdx].Booked.GetAvgPart();
+ }
+ return 0.0;
+}
+
+double TPoolInfo::GetLastSecondBooked(i16 threadIdx) {
+ if ((size_t)threadIdx < ThreadInfo.size()) {
+ return ThreadInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1);
+ }
+ return 0.0;
+}
+
+double TPoolInfo::GetLastSecondSharedBooked(i16 threadIdx) {
+ if ((size_t)threadIdx < SharedInfo.size()) {
+ return SharedInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1);
+ }
+ return 0.0;
+}
+
+double TPoolInfo::GetConsumed(i16 threadIdx) {
+ if ((size_t)threadIdx < ThreadInfo.size()) {
+ return ThreadInfo[threadIdx].Consumed.GetAvgPart();
+ }
+ return 0.0;
+}
+
+double TPoolInfo::GetSharedConsumed(i16 threadIdx) {
+ if ((size_t)threadIdx < SharedInfo.size()) {
+ return SharedInfo[threadIdx].Consumed.GetAvgPart();
+ }
+ return 0.0;
+}
+
+double TPoolInfo::GetLastSecondConsumed(i16 threadIdx) {
+ if ((size_t)threadIdx < ThreadInfo.size()) {
+ return ThreadInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1);
+ }
+ return 0.0;
+}
+
+double TPoolInfo::GetLastSecondSharedConsumed(i16 threadIdx) {
+ if ((size_t)threadIdx < SharedInfo.size()) {
+ return SharedInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1);
+ }
+ return 0.0;
+}
+
+#define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7]
+TCpuConsumption TPoolInfo::PullStats(ui64 ts) {
+ TCpuConsumption acc;
+ for (i16 threadIdx = 0; threadIdx < MaxFullThreadCount; ++threadIdx) {
+ TThreadInfo &threadInfo = ThreadInfo[threadIdx];
+ TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx);
+ acc.Add(cpuConsumption);
+ threadInfo.Consumed.Register(ts, cpuConsumption.ConsumedUs);
+ LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History));
+ threadInfo.Booked.Register(ts, cpuConsumption.BookedUs);
+ LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History));
+ }
+ TVector<TExecutorThreadStats> sharedStats;
+ if (Shared) {
+ Shared->GetSharedStatsForHarmonizer(Pool->PoolId, sharedStats);
+ }
+
+ for (ui32 sharedIdx = 0; sharedIdx < SharedInfo.size(); ++sharedIdx) {
+ auto stat = sharedStats[sharedIdx];
+ TCpuConsumption sharedConsumption{
+ Ts2Us(stat.SafeElapsedTicks),
+ static_cast<double>(stat.CpuUs),
+ stat.NotEnoughCpuExecutions
+ };
+ acc.Add(sharedConsumption);
+ SharedInfo[sharedIdx].Consumed.Register(ts, sharedConsumption.ConsumedUs);
+ LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "shared_consumed", UNROLL_HISTORY(SharedInfo[sharedIdx].Consumed.History));
+ SharedInfo[sharedIdx].Booked.Register(ts, sharedConsumption.BookedUs);
+ LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "shared_booked", UNROLL_HISTORY(SharedInfo[sharedIdx].Booked.History));
+ }
+
+ Consumed.Register(ts, acc.ConsumedUs);
+ MaxConsumedCpu.store(Consumed.GetMax() / 1'000'000, std::memory_order_relaxed);
+ MinConsumedCpu.store(Consumed.GetMin() / 1'000'000, std::memory_order_relaxed);
+ AvgConsumedCpu.store(Consumed.GetAvgPart() / 1'000'000, std::memory_order_relaxed);
+ Booked.Register(ts, acc.BookedUs);
+ MaxBookedCpu.store(Booked.GetMax() / 1'000'000, std::memory_order_relaxed);
+ MinBookedCpu.store(Booked.GetMin() / 1'000'000, std::memory_order_relaxed);
+ NewNotEnoughCpuExecutions = acc.NotEnoughCpuExecutions - NotEnoughCpuExecutions;
+ NotEnoughCpuExecutions = acc.NotEnoughCpuExecutions;
+ if (WaitingStats && BasicPool) {
+ WaitingStats->Clear();
+ BasicPool->GetWaitingStats(*WaitingStats);
+ if constexpr (!NFeatures::TSpinFeatureFlags::CalcPerThread) {
+ MovingWaitingStats->Add(*WaitingStats, 0.8, 0.2);
+ }
+ }
+ return acc;
+}
+#undef UNROLL_HISTORY
+
+float TPoolInfo::GetThreadCount() {
+ return Pool->GetThreadCount();
+}
+
+i16 TPoolInfo::GetFullThreadCount() {
+ return Pool->GetFullThreadCount();
+}
+
+void TPoolInfo::SetFullThreadCount(i16 threadCount) {
+ Pool->SetFullThreadCount(threadCount);
+}
+
+bool TPoolInfo::IsAvgPingGood() {
+ bool res = true;
+ if (AvgPingCounter) {
+ res &= *AvgPingCounter > MaxAvgPingUs;
+ }
+ if (AvgPingCounterWithSmallWindow) {
+ res &= *AvgPingCounterWithSmallWindow > MaxAvgPingUs;
+ }
+ return res;
+}
+
+class THarmonizer: public IHarmonizer {
+private:
+ std::atomic<bool> IsDisabled = false;
+ TSpinLock Lock;
+ std::atomic<ui64> NextHarmonizeTs = 0;
+ std::vector<std::unique_ptr<TPoolInfo>> Pools;
+ std::vector<ui16> PriorityOrder;
+
+ TValueHistory<16> Consumed;
+ TValueHistory<16> Booked;
+
+ TAtomic MaxConsumedCpu = 0;
+ TAtomic MinConsumedCpu = 0;
+ TAtomic MaxBookedCpu = 0;
+ TAtomic MinBookedCpu = 0;
+
+ TSharedExecutorPool* Shared = nullptr;
+
+ std::atomic<double> AvgAwakeningTimeUs = 0;
+ std::atomic<double> AvgWakingUpTimeUs = 0;
+
+ void PullStats(ui64 ts);
+ void HarmonizeImpl(ui64 ts);
+ void CalculatePriorityOrder();
+public:
+ THarmonizer(ui64 ts);
+ virtual ~THarmonizer();
+ double Rescale(double value) const;
+ void Harmonize(ui64 ts) override;
+ void DeclareEmergency(ui64 ts) override;
+ void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override;
+ void Enable(bool enable) override;
+ TPoolHarmonizerStats GetPoolStats(i16 poolId) const override;
+ THarmonizerStats GetStats() const override;
+ void SetSharedPool(TSharedExecutorPool* pool) override;
+};
+
+THarmonizer::THarmonizer(ui64 ts) {
+ NextHarmonizeTs = ts;
+}
+
+THarmonizer::~THarmonizer() {
+}
+
+double THarmonizer::Rescale(double value) const {
+ return Max(0.0, Min(1.0, value * (1.0/0.9)));
+}
+
+void THarmonizer::PullStats(ui64 ts) {
+ TCpuConsumption acc;
+ for (auto &pool : Pools) {
+ TCpuConsumption consumption = pool->PullStats(ts);
+ acc.Add(consumption);
+ }
+ Consumed.Register(ts, acc.ConsumedUs);
+ RelaxedStore(&MaxConsumedCpu, Consumed.GetMaxInt());
+ RelaxedStore(&MinConsumedCpu, Consumed.GetMinInt());
+ Booked.Register(ts, acc.BookedUs);
+ RelaxedStore(&MaxBookedCpu, Booked.GetMaxInt());
+ RelaxedStore(&MinBookedCpu, Booked.GetMinInt());
+}
+
+Y_FORCE_INLINE bool IsStarved(double consumed, double booked) {
+ return Max(consumed, booked) > 0.1 && consumed < booked * 0.7;
+}
+
+Y_FORCE_INLINE bool IsHoggish(double booked, double currentThreadCount) {
+ return booked < currentThreadCount - 0.5;
+}
+
+void THarmonizer::HarmonizeImpl(ui64 ts) {
+ bool isStarvedPresent = false;
+ double booked = 0.0;
+ double consumed = 0.0;
+ double lastSecondBooked = 0.0;
+ i64 beingStopped = 0;
+ double total = 0;
+ TStackVec<size_t, 8> needyPools;
+ TStackVec<std::pair<size_t, double>, 8> hoggishPools;
+ TStackVec<bool, 8> isNeedyByPool;
+
+ size_t sumOfAdditionalThreads = 0;
+
+ ui64 TotalWakingUpTime = 0;
+ ui64 TotalWakingUps = 0;
+ ui64 TotalAwakeningTime = 0;
+ ui64 TotalAwakenings = 0;
+ for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
+ TPoolInfo& pool = *Pools[poolIdx];
+ if (pool.WaitingStats) {
+ TotalWakingUpTime += pool.WaitingStats->WakingUpTotalTime;
+ TotalWakingUps += pool.WaitingStats->WakingUpCount;
+ TotalAwakeningTime += pool.WaitingStats->AwakingTotalTime;
+ TotalAwakenings += pool.WaitingStats->AwakingCount;
+ }
+ }
+
+ constexpr ui64 knownAvgWakingUpTime = TWaitingStatsConstants::KnownAvgWakingUpTime;
+ constexpr ui64 knownAvgAwakeningUpTime = TWaitingStatsConstants::KnownAvgAwakeningTime;
+
+ ui64 realAvgWakingUpTime = (TotalWakingUps ? TotalWakingUpTime / TotalWakingUps : knownAvgWakingUpTime);
+ ui64 avgWakingUpTime = realAvgWakingUpTime;
+ if (avgWakingUpTime > 2 * knownAvgWakingUpTime || !realAvgWakingUpTime) {
+ avgWakingUpTime = knownAvgWakingUpTime;
+ }
+ AvgWakingUpTimeUs = Ts2Us(avgWakingUpTime);
+
+ ui64 realAvgAwakeningTime = (TotalAwakenings ? TotalAwakeningTime / TotalAwakenings : knownAvgAwakeningUpTime);
+ ui64 avgAwakeningTime = realAvgAwakeningTime;
+ if (avgAwakeningTime > 2 * knownAvgAwakeningUpTime || !realAvgAwakeningTime) {
+ avgAwakeningTime = knownAvgAwakeningUpTime;
+ }
+ AvgAwakeningTimeUs = Ts2Us(avgAwakeningTime);
+
+ ui64 avgWakingUpConsumption = avgWakingUpTime + avgAwakeningTime;
+ LWPROBE(WakingUpConsumption, Ts2Us(avgWakingUpTime), Ts2Us(avgWakingUpTime), Ts2Us(avgAwakeningTime), Ts2Us(realAvgAwakeningTime), Ts2Us(avgWakingUpConsumption));
+
+ for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
+ TPoolInfo& pool = *Pools[poolIdx];
+ if (!pool.BasicPool) {
+ continue;
+ }
+ if (pool.BasicPool->ActorSystemProfile != EASProfile::Default) {
+ if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) {
+ pool.BasicPool->CalcSpinPerThread(avgWakingUpConsumption);
+ } else if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) {
+ ui64 newSpinThreshold = pool.MovingWaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption);
+ pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold);
+ } else {
+ ui64 newSpinThreshold = pool.WaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption);
+ pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold);
+ }
+ pool.BasicPool->ClearWaitingStats();
+ }
+ }
+
+ std::vector<bool> hasSharedThread(Pools.size());
+ std::vector<bool> hasSharedThreadWhichWasNotBorrowed(Pools.size());
+ std::vector<bool> hasBorrowedSharedThread(Pools.size());
+ std::vector<i16> freeHalfThread;
+ if (Shared) {
+ auto sharedState = Shared->GetState();
+ for (ui32 poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
+ i16 threadIdx = sharedState.ThreadByPool[poolIdx];
+ if (threadIdx != -1) {
+ hasSharedThread[poolIdx] = true;
+ if (sharedState.PoolByBorrowedThread[threadIdx] == -1) {
+ hasSharedThreadWhichWasNotBorrowed[poolIdx] = true;
+ }
+
+ }
+ if (sharedState.BorrowedThreadByPool[poolIdx] != -1) {
+ hasBorrowedSharedThread[poolIdx] = true;
+ }
+ }
+ }
+
+ for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
+ TPoolInfo& pool = *Pools[poolIdx];
+ total += pool.DefaultThreadCount;
+
+ i16 currentFullThreadCount = pool.GetFullThreadCount();
+ sumOfAdditionalThreads += Max(0, currentFullThreadCount - pool.DefaultFullThreadCount);
+ float currentThreadCount = pool.GetThreadCount();
+
+ double poolBooked = 0.0;
+ double poolConsumed = 0.0;
+ double lastSecondPoolBooked = 0.0;
+ double lastSecondPoolConsumed = 0.0;
+ beingStopped += pool.Pool->GetBlockingThreadCount();
+
+ for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) {
+ double threadBooked = Rescale(pool.GetBooked(threadIdx));
+ double threadLastSecondBooked = Rescale(pool.GetLastSecondBooked(threadIdx));
+ double threadConsumed = Rescale(pool.GetConsumed(threadIdx));
+ double threadLastSecondConsumed = Rescale(pool.GetLastSecondConsumed(threadIdx));
+ poolBooked += threadBooked;
+ lastSecondPoolBooked += threadLastSecondBooked;
+ poolConsumed += threadConsumed;
+ lastSecondPoolConsumed += threadLastSecondConsumed;
+ LWPROBE(HarmonizeCheckPoolByThread, poolIdx, pool.Pool->GetName(), threadIdx, threadBooked, threadConsumed, threadLastSecondBooked, threadLastSecondConsumed);
+ }
+
+ for (ui32 sharedIdx = 0; sharedIdx < pool.SharedInfo.size(); ++sharedIdx) {
+ double sharedBooked = Rescale(pool.GetSharedBooked(sharedIdx));
+ double sharedLastSecondBooked = Rescale(pool.GetLastSecondSharedBooked(sharedIdx));
+ double sharedConsumed = Rescale(pool.GetSharedConsumed(sharedIdx));
+ double sharedLastSecondConsumed = Rescale(pool.GetLastSecondSharedConsumed(sharedIdx));
+ poolBooked += sharedBooked;
+ lastSecondPoolBooked += sharedLastSecondBooked;
+ poolConsumed += sharedConsumed;
+ lastSecondPoolConsumed += sharedLastSecondConsumed;
+ LWPROBE(HarmonizeCheckPoolByThread, poolIdx, pool.Pool->GetName(), -1 - sharedIdx, sharedBooked, sharedConsumed, sharedLastSecondBooked, sharedLastSecondConsumed);
+ }
+
+ bool isStarved = IsStarved(poolConsumed, poolBooked) || IsStarved(lastSecondPoolConsumed, lastSecondPoolBooked);
+ if (isStarved) {
+ isStarvedPresent = true;
+ }
+
+ bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (poolBooked >= currentThreadCount);
+ if (pool.AvgPingCounter) {
+ if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) {
+ isNeedy = false;
+ } else {
+ pool.LastUpdateTs = ts;
+ }
+ }
+ if (currentThreadCount - poolBooked > 0.5) {
+ if (hasBorrowedSharedThread[poolIdx] || hasSharedThreadWhichWasNotBorrowed[poolIdx]) {
+ freeHalfThread.push_back(poolIdx);
+ }
+ }
+ isNeedyByPool.push_back(isNeedy);
+ if (isNeedy) {
+ needyPools.push_back(poolIdx);
+ }
+ bool isHoggish = IsHoggish(poolBooked, currentThreadCount)
+ || IsHoggish(lastSecondPoolBooked, currentThreadCount);
+ if (isHoggish) {
+ hoggishPools.push_back({poolIdx, std::max(poolBooked - currentThreadCount, lastSecondPoolBooked - currentThreadCount)});
+ }
+ booked += poolBooked;
+ consumed += poolConsumed;
+ AtomicSet(pool.LastFlags, (i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2));
+ LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, currentThreadCount, pool.MaxFullThreadCount, isStarved, isNeedy, isHoggish);
+ }
+
+ double budget = total - Max(booked, lastSecondBooked);
+ i16 budgetInt = static_cast<i16>(Max(budget, 0.0));
+ if (budget < -0.1) {
+ isStarvedPresent = true;
+ }
+ double overbooked = consumed - booked;
+ if (overbooked < 0) {
+ isStarvedPresent = false;
+ }
+
+ if (needyPools.size()) {
+ Sort(needyPools.begin(), needyPools.end(), [&] (i16 lhs, i16 rhs) {
+ if (Pools[lhs]->Priority != Pools[rhs]->Priority) {
+ return Pools[lhs]->Priority > Pools[rhs]->Priority;
+ }
+ return Pools[lhs]->Pool->PoolId < Pools[rhs]->Pool->PoolId;
+ });
+ }
+
+ if (freeHalfThread.size()) {
+ Sort(freeHalfThread.begin(), freeHalfThread.end(), [&] (i16 lhs, i16 rhs) {
+ if (Pools[lhs]->Priority != Pools[rhs]->Priority) {
+ return Pools[lhs]->Priority > Pools[rhs]->Priority;
+ }
+ return Pools[lhs]->Pool->PoolId < Pools[rhs]->Pool->PoolId;
+ });
+ }
+
+ if (isStarvedPresent) {
+ // last_starved_at_consumed_value = сумма по всем пулам consumed;
+ // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total,
+ // использовать вместо total
+ if (beingStopped && beingStopped >= overbooked) {
+ // do nothing
+ } else {
+ for (ui16 poolIdx : PriorityOrder) {
+ TPoolInfo &pool = *Pools[poolIdx];
+ i64 threadCount = pool.GetFullThreadCount();
+ if (hasSharedThread[poolIdx] && !hasSharedThreadWhichWasNotBorrowed[poolIdx]) {
+ Shared->ReturnOwnHalfThread(poolIdx);
+ }
+ while (threadCount > pool.DefaultFullThreadCount) {
+ pool.SetFullThreadCount(--threadCount);
+ AtomicIncrement(pool.DecreasingThreadsByStarvedState);
+ overbooked--;
+ sumOfAdditionalThreads--;
+
+ LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by starving", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
+ if (overbooked < 1) {
+ break;
+ }
+ }
+ if (overbooked < 1) {
+ break;
+ }
+ }
+ }
+ } else {
+ for (size_t needyPoolIdx : needyPools) {
+ TPoolInfo &pool = *Pools[needyPoolIdx];
+ i64 threadCount = pool.GetFullThreadCount();
+ if (budget >= 1.0) {
+ if (threadCount + 1 <= pool.MaxFullThreadCount) {
+ AtomicIncrement(pool.IncreasingThreadsByNeedyState);
+ isNeedyByPool[needyPoolIdx] = false;
+ sumOfAdditionalThreads++;
+ pool.SetFullThreadCount(threadCount + 1);
+ budget -= 1.0;
+ LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
+ }
+ } else if (Shared && budget >= 0.5 && !hasBorrowedSharedThread[needyPoolIdx] && freeHalfThread.size()) {
+ Shared->GiveHalfThread(freeHalfThread.back(), needyPoolIdx);
+ freeHalfThread.pop_back();
+ isNeedyByPool[needyPoolIdx] = false;
+ budget -= 0.5;
+ }
+ if constexpr (NFeatures::IsLocalQueues()) {
+ bool needToExpandLocalQueue = budget < 1.0 || threadCount >= pool.MaxFullThreadCount;
+ needToExpandLocalQueue &= (bool)pool.BasicPool;
+ needToExpandLocalQueue &= (pool.MaxFullThreadCount > 1);
+ needToExpandLocalQueue &= (pool.LocalQueueSize < NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE);
+ if (needToExpandLocalQueue) {
+ pool.BasicPool->SetLocalQueueSize(++pool.LocalQueueSize);
+ }
+ }
+ }
+ }
+
+ if (budget < 1.0) {
+ size_t takingAwayThreads = 0;
+ for (size_t needyPoolIdx : needyPools) {
+ TPoolInfo &pool = *Pools[needyPoolIdx];
+ i64 threadCount = pool.GetFullThreadCount();
+ sumOfAdditionalThreads -= threadCount - pool.DefaultFullThreadCount;
+ if (sumOfAdditionalThreads < takingAwayThreads + 1) {
+ break;
+ }
+ if (!isNeedyByPool[needyPoolIdx]) {
+ continue;
+ }
+ AtomicIncrement(pool.IncreasingThreadsByExchange);
+ isNeedyByPool[needyPoolIdx] = false;
+ takingAwayThreads++;
+ pool.SetFullThreadCount(threadCount + 1);
+
+ LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by exchanging", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
+ }
+
+ for (ui16 poolIdx : PriorityOrder) {
+ if (takingAwayThreads <= 0) {
+ break;
+ }
+
+ TPoolInfo &pool = *Pools[poolIdx];
+ size_t threadCount = pool.GetFullThreadCount();
+ size_t additionalThreadsCount = Max<size_t>(0L, threadCount - pool.DefaultFullThreadCount);
+ size_t currentTakingAwayThreads = Min(additionalThreadsCount, takingAwayThreads);
+
+ if (!currentTakingAwayThreads) {
+ continue;
+ }
+ takingAwayThreads -= currentTakingAwayThreads;
+ pool.SetFullThreadCount(threadCount - currentTakingAwayThreads);
+
+ AtomicAdd(pool.DecreasingThreadsByExchange, takingAwayThreads);
+ LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by exchanging", threadCount - currentTakingAwayThreads, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
+ }
+ }
+
+ for (auto &[hoggishPoolIdx, freeCpu] : hoggishPools) {
+ TPoolInfo &pool = *Pools[hoggishPoolIdx];
+ i64 threadCount = pool.GetFullThreadCount();
+ if (hasBorrowedSharedThread[hoggishPoolIdx]) {
+ Shared->ReturnBorrowedHalfThread(hoggishPoolIdx);
+ freeCpu -= 0.5;
+ continue;
+ }
+ if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) {
+ pool.LocalQueueSize = std::min<ui16>(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE, pool.LocalQueueSize / 2);
+ pool.BasicPool->SetLocalQueueSize(pool.LocalQueueSize);
+ }
+ if (threadCount > pool.MinFullThreadCount && freeCpu >= 1) {
+ AtomicIncrement(pool.DecreasingThreadsByHoggishState);
+ LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
+ pool.SetFullThreadCount(threadCount - 1);
+ }
+ }
+
+ for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
+ TPoolInfo& pool = *Pools[poolIdx];
+ AtomicSet(pool.PotentialMaxThreadCount, std::min<i64>(pool.MaxThreadCount, pool.GetThreadCount() + budgetInt));
+ }
+}
+
+void THarmonizer::CalculatePriorityOrder() {
+ PriorityOrder.resize(Pools.size());
+ Iota(PriorityOrder.begin(), PriorityOrder.end(), 0);
+ Sort(PriorityOrder.begin(), PriorityOrder.end(), [&] (i16 lhs, i16 rhs) {
+ if (Pools[lhs]->Priority != Pools[rhs]->Priority) {
+ return Pools[lhs]->Priority < Pools[rhs]->Priority;
+ }
+ return Pools[lhs]->Pool->PoolId > Pools[rhs]->Pool->PoolId;
+ });
+}
+
+void THarmonizer::Harmonize(ui64 ts) {
+ if (IsDisabled || NextHarmonizeTs > ts || !Lock.TryAcquire()) {
+ LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, false);
+ return;
+ }
+ // Check again under the lock
+ if (IsDisabled) {
+ LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, true);
+ Lock.Release();
+ return;
+ }
+ // Will never reach this line disabled
+ ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull));
+ LWPROBE(TryToHarmonizeSuccess, ts, NextHarmonizeTs, previousNextHarmonizeTs);
+
+ {
+ TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_HARMONIZER> activityGuard;
+
+ if (PriorityOrder.empty()) {
+ CalculatePriorityOrder();
+ }
+
+ PullStats(ts);
+ HarmonizeImpl(ts);
+ }
+
+ Lock.Release();
+}
+
+void THarmonizer::DeclareEmergency(ui64 ts) {
+ NextHarmonizeTs = ts;
+}
+
+void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) {
+ TGuard<TSpinLock> guard(Lock);
+ Pools.emplace_back(new TPoolInfo);
+ TPoolInfo &poolInfo = *Pools.back();
+ poolInfo.Pool = pool;
+ poolInfo.Shared = Shared;
+ poolInfo.BasicPool = dynamic_cast<TBasicExecutorPool*>(pool);
+ poolInfo.DefaultThreadCount = pool->GetDefaultThreadCount();
+ poolInfo.MinThreadCount = pool->GetMinThreadCount();
+ poolInfo.MaxThreadCount = pool->GetMaxThreadCount();
+
+ poolInfo.DefaultFullThreadCount = pool->GetDefaultFullThreadCount();
+ poolInfo.MinFullThreadCount = pool->GetMinFullThreadCount();
+ poolInfo.MaxFullThreadCount = pool->GetMaxFullThreadCount();
+ poolInfo.ThreadInfo.resize(poolInfo.MaxFullThreadCount);
+ poolInfo.SharedInfo.resize(Shared ? Shared->GetSharedThreadCount() : 0);
+ poolInfo.Priority = pool->GetPriority();
+ pool->SetFullThreadCount(poolInfo.DefaultFullThreadCount);
+ if (pingInfo) {
+ poolInfo.AvgPingCounter = pingInfo->AvgPingCounter;
+ poolInfo.AvgPingCounterWithSmallWindow = pingInfo->AvgPingCounterWithSmallWindow;
+ poolInfo.MaxAvgPingUs = pingInfo->MaxAvgPingUs;
+ }
+ if (poolInfo.BasicPool) {
+ poolInfo.WaitingStats.reset(new TWaitingStats<ui64>());
+ poolInfo.MovingWaitingStats.reset(new TWaitingStats<double>());
+ }
+ PriorityOrder.clear();
+}
+
+void THarmonizer::Enable(bool enable) {
+ TGuard<TSpinLock> guard(Lock);
+ IsDisabled = enable;
+}
+
+IHarmonizer* MakeHarmonizer(ui64 ts) {
+ return new THarmonizer(ts);
+}
+
+TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const {
+ const TPoolInfo &pool = *Pools[poolId];
+ ui64 flags = RelaxedLoad(&pool.LastFlags);
+ return TPoolHarmonizerStats{
+ .IncreasingThreadsByNeedyState = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)),
+ .IncreasingThreadsByExchange = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByExchange)),
+ .DecreasingThreadsByStarvedState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)),
+ .DecreasingThreadsByHoggishState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)),
+ .DecreasingThreadsByExchange = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByExchange)),
+ .MaxConsumedCpu = pool.MaxConsumedCpu.load(std::memory_order_relaxed),
+ .MinConsumedCpu = pool.MinConsumedCpu.load(std::memory_order_relaxed),
+ .AvgConsumedCpu = pool.AvgConsumedCpu.load(std::memory_order_relaxed),
+ .MaxBookedCpu = pool.MaxBookedCpu.load(std::memory_order_relaxed),
+ .MinBookedCpu = pool.MinBookedCpu.load(std::memory_order_relaxed),
+ .PotentialMaxThreadCount = static_cast<i16>(RelaxedLoad(&pool.PotentialMaxThreadCount)),
+ .IsNeedy = static_cast<bool>(flags & 1),
+ .IsStarved = static_cast<bool>(flags & 2),
+ .IsHoggish = static_cast<bool>(flags & 4),
+ };
+}
+
+THarmonizerStats THarmonizer::GetStats() const {
+ return THarmonizerStats{
+ .MaxConsumedCpu = static_cast<i64>(RelaxedLoad(&MaxConsumedCpu)),
+ .MinConsumedCpu = static_cast<i64>(RelaxedLoad(&MinConsumedCpu)),
+ .MaxBookedCpu = static_cast<i64>(RelaxedLoad(&MaxBookedCpu)),
+ .MinBookedCpu = static_cast<i64>(RelaxedLoad(&MinBookedCpu)),
+ .AvgAwakeningTimeUs = AvgAwakeningTimeUs,
+ .AvgWakingUpTimeUs = AvgWakingUpTimeUs,
+ };
+}
+
+void THarmonizer::SetSharedPool(TSharedExecutorPool* pool) {
+ Shared = pool;
+}
+
+}
diff --git a/ydb/library/actors/core/harmonizer/harmonizer.h b/ydb/library/actors/core/harmonizer.h
index b5140c963b..b4323ef8de 100644
--- a/ydb/library/actors/core/harmonizer/harmonizer.h
+++ b/ydb/library/actors/core/harmonizer.h
@@ -1,10 +1,11 @@
#pragma once
#include "defs.h"
+#include "executor_pool_shared.h"
namespace NActors {
class IExecutorPool;
- class ISharedExecutorPool;
+ class TSharedExecutorPool;
struct TSelfPingInfo;
template <typename T>
@@ -16,38 +17,25 @@ namespace NActors {
ui64 DecreasingThreadsByStarvedState = 0;
ui64 DecreasingThreadsByHoggishState = 0;
ui64 DecreasingThreadsByExchange = 0;
-
- ui64 ReceivedHalfThreadByNeedyState = 0;
- ui64 GivenHalfThreadByOtherStarvedState = 0;
- ui64 GivenHalfThreadByHoggishState = 0;
- ui64 GivenHalfThreadByOtherNeedyState = 0;
- ui64 ReturnedHalfThreadByStarvedState = 0;
- ui64 ReturnedHalfThreadByOtherHoggishState = 0;
-
- float MaxCpuUs = 0.0;
- float MinCpuUs = 0.0;
- float AvgCpuUs = 0.0;
- float MaxElapsedUs = 0.0;
- float MinElapsedUs = 0.0;
- float AvgElapsedUs = 0.0;
+ float MaxConsumedCpu = 0.0;
+ float MinConsumedCpu = 0.0;
+ float AvgConsumedCpu = 0.0;
+ float MaxBookedCpu = 0.0;
+ float MinBookedCpu = 0.0;
i16 PotentialMaxThreadCount = 0;
bool IsNeedy = false;
bool IsStarved = false;
bool IsHoggish = false;
-
- TString ToString() const;
};
struct THarmonizerStats {
- i64 MaxCpuUs = 0.0;
- i64 MinCpuUs = 0.0;
- i64 MaxElapsedUs = 0.0;
- i64 MinElapsedUs = 0.0;
+ i64 MaxConsumedCpu = 0.0;
+ i64 MinConsumedCpu = 0.0;
+ i64 MaxBookedCpu = 0.0;
+ i64 MinBookedCpu = 0.0;
double AvgAwakeningTimeUs = 0;
double AvgWakingUpTimeUs = 0;
-
- TString ToString() const;
};
// Pool cpu harmonizer
@@ -60,7 +48,7 @@ namespace NActors {
virtual void Enable(bool enable) = 0;
virtual TPoolHarmonizerStats GetPoolStats(i16 poolId) const = 0;
virtual THarmonizerStats GetStats() const = 0;
- virtual void SetSharedPool(ISharedExecutorPool* pool) = 0;
+ virtual void SetSharedPool(TSharedExecutorPool* pool) = 0;
};
IHarmonizer* MakeHarmonizer(ui64 ts);
diff --git a/ydb/library/actors/core/harmonizer/cpu_consumption.cpp b/ydb/library/actors/core/harmonizer/cpu_consumption.cpp
deleted file mode 100644
index a2bd5c20e3..0000000000
--- a/ydb/library/actors/core/harmonizer/cpu_consumption.cpp
+++ /dev/null
@@ -1,171 +0,0 @@
-#include "cpu_consumption.h"
-#include "debug.h"
-
-namespace NActors {
-
-LWTRACE_USING(ACTORLIB_PROVIDER);
-
-void TCpuConsumptionInfo::Clear() {
- Elapsed = 0.0;
- Cpu = 0.0;
- LastSecondElapsed = 0.0;
- LastSecondCpu = 0.0;
-}
-
-void THarmonizerCpuConsumption::Init(i16 poolCount) {
- PoolConsumption.resize(poolCount);
- IsNeedyByPool.reserve(poolCount);
- NeedyPools.reserve(poolCount);
- HoggishPools.reserve(poolCount);
-}
-
-namespace {
-
- float Rescale(float value) {
- return Max(0.0, Min(1.0, value * (1.0/0.9)));
- }
-
- void UpdatePoolConsumption(const TPoolInfo& pool, TCpuConsumptionInfo *poolConsumption) {
- poolConsumption->Clear();
- for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) {
- float threadElapsed = Rescale(pool.GetElapsed(threadIdx));
- float threadLastSecondElapsed = Rescale(pool.GetLastSecondElapsed(threadIdx));
- float threadCpu = Rescale(pool.GetCpu(threadIdx));
- float threadLastSecondCpu = Rescale(pool.GetLastSecondCpu(threadIdx));
- poolConsumption->Elapsed += threadElapsed;
- poolConsumption->LastSecondElapsed += threadLastSecondElapsed;
- poolConsumption->Cpu += threadCpu;
- poolConsumption->LastSecondCpu += threadLastSecondCpu;
- LWPROBE(HarmonizeCheckPoolByThread, pool.Pool->PoolId, pool.Pool->GetName(), threadIdx, threadElapsed, threadCpu, threadLastSecondElapsed, threadLastSecondCpu);
- }
- for (ui32 sharedIdx = 0; sharedIdx < pool.SharedInfo.size(); ++sharedIdx) {
- float sharedElapsed = Rescale(pool.GetSharedElapsed(sharedIdx));
- float sharedLastSecondElapsed = Rescale(pool.GetLastSecondSharedElapsed(sharedIdx));
- float sharedCpu = Rescale(pool.GetSharedCpu(sharedIdx));
- float sharedLastSecondCpu = Rescale(pool.GetLastSecondSharedCpu(sharedIdx));
- poolConsumption->Elapsed += sharedElapsed;
- poolConsumption->LastSecondElapsed += sharedLastSecondElapsed;
- poolConsumption->Cpu += sharedCpu;
- poolConsumption->LastSecondCpu += sharedLastSecondCpu;
- LWPROBE(HarmonizeCheckPoolByThread, pool.Pool->PoolId, pool.Pool->GetName(), -1 - sharedIdx, sharedElapsed, sharedCpu, sharedLastSecondElapsed, sharedLastSecondCpu);
- }
- }
-
- bool IsStarved(double elapsed, double cpu) {
- return Max(elapsed, cpu) > 0.1 && (cpu < elapsed * 0.7 || elapsed - cpu > 0.5);
- }
-
- bool IsHoggish(double elapsed, double currentThreadCount) {
- return elapsed < currentThreadCount - 0.5;
- }
-
-} // namespace
-
-
-void THarmonizerCpuConsumption::Pull(const std::vector<std::unique_ptr<TPoolInfo>> &pools, const TSharedInfo& sharedInfo) {
- FreeHalfThread.clear();
- NeedyPools.clear();
- HoggishPools.clear();
- IsNeedyByPool.clear();
-
-
- TotalCores = 0;
- AdditionalThreads = 0;
- StoppingThreads = 0;
- IsStarvedPresent = false;
- Elapsed = 0.0;
- Cpu = 0.0;
- LastSecondElapsed = 0.0;
- LastSecondCpu = 0.0;
- for (size_t poolIdx = 0; poolIdx < pools.size(); ++poolIdx) {
- TPoolInfo& pool = *pools[poolIdx];
- TotalCores += pool.DefaultThreadCount;
-
- AdditionalThreads += Max(0, pool.GetFullThreadCount() - pool.DefaultFullThreadCount);
- float currentThreadCount = pool.GetThreadCount();
- StoppingThreads += pool.Pool->GetBlockingThreadCount();
- HARMONIZER_DEBUG_PRINT("pool", poolIdx, "pool name", pool.Pool->GetName(), "current thread count", currentThreadCount, "stopping threads", StoppingThreads, "default thread count", pool.DefaultThreadCount);
-
- UpdatePoolConsumption(pool, &PoolConsumption[poolIdx]);
-
- HARMONIZER_DEBUG_PRINT("pool", poolIdx, "pool name", pool.Pool->GetName(), "elapsed", PoolConsumption[poolIdx].Elapsed, "cpu", PoolConsumption[poolIdx].Cpu, "last second elapsed", PoolConsumption[poolIdx].LastSecondElapsed, "last second cpu", PoolConsumption[poolIdx].LastSecondCpu);
-
- bool isStarved = IsStarved(PoolConsumption[poolIdx].Elapsed, PoolConsumption[poolIdx].Cpu)
- || IsStarved(PoolConsumption[poolIdx].LastSecondElapsed, PoolConsumption[poolIdx].LastSecondCpu);
- if (isStarved) {
- IsStarvedPresent = true;
- }
-
- bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (PoolConsumption[poolIdx].Cpu >= currentThreadCount);
- IsNeedyByPool.push_back(isNeedy);
- if (isNeedy) {
- NeedyPools.push_back(poolIdx);
- }
-
- if (currentThreadCount - PoolConsumption[poolIdx].Elapsed > 0.5) {
- if (sharedInfo.HasBorrowedSharedThread[poolIdx] || sharedInfo.HasSharedThreadWhichWasNotBorrowed[poolIdx]) {
- FreeHalfThread.push_back(poolIdx);
- }
- }
-
- bool isHoggish = IsHoggish(PoolConsumption[poolIdx].Elapsed, currentThreadCount)
- || IsHoggish(PoolConsumption[poolIdx].LastSecondElapsed, currentThreadCount);
- if (isHoggish) {
- float freeCpu = std::max(currentThreadCount - PoolConsumption[poolIdx].Elapsed, currentThreadCount - PoolConsumption[poolIdx].LastSecondElapsed);
- HoggishPools.push_back({poolIdx, freeCpu});
- }
-
- Elapsed += PoolConsumption[poolIdx].Elapsed;
- Cpu += PoolConsumption[poolIdx].Cpu;
- LastSecondElapsed += PoolConsumption[poolIdx].LastSecondElapsed;
- LastSecondCpu += PoolConsumption[poolIdx].LastSecondCpu;
- pool.LastFlags.store((i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2), std::memory_order_relaxed);
- LWPROBE_WITH_DEBUG(
- HarmonizeCheckPool,
- poolIdx,
- pool.Pool->GetName(),
- PoolConsumption[poolIdx].Elapsed,
- PoolConsumption[poolIdx].Cpu,
- PoolConsumption[poolIdx].LastSecondElapsed,
- PoolConsumption[poolIdx].LastSecondCpu,
- currentThreadCount,
- pool.MaxFullThreadCount,
- isStarved,
- isNeedy,
- isHoggish
- );
- }
-
- if (NeedyPools.size()) {
- Sort(NeedyPools.begin(), NeedyPools.end(), [&] (i16 lhs, i16 rhs) {
- if (pools[lhs]->Priority != pools[rhs]->Priority) {
- return pools[lhs]->Priority > pools[rhs]->Priority;
- }
- return pools[lhs]->Pool->PoolId < pools[rhs]->Pool->PoolId;
- });
- }
-
- if (FreeHalfThread.size()) {
- Sort(FreeHalfThread.begin(), FreeHalfThread.end(), [&] (i16 lhs, i16 rhs) {
- if (pools[lhs]->Priority != pools[rhs]->Priority) {
- return pools[lhs]->Priority > pools[rhs]->Priority;
- }
- return pools[lhs]->Pool->PoolId < pools[rhs]->Pool->PoolId;
- });
- }
-
- HARMONIZER_DEBUG_PRINT("NeedyPools", NeedyPools.size(), "FreeHalfThread", FreeHalfThread.size(), "HoggishPools", HoggishPools.size());
-
- Budget = TotalCores - Max(Elapsed, LastSecondElapsed);
- BudgetInt = static_cast<i16>(Max(Budget, 0.0f));
- if (Budget < -0.1) {
- IsStarvedPresent = true;
- }
- Overbooked = Elapsed - Cpu;
- if (Overbooked < 0) {
- IsStarvedPresent = false;
- }
- HARMONIZER_DEBUG_PRINT("IsStarvedPresent", IsStarvedPresent, "Budget", Budget, "BudgetInt", BudgetInt, "Overbooked", Overbooked, "TotalCores", TotalCores, "Elapsed", Elapsed, "Cpu", Cpu);
-}
-
-} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/cpu_consumption.h b/ydb/library/actors/core/harmonizer/cpu_consumption.h
deleted file mode 100644
index 0fbfd85b6e..0000000000
--- a/ydb/library/actors/core/harmonizer/cpu_consumption.h
+++ /dev/null
@@ -1,45 +0,0 @@
-#pragma once
-
-#include "defs.h"
-#include "pool.h"
-#include "shared_info.h"
-namespace NActors {
-
-struct TPoolInfo;
-
-struct TCpuConsumptionInfo {
- float Elapsed;
- float Cpu;
- float LastSecondElapsed;
- float LastSecondCpu;
-
- void Clear();
-}; // struct TCpuConsumptionInfo
-
-struct THarmonizerCpuConsumption {
- std::vector<TCpuConsumptionInfo> PoolConsumption;
-
- float TotalCores = 0;
- i16 AdditionalThreads = 0;
- i16 StoppingThreads = 0;
- bool IsStarvedPresent = false;
- float Budget = 0.0;
- i16 BudgetInt = 0;
- float Overbooked = 0.0;
-
- float Elapsed = 0.0;
- float Cpu = 0.0;
- float LastSecondElapsed = 0.0;
- float LastSecondCpu = 0.0;
- TStackVec<i16, 8> NeedyPools;
- TStackVec<std::pair<i16, float>, 8> HoggishPools;
- TStackVec<bool, 8> IsNeedyByPool;
- std::vector<i16> FreeHalfThread;
-
- void Init(i16 poolCount);
-
- void Pull(const std::vector<std::unique_ptr<TPoolInfo>> &pools, const TSharedInfo& sharedInfo);
-
-}; // struct THarmonizerCpuConsumption
-
-} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/debug.h b/ydb/library/actors/core/harmonizer/debug.h
deleted file mode 100644
index 007d7d8012..0000000000
--- a/ydb/library/actors/core/harmonizer/debug.h
+++ /dev/null
@@ -1,23 +0,0 @@
-#pragma once
-
-#include <ydb/library/actors/core/probes.h>
-
-namespace NActors {
-
-constexpr bool DebugHarmonizer = false;
-
-template <typename ... TArgs>
-void Print(TArgs&& ... args) {
- ((Cerr << std::forward<TArgs>(args) << " "), ...) << Endl;
-}
-
-#define HARMONIZER_DEBUG_PRINT(...) \
- if constexpr (DebugHarmonizer) { Print(__VA_ARGS__); }
-// HARMONIZER_DEBUG_PRINT(...)
-
-#define LWPROBE_WITH_DEBUG(probe, ...) \
- LWPROBE(probe, __VA_ARGS__); \
- HARMONIZER_DEBUG_PRINT(#probe, __VA_ARGS__); \
-// LWPROBE_WITH_DEBUG(probe, ...)
-
-} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/defs.h b/ydb/library/actors/core/harmonizer/defs.h
deleted file mode 100644
index 4e18506740..0000000000
--- a/ydb/library/actors/core/harmonizer/defs.h
+++ /dev/null
@@ -1,3 +0,0 @@
-#pragma once
-
-#include <ydb/library/actors/core/defs.h>
diff --git a/ydb/library/actors/core/harmonizer/harmonizer.cpp b/ydb/library/actors/core/harmonizer/harmonizer.cpp
deleted file mode 100644
index e6b077b240..0000000000
--- a/ydb/library/actors/core/harmonizer/harmonizer.cpp
+++ /dev/null
@@ -1,485 +0,0 @@
-#include "harmonizer.h"
-#include "history.h"
-#include "pool.h"
-#include "waiting_stats.h"
-#include "cpu_consumption.h"
-#include "shared_info.h"
-#include "debug.h"
-#include <ydb/library/actors/core/executor_pool.h>
-
-#include <ydb/library/actors/core/executor_thread_ctx.h>
-#include <ydb/library/actors/core/executor_thread.h>
-#include <ydb/library/actors/core/probes.h>
-
-#include <ydb/library/actors/core/activity_guard.h>
-#include <ydb/library/actors/core/actorsystem.h>
-#include <ydb/library/actors/core/executor_pool_basic.h>
-#include <ydb/library/actors/core/executor_pool_basic_feature_flags.h>
-#include <ydb/library/actors/core/executor_pool_shared.h>
-
-#include <atomic>
-#include <ydb/library/actors/util/cpu_load_log.h>
-#include <ydb/library/actors/util/datetime.h>
-#include <ydb/library/actors/util/intrinsics.h>
-
-#include <util/system/spinlock.h>
-
-#include <algorithm>
-
-namespace NActors {
-
-
-
-LWTRACE_USING(ACTORLIB_PROVIDER);
-
-
-class THarmonizer: public IHarmonizer {
-private:
- std::atomic<bool> IsDisabled = false;
- TSpinLock Lock;
- std::atomic<ui64> NextHarmonizeTs = 0;
- std::vector<std::unique_ptr<TPoolInfo>> Pools;
- std::vector<ui16> PriorityOrder;
-
- TValueHistory<16> CpuUs;
- TValueHistory<16> Elapsed;
-
- std::atomic<i64> MaxCpuUs = 0;
- std::atomic<i64> MinCpuUs = 0;
- std::atomic<i64> MaxElapsedUs = 0;
- std::atomic<i64> MinElapsedUs = 0;
-
- ISharedExecutorPool* Shared = nullptr;
- TSharedInfo SharedInfo;
-
- TWaitingInfo WaitingInfo;
- THarmonizerCpuConsumption CpuConsumption;
- THarmonizerStats Stats;
- float ProcessingBudget = 0.0;
-
- void PullStats(ui64 ts);
- void PullSharedInfo();
- void ProcessWaitingStats();
- void HarmonizeImpl(ui64 ts);
- void CalculatePriorityOrder();
- void ProcessStarvedState();
- void ProcessNeedyState();
- void ProcessExchange();
- void ProcessHoggishState();
-public:
- THarmonizer(ui64 ts);
- virtual ~THarmonizer();
- double Rescale(double value) const;
- void Harmonize(ui64 ts) override;
- void DeclareEmergency(ui64 ts) override;
- void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override;
- void Enable(bool enable) override;
- TPoolHarmonizerStats GetPoolStats(i16 poolId) const override;
- THarmonizerStats GetStats() const override;
- void SetSharedPool(ISharedExecutorPool* pool) override;
-};
-
-THarmonizer::THarmonizer(ui64 ts) {
- NextHarmonizeTs = ts;
-}
-
-THarmonizer::~THarmonizer() {
-}
-
-void THarmonizer::PullStats(ui64 ts) {
- HARMONIZER_DEBUG_PRINT("PullStats");
- TCpuConsumption acc;
- for (auto &pool : Pools) {
- TCpuConsumption consumption = pool->PullStats(ts);
- acc.Add(consumption);
- }
- CpuUs.Register(ts, acc.CpuUs);
- MaxCpuUs.store(CpuUs.GetMaxInt(), std::memory_order_relaxed);
- MinCpuUs.store(CpuUs.GetMinInt(), std::memory_order_relaxed);
- Elapsed.Register(ts, acc.ElapsedUs);
- MaxElapsedUs.store(Elapsed.GetMaxInt(), std::memory_order_relaxed);
- MinElapsedUs.store(Elapsed.GetMinInt(), std::memory_order_relaxed);
-
- WaitingInfo.Pull(Pools);
- if (Shared) {
- SharedInfo.Pull(*Shared);
- }
- CpuConsumption.Pull(Pools, SharedInfo);
- ProcessingBudget = CpuConsumption.Budget;
-}
-
-void THarmonizer::ProcessWaitingStats() {
- HARMONIZER_DEBUG_PRINT("ProcessWaitingStats");
- for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
- TPoolInfo& pool = *Pools[poolIdx];
- if (!pool.BasicPool) {
- continue;
- }
- if (pool.BasicPool->ActorSystemProfile != EASProfile::Default) {
- if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) {
- pool.BasicPool->CalcSpinPerThread(WaitingInfo.AvgWakingUpTimeUs);
- } else if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) {
- ui64 newSpinThreshold = pool.MovingWaitingStats->CalculateGoodSpinThresholdCycles(WaitingInfo.AvgWakingUpTimeUs);
- pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold);
- } else {
- ui64 newSpinThreshold = pool.WaitingStats->CalculateGoodSpinThresholdCycles(WaitingInfo.AvgWakingUpTimeUs);
- pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold);
- }
- pool.BasicPool->ClearWaitingStats();
- }
- }
-}
-
-void THarmonizer::ProcessStarvedState() {
- HARMONIZER_DEBUG_PRINT("ProcessStarvedState");
- HARMONIZER_DEBUG_PRINT("shared info", SharedInfo.ToString());
- for (ui16 poolIdx : PriorityOrder) {
- TPoolInfo &pool = *Pools[poolIdx];
- i64 threadCount = pool.GetFullThreadCount();
- if (SharedInfo.HasSharedThread[poolIdx] && !SharedInfo.HasSharedThreadWhichWasNotBorrowed[poolIdx]) {
- HARMONIZER_DEBUG_PRINT("return own half thread", poolIdx);
- i16 borrowedPool = Shared->ReturnOwnHalfThread(poolIdx);
- pool.ReturnedHalfThreadByStarvedState.fetch_add(1, std::memory_order_relaxed);
- Y_ABORT_UNLESS(borrowedPool != -1);
- Pools[borrowedPool]->GivenHalfThreadByOtherStarvedState.fetch_add(1, std::memory_order_relaxed);
- } else {
- HARMONIZER_DEBUG_PRINT("no own half thread", poolIdx, "has shared thread", SharedInfo.HasSharedThread[poolIdx], "has shared thread which was not borrowed", SharedInfo.HasSharedThreadWhichWasNotBorrowed[poolIdx]);
- }
- HARMONIZER_DEBUG_PRINT("poolIdx", poolIdx, "threadCount", threadCount, "pool.DefaultFullThreadCount", pool.DefaultFullThreadCount);
- while (threadCount > pool.DefaultFullThreadCount) {
- pool.SetFullThreadCount(--threadCount);
- pool.DecreasingThreadsByStarvedState.fetch_add(1, std::memory_order_relaxed);
- CpuConsumption.AdditionalThreads--;
- CpuConsumption.StoppingThreads++;
-
- LWPROBE_WITH_DEBUG(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by starving", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
- if (CpuConsumption.Overbooked <= CpuConsumption.StoppingThreads) {
- break;
- }
- }
- if (CpuConsumption.Overbooked <= CpuConsumption.StoppingThreads) {
- break;
- }
- }
-}
-
-void THarmonizer::ProcessNeedyState() {
- HARMONIZER_DEBUG_PRINT("ProcessNeedyState");
- if (CpuConsumption.NeedyPools.empty()) {
- HARMONIZER_DEBUG_PRINT("No needy pools");
- return;
- }
- for (size_t needyPoolIdx : CpuConsumption.NeedyPools) {
- TPoolInfo &pool = *Pools[needyPoolIdx];
- if (!CpuConsumption.IsNeedyByPool[needyPoolIdx]) {
- continue;
- }
- i64 threadCount = pool.GetFullThreadCount();
- if (Shared && ProcessingBudget >= 0.5 && !SharedInfo.HasBorrowedSharedThread[needyPoolIdx] && CpuConsumption.FreeHalfThread.size()) {
- i16 poolWithHalfThread = CpuConsumption.FreeHalfThread.back();
- Shared->GiveHalfThread(poolWithHalfThread, needyPoolIdx);
- CpuConsumption.FreeHalfThread.pop_back();
- CpuConsumption.IsNeedyByPool[needyPoolIdx] = false;
- ProcessingBudget -= 0.5;
- pool.ReceivedHalfThreadByNeedyState.fetch_add(1, std::memory_order_relaxed);
- Pools[poolWithHalfThread]->GivenHalfThreadByOtherNeedyState.fetch_add(1, std::memory_order_relaxed);
- } else if (ProcessingBudget >= 1.0) {
- if (threadCount + 1 <= pool.MaxFullThreadCount) {
- pool.IncreasingThreadsByNeedyState.fetch_add(1, std::memory_order_relaxed);
- CpuConsumption.IsNeedyByPool[needyPoolIdx] = false;
- CpuConsumption.AdditionalThreads++;
- pool.SetFullThreadCount(threadCount + 1);
- ProcessingBudget -= 1.0;
- LWPROBE_WITH_DEBUG(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
- }
- }
- if constexpr (NFeatures::IsLocalQueues()) {
- bool needToExpandLocalQueue = ProcessingBudget < 1.0 || threadCount >= pool.MaxFullThreadCount;
- needToExpandLocalQueue &= (bool)pool.BasicPool;
- needToExpandLocalQueue &= (pool.MaxFullThreadCount > 1);
- needToExpandLocalQueue &= (pool.LocalQueueSize < NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE);
- if (needToExpandLocalQueue) {
- pool.BasicPool->SetLocalQueueSize(++pool.LocalQueueSize);
- }
- }
- }
-}
-
-void THarmonizer::ProcessExchange() {
- HARMONIZER_DEBUG_PRINT("ProcessExchange");
- if (CpuConsumption.NeedyPools.empty()) {
- HARMONIZER_DEBUG_PRINT("No needy pools");
- return;
- }
- size_t takingAwayThreads = 0;
- size_t sumOfAdditionalThreads = CpuConsumption.AdditionalThreads;
- for (size_t needyPoolIdx : CpuConsumption.NeedyPools) {
- TPoolInfo &pool = *Pools[needyPoolIdx];
- i64 threadCount = pool.GetFullThreadCount();
- sumOfAdditionalThreads -= threadCount - pool.DefaultFullThreadCount;
- if (sumOfAdditionalThreads < takingAwayThreads + 1) {
- break;
- }
- if (!CpuConsumption.IsNeedyByPool[needyPoolIdx]) {
- continue;
- }
- pool.IncreasingThreadsByExchange.fetch_add(1, std::memory_order_relaxed);
- CpuConsumption.IsNeedyByPool[needyPoolIdx] = false;
- takingAwayThreads++;
- pool.SetFullThreadCount(threadCount + 1);
-
- LWPROBE_WITH_DEBUG(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by exchanging", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
- }
-
- for (ui16 poolIdx : PriorityOrder) {
- if (takingAwayThreads <= 0) {
- break;
- }
-
- TPoolInfo &pool = *Pools[poolIdx];
- size_t threadCount = pool.GetFullThreadCount();
- size_t additionalThreadsCount = Max<size_t>(0L, threadCount - pool.DefaultFullThreadCount);
- size_t currentTakingAwayThreads = Min(additionalThreadsCount, takingAwayThreads);
-
- if (!currentTakingAwayThreads) {
- continue;
- }
- takingAwayThreads -= currentTakingAwayThreads;
- pool.SetFullThreadCount(threadCount - currentTakingAwayThreads);
-
- pool.DecreasingThreadsByExchange.fetch_add(currentTakingAwayThreads, std::memory_order_relaxed);
- LWPROBE_WITH_DEBUG(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by exchanging", threadCount - currentTakingAwayThreads, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
- }
-}
-
-void THarmonizer::ProcessHoggishState() {
- HARMONIZER_DEBUG_PRINT("ProcessHoggishState");
- for (auto &[hoggishPoolIdx, freeCpu] : CpuConsumption.HoggishPools) {
- TPoolInfo &pool = *Pools[hoggishPoolIdx];
- i64 threadCount = pool.GetFullThreadCount();
- if (SharedInfo.HasBorrowedSharedThread[hoggishPoolIdx]) {
- i16 originalPool = Shared->ReturnBorrowedHalfThread(hoggishPoolIdx);
- HARMONIZER_DEBUG_PRINT("has borrowed shared thread", hoggishPoolIdx, "will return half thread to", originalPool);
- freeCpu -= 0.5;
- pool.GivenHalfThreadByHoggishState.fetch_add(1, std::memory_order_relaxed);
- Y_ABORT_UNLESS(originalPool != -1);
- Pools[originalPool]->ReturnedHalfThreadByOtherHoggishState.fetch_add(1, std::memory_order_relaxed);
- }
- if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) {
- pool.LocalQueueSize = std::min<ui16>(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE, pool.LocalQueueSize / 2);
- pool.BasicPool->SetLocalQueueSize(pool.LocalQueueSize);
- }
- HARMONIZER_DEBUG_PRINT("poolIdx", hoggishPoolIdx, "threadCount", threadCount, "pool.MinFullThreadCount", pool.MinFullThreadCount, "freeCpu", freeCpu);
- if (threadCount > pool.MinFullThreadCount && freeCpu >= 1) {
- pool.DecreasingThreadsByHoggishState.fetch_add(1, std::memory_order_relaxed);
- pool.SetFullThreadCount(threadCount - 1);
- LWPROBE_WITH_DEBUG(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
- }
- }
-}
-
-void THarmonizer::HarmonizeImpl(ui64 ts) {
- HARMONIZER_DEBUG_PRINT("HarmonizeImpl");
- Y_UNUSED(ts);
-
- ProcessWaitingStats();
-
- for (size_t needyPoolIdx : CpuConsumption.NeedyPools) {
- TPoolInfo &pool = *Pools[needyPoolIdx];
- if (pool.AvgPingCounter && pool.LastUpdateTs + Us2Ts(3'000'000) > ts) {
- CpuConsumption.IsNeedyByPool[needyPoolIdx] = false;
- HARMONIZER_DEBUG_PRINT("pool won't be updated because time", needyPoolIdx);
- }
- }
-
- HARMONIZER_DEBUG_PRINT("IsStarvedPresent", CpuConsumption.IsStarvedPresent, "Overbooked", CpuConsumption.Overbooked, "StoppingThreads", CpuConsumption.StoppingThreads);
- if (CpuConsumption.IsStarvedPresent && CpuConsumption.Overbooked >= CpuConsumption.StoppingThreads) {
- ProcessStarvedState();
- } else if (!CpuConsumption.IsStarvedPresent) {
- ProcessNeedyState();
- }
-
- if (ProcessingBudget < 1.0) {
- ProcessExchange();
- }
-
- if (!CpuConsumption.HoggishPools.empty()) {
- ProcessHoggishState();
- }
-
- for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
- TPoolInfo& pool = *Pools[poolIdx];
- pool.PotentialMaxThreadCount.store(std::min<i64>(pool.MaxThreadCount, static_cast<i64>(pool.GetThreadCount() + CpuConsumption.Budget)), std::memory_order_relaxed);
- HARMONIZER_DEBUG_PRINT(poolIdx, pool.Pool->GetName(), "potential max thread count", pool.PotentialMaxThreadCount.load(std::memory_order_relaxed), "budget", CpuConsumption.Budget, "thread count", pool.GetThreadCount());
- }
-}
-
-void THarmonizer::CalculatePriorityOrder() {
- PriorityOrder.resize(Pools.size());
- Iota(PriorityOrder.begin(), PriorityOrder.end(), 0);
- Sort(PriorityOrder.begin(), PriorityOrder.end(), [&] (i16 lhs, i16 rhs) {
- if (Pools[lhs]->Priority != Pools[rhs]->Priority) {
- return Pools[lhs]->Priority < Pools[rhs]->Priority;
- }
- return Pools[lhs]->Pool->PoolId > Pools[rhs]->Pool->PoolId;
- });
-}
-
-void THarmonizer::Harmonize(ui64 ts) {
- if (IsDisabled.load(std::memory_order_relaxed) || NextHarmonizeTs.load(std::memory_order_acquire) > ts || !Lock.TryAcquire()) {
- LWPROBE_WITH_DEBUG(TryToHarmonizeFailed, ts, NextHarmonizeTs.load(std::memory_order_relaxed), IsDisabled.load(std::memory_order_relaxed), false);
- return;
- }
-
- if (NextHarmonizeTs.load(std::memory_order_acquire) > ts) {
- Lock.Release();
- return;
- }
-
- // Check again under the lock
- if (IsDisabled.load(std::memory_order_relaxed)) {
- LWPROBE_WITH_DEBUG(TryToHarmonizeFailed, ts, NextHarmonizeTs.load(std::memory_order_relaxed), IsDisabled.load(std::memory_order_relaxed), true);
- Lock.Release();
- return;
- }
- // Will never reach this line disabled
- ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull), std::memory_order_acquire);
- LWPROBE_WITH_DEBUG(TryToHarmonizeSuccess, ts, NextHarmonizeTs.load(std::memory_order_relaxed), previousNextHarmonizeTs);
-
- {
- TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_HARMONIZER> activityGuard;
-
- if (PriorityOrder.empty()) {
- CalculatePriorityOrder();
- SharedInfo.Init(Pools.size());
- CpuConsumption.Init(Pools.size());
- }
-
- PullStats(ts);
- HarmonizeImpl(ts);
- }
-
- Lock.Release();
-}
-
-void THarmonizer::DeclareEmergency(ui64 ts) {
- NextHarmonizeTs = ts;
-}
-
-void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) {
- TGuard<TSpinLock> guard(Lock);
- Pools.emplace_back(new TPoolInfo);
- TPoolInfo &poolInfo = *Pools.back();
- poolInfo.Pool = pool;
- poolInfo.Shared = Shared;
- poolInfo.BasicPool = dynamic_cast<TBasicExecutorPool*>(pool);
- poolInfo.DefaultThreadCount = pool->GetDefaultThreadCount();
- poolInfo.MinThreadCount = pool->GetMinThreadCount();
- poolInfo.MaxThreadCount = pool->GetMaxThreadCount();
- poolInfo.PotentialMaxThreadCount = poolInfo.MaxThreadCount;
-
- poolInfo.DefaultFullThreadCount = pool->GetDefaultFullThreadCount();
- poolInfo.MinFullThreadCount = pool->GetMinFullThreadCount();
- poolInfo.MaxFullThreadCount = pool->GetMaxFullThreadCount();
- poolInfo.ThreadInfo.resize(poolInfo.MaxFullThreadCount);
- poolInfo.SharedInfo.resize(Shared ? Shared->GetSharedThreadCount() : 0);
- poolInfo.Priority = pool->GetPriority();
- pool->SetFullThreadCount(poolInfo.DefaultFullThreadCount);
- if (pingInfo) {
- poolInfo.AvgPingCounter = pingInfo->AvgPingCounter;
- poolInfo.AvgPingCounterWithSmallWindow = pingInfo->AvgPingCounterWithSmallWindow;
- poolInfo.MaxAvgPingUs = pingInfo->MaxAvgPingUs;
- }
- if (poolInfo.BasicPool) {
- poolInfo.WaitingStats.reset(new TWaitingStats<ui64>());
- poolInfo.MovingWaitingStats.reset(new TWaitingStats<double>());
- }
- PriorityOrder.clear();
-}
-
-void THarmonizer::Enable(bool enable) {
- TGuard<TSpinLock> guard(Lock);
- IsDisabled = enable;
-}
-
-IHarmonizer* MakeHarmonizer(ui64 ts) {
- return new THarmonizer(ts);
-}
-
-TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const {
- const TPoolInfo &pool = *Pools[poolId];
- ui64 flags = pool.LastFlags.load(std::memory_order_relaxed);
- return TPoolHarmonizerStats{
- .IncreasingThreadsByNeedyState = pool.IncreasingThreadsByNeedyState.load(std::memory_order_relaxed),
- .IncreasingThreadsByExchange = pool.IncreasingThreadsByExchange.load(std::memory_order_relaxed),
- .DecreasingThreadsByStarvedState = pool.DecreasingThreadsByStarvedState.load(std::memory_order_relaxed),
- .DecreasingThreadsByHoggishState = pool.DecreasingThreadsByHoggishState.load(std::memory_order_relaxed),
- .DecreasingThreadsByExchange = pool.DecreasingThreadsByExchange.load(std::memory_order_relaxed),
- .ReceivedHalfThreadByNeedyState = pool.ReceivedHalfThreadByNeedyState.load(std::memory_order_relaxed),
- .GivenHalfThreadByOtherStarvedState = pool.GivenHalfThreadByOtherStarvedState.load(std::memory_order_relaxed),
- .GivenHalfThreadByHoggishState = pool.GivenHalfThreadByHoggishState.load(std::memory_order_relaxed),
- .GivenHalfThreadByOtherNeedyState = pool.GivenHalfThreadByOtherNeedyState.load(std::memory_order_relaxed),
- .ReturnedHalfThreadByStarvedState = pool.ReturnedHalfThreadByStarvedState.load(std::memory_order_relaxed),
- .ReturnedHalfThreadByOtherHoggishState = pool.ReturnedHalfThreadByOtherHoggishState.load(std::memory_order_relaxed),
- .MaxCpuUs = pool.MaxCpuUs.load(std::memory_order_relaxed),
- .MinCpuUs = pool.MinCpuUs.load(std::memory_order_relaxed),
- .AvgCpuUs = pool.AvgCpuUs.load(std::memory_order_relaxed),
- .MaxElapsedUs = pool.MaxElapsedUs.load(std::memory_order_relaxed),
- .MinElapsedUs = pool.MinElapsedUs.load(std::memory_order_relaxed),
- .PotentialMaxThreadCount = pool.PotentialMaxThreadCount.load(std::memory_order_relaxed),
- .IsNeedy = static_cast<bool>(flags & 1),
- .IsStarved = static_cast<bool>(flags & 2),
- .IsHoggish = static_cast<bool>(flags & 4),
- };
-}
-
-THarmonizerStats THarmonizer::GetStats() const {
- return THarmonizerStats{
- .MaxCpuUs = MaxCpuUs.load(std::memory_order_relaxed),
- .MinCpuUs = MinCpuUs.load(std::memory_order_relaxed),
- .MaxElapsedUs = MaxElapsedUs.load(std::memory_order_relaxed),
- .MinElapsedUs = MinElapsedUs.load(std::memory_order_relaxed),
- .AvgAwakeningTimeUs = WaitingInfo.AvgAwakeningTimeUs.load(std::memory_order_relaxed),
- .AvgWakingUpTimeUs = WaitingInfo.AvgWakingUpTimeUs.load(std::memory_order_relaxed),
- };
-}
-
-void THarmonizer::SetSharedPool(ISharedExecutorPool* pool) {
- Shared = pool;
-}
-
-TString TPoolHarmonizerStats::ToString() const {
- return TStringBuilder() << '{'
- << "IncreasingThreadsByNeedyState: " << IncreasingThreadsByNeedyState << ", "
- << "IncreasingThreadsByExchange: " << IncreasingThreadsByExchange << ", "
- << "DecreasingThreadsByStarvedState: " << DecreasingThreadsByStarvedState << ", "
- << "DecreasingThreadsByHoggishState: " << DecreasingThreadsByHoggishState << ", "
- << "DecreasingThreadsByExchange: " << DecreasingThreadsByExchange << ", "
- << "ReceivedHalfThreadByNeedyState: " << ReceivedHalfThreadByNeedyState << ", "
- << "GivenHalfThreadByOtherStarvedState: " << GivenHalfThreadByOtherStarvedState << ", "
- << "GivenHalfThreadByHoggishState: " << GivenHalfThreadByHoggishState << ", "
- << "GivenHalfThreadByOtherNeedyState: " << GivenHalfThreadByOtherNeedyState << ", "
- << "ReturnedHalfThreadByStarvedState: " << ReturnedHalfThreadByStarvedState << ", "
- << "ReturnedHalfThreadByOtherHoggishState: " << ReturnedHalfThreadByOtherHoggishState << ", "
- << "MaxCpuUs: " << MaxCpuUs << ", "
- << "MinCpuUs: " << MinCpuUs << ", "
- << "AvgCpuUs: " << AvgCpuUs << ", "
- << "MaxElapsedUs: " << MaxElapsedUs << ", "
- << "MinElapsedUs: " << MinElapsedUs << ", "
- << "PotentialMaxThreadCount: " << PotentialMaxThreadCount << ", "
- << "IsNeedy: " << IsNeedy << ", "
- << "IsStarved: " << IsStarved << ", "
- << "IsHoggish: " << IsHoggish << '}';
-}
-
-TString THarmonizerStats::ToString() const {
- return TStringBuilder() << '{'
- << "MaxCpuUs: " << MaxCpuUs << ", "
- << "MinCpuUs: " << MinCpuUs << ", "
- << "MaxElapsedUs: " << MaxElapsedUs << ", "
- << "MinElapsedUs: " << MinElapsedUs << ", "
- << "AvgAwakeningTimeUs: " << AvgAwakeningTimeUs << ", "
- << "AvgWakingUpTimeUs: " << AvgWakingUpTimeUs << '}';
-}
-
-}
diff --git a/ydb/library/actors/core/harmonizer/history.h b/ydb/library/actors/core/harmonizer/history.h
deleted file mode 100644
index c3ac81cf83..0000000000
--- a/ydb/library/actors/core/harmonizer/history.h
+++ /dev/null
@@ -1,149 +0,0 @@
-#pragma once
-
-#include "defs.h"
-#include <ydb/library/actors/util/datetime.h>
-
-
-namespace NActors {
-
-template <ui8 HistoryBufferSize = 8>
-struct TValueHistory {
-
- static constexpr bool CheckBinaryPower(ui64 value) {
- return !(value & (value - 1));
- }
-
- static_assert(CheckBinaryPower(HistoryBufferSize));
-
- double History[HistoryBufferSize] = {0.0};
- ui64 HistoryIdx = 0;
- ui64 LastTs = Max<ui64>();
- double LastUs = 0.0;
- double AccumulatedUs = 0.0;
- ui64 AccumulatedTs = 0;
-
- template <bool WithTail=false>
- double Accumulate(auto op, auto comb, ui8 seconds) const {
- double acc = AccumulatedUs;
- size_t idx = HistoryIdx;
- ui8 leftSeconds = seconds;
- if constexpr (!WithTail) {
- idx--;
- leftSeconds--;
- if (idx >= HistoryBufferSize) {
- idx = HistoryBufferSize - 1;
- }
- acc = History[idx];
- }
- do {
- idx--;
- leftSeconds--;
- if (idx >= HistoryBufferSize) {
- idx = HistoryBufferSize - 1;
- }
- if constexpr (WithTail) {
- acc = op(acc, History[idx]);
- } else if (leftSeconds) {
- acc = op(acc, History[idx]);
- } else {
- ui64 tsInSecond = Us2Ts(1'000'000.0);
- acc = op(acc, History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond);
- }
- } while (leftSeconds);
- double duration = 1'000'000.0 * seconds;
- if constexpr (WithTail) {
- duration += Ts2Us(AccumulatedTs);
- }
- return comb(acc, duration);
- }
-
- template <bool WithTail=false>
- double GetAvgPartForLastSeconds(ui8 seconds) const {
- auto sum = [](double acc, double value) {
- return acc + value;
- };
- auto avg = [](double sum, double duration) {
- return sum / duration;
- };
- return Accumulate<WithTail>(sum, avg, seconds);
- }
-
- double GetAvgPart() const {
- return GetAvgPartForLastSeconds<true>(HistoryBufferSize);
- }
-
- double GetMaxForLastSeconds(ui8 seconds) const {
- auto max = [](const double& acc, const double& value) {
- return Max(acc, value);
- };
- auto fst = [](const double& value, const double&) { return value; };
- return Accumulate<false>(max, fst, seconds);
- }
-
- double GetMax() const {
- return GetMaxForLastSeconds(HistoryBufferSize);
- }
-
- i64 GetMaxInt() const {
- return static_cast<i64>(GetMax());
- }
-
- double GetMinForLastSeconds(ui8 seconds) const {
- auto min = [](const double& acc, const double& value) {
- return Min(acc, value);
- };
- auto fst = [](const double& value, const double&) { return value; };
- return Accumulate<false>(min, fst, seconds);
- }
-
- double GetMin() const {
- return GetMinForLastSeconds(HistoryBufferSize);
- }
-
- i64 GetMinInt() const {
- return static_cast<i64>(GetMin());
- }
-
- void Register(ui64 ts, double valueUs) {
- if (ts < LastTs) {
- LastTs = ts;
- LastUs = valueUs;
- AccumulatedUs = 0.0;
- AccumulatedTs = 0;
- return;
- }
- ui64 lastTs = std::exchange(LastTs, ts);
- ui64 dTs = ts - lastTs;
- double lastUs = std::exchange(LastUs, valueUs);
- double dUs = valueUs - lastUs;
-
- if (dTs > Us2Ts(8'000'000.0)) {
- dUs = dUs * 1'000'000.0 / Ts2Us(dTs);
- for (size_t idx = 0; idx < HistoryBufferSize; ++idx) {
- History[idx] = dUs;
- }
- AccumulatedUs = 0.0;
- AccumulatedTs = 0;
- return;
- }
-
- while (dTs > 0) {
- if (AccumulatedTs + dTs < Us2Ts(1'000'000.0)) {
- AccumulatedTs += dTs;
- AccumulatedUs += dUs;
- break;
- } else {
- ui64 addTs = Us2Ts(1'000'000.0) - AccumulatedTs;
- double addUs = dUs * addTs / dTs;
- dTs -= addTs;
- dUs -= addUs;
- History[HistoryIdx] = AccumulatedUs + addUs;
- HistoryIdx = (HistoryIdx + 1) % HistoryBufferSize;
- AccumulatedUs = 0.0;
- AccumulatedTs = 0;
- }
- }
- }
-}; // struct TValueHistory
-
-} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/pool.cpp b/ydb/library/actors/core/harmonizer/pool.cpp
deleted file mode 100644
index 472d8d2592..0000000000
--- a/ydb/library/actors/core/harmonizer/pool.cpp
+++ /dev/null
@@ -1,149 +0,0 @@
-#include "pool.h"
-
-#include <ydb/library/actors/core/executor_pool.h>
-#include <ydb/library/actors/core/executor_pool_shared.h>
-#include <ydb/library/actors/core/executor_pool_basic.h>
-#include <ydb/library/actors/core/executor_pool_basic_feature_flags.h>
-
-#include "debug.h"
-
-namespace NActors {
-
-LWTRACE_USING(ACTORLIB_PROVIDER);
-
-TPoolInfo::TPoolInfo()
- : LocalQueueSize(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE)
-{}
-
-double TPoolInfo::GetCpu(i16 threadIdx) const {
- if ((size_t)threadIdx < ThreadInfo.size()) {
- return ThreadInfo[threadIdx].CpuUs.GetAvgPart();
- }
- return 0.0;
-}
-
-double TPoolInfo::GetSharedCpu(i16 sharedThreadIdx) const {
- if ((size_t)sharedThreadIdx < SharedInfo.size()) {
- return SharedInfo[sharedThreadIdx].CpuUs.GetAvgPart();
- }
- return 0.0;
-}
-
-double TPoolInfo::GetLastSecondCpu(i16 threadIdx) const {
- if ((size_t)threadIdx < ThreadInfo.size()) {
- return ThreadInfo[threadIdx].CpuUs.GetAvgPartForLastSeconds(1);
- }
- return 0.0;
-}
-
-double TPoolInfo::GetLastSecondSharedCpu(i16 sharedThreadIdx) const {
- if ((size_t)sharedThreadIdx < SharedInfo.size()) {
- return SharedInfo[sharedThreadIdx].CpuUs.GetAvgPartForLastSeconds(1);
- }
- return 0.0;
-}
-
-double TPoolInfo::GetElapsed(i16 threadIdx) const {
- if ((size_t)threadIdx < ThreadInfo.size()) {
- return ThreadInfo[threadIdx].ElapsedUs.GetAvgPart();
- }
- return 0.0;
-}
-
-double TPoolInfo::GetSharedElapsed(i16 sharedThreadIdx) const {
- if ((size_t)sharedThreadIdx < SharedInfo.size()) {
- return SharedInfo[sharedThreadIdx].ElapsedUs.GetAvgPart();
- }
- return 0.0;
-}
-
-double TPoolInfo::GetLastSecondElapsed(i16 threadIdx) const {
- if ((size_t)threadIdx < ThreadInfo.size()) {
- return ThreadInfo[threadIdx].ElapsedUs.GetAvgPartForLastSeconds(1);
- }
- return 0.0;
-}
-
-double TPoolInfo::GetLastSecondSharedElapsed(i16 sharedThreadIdx) const {
- if ((size_t)sharedThreadIdx < SharedInfo.size()) {
- return SharedInfo[sharedThreadIdx].ElapsedUs.GetAvgPartForLastSeconds(1);
- }
- return 0.0;
-}
-
-#define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7]
-TCpuConsumption TPoolInfo::PullStats(ui64 ts) {
- TCpuConsumption acc;
- for (i16 threadIdx = 0; threadIdx < MaxFullThreadCount; ++threadIdx) {
- TThreadInfo &threadInfo = ThreadInfo[threadIdx];
- TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx);
- acc.Add(cpuConsumption);
- threadInfo.ElapsedUs.Register(ts, cpuConsumption.ElapsedUs);
- LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "elapsed", UNROLL_HISTORY(threadInfo.ElapsedUs.History));
- threadInfo.CpuUs.Register(ts, cpuConsumption.CpuUs);
- LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "cpu", UNROLL_HISTORY(threadInfo.CpuUs.History));
- }
- TVector<TExecutorThreadStats> sharedStats;
- if (Shared) {
- Shared->GetSharedStatsForHarmonizer(Pool->PoolId, sharedStats);
- }
-
- for (ui32 sharedIdx = 0; sharedIdx < SharedInfo.size(); ++sharedIdx) {
- auto stat = sharedStats[sharedIdx];
- TCpuConsumption sharedConsumption{
- Ts2Us(stat.SafeElapsedTicks),
- static_cast<double>(stat.CpuUs),
- stat.NotEnoughCpuExecutions
- };
- acc.Add(sharedConsumption);
- SharedInfo[sharedIdx].ElapsedUs.Register(ts, sharedConsumption.ElapsedUs);
- LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "elapsed", UNROLL_HISTORY(SharedInfo[sharedIdx].ElapsedUs.History));
- SharedInfo[sharedIdx].CpuUs.Register(ts, sharedConsumption.CpuUs);
- LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "cpu", UNROLL_HISTORY(SharedInfo[sharedIdx].CpuUs.History));
- }
-
- CpuUs.Register(ts, acc.CpuUs);
- MaxCpuUs.store(CpuUs.GetMax() / 1'000'000, std::memory_order_relaxed);
- MinCpuUs.store(CpuUs.GetMin() / 1'000'000, std::memory_order_relaxed);
- AvgCpuUs.store(CpuUs.GetAvgPart() / 1'000'000, std::memory_order_relaxed);
- ElapsedUs.Register(ts, acc.ElapsedUs);
- MaxElapsedUs.store(ElapsedUs.GetMax() / 1'000'000, std::memory_order_relaxed);
- MinElapsedUs.store(ElapsedUs.GetMin() / 1'000'000, std::memory_order_relaxed);
- NewNotEnoughCpuExecutions = acc.NotEnoughCpuExecutions - NotEnoughCpuExecutions;
- NotEnoughCpuExecutions = acc.NotEnoughCpuExecutions;
- if (WaitingStats && BasicPool) {
- WaitingStats->Clear();
- BasicPool->GetWaitingStats(*WaitingStats);
- if constexpr (!NFeatures::TSpinFeatureFlags::CalcPerThread) {
- MovingWaitingStats->Add(*WaitingStats, 0.8, 0.2);
- }
- }
- return acc;
-}
-#undef UNROLL_HISTORY
-
-float TPoolInfo::GetThreadCount() {
- return Pool->GetThreadCount();
-}
-
-i16 TPoolInfo::GetFullThreadCount() {
- return Pool->GetFullThreadCount();
-}
-
-void TPoolInfo::SetFullThreadCount(i16 threadCount) {
- HARMONIZER_DEBUG_PRINT(Pool->PoolId, Pool->GetName(), "set full thread count", threadCount);
- Pool->SetFullThreadCount(threadCount);
-}
-
-bool TPoolInfo::IsAvgPingGood() {
- bool res = true;
- if (AvgPingCounter) {
- res &= *AvgPingCounter > MaxAvgPingUs;
- }
- if (AvgPingCounterWithSmallWindow) {
- res &= *AvgPingCounterWithSmallWindow > MaxAvgPingUs;
- }
- return res;
-}
-
-} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/pool.h b/ydb/library/actors/core/harmonizer/pool.h
deleted file mode 100644
index e5e88b71e2..0000000000
--- a/ydb/library/actors/core/harmonizer/pool.h
+++ /dev/null
@@ -1,93 +0,0 @@
-#pragma once
-
-#include "defs.h"
-
-#include "history.h"
-#include <ydb/library/actors/core/executor_pool.h>
-
-#include <library/cpp/monlib/dynamic_counters/counters.h>
-
-namespace NActors {
-
-class ISharedExecutorPool;
-class TBasicExecutorPool;
-class IExecutorPool;
-template <typename T>
-struct TWaitingStats;
-
-struct TThreadInfo {
- TValueHistory<8> CpuUs;
- TValueHistory<8> ElapsedUs;
-}; // struct TThreadInfo
-
-struct TPoolInfo {
- std::vector<TThreadInfo> ThreadInfo;
- std::vector<TThreadInfo> SharedInfo;
- ISharedExecutorPool* Shared = nullptr;
- IExecutorPool* Pool = nullptr;
- TBasicExecutorPool* BasicPool = nullptr;
-
- i16 DefaultFullThreadCount = 0;
- i16 MinFullThreadCount = 0;
- i16 MaxFullThreadCount = 0;
-
- float DefaultThreadCount = 0;
- float MinThreadCount = 0;
- float MaxThreadCount = 0;
-
- i16 Priority = 0;
- NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter;
- NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
- ui32 MaxAvgPingUs = 0;
- ui64 LastUpdateTs = 0;
- ui64 NotEnoughCpuExecutions = 0;
- ui64 NewNotEnoughCpuExecutions = 0;
- ui16 LocalQueueSize;
-
- std::atomic<i64> LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish
- std::atomic<ui64> IncreasingThreadsByNeedyState = 0;
- std::atomic<ui64> IncreasingThreadsByExchange = 0;
- std::atomic<ui64> DecreasingThreadsByStarvedState = 0;
- std::atomic<ui64> DecreasingThreadsByHoggishState = 0;
- std::atomic<ui64> DecreasingThreadsByExchange = 0;
- std::atomic<i16> PotentialMaxThreadCount = 0;
- std::atomic<ui64> ReceivedHalfThreadByNeedyState = 0;
- std::atomic<ui64> GivenHalfThreadByOtherStarvedState = 0;
- std::atomic<ui64> GivenHalfThreadByHoggishState = 0;
- std::atomic<ui64> GivenHalfThreadByOtherNeedyState = 0;
- std::atomic<ui64> ReturnedHalfThreadByStarvedState = 0;
- std::atomic<ui64> ReturnedHalfThreadByOtherHoggishState = 0;
-
- TValueHistory<16> CpuUs;
- TValueHistory<16> ElapsedUs;
-
- std::atomic<float> MaxCpuUs = 0;
- std::atomic<float> MinCpuUs = 0;
- std::atomic<float> AvgCpuUs = 0;
- std::atomic<float> MaxElapsedUs = 0;
- std::atomic<float> MinElapsedUs = 0;
- std::atomic<float> AvgElapsedUs = 0;
-
- std::unique_ptr<TWaitingStats<ui64>> WaitingStats;
- std::unique_ptr<TWaitingStats<double>> MovingWaitingStats;
-
- TPoolInfo();
-
- double GetCpu(i16 threadIdx) const;
- double GetElapsed(i16 threadIdx) const;
- double GetLastSecondCpu(i16 threadIdx) const;
- double GetLastSecondElapsed(i16 threadIdx) const;
-
- double GetSharedCpu(i16 sharedThreadIdx) const;
- double GetSharedElapsed(i16 sharedThreadIdx) const;
- double GetLastSecondSharedCpu(i16 sharedThreadIdx) const;
- double GetLastSecondSharedElapsed(i16 sharedThreadIdx) const;
-
- TCpuConsumption PullStats(ui64 ts);
- i16 GetFullThreadCount();
- float GetThreadCount();
- void SetFullThreadCount(i16 threadCount);
- bool IsAvgPingGood();
-}; // struct TPoolInfo
-
-} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/shared_info.cpp b/ydb/library/actors/core/harmonizer/shared_info.cpp
deleted file mode 100644
index 6b8b94f0bd..0000000000
--- a/ydb/library/actors/core/harmonizer/shared_info.cpp
+++ /dev/null
@@ -1,55 +0,0 @@
-#include "shared_info.h"
-#include <util/string/builder.h>
-
-#include <ydb/library/actors/core/executor_pool_shared.h>
-
-namespace NActors {
-
-void TSharedInfo::Init(i16 poolCount) {
- HasSharedThread.resize(poolCount, false);
- HasSharedThreadWhichWasNotBorrowed.resize(poolCount, false);
- HasBorrowedSharedThread.resize(poolCount, false);
-}
-
-void TSharedInfo::Pull(const ISharedExecutorPool& shared) {
- auto sharedState = shared.GetState();
- for (ui32 poolIdx = 0; poolIdx < HasSharedThread.size(); ++poolIdx) {
- i16 threadIdx = sharedState.ThreadByPool[poolIdx];
- if (threadIdx != -1) {
- HasSharedThread[poolIdx] = true;
- if (sharedState.PoolByBorrowedThread[threadIdx] == -1) {
- HasSharedThreadWhichWasNotBorrowed[poolIdx] = true;
- } else {
- HasSharedThreadWhichWasNotBorrowed[poolIdx] = false;
- }
- }
- if (sharedState.BorrowedThreadByPool[poolIdx] != -1) {
- HasBorrowedSharedThread[poolIdx] = true;
- } else {
- HasBorrowedSharedThread[poolIdx] = false;
- }
- }
-}
-
-TString TSharedInfo::ToString() const {
- TStringBuilder builder;
- builder << "{";
- builder << "HasSharedThread: \"";
- for (ui32 i = 0; i < HasSharedThread.size(); ++i) {
- builder << (HasSharedThread[i] ? "1" : "0");
- }
- builder << "\", ";
- builder << "HasSharedThreadWhichWasNotBorrowed: \"";
- for (ui32 i = 0; i < HasSharedThreadWhichWasNotBorrowed.size(); ++i) {
- builder << (HasSharedThreadWhichWasNotBorrowed[i] ? "1" : "0");
- }
- builder << "\", ";
- builder << "HasBorrowedSharedThread: \"";
- for (ui32 i = 0; i < HasBorrowedSharedThread.size(); ++i) {
- builder << (HasBorrowedSharedThread[i] ? "1" : "0");
- }
- builder << "\"}";
- return builder;
-}
-
-} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/shared_info.h b/ydb/library/actors/core/harmonizer/shared_info.h
deleted file mode 100644
index 41277db1bb..0000000000
--- a/ydb/library/actors/core/harmonizer/shared_info.h
+++ /dev/null
@@ -1,20 +0,0 @@
-#pragma once
-
-#include "defs.h"
-
-namespace NActors {
-
-class ISharedExecutorPool;
-
-struct TSharedInfo {
- std::vector<bool> HasSharedThread;
- std::vector<bool> HasSharedThreadWhichWasNotBorrowed;
- std::vector<bool> HasBorrowedSharedThread;
-
- void Init(i16 poolCount);
- void Pull(const ISharedExecutorPool& shared);
-
- TString ToString() const;
-}; // struct TSharedInfo
-
-} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/ut/harmonizer_ut.cpp b/ydb/library/actors/core/harmonizer/ut/harmonizer_ut.cpp
deleted file mode 100644
index 571550c7b1..0000000000
--- a/ydb/library/actors/core/harmonizer/ut/harmonizer_ut.cpp
+++ /dev/null
@@ -1,673 +0,0 @@
-#include "harmonizer.h"
-#include "debug.h"
-#include <library/cpp/testing/unittest/registar.h>
-#include <ydb/library/actors/core/executor_pool_shared.h>
-#include <ydb/library/actors/core/executor_thread_ctx.h>
-#include <ydb/library/actors/helpers/pool_stats_collector.h>
-
-using namespace NActors;
-
-
-#define CHECK_CHANGING_THREADS(stats, inc_needy, inc_exchange, dec_hoggish, dec_starved, dec_exchange) \
- UNIT_ASSERT_VALUES_EQUAL_C((stats).IncreasingThreadsByNeedyState, inc_needy, (stats).ToString()); \
- UNIT_ASSERT_VALUES_EQUAL_C((stats).IncreasingThreadsByExchange, inc_exchange, (stats).ToString()); \
- UNIT_ASSERT_VALUES_EQUAL_C((stats).DecreasingThreadsByHoggishState, dec_hoggish, (stats).ToString()); \
- UNIT_ASSERT_VALUES_EQUAL_C((stats).DecreasingThreadsByStarvedState, dec_starved, (stats).ToString()); \
- UNIT_ASSERT_VALUES_EQUAL_C((stats).DecreasingThreadsByExchange, dec_exchange, (stats).ToString());
-// end CHECK_CHANGING_THREADS
-
-#define CHECK_CHANGING_HALF_THREADS(stats, received_needy, given_starved, given_hoggish, given_other_needy, returned_starved, returned_other_hoggish) \
- UNIT_ASSERT_VALUES_EQUAL_C((stats).ReceivedHalfThreadByNeedyState, received_needy, (stats).ToString()); \
- UNIT_ASSERT_VALUES_EQUAL_C((stats).GivenHalfThreadByOtherStarvedState, given_starved, (stats).ToString()); \
- UNIT_ASSERT_VALUES_EQUAL_C((stats).GivenHalfThreadByHoggishState, given_hoggish, (stats).ToString()); \
- UNIT_ASSERT_VALUES_EQUAL_C((stats).GivenHalfThreadByOtherNeedyState, given_other_needy, (stats).ToString()); \
- UNIT_ASSERT_VALUES_EQUAL_C((stats).ReturnedHalfThreadByStarvedState, returned_starved, (stats).ToString()); \
- UNIT_ASSERT_VALUES_EQUAL_C((stats).ReturnedHalfThreadByOtherHoggishState, returned_other_hoggish, (stats).ToString());
-// end CHECK_CHANGING_HALF_THREADS
-
-#define CHECK_IS_NEEDY(stats) \
- UNIT_ASSERT_VALUES_EQUAL_C((stats).IsNeedy, true, (stats).ToString()); \
-// end CHECK_IS_NEEDY
-
-#define CHECK_IS_NOT_NEEDY(stats) \
- UNIT_ASSERT_VALUES_EQUAL_C((stats).IsNeedy, false, (stats).ToString()); \
-// end CHECK_IS_NOT_NEEDY
-
-#define CHECK_IS_HOGGISH(stats) \
- UNIT_ASSERT_VALUES_EQUAL_C((stats).IsHoggish, true, (stats).ToString()); \
-// end CHECK_IS_HOGGISH
-
-#define CHECK_IS_NOT_HOGGISH(stats) \
- UNIT_ASSERT_VALUES_EQUAL_C((stats).IsHoggish, false, (stats).ToString()); \
-// end CHECK_IS_NOT_HOGGISH
-
-#define CHECK_IS_STARVED(stats) \
- UNIT_ASSERT_VALUES_EQUAL_C((stats).IsStarved, true, (stats).ToString()); \
-// end CHECK_IS_STARVED
-
-#define CHECK_IS_NOT_STARVED(stats) \
- UNIT_ASSERT_VALUES_EQUAL_C((stats).IsStarved, false, (stats).ToString()); \
-// end CHECK_IS_NOT_STARVED
-
-
-Y_UNIT_TEST_SUITE(HarmonizerTests) {
-
- struct TMockExecutorPoolParams {
- i16 DefaultFullThreadCount = 4;
- i16 MinFullThreadCount = 4;
- i16 MaxFullThreadCount = 8;
- float DefaultThreadCount = 4.0f;
- float MinThreadCount = 4.0f;
- float MaxThreadCount = 8.0f;
- i16 Priority = 0;
- TString Name = "MockPool";
- ui32 PoolId = 0;
-
- TString ToString() const {
- return TStringBuilder() << "PoolId: " << PoolId << ", Name: " << Name << ", DefaultFullThreadCount: " << DefaultFullThreadCount << ", MinFullThreadCount: " << MinFullThreadCount << ", MaxFullThreadCount: " << MaxFullThreadCount << ", DefaultThreadCount: " << DefaultThreadCount << ", MinThreadCount: " << MinThreadCount << ", MaxThreadCount: " << MaxThreadCount << ", Priority: " << Priority;
- }
- };
-
- struct TCpuConsumptionModel {
- TCpuConsumption value;
- TCpuConsumptionModel() : value() {}
- TCpuConsumptionModel(const TCpuConsumption& other) : value(other) {}
- operator TCpuConsumption() const {
- return value;
- }
- void Increase(const TCpuConsumption& other) {
- value.ElapsedUs += other.ElapsedUs;
- value.CpuUs += other.CpuUs;
- value.NotEnoughCpuExecutions += other.NotEnoughCpuExecutions;
- }
- };
-
- class TMockExecutorPool : public IExecutorPool {
- public:
- TMockExecutorPool(const TMockExecutorPoolParams& params = TMockExecutorPoolParams())
- : IExecutorPool(params.PoolId)
- , Params(params)
- , ThreadCount(params.DefaultFullThreadCount)
- , ThreadCpuConsumptions(params.MaxFullThreadCount, TCpuConsumption{0.0, 0.0})
- {
-
- }
-
- TMockExecutorPoolParams Params;
- i16 ThreadCount = 0;
- std::vector<TCpuConsumptionModel> ThreadCpuConsumptions;
- std::vector<TSharedExecutorThreadCtx*> SharedThreads;
-
- i16 GetDefaultFullThreadCount() const override { return Params.DefaultFullThreadCount; }
- i16 GetMinFullThreadCount() const override { return Params.MinFullThreadCount; }
- i16 GetMaxFullThreadCount() const override { return Params.MaxFullThreadCount; }
- void SetFullThreadCount(i16 count) override {
- HARMONIZER_DEBUG_PRINT(Params.PoolId, Params.Name, "set full thread count", count);
- ThreadCount = Max(Params.MinFullThreadCount, Min(Params.MaxFullThreadCount, count));
- }
- i16 GetFullThreadCount() const override { return ThreadCount; }
- float GetDefaultThreadCount() const override { return Params.DefaultThreadCount; }
- float GetMinThreadCount() const override { return Params.MinThreadCount; }
- float GetMaxThreadCount() const override { return Params.MaxThreadCount; }
- i16 GetPriority() const override { return Params.Priority; }
- TString GetName() const override { return Params.Name; }
-
- // Дополнительные методы из IExecutorPool
- void Prepare(TActorSystem* /*actorSystem*/, NSchedulerQueue::TReader** /*scheduleReaders*/, ui32* /*scheduleSz*/) override {}
- void Start() override {}
- void PrepareStop() override {}
- void Shutdown() override {}
- bool Cleanup() override { return true; }
-
- TMailbox* GetReadyActivation(TWorkerContext& /*wctx*/, ui64 /*revolvingCounter*/) override { return nullptr; }
- TMailbox* ResolveMailbox(ui32 /*hint*/) override { return nullptr; }
-
- void Schedule(TInstant /*deadline*/, TAutoPtr<IEventHandle> /*ev*/, ISchedulerCookie* /*cookie*/, TWorkerId /*workerId*/) override {}
- void Schedule(TMonotonic /*deadline*/, TAutoPtr<IEventHandle> /*ev*/, ISchedulerCookie* /*cookie*/, TWorkerId /*workerId*/) override {}
- void Schedule(TDuration /*delta*/, TAutoPtr<IEventHandle> /*ev*/, ISchedulerCookie* /*cookie*/, TWorkerId /*workerId*/) override {}
-
- bool Send(TAutoPtr<IEventHandle>& /*ev*/) override { return true; }
- bool SpecificSend(TAutoPtr<IEventHandle>& /*ev*/) override { return true; }
- void ScheduleActivation(TMailbox* /*activation*/) override {}
- void SpecificScheduleActivation(TMailbox* /*activation*/) override {}
- void ScheduleActivationEx(TMailbox* /*activation*/, ui64 /*revolvingCounter*/) override {}
- TActorId Register(IActor* /*actor*/, TMailboxType::EType /*mailboxType*/, ui64 /*revolvingCounter*/, const TActorId& /*parentId*/) override { return TActorId(); }
- TActorId Register(IActor* /*actor*/, TMailboxCache& /*cache*/, ui64 /*revolvingCounter*/, const TActorId& /*parentId*/) override { return TActorId(); }
- TActorId Register(IActor* /*actor*/, TMailbox* /*mailbox*/, const TActorId& /*parentId*/) override { return TActorId(); }
-
- TAffinity* Affinity() const override { return nullptr; }
-
- ui32 GetThreads() const override { return static_cast<ui32>(ThreadCount); }
- float GetThreadCount() const override { return static_cast<float>(ThreadCount); }
-
- TSharedExecutorThreadCtx* ReleaseSharedThread() override {
- UNIT_ASSERT(!SharedThreads.empty());
- TSharedExecutorThreadCtx* thread = SharedThreads.back();
- SharedThreads.pop_back();
- return thread;
- }
- void AddSharedThread(TSharedExecutorThreadCtx* thread) override {
- UNIT_ASSERT(SharedThreads.size() < 2);
- SharedThreads.push_back(thread);
- }
-
- void IncreaseThreadCpuConsumption(TCpuConsumption consumption, i16 start = 0, i16 count = -1) {
- if (count == -1) {
- count = Params.MaxFullThreadCount - start;
- }
- for (i16 i = start; i < start + count; ++i) {
- ThreadCpuConsumptions[i].Increase(consumption);
- }
- }
-
- void SetThreadCpuConsumption(TCpuConsumption consumption, i16 start = 0, i16 count = -1) {
- if (count == -1) {
- count = Params.MaxFullThreadCount - start;
- }
- for (i16 i = start; i < start + count; ++i) {
- ThreadCpuConsumptions[i] = consumption;
- }
- }
-
- TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) override {
- UNIT_ASSERT_GE(threadIdx, 0);
- UNIT_ASSERT_LE(static_cast<ui16>(threadIdx), ThreadCpuConsumptions.size());
- return ThreadCpuConsumptions[threadIdx];
- }
- };
-
- class TMockSharedExecutorPool : public ISharedExecutorPool {
- std::unique_ptr<ISharedExecutorPool> OriginalPool;
- std::vector<std::vector<TCpuConsumptionModel>> ThreadCpuConsumptions;
-
- static std::vector<i16> GetPoolIds(const std::vector<IExecutorPool*>& pools) {
- std::vector<i16> poolIds;
- for (size_t i = 0; i < pools.size(); ++i) {
- poolIds.push_back(static_cast<i16>(i));
- }
- return poolIds;
- }
-
- public:
- TMockSharedExecutorPool(const TSharedExecutorPoolConfig& config, i16 poolCount, std::vector<IExecutorPool*> pools)
- : OriginalPool(CreateSharedExecutorPool(config, poolCount, GetPoolIds(pools)))
- , ThreadCpuConsumptions(poolCount, std::vector<TCpuConsumptionModel>(config.Threads, TCpuConsumption{0.0, 0.0}))
- {
- OriginalPool->Init(pools, false);
- }
-
- void Prepare(TActorSystem* /*actorSystem*/, NSchedulerQueue::TReader** /*scheduleReaders*/, ui32* /*scheduleSz*/) override {}
- void Start() override {}
- void PrepareStop() override {}
- void Shutdown() override {}
- bool Cleanup() override { return true; }
-
- void GetSharedStatsForHarmonizer(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override {
- OriginalPool->GetSharedStatsForHarmonizer(pool, statsCopy);
- }
-
- void GetSharedStats(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override {
- OriginalPool->GetSharedStats(pool, statsCopy);
- }
-
- void Init(const std::vector<IExecutorPool*>& pools, bool /*isShared*/) override {
- OriginalPool->Init(pools, false);
- }
-
- TSharedExecutorThreadCtx* GetSharedThread(i16 poolId) override {
- return OriginalPool->GetSharedThread(poolId);
- }
-
- i16 ReturnOwnHalfThread(i16 pool) override {
- return OriginalPool->ReturnOwnHalfThread(pool);
- }
-
- i16 ReturnBorrowedHalfThread(i16 pool) override {
- return OriginalPool->ReturnBorrowedHalfThread(pool);
- }
-
- void GiveHalfThread(i16 from, i16 to) override {
- OriginalPool->GiveHalfThread(from, to);
- }
-
- i16 GetSharedThreadCount() const override {
- return OriginalPool->GetSharedThreadCount();
- }
-
- TSharedPoolState GetState() const override {
- return OriginalPool->GetState();
- }
-
- TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx) override {
- return ThreadCpuConsumptions[poolId][threadIdx];
- }
-
- std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId) override {
- std::vector<TCpuConsumption> poolConsumptions(ThreadCpuConsumptions[poolId].size());
- for (size_t i = 0; i < poolConsumptions.size(); ++i) {
- poolConsumptions[i] = ThreadCpuConsumptions[poolId][i];
- }
- return poolConsumptions;
- }
-
- void IncreaseThreadCpuConsumption(ui32 threadIdx, ui32 poolId, TCpuConsumption consumption) {
- ThreadCpuConsumptions[poolId][threadIdx].Increase(consumption);
- }
-
- void SetThreadCpuConsumption(ui32 threadIdx, ui32 poolId, TCpuConsumption consumption) {
- ThreadCpuConsumptions[poolId][threadIdx] = consumption;
- }
- };
-
- Y_UNIT_TEST(TestHarmonizerCreation) {
- ui64 currentTs = 1000000;
- std::unique_ptr<IHarmonizer> harmonizer(MakeHarmonizer(currentTs));
- UNIT_ASSERT(harmonizer != nullptr);
- }
-
- Y_UNIT_TEST(TestAddPool) {
- ui64 currentTs = 1000000;
- std::unique_ptr<IHarmonizer> harmonizer(MakeHarmonizer(currentTs));
- auto mockPool = std::make_unique<TMockExecutorPool>();
- harmonizer->AddPool(mockPool.get());
-
- auto stats = harmonizer->GetPoolStats(0);
- UNIT_ASSERT_VALUES_EQUAL(stats.PotentialMaxThreadCount, 8);
- UNIT_ASSERT_VALUES_EQUAL(stats.IncreasingThreadsByNeedyState, 0);
- UNIT_ASSERT_VALUES_EQUAL(stats.DecreasingThreadsByStarvedState, 0);
- }
-
- Y_UNIT_TEST(TestHarmonize) {
- ui64 currentTs = 1000000;
- auto harmonizer = MakeHarmonizer(currentTs);
- auto mockPool = new TMockExecutorPool();
- harmonizer->AddPool(mockPool);
-
- harmonizer->Harmonize(currentTs + 1000000); // 1 second later
-
- auto stats = harmonizer->GetPoolStats(0);
- Y_UNUSED(stats);
- UNIT_ASSERT_VALUES_EQUAL(mockPool->ThreadCount, 4); // Should start with default
-
- delete harmonizer;
- delete mockPool;
- }
-
- Y_UNIT_TEST(TestToNeedyNextToHoggish) {
- ui64 currentTs = 1000000;
- auto harmonizer = MakeHarmonizer(currentTs);
- TMockExecutorPoolParams params;
- std::vector<std::unique_ptr<TMockExecutorPool>> mockPools;
- mockPools.emplace_back(new TMockExecutorPool(params));
- params.PoolId = 1;
- mockPools.emplace_back(new TMockExecutorPool(params));
- for (auto& pool : mockPools) {
- harmonizer->AddPool(pool.get());
- pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params.MaxFullThreadCount);
- }
-
- TCpuConsumptionModel cpuConsumptionModel;
-
- currentTs += Us2Ts(1'000'000);
- harmonizer->Harmonize(currentTs);
- mockPools[0]->SetThreadCpuConsumption({59'000'000.0, 59'000'000.0}, 0, params.DefaultFullThreadCount);
-
- currentTs += Us2Ts(59'000'000);
- harmonizer->Harmonize(currentTs);
-
- auto stats = harmonizer->GetPoolStats(0);
-
- CHECK_CHANGING_THREADS(stats, 1, 0, 0, 0, 0);
- CHECK_IS_NEEDY(stats);
- UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 5);
- UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4);
-
- currentTs += Us2Ts(60'000'000);
- mockPools[0]->SetThreadCpuConsumption({0.0, 0.0}, 0, params.DefaultFullThreadCount);
- harmonizer->Harmonize(currentTs);
-
- stats = harmonizer->GetPoolStats(0);
-
- CHECK_CHANGING_THREADS(stats, 1, 0, 1, 0, 0);
- CHECK_IS_HOGGISH(stats);
- UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 4);
- UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4);
- }
-
- Y_UNIT_TEST(TestToNeedyNextToStarved) {
- ui64 currentTs = 1000000;
- auto harmonizer = MakeHarmonizer(currentTs);
- TMockExecutorPoolParams params;
- std::vector<std::unique_ptr<TMockExecutorPool>> mockPools;
- mockPools.emplace_back(new TMockExecutorPool(params));
- params.PoolId = 1;
- mockPools.emplace_back(new TMockExecutorPool(params));
- for (auto& pool : mockPools) {
- harmonizer->AddPool(pool.get());
- pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params.MaxFullThreadCount);
- }
-
- TCpuConsumptionModel cpuConsumptionModel;
-
- currentTs += Us2Ts(1'000'000);
- harmonizer->Harmonize(currentTs);
- mockPools[0]->IncreaseThreadCpuConsumption({59'000'000.0, 59'000'000.0}, 0, params.DefaultFullThreadCount);
-
- currentTs += Us2Ts(59'000'000);
- harmonizer->Harmonize(currentTs);
-
- auto stats = harmonizer->GetPoolStats(0);
-
- CHECK_CHANGING_THREADS(stats, 1, 0, 0, 0, 0);
- CHECK_IS_NEEDY(stats);
- UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 5);
- UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4);
-
- currentTs += Us2Ts(60'000'000);
- mockPools[0]->IncreaseThreadCpuConsumption({40'000'000.0, 60'000'000.0}, 0, 5);
- mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 4);
- harmonizer->Harmonize(currentTs);
-
- stats = harmonizer->GetPoolStats(0);
-
- CHECK_CHANGING_THREADS(stats, 1, 0, 0, 1, 0);
- CHECK_IS_STARVED(stats);
- UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 4);
- UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4);
- }
-
- Y_UNIT_TEST(TestExchangeThreads) {
- ui64 currentTs = 1000000;
- auto harmonizer = MakeHarmonizer(currentTs);
- TMockExecutorPoolParams params {
- .DefaultFullThreadCount = 1,
- .MinFullThreadCount = 1,
- .MaxFullThreadCount = 2,
- .DefaultThreadCount = 1.0f,
- .MinThreadCount = 1.0f,
- .MaxThreadCount = 2.0f,
- };
- std::vector<std::unique_ptr<TMockExecutorPool>> mockPools;
- mockPools.emplace_back(new TMockExecutorPool(params));
- params.PoolId = 1;
- mockPools.emplace_back(new TMockExecutorPool(params));
- params.PoolId = 2;
- mockPools.emplace_back(new TMockExecutorPool(params));
- for (auto& pool : mockPools) {
- harmonizer->AddPool(pool.get());
- pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params.MaxFullThreadCount);
- }
-
- currentTs += Us2Ts(1'000'000);
- harmonizer->Harmonize(currentTs);
-
- currentTs += Us2Ts(60'000'000);
- mockPools[0]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 1);
- mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 1);
- mockPools[2]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 1);
- harmonizer->Harmonize(currentTs);
-
- auto stats0 = harmonizer->GetPoolStats(0);
- auto stats1 = harmonizer->GetPoolStats(1);
- auto stats2 = harmonizer->GetPoolStats(2);
-
- CHECK_CHANGING_THREADS(stats0, 0, 0, 0, 0, 0);
- CHECK_CHANGING_THREADS(stats1, 1, 0, 0, 0, 0);
- CHECK_CHANGING_THREADS(stats2, 0, 0, 0, 0, 0);
- CHECK_IS_HOGGISH(stats0);
- CHECK_IS_NEEDY(stats1);
- CHECK_IS_NEEDY(stats2);
- UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 1);
- UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 2);
- UNIT_ASSERT_VALUES_EQUAL(mockPools[2]->ThreadCount, 1);
-
- currentTs += Us2Ts(60'000'000);
- mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 1);
- mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2);
- mockPools[2]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 1);
- harmonizer->Harmonize(currentTs);
-
- stats0 = harmonizer->GetPoolStats(0);
- stats1 = harmonizer->GetPoolStats(1);
- stats2 = harmonizer->GetPoolStats(2);
-
- CHECK_CHANGING_THREADS(stats0, 0, 1, 0, 0, 0);
- CHECK_CHANGING_THREADS(stats1, 1, 0, 0, 0, 1);
- CHECK_CHANGING_THREADS(stats2, 0, 0, 0, 0, 0);
- CHECK_IS_NEEDY(stats0);
- CHECK_IS_NEEDY(stats1);
- CHECK_IS_HOGGISH(stats2);
- UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 2);
- UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 1);
- UNIT_ASSERT_VALUES_EQUAL(mockPools[2]->ThreadCount, 1);
- }
-
- Y_UNIT_TEST(TestThreadCounts) {
- ui64 currentTs = 1000000;
- std::vector<TMockExecutorPoolParams> params {
- {
- .DefaultFullThreadCount = 5,
- .MinFullThreadCount = 5,
- .MaxFullThreadCount = 15,
- .DefaultThreadCount = 5.0f,
- .MinThreadCount = 5.0f,
- .MaxThreadCount = 15.0f,
- .PoolId = 0,
- },
- {
- .DefaultFullThreadCount = 5,
- .MinFullThreadCount = 5,
- .MaxFullThreadCount = 15,
- .DefaultThreadCount = 5.0f,
- .MinThreadCount = 5.0f,
- .MaxThreadCount = 15.0f,
- .PoolId = 1,
- },
- {
- .DefaultFullThreadCount = 5,
- .MinFullThreadCount = 5,
- .MaxFullThreadCount = 15,
- .DefaultThreadCount = 5.0f,
- .MinThreadCount = 5.0f,
- .MaxThreadCount = 15.0f,
- .PoolId = 2,
- },
- };
- auto harmonizer = MakeHarmonizer(currentTs);
- std::vector<std::unique_ptr<TMockExecutorPool>> mockPools;
- i16 budget = 0;
- for (auto& param : params) {
- mockPools.emplace_back(new TMockExecutorPool(param));
- HARMONIZER_DEBUG_PRINT("created pool", mockPools.back()->Params.ToString());
- budget += param.DefaultFullThreadCount;
- }
- for (ui32 poolIdx = 0; poolIdx < params.size(); ++poolIdx) {
- auto &pool = mockPools[poolIdx];
- harmonizer->AddPool(pool.get());
- pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params[poolIdx].MaxFullThreadCount);
- }
- currentTs += Us2Ts(1'000'000);
- harmonizer->Harmonize(currentTs);
-
- for (i16 i = 0; i < params[0].MaxFullThreadCount; ++i) {
- for (i16 ii = 0; ii < params[1].MaxFullThreadCount; ++ii) {
- for (i16 iii = 0; iii < params[2].MaxFullThreadCount; ++iii) {
- if (i + ii + iii > budget) {
- continue;
- }
- ui32 localBudget = budget - (i + ii + iii);
- HARMONIZER_DEBUG_PRINT("first pool", i, "second pool", ii, "third pool", iii, "budget", budget, "local budget", localBudget);
- currentTs += Us2Ts(60'000'000);
- mockPools[0]->SetFullThreadCount(i);
- mockPools[1]->SetFullThreadCount(ii);
- mockPools[2]->SetFullThreadCount(iii);
- mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, std::min<i16>(i, mockPools[0]->ThreadCount));
- mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, std::min<i16>(ii, mockPools[1]->ThreadCount));
- mockPools[2]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, std::min(iii, mockPools[2]->ThreadCount));
- harmonizer->Harmonize(currentTs);
- std::vector<TPoolHarmonizerStats> stats;
- for (auto& pool : params) {
- stats.emplace_back(harmonizer->GetPoolStats(pool.PoolId));
- }
- for (ui32 poolIdx = 0; poolIdx < params.size(); ++poolIdx) {
- UNIT_ASSERT_VALUES_EQUAL(stats[poolIdx].PotentialMaxThreadCount, std::min<i16>(mockPools[poolIdx]->ThreadCount + localBudget, params[poolIdx].MaxFullThreadCount));
- }
- }
- }
- }
- }
-
- Y_UNIT_TEST(TestSharedHalfThreads) {
- ui64 currentTs = 1000000;
- std::unique_ptr<IHarmonizer> harmonizer(MakeHarmonizer(currentTs));
- TMockExecutorPoolParams params {
- .DefaultFullThreadCount = 2,
- .MinFullThreadCount = 1,
- .MaxFullThreadCount = 3,
- .DefaultThreadCount = 2.0f,
- .MinThreadCount = 1.0f,
- .MaxThreadCount = 3.0f,
- };
- std::vector<std::unique_ptr<TMockExecutorPool>> mockPools;
- std::vector<IExecutorPool*> pools;
- mockPools.emplace_back(new TMockExecutorPool(params));
- pools.push_back(mockPools.back().get());
- params.PoolId = 1;
- mockPools.emplace_back(new TMockExecutorPool(params));
- pools.push_back(mockPools.back().get());
- params.PoolId = 2;
- mockPools.emplace_back(new TMockExecutorPool(params));
- pools.push_back(mockPools.back().get());
-
-
- TSharedExecutorPoolConfig sharedConfig;
- sharedConfig.Threads = 3;
- std::unique_ptr<ISharedExecutorPool> sharedPool(new TMockSharedExecutorPool(sharedConfig, 3, pools));
-
- for (auto& pool : mockPools) {
- harmonizer->AddPool(pool.get());
- }
- harmonizer->SetSharedPool(sharedPool.get());
-
- for (ui32 i = 0; i < mockPools.size(); ++i) {
- mockPools[i]->AddSharedThread(sharedPool->GetSharedThread(i));
- }
-
- currentTs += Us2Ts(1'000'000);
- harmonizer->Harmonize(currentTs);
-
- currentTs += Us2Ts(60'000'000);
- mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2);
- mockPools[1]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 2);
- mockPools[2]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2);
-
- harmonizer->Harmonize(currentTs);
-
- auto stats0 = harmonizer->GetPoolStats(0);
- auto stats1 = harmonizer->GetPoolStats(1);
- auto stats2 = harmonizer->GetPoolStats(2);
- auto sharedState = sharedPool->GetState();
-
- CHECK_CHANGING_HALF_THREADS(stats0, 1, 0, 0, 0, 0, 0);
- CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 1, 0, 0);
- CHECK_IS_NEEDY(stats0);
- CHECK_IS_HOGGISH(stats1);
- CHECK_IS_NEEDY(stats2);
- UNIT_ASSERT_VALUES_EQUAL(sharedState.BorrowedThreadByPool[0], 1);
- UNIT_ASSERT_VALUES_EQUAL(sharedState.PoolByBorrowedThread[1], 0);
-
- currentTs += Us2Ts(60'000'000);
-
- harmonizer->Harmonize(currentTs);
-
- stats0 = harmonizer->GetPoolStats(0);
- stats1 = harmonizer->GetPoolStats(1);
- stats2 = harmonizer->GetPoolStats(2);
- sharedState = sharedPool->GetState();
-
- CHECK_CHANGING_HALF_THREADS(stats0, 1, 0, 1, 0, 0, 0);
- CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 1, 0, 1);
- CHECK_IS_HOGGISH(stats0);
- CHECK_IS_HOGGISH(stats1);
- CHECK_IS_HOGGISH(stats2);
- UNIT_ASSERT_VALUES_EQUAL(sharedState.BorrowedThreadByPool[0], -1);
- UNIT_ASSERT_VALUES_EQUAL(sharedState.PoolByBorrowedThread[1], -1);
- }
-
- Y_UNIT_TEST(TestSharedHalfThreadsStarved) {
- ui64 currentTs = 1000000;
- std::unique_ptr<IHarmonizer> harmonizer(MakeHarmonizer(currentTs));
- TMockExecutorPoolParams params {
- .DefaultFullThreadCount = 2,
- .MinFullThreadCount = 1,
- .MaxFullThreadCount = 3,
- .DefaultThreadCount = 2.0f,
- .MinThreadCount = 1.0f,
- .MaxThreadCount = 3.0f,
- };
- std::vector<std::unique_ptr<TMockExecutorPool>> mockPools;
- std::vector<IExecutorPool*> pools;
- mockPools.emplace_back(new TMockExecutorPool(params));
- pools.push_back(mockPools.back().get());
- params.PoolId = 1;
- mockPools.emplace_back(new TMockExecutorPool(params));
- pools.push_back(mockPools.back().get());
-
- TSharedExecutorPoolConfig sharedConfig;
- sharedConfig.Threads = 2;
- std::unique_ptr<ISharedExecutorPool> sharedPool(new TMockSharedExecutorPool(sharedConfig, 2, pools));
-
- for (auto& pool : mockPools) {
- harmonizer->AddPool(pool.get());
- }
- harmonizer->SetSharedPool(sharedPool.get());
-
- currentTs += Us2Ts(1'000'000);
- harmonizer->Harmonize(currentTs);
-
- currentTs += Us2Ts(60'000'000);
- mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2);
- mockPools[1]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 2);
- harmonizer->Harmonize(currentTs);
-
- auto stats0 = harmonizer->GetPoolStats(0);
- auto stats1 = harmonizer->GetPoolStats(1);
- auto sharedState = sharedPool->GetState();
- CHECK_CHANGING_HALF_THREADS(stats0, 1, 0, 0, 0, 0, 0);
- CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 1, 0, 0);
- CHECK_IS_NEEDY(stats0);
- CHECK_IS_HOGGISH(stats1);
- UNIT_ASSERT_VALUES_EQUAL_C(sharedState.BorrowedThreadByPool[0], 1, sharedState.ToString());
- UNIT_ASSERT_VALUES_EQUAL_C(sharedState.PoolByBorrowedThread[1], 0, sharedState.ToString());
-
- currentTs += Us2Ts(60'000'000);
- mockPools[0]->IncreaseThreadCpuConsumption({30'000'000.0, 60'000'000.0}, 0, 2);
- mockPools[1]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 2);
- harmonizer->Harmonize(currentTs);
-
- stats0 = harmonizer->GetPoolStats(0);
- stats1 = harmonizer->GetPoolStats(1);
- sharedState = sharedPool->GetState();
- UNIT_ASSERT_VALUES_EQUAL_C(sharedState.BorrowedThreadByPool[0], -1, sharedState.ToString());
- UNIT_ASSERT_VALUES_EQUAL_C(sharedState.PoolByBorrowedThread[1], -1, sharedState.ToString());
- CHECK_CHANGING_HALF_THREADS(stats0, 1, 1, 0, 0, 0, 0);
- CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 1, 1, 0);
- CHECK_IS_STARVED(stats0);
- CHECK_IS_HOGGISH(stats1);
-
- currentTs += Us2Ts(60'000'000);
- mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2);
- mockPools[1]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 2);
-
- harmonizer->Harmonize(currentTs);
-
- stats0 = harmonizer->GetPoolStats(0);
- stats1 = harmonizer->GetPoolStats(1);
-
- CHECK_CHANGING_HALF_THREADS(stats0, 2, 1, 0, 0, 0, 0);
- CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 2, 1, 0);
- CHECK_IS_NEEDY(stats0);
- CHECK_IS_HOGGISH(stats1);
- }
-
-}
diff --git a/ydb/library/actors/core/harmonizer/ut/ya.make b/ydb/library/actors/core/harmonizer/ut/ya.make
deleted file mode 100644
index 5838d1bc21..0000000000
--- a/ydb/library/actors/core/harmonizer/ut/ya.make
+++ /dev/null
@@ -1,22 +0,0 @@
-UNITTEST_FOR(ydb/library/actors/core/harmonizer)
-
-FORK_SUBTESTS()
-IF (SANITIZER_TYPE)
- SIZE(MEDIUM)
- TIMEOUT(600)
-ELSE()
- SIZE(SMALL)
- TIMEOUT(60)
-ENDIF()
-
-
-PEERDIR(
- ydb/library/actors/interconnect
- ydb/library/actors/testlib
-)
-
-SRCS(
- harmonizer_ut.cpp
-)
-
-END()
diff --git a/ydb/library/actors/core/harmonizer/waiting_stats.cpp b/ydb/library/actors/core/harmonizer/waiting_stats.cpp
deleted file mode 100644
index f855055034..0000000000
--- a/ydb/library/actors/core/harmonizer/waiting_stats.cpp
+++ /dev/null
@@ -1,48 +0,0 @@
-#include "waiting_stats.h"
-
-#include "pool.h"
-#include <ydb/library/actors/core/executor_pool_basic.h>
-#include <ydb/library/actors/core/probes.h>
-
-namespace NActors {
-
-LWTRACE_USING(ACTORLIB_PROVIDER);
-
-void TWaitingInfo::Pull(const std::vector<std::unique_ptr<TPoolInfo>> &pools) {
- WakingUpTotalTime = 0;
- WakingUpCount = 0;
- AwakingTotalTime = 0;
- AwakingCount = 0;
-
- for (size_t poolIdx = 0; poolIdx < pools.size(); ++poolIdx) {
- TPoolInfo& pool = *pools[poolIdx];
- if (pool.WaitingStats) {
- WakingUpTotalTime += pool.WaitingStats->WakingUpTotalTime;
- WakingUpCount += pool.WaitingStats->WakingUpCount;
- AwakingTotalTime += pool.WaitingStats->AwakingTotalTime;
- AwakingCount += pool.WaitingStats->AwakingCount;
- }
- }
-
- constexpr ui64 knownAvgWakingUpTime = TWaitingStatsConstants::KnownAvgWakingUpTime;
- constexpr ui64 knownAvgAwakeningUpTime = TWaitingStatsConstants::KnownAvgAwakeningTime;
-
- ui64 realAvgWakingUpTime = (WakingUpCount ? WakingUpTotalTime / WakingUpCount : knownAvgWakingUpTime);
- ui64 avgWakingUpTime = realAvgWakingUpTime;
- if (avgWakingUpTime > 2 * knownAvgWakingUpTime || !realAvgWakingUpTime) {
- avgWakingUpTime = knownAvgWakingUpTime;
- }
- AvgWakingUpTimeUs.store(Ts2Us(avgWakingUpTime), std::memory_order_relaxed);
-
- ui64 realAvgAwakeningTime = (AwakingCount ? AwakingTotalTime / AwakingCount : knownAvgAwakeningUpTime);
- ui64 avgAwakeningTime = realAvgAwakeningTime;
- if (avgAwakeningTime > 2 * knownAvgAwakeningUpTime || !realAvgAwakeningTime) {
- avgAwakeningTime = knownAvgAwakeningUpTime;
- }
- AvgAwakeningTimeUs.store(Ts2Us(avgAwakeningTime), std::memory_order_relaxed);
-
- ui64 avgWakingUpConsumption = avgWakingUpTime + avgAwakeningTime;
- LWPROBE(WakingUpConsumption, Ts2Us(avgWakingUpTime), Ts2Us(avgWakingUpTime), Ts2Us(avgAwakeningTime), Ts2Us(realAvgAwakeningTime), Ts2Us(avgWakingUpConsumption));
-}
-
-} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/waiting_stats.h b/ydb/library/actors/core/harmonizer/waiting_stats.h
deleted file mode 100644
index 76f661bb37..0000000000
--- a/ydb/library/actors/core/harmonizer/waiting_stats.h
+++ /dev/null
@@ -1,21 +0,0 @@
-#pragma once
-
-#include "defs.h"
-
-namespace NActors {
-
-struct TPoolInfo;
-
-struct TWaitingInfo {
- ui64 WakingUpTotalTime = 0;
- ui64 WakingUpCount = 0;
- ui64 AwakingTotalTime = 0;
- ui64 AwakingCount = 0;
- std::atomic<float> AvgWakingUpTimeUs = 0;
- std::atomic<float> AvgAwakeningTimeUs = 0;
-
- void Pull(const std::vector<std::unique_ptr<TPoolInfo>> &pools);
-
-}; // struct TWaitingInfo
-
-} // namespace NActors
diff --git a/ydb/library/actors/core/harmonizer/ya.make b/ydb/library/actors/core/harmonizer/ya.make
deleted file mode 100644
index 9797c76c4a..0000000000
--- a/ydb/library/actors/core/harmonizer/ya.make
+++ /dev/null
@@ -1,44 +0,0 @@
-LIBRARY()
-
-NO_WSHADOW()
-
-IF (PROFILE_MEMORY_ALLOCATIONS)
- CFLAGS(-DPROFILE_MEMORY_ALLOCATIONS)
-ENDIF()
-
-IF (ALLOCATOR == "B" OR ALLOCATOR == "BS" OR ALLOCATOR == "C")
- CXXFLAGS(-DBALLOC)
- PEERDIR(
- library/cpp/balloc/optional
- )
-ENDIF()
-
-SRCS(
- cpu_consumption.cpp
- pool.cpp
- shared_info.cpp
- waiting_stats.cpp
- harmonizer.cpp
-)
-
-PEERDIR(
- ydb/library/actors/util
- ydb/library/actors/protos
- ydb/library/services
- library/cpp/logger
- library/cpp/lwtrace
- library/cpp/monlib/dynamic_counters
- library/cpp/time_provider
-)
-
-IF (SANITIZER_TYPE == "thread")
- SUPPRESSIONS(
- tsan.supp
- )
-ENDIF()
-
-END()
-
-RECURSE_FOR_TESTS(
- ut
-)
diff --git a/ydb/library/actors/core/mon_stats.h b/ydb/library/actors/core/mon_stats.h
index 04a6982457..4bceffa8f3 100644
--- a/ydb/library/actors/core/mon_stats.h
+++ b/ydb/library/actors/core/mon_stats.h
@@ -63,10 +63,10 @@ namespace NActors {
ui64 DecreasingThreadsByStarvedState = 0;
ui64 DecreasingThreadsByHoggishState = 0;
ui64 DecreasingThreadsByExchange = 0;
- i64 MaxCpuUs = 0;
- i64 MinCpuUs = 0;
- i64 MaxElapsedUs = 0;
- i64 MinElapsedUs = 0;
+ i64 MaxConsumedCpuUs = 0;
+ i64 MinConsumedCpuUs = 0;
+ i64 MaxBookedCpuUs = 0;
+ i64 MinBookedCpuUs = 0;
double SpinningTimeUs = 0;
double SpinThresholdUs = 0;
i16 WrongWakenedThreadCount = 0;
diff --git a/ydb/library/actors/core/probes.h b/ydb/library/actors/core/probes.h
index ebf34299e2..f9394f3273 100644
--- a/ydb/library/actors/core/probes.h
+++ b/ydb/library/actors/core/probes.h
@@ -174,11 +174,11 @@
NAMES("poolId", "pool", "threacCount", "minThreadCount", "maxThreadCount", "defaultThreadCount")) \
PROBE(HarmonizeCheckPool, GROUPS("Harmonizer"), \
TYPES(ui32, TString, double, double, double, double, ui32, ui32, bool, bool, bool), \
- NAMES("poolId", "pool", "elapsed", "cpu", "lastSecondElapsed", "lastSecondCpu", "threadCount", "maxThreadCount", \
+ NAMES("poolId", "pool", "booked", "consumed", "lastSecondBooked", "lastSecondConsumed", "threadCount", "maxThreadCount", \
"isStarved", "isNeedy", "isHoggish")) \
PROBE(HarmonizeCheckPoolByThread, GROUPS("Harmonizer"), \
TYPES(ui32, TString, i16, double, double, double, double), \
- NAMES("poolId", "pool", "threadIdx", "elapsed", "cpu", "lastSecondElapsed", "lastSecondCpu")) \
+ NAMES("poolId", "pool", "threadIdx", "booked", "consumed", "lastSecondBooked", "lastSecondConsumed")) \
PROBE(WakingUpConsumption, GROUPS("Harmonizer"), \
TYPES(double, double, double, double, double), \
NAMES("avgWakingUpUs", "realAvgWakingUpUs", "avgAwakeningUs", "realAvgAwakeningUs", "total")) \
diff --git a/ydb/library/actors/core/ya.make b/ydb/library/actors/core/ya.make
index ca44ab7707..fbeb0e2b0d 100644
--- a/ydb/library/actors/core/ya.make
+++ b/ydb/library/actors/core/ya.make
@@ -56,6 +56,8 @@ SRCS(
executor_pool_shared.h
executor_thread.cpp
executor_thread.h
+ harmonizer.cpp
+ harmonizer.h
hfunc.h
interconnect.cpp
interconnect.h
@@ -104,7 +106,6 @@ GENERATE_ENUM_SERIALIZATION(log_iface.h)
PEERDIR(
ydb/library/actors/actor_type
- ydb/library/actors/core/harmonizer
ydb/library/actors/memory_log
ydb/library/actors/prof
ydb/library/actors/protos
@@ -128,10 +129,6 @@ ENDIF()
END()
-RECURSE(
- harmonizer
-)
-
RECURSE_FOR_TESTS(
ut
ut_fat
diff --git a/ydb/library/actors/helpers/pool_stats_collector.h b/ydb/library/actors/helpers/pool_stats_collector.h
index f1852cb485..a0790a55c8 100644
--- a/ydb/library/actors/helpers/pool_stats_collector.h
+++ b/ydb/library/actors/helpers/pool_stats_collector.h
@@ -189,10 +189,10 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState;
NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByExchange;
NMonitoring::TDynamicCounters::TCounterPtr NotEnoughCpuExecutions;
- NMonitoring::TDynamicCounters::TCounterPtr MaxCpuUs;
- NMonitoring::TDynamicCounters::TCounterPtr MinCpuUs;
- NMonitoring::TDynamicCounters::TCounterPtr MaxElapsedUs;
- NMonitoring::TDynamicCounters::TCounterPtr MinElapsedUs;
+ NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu;
+ NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu;
+ NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu;
+ NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu;
NMonitoring::TDynamicCounters::TCounterPtr SpinningTimeUs;
NMonitoring::TDynamicCounters::TCounterPtr SpinThresholdUs;
@@ -266,10 +266,10 @@ private:
DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true);
DecreasingThreadsByExchange = PoolGroup->GetCounter("DecreasingThreadsByExchange", true);
NotEnoughCpuExecutions = PoolGroup->GetCounter("NotEnoughCpuExecutions", true);
- MaxCpuUs = PoolGroup->GetCounter("MaxCpuUs", false);
- MinCpuUs = PoolGroup->GetCounter("MinCpuUs", false);
- MaxElapsedUs = PoolGroup->GetCounter("MaxElapsedUs", false);
- MinElapsedUs = PoolGroup->GetCounter("MinElapsedUs", false);
+ MaxConsumedCpu = PoolGroup->GetCounter("MaxConsumedCpuByPool", false);
+ MinConsumedCpu = PoolGroup->GetCounter("MinConsumedCpuByPool", false);
+ MaxBookedCpu = PoolGroup->GetCounter("MaxBookedCpuByPool", false);
+ MinBookedCpu = PoolGroup->GetCounter("MinBookedCpuByPool", false);
SpinningTimeUs = PoolGroup->GetCounter("SpinningTimeUs", true);
SpinThresholdUs = PoolGroup->GetCounter("SpinThresholdUs", false);
@@ -384,10 +384,10 @@ private:
struct TActorSystemCounters {
TIntrusivePtr<NMonitoring::TDynamicCounters> Group;
- NMonitoring::TDynamicCounters::TCounterPtr MaxCpuUs;
- NMonitoring::TDynamicCounters::TCounterPtr MinCpuUs;
- NMonitoring::TDynamicCounters::TCounterPtr MaxElapsedUs;
- NMonitoring::TDynamicCounters::TCounterPtr MinElapsedUs;
+ NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu;
+ NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu;
+ NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu;
+ NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu;
NMonitoring::TDynamicCounters::TCounterPtr AvgAwakeningTimeUs;
NMonitoring::TDynamicCounters::TCounterPtr AvgWakingUpTimeUs;
@@ -396,20 +396,20 @@ private:
void Init(NMonitoring::TDynamicCounters* group) {
Group = group;
- MaxCpuUs = Group->GetCounter("MaxCpuUs", false);
- MinCpuUs = Group->GetCounter("MinCpuUs", false);
- MaxElapsedUs = Group->GetCounter("MaxElapsedUs", false);
- MinElapsedUs = Group->GetCounter("MinElapsedUs", false);
+ MaxConsumedCpu = Group->GetCounter("MaxConsumedCpu", false);
+ MinConsumedCpu = Group->GetCounter("MinConsumedCpu", false);
+ MaxBookedCpu = Group->GetCounter("MaxBookedCpu", false);
+ MinBookedCpu = Group->GetCounter("MinBookedCpu", false);
AvgAwakeningTimeUs = Group->GetCounter("AvgAwakeningTimeUs", false);
AvgWakingUpTimeUs = Group->GetCounter("AvgWakingUpTimeUs", false);
}
void Set(const THarmonizerStats& harmonizerStats) {
#ifdef ACTORSLIB_COLLECT_EXEC_STATS
- *MaxCpuUs = harmonizerStats.MaxCpuUs;
- *MinCpuUs = harmonizerStats.MinCpuUs;
- *MaxElapsedUs = harmonizerStats.MaxElapsedUs;
- *MinElapsedUs = harmonizerStats.MinElapsedUs;
+ *MaxConsumedCpu = harmonizerStats.MaxConsumedCpu;
+ *MinConsumedCpu = harmonizerStats.MinConsumedCpu;
+ *MaxBookedCpu = harmonizerStats.MaxBookedCpu;
+ *MinBookedCpu = harmonizerStats.MinBookedCpu;
*AvgAwakeningTimeUs = harmonizerStats.AvgAwakeningTimeUs;
*AvgWakingUpTimeUs = harmonizerStats.AvgWakingUpTimeUs;