diff options
| author | kruall <[email protected]> | 2024-12-09 22:53:49 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-12-09 22:53:49 +0300 |
| commit | f9e79db6a8e464ea6f5675a277f0643d2a0f9098 (patch) | |
| tree | 481640e3d55257ecf6671c12887c4fd2815b45d1 | |
| parent | a15d8708109b977cfb16c47d1e53f35266ca9651 (diff) | |
Fix miscalculated budget in THarmonizer (#12422)
33 files changed, 2644 insertions, 1014 deletions
diff --git a/ydb/library/actors/core/cpu_manager.cpp b/ydb/library/actors/core/cpu_manager.cpp index 1c286d9bd79..047388ce906 100644 --- a/ydb/library/actors/core/cpu_manager.cpp +++ b/ydb/library/actors/core/cpu_manager.cpp @@ -37,8 +37,8 @@ namespace NActors { poolsWithSharedThreads.push_back(cfg.PoolId); } } - Shared.reset(new TSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads)); - auto sharedPool = static_cast<TSharedExecutorPool*>(Shared.get()); + Shared.reset(CreateSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads)); + auto sharedPool = static_cast<ISharedExecutorPool*>(Shared.get()); ui64 ts = GetCycleCountFast(); Harmonizer.reset(MakeHarmonizer(ts)); @@ -135,11 +135,9 @@ namespace NActors { for (TBasicExecutorPoolConfig& cfg : Config.Basic) { if (cfg.PoolId == poolId) { if (cfg.HasSharedThread) { - auto *sharedPool = static_cast<TSharedExecutorPool*>(Shared.get()); + auto *sharedPool = Shared.get(); auto *pool = new TBasicExecutorPool(cfg, Harmonizer.get(), Jail.get()); - if (pool) { - pool->AddSharedThread(sharedPool->GetSharedThread(poolId)); - } + pool->AddSharedThread(sharedPool->GetSharedThread(poolId)); return pool; } else { return new TBasicExecutorPool(cfg, Harmonizer.get(), Jail.get()); diff --git a/ydb/library/actors/core/cpu_manager.h b/ydb/library/actors/core/cpu_manager.h index 1fee9467987..0bd0994f4b6 100644 --- a/ydb/library/actors/core/cpu_manager.h +++ b/ydb/library/actors/core/cpu_manager.h @@ -2,10 +2,10 @@ #include "config.h" #include "executor_pool_jail.h" -#include "harmonizer.h" #include "executor_pool.h" #include "executor_pool_shared.h" #include "mon_stats.h" +#include <ydb/library/actors/core/harmonizer/harmonizer.h> #include <memory> namespace NActors { @@ -16,7 +16,7 @@ namespace NActors { const ui32 ExecutorPoolCount; TArrayHolder<TAutoPtr<IExecutorPool>> Executors; std::unique_ptr<IHarmonizer> Harmonizer; - std::unique_ptr<TSharedExecutorPool> Shared; + std::unique_ptr<ISharedExecutorPool> Shared; std::unique_ptr<TExecutorPoolJail> Jail; TCpuManagerConfig Config; diff --git a/ydb/library/actors/core/executor_pool.h b/ydb/library/actors/core/executor_pool.h index 67297b428ba..8fd79113391 100644 --- a/ydb/library/actors/core/executor_pool.h +++ b/ydb/library/actors/core/executor_pool.h @@ -14,15 +14,16 @@ namespace NActors { struct TExecutorThreadStats; class TExecutorPoolJail; class ISchedulerCookie; + struct TSharedExecutorThreadCtx; struct TCpuConsumption { - double ConsumedUs = 0; - double BookedUs = 0; + double CpuUs = 0; + double ElapsedUs = 0; ui64 NotEnoughCpuExecutions = 0; void Add(const TCpuConsumption& other) { - ConsumedUs += other.ConsumedUs; - BookedUs += other.BookedUs; + CpuUs += other.CpuUs; + ElapsedUs += other.ElapsedUs; NotEnoughCpuExecutions += other.NotEnoughCpuExecutions; } }; @@ -176,6 +177,16 @@ namespace NActors { return 1; } + virtual TSharedExecutorThreadCtx* ReleaseSharedThread() { + return nullptr; + } + virtual void AddSharedThread(TSharedExecutorThreadCtx*) { + } + + virtual bool IsSharedThreadEnabled() const { + return false; + } + virtual TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) { Y_UNUSED(threadIdx); return TCpuConsumption{0.0, 0.0}; diff --git a/ydb/library/actors/core/executor_pool_base.cpp b/ydb/library/actors/core/executor_pool_base.cpp index fe41e946233..e8e504abe20 100644 --- a/ydb/library/actors/core/executor_pool_base.cpp +++ b/ydb/library/actors/core/executor_pool_base.cpp @@ -1,4 +1,5 @@ #include "actorsystem.h" +#include "activity_guard.h" #include "actor.h" #include "executor_pool_base.h" #include "executor_pool_basic_feature_flags.h" diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index bfb72f02b36..fce778ef3be 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -407,10 +407,6 @@ namespace NActors { poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState; poolStats.DecreasingThreadsByExchange = stats.DecreasingThreadsByExchange; poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount; - poolStats.MaxConsumedCpuUs = stats.MaxConsumedCpu; - poolStats.MinConsumedCpuUs = stats.MinConsumedCpu; - poolStats.MaxBookedCpuUs = stats.MaxBookedCpu; - poolStats.MinBookedCpuUs = stats.MinBookedCpu; } statsCopy.resize(MaxFullThreadCount + 1); @@ -429,7 +425,7 @@ namespace NActors { void TBasicExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const { if (Harmonizer) { TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId); - poolState.UsedCpu = stats.AvgConsumedCpu; + poolState.ElapsedCpu = stats.AvgElapsedCpu; poolState.PossibleMaxLimit = stats.PotentialMaxThreadCount; } else { poolState.PossibleMaxLimit = poolState.MaxLimit; @@ -625,7 +621,7 @@ namespace NActors { TExecutorThreadCtx& threadCtx = Threads[threadIdx]; TExecutorThreadStats stats; threadCtx.Thread->GetCurrentStatsForHarmonizer(stats); - return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs), stats.NotEnoughCpuExecutions}; + return {static_cast<double>(stats.CpuUs), Ts2Us(stats.SafeElapsedTicks), stats.NotEnoughCpuExecutions}; } i16 TBasicExecutorPool::GetBlockingThreadCount() const { diff --git a/ydb/library/actors/core/executor_pool_basic.h b/ydb/library/actors/core/executor_pool_basic.h index c60e2ab8334..76909997519 100644 --- a/ydb/library/actors/core/executor_pool_basic.h +++ b/ydb/library/actors/core/executor_pool_basic.h @@ -7,8 +7,8 @@ #include "executor_pool_basic_feature_flags.h" #include "scheduler_queue.h" #include "executor_pool_base.h" -#include "harmonizer.h" #include <memory> +#include <ydb/library/actors/core/harmonizer/harmonizer.h> #include <ydb/library/actors/actor_type/indexes.h> #include <ydb/library/actors/util/unordered_cache.h> #include <ydb/library/actors/util/threadparkpad.h> @@ -278,8 +278,11 @@ namespace NActors { void CalcSpinPerThread(ui64 wakingUpConsumption); void ClearWaitingStats() const; - TSharedExecutorThreadCtx* ReleaseSharedThread(); - void AddSharedThread(TSharedExecutorThreadCtx* thread); + TSharedExecutorThreadCtx* ReleaseSharedThread() override; + void AddSharedThread(TSharedExecutorThreadCtx* thread) override; + bool IsSharedThreadEnabled() const override { + return true; + } private: void AskToGoToSleep(bool *needToWait, bool *needToBlock); diff --git a/ydb/library/actors/core/executor_pool_io.cpp b/ydb/library/actors/core/executor_pool_io.cpp index 310a6f83b35..c6555f2c488 100644 --- a/ydb/library/actors/core/executor_pool_io.cpp +++ b/ydb/library/actors/core/executor_pool_io.cpp @@ -156,7 +156,7 @@ namespace NActors { void TIOExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const { if (Harmonizer) { TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId); - poolState.UsedCpu = stats.AvgConsumedCpu; + poolState.ElapsedCpu = stats.AvgElapsedCpu; } poolState.CurrentLimit = PoolThreads; poolState.MaxLimit = PoolThreads; diff --git a/ydb/library/actors/core/executor_pool_io.h b/ydb/library/actors/core/executor_pool_io.h index 316b21d4d2f..16b17127d4c 100644 --- a/ydb/library/actors/core/executor_pool_io.h +++ b/ydb/library/actors/core/executor_pool_io.h @@ -3,9 +3,9 @@ #include "actorsystem.h" #include "executor_thread.h" #include "executor_thread_ctx.h" -#include "harmonizer.h" #include "scheduler_queue.h" #include "executor_pool_base.h" +#include <ydb/library/actors/core/harmonizer/harmonizer.h> #include <ydb/library/actors/actor_type/indexes.h> #include <ydb/library/actors/util/ticket_lock.h> #include <ydb/library/actors/util/unordered_cache.h> diff --git a/ydb/library/actors/core/executor_pool_shared.cpp b/ydb/library/actors/core/executor_pool_shared.cpp index 104e02812cc..7d1327f8454 100644 --- a/ydb/library/actors/core/executor_pool_shared.cpp +++ b/ydb/library/actors/core/executor_pool_shared.cpp @@ -12,6 +12,50 @@ namespace NActors { +class TSharedExecutorPool: public ISharedExecutorPool { +public: + TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads); + + // IThreadPool + void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override; + void Start() override; + void PrepareStop() override; + void Shutdown() override; + bool Cleanup() override; + + TSharedExecutorThreadCtx *GetSharedThread(i16 poolId) override; + void GetSharedStats(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override; + void GetSharedStatsForHarmonizer(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override; + TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx) override; + std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId) override; + + i16 ReturnOwnHalfThread(i16 pool) override; + i16 ReturnBorrowedHalfThread(i16 pool) override; + void GiveHalfThread(i16 from, i16 to) override; + + i16 GetSharedThreadCount() const override; + + TSharedPoolState GetState() const override; + + void Init(const std::vector<IExecutorPool*>& pools, bool withThreads) override; + +private: + TSharedPoolState State; + + std::vector<IExecutorPool*> Pools; + + i16 PoolCount; + i16 SharedThreadCount; + std::unique_ptr<TSharedExecutorThreadCtx[]> Threads; + + std::unique_ptr<NSchedulerQueue::TReader[]> ScheduleReaders; + std::unique_ptr<NSchedulerQueue::TWriter[]> ScheduleWriters; + + TDuration TimePerMailbox; + ui32 EventsPerMailbox; + ui64 SoftProcessingDurationTs; +}; // class TSharedExecutorPool + TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads) : State(poolCount, poolsWithThreads.size()) , Pools(poolCount) @@ -29,40 +73,47 @@ TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config } } -void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) { - // ActorSystem = actorSystem; - - ScheduleReaders.reset(new NSchedulerQueue::TReader[SharedThreadCount]); - ScheduleWriters.reset(new NSchedulerQueue::TWriter[SharedThreadCount]); - - std::vector<IExecutorPool*> poolsBasic = actorSystem->GetBasicExecutorPools(); - std::vector<IExecutorPool*> poolByThread(SharedThreadCount); - for (IExecutorPool* pool : poolsBasic) { - Pools[pool->PoolId] = dynamic_cast<TBasicExecutorPool*>(pool); - i16 threadIdx = State.ThreadByPool[pool->PoolId]; - if (threadIdx >= 0) { - poolByThread[threadIdx] = pool; - } +void TSharedExecutorPool::Init(const std::vector<IExecutorPool*>& pools, bool withThreads) { + std::vector<IExecutorPool*> poolByThread(SharedThreadCount); + for (IExecutorPool* pool : pools) { + Pools[pool->PoolId] = pool; + i16 threadIdx = State.ThreadByPool[pool->PoolId]; + if (threadIdx >= 0) { + poolByThread[threadIdx] = pool; } + } - for (i16 i = 0; i != SharedThreadCount; ++i) { - // !TODO - Threads[i].ExecutorPools[0].store(dynamic_cast<TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release); + for (i16 i = 0; i != SharedThreadCount; ++i) { + // !TODO + Threads[i].ExecutorPools[0].store(dynamic_cast<TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release); + if (withThreads) { Threads[i].Thread.reset( new TSharedExecutorThread( -1, - actorSystem, - &Threads[i], - PoolCount, - "SharedThread", - SoftProcessingDurationTs, - TimePerMailbox, + nullptr, + &Threads[i], + PoolCount, + "SharedThread", + SoftProcessingDurationTs, + TimePerMailbox, EventsPerMailbox)); - ScheduleWriters[i].Init(ScheduleReaders[i]); } + } +} + +void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) { + ScheduleReaders.reset(new NSchedulerQueue::TReader[SharedThreadCount]); + ScheduleWriters.reset(new NSchedulerQueue::TWriter[SharedThreadCount]); + + std::vector<IExecutorPool*> poolsBasic = actorSystem->GetBasicExecutorPools(); + Init(poolsBasic, true); - *scheduleReaders = ScheduleReaders.get(); - *scheduleSz = SharedThreadCount; + for (i16 i = 0; i != SharedThreadCount; ++i) { + ScheduleWriters[i].Init(ScheduleReaders[i]); + } + + *scheduleReaders = ScheduleReaders.get(); + *scheduleSz = SharedThreadCount; } void TSharedExecutorPool::Start() { @@ -99,24 +150,27 @@ TSharedExecutorThreadCtx* TSharedExecutorPool::GetSharedThread(i16 pool) { return &Threads[threadIdx]; } -void TSharedExecutorPool::ReturnOwnHalfThread(i16 pool) { +i16 TSharedExecutorPool::ReturnOwnHalfThread(i16 pool) { i16 threadIdx = State.ThreadByPool[pool]; - TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel); + IExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel); Y_ABORT_UNLESS(borrowingPool); - State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1; + i16 borrowedPool = State.PoolByBorrowedThread[threadIdx]; + State.BorrowedThreadByPool[borrowedPool] = -1; State.PoolByBorrowedThread[threadIdx] = -1; // TODO(kruall): Check on race borrowingPool->ReleaseSharedThread(); + return borrowedPool; } -void TSharedExecutorPool::ReturnBorrowedHalfThread(i16 pool) { +i16 TSharedExecutorPool::ReturnBorrowedHalfThread(i16 pool) { i16 threadIdx = State.BorrowedThreadByPool[pool]; - TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel); + IExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel); Y_ABORT_UNLESS(borrowingPool); State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1; State.PoolByBorrowedThread[threadIdx] = -1; // TODO(kruall): Check on race borrowingPool->ReleaseSharedThread(); + return State.PoolByThread[threadIdx]; } void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) { @@ -127,14 +181,14 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) { if (borrowedThreadIdx != -1) { i16 originalPool = State.PoolByThread[borrowedThreadIdx]; if (originalPool == to) { - return ReturnOwnHalfThread(to); + ReturnOwnHalfThread(to); } else { ReturnOwnHalfThread(originalPool); } from = originalPool; } i16 threadIdx = State.ThreadByPool[from]; - TBasicExecutorPool* borrowingPool = Pools[to]; + IExecutorPool* borrowingPool = Pools[to]; Threads[threadIdx].ExecutorPools[1].store(borrowingPool, std::memory_order_release); State.BorrowedThreadByPool[to] = threadIdx; State.PoolByBorrowedThread[threadIdx] = to; @@ -143,16 +197,16 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) { } void TSharedExecutorPool::GetSharedStats(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) { - statsCopy.resize(SharedThreadCount + 1); + statsCopy.resize(SharedThreadCount); for (i16 i = 0; i < SharedThreadCount; ++i) { - Threads[i].Thread->GetSharedStats(poolId, statsCopy[i + 1]); + Threads[i].Thread->GetSharedStats(poolId, statsCopy[i]); } } void TSharedExecutorPool::GetSharedStatsForHarmonizer(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) { - statsCopy.resize(SharedThreadCount + 1); + statsCopy.resize(SharedThreadCount); for (i16 i = 0; i < SharedThreadCount; ++i) { - Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i + 1]); + Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i]); } } @@ -181,4 +235,34 @@ TSharedPoolState TSharedExecutorPool::GetState() const { return State; } +ISharedExecutorPool *CreateSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads) { + return new TSharedExecutorPool(config, poolCount, poolsWithThreads); +} + +TString TSharedPoolState::ToString() const { + TStringBuilder builder; + builder << '{'; + builder << "ThreadByPool: ["; + for (ui32 i = 0; i < ThreadByPool.size(); ++i) { + builder << ThreadByPool[i] << (i == ThreadByPool.size() - 1 ? "" : ", "); + } + builder << "], "; + builder << "PoolByThread: ["; + for (ui32 i = 0; i < PoolByThread.size(); ++i) { + builder << PoolByThread[i] << (i == PoolByThread.size() - 1 ? "" : ", "); + } + builder << "], "; + builder << "BorrowedThreadByPool: ["; + for (ui32 i = 0; i < BorrowedThreadByPool.size(); ++i) { + builder << BorrowedThreadByPool[i] << (i == BorrowedThreadByPool.size() - 1 ? "" : ", "); + } + builder << "], "; + builder << "PoolByBorrowedThread: ["; + for (ui32 i = 0; i < PoolByBorrowedThread.size(); ++i) { + builder << PoolByBorrowedThread[i] << (i == PoolByBorrowedThread.size() - 1 ? "" : ", "); + } + builder << ']'; + return builder << '}'; +} + } diff --git a/ydb/library/actors/core/executor_pool_shared.h b/ydb/library/actors/core/executor_pool_shared.h index c083c21654a..f6c79c07a58 100644 --- a/ydb/library/actors/core/executor_pool_shared.h +++ b/ydb/library/actors/core/executor_pool_shared.h @@ -1,14 +1,12 @@ #pragma once #include "executor_pool.h" -#include "executor_thread_ctx.h" namespace NActors { - struct TExecutorThreadCtx; struct TSharedExecutorPoolConfig; - class TBasicExecutorPool; + struct TSharedExecutorThreadCtx; struct TSharedPoolState { std::vector<i16> ThreadByPool; @@ -22,48 +20,30 @@ namespace NActors { , BorrowedThreadByPool(poolCount, -1) , PoolByBorrowedThread(threadCount, -1) {} + + TString ToString() const; }; - class TSharedExecutorPool: public IActorThreadPool { + class ISharedExecutorPool : public IActorThreadPool { public: - TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads); - - // IThreadPool - void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override; - void Start() override; - void PrepareStop() override; - void Shutdown() override; - bool Cleanup() override; - - TSharedExecutorThreadCtx *GetSharedThread(i16 poolId); - void GetSharedStats(i16 pool, std::vector<TExecutorThreadStats>& statsCopy); - void GetSharedStatsForHarmonizer(i16 pool, std::vector<TExecutorThreadStats>& statsCopy); - TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx); - std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId); - - void ReturnOwnHalfThread(i16 pool); - void ReturnBorrowedHalfThread(i16 pool); - void GiveHalfThread(i16 from, i16 to); + virtual ~ISharedExecutorPool() = default; - i16 GetSharedThreadCount() const; + virtual TSharedExecutorThreadCtx *GetSharedThread(i16 poolId) = 0; + virtual void GetSharedStats(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) = 0; + virtual void GetSharedStatsForHarmonizer(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) = 0; + virtual TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx) = 0; + virtual std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId) = 0; + virtual void Init(const std::vector<IExecutorPool*>& pools, bool withThreads) = 0; - TSharedPoolState GetState() const; + virtual i16 ReturnOwnHalfThread(i16 pool) = 0; + virtual i16 ReturnBorrowedHalfThread(i16 pool) = 0; + virtual void GiveHalfThread(i16 from, i16 to) = 0; - private: - TSharedPoolState State; - - std::vector<TBasicExecutorPool*> Pools; + virtual i16 GetSharedThreadCount() const = 0; - i16 PoolCount; - i16 SharedThreadCount; - std::unique_ptr<TSharedExecutorThreadCtx[]> Threads; - - std::unique_ptr<NSchedulerQueue::TReader[]> ScheduleReaders; - std::unique_ptr<NSchedulerQueue::TWriter[]> ScheduleWriters; - - TDuration TimePerMailbox; - ui32 EventsPerMailbox; - ui64 SoftProcessingDurationTs; + virtual TSharedPoolState GetState() const = 0; }; + ISharedExecutorPool *CreateSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads); + }
\ No newline at end of file diff --git a/ydb/library/actors/core/executor_thread_ctx.h b/ydb/library/actors/core/executor_thread_ctx.h index 6a5bfaa34be..c7d1caa5ed6 100644 --- a/ydb/library/actors/core/executor_thread_ctx.h +++ b/ydb/library/actors/core/executor_thread_ctx.h @@ -11,8 +11,7 @@ namespace NActors { class TGenericExecutorThread; - class TBasicExecutorPool; - class TIOExecutorPool; + class IExecutorPool; enum class EThreadState : ui64 { None, @@ -123,7 +122,7 @@ namespace NActors { struct TExecutorThreadCtx : public TGenericExecutorThreadCtx { using TBase = TGenericExecutorThreadCtx; - TBasicExecutorPool *OwnerExecutorPool = nullptr; + IExecutorPool *OwnerExecutorPool = nullptr; void SetWork() { ExchangeState(EThreadState::Work); @@ -186,7 +185,7 @@ namespace NActors { } }; - std::atomic<TBasicExecutorPool*> ExecutorPools[MaxPoolsForSharedThreads]; + std::atomic<IExecutorPool*> ExecutorPools[MaxPoolsForSharedThreads]; std::atomic<i64> RequestsForWakeUp = 0; ui32 NextPool = 0; diff --git a/ydb/library/actors/core/harmonizer.cpp b/ydb/library/actors/core/harmonizer.cpp deleted file mode 100644 index df651642169..00000000000 --- a/ydb/library/actors/core/harmonizer.cpp +++ /dev/null @@ -1,861 +0,0 @@ -#include "harmonizer.h" - -#include "executor_thread_ctx.h" -#include "executor_thread.h" -#include "probes.h" - -#include "activity_guard.h" -#include "actorsystem.h" -#include "executor_pool_basic.h" -#include "executor_pool_basic_feature_flags.h" -#include "executor_pool_shared.h" - -#include <atomic> -#include <ydb/library/actors/util/cpu_load_log.h> -#include <ydb/library/actors/util/datetime.h> -#include <ydb/library/actors/util/intrinsics.h> - -#include <util/system/spinlock.h> - -#include <algorithm> - -namespace NActors { - -LWTRACE_USING(ACTORLIB_PROVIDER); - -constexpr bool CheckBinaryPower(ui64 value) { - return !(value & (value - 1)); -} - -template <ui8 HistoryBufferSize = 8> -struct TValueHistory { - static_assert(CheckBinaryPower(HistoryBufferSize)); - - double History[HistoryBufferSize] = {0.0}; - ui64 HistoryIdx = 0; - ui64 LastTs = Max<ui64>(); - double LastUs = 0.0; - double AccumulatedUs = 0.0; - ui64 AccumulatedTs = 0; - - template <bool WithTail=false> - double Accumulate(auto op, auto comb, ui8 seconds) { - double acc = AccumulatedUs; - size_t idx = HistoryIdx; - ui8 leftSeconds = seconds; - if constexpr (!WithTail) { - idx--; - leftSeconds--; - if (idx >= HistoryBufferSize) { - idx = HistoryBufferSize - 1; - } - acc = History[idx]; - } - do { - idx--; - leftSeconds--; - if (idx >= HistoryBufferSize) { - idx = HistoryBufferSize - 1; - } - if constexpr (WithTail) { - acc = op(acc, History[idx]); - } else if (leftSeconds) { - acc = op(acc, History[idx]); - } else { - ui64 tsInSecond = Us2Ts(1'000'000.0); - acc = op(acc, History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond); - } - } while (leftSeconds); - double duration = 1'000'000.0 * seconds; - if constexpr (WithTail) { - duration += Ts2Us(AccumulatedTs); - } - return comb(acc, duration); - } - - template <bool WithTail=false> - double GetAvgPartForLastSeconds(ui8 seconds) { - auto sum = [](double acc, double value) { - return acc + value; - }; - auto avg = [](double sum, double duration) { - return sum / duration; - }; - return Accumulate<WithTail>(sum, avg, seconds); - } - - double GetAvgPart() { - return GetAvgPartForLastSeconds<true>(HistoryBufferSize); - } - - double GetMaxForLastSeconds(ui8 seconds) { - auto max = [](const double& acc, const double& value) { - return Max(acc, value); - }; - auto fst = [](const double& value, const double&) { return value; }; - return Accumulate<false>(max, fst, seconds); - } - - double GetMax() { - return GetMaxForLastSeconds(HistoryBufferSize); - } - - i64 GetMaxInt() { - return static_cast<i64>(GetMax()); - } - - double GetMinForLastSeconds(ui8 seconds) { - auto min = [](const double& acc, const double& value) { - return Min(acc, value); - }; - auto fst = [](const double& value, const double&) { return value; }; - return Accumulate<false>(min, fst, seconds); - } - - double GetMin() { - return GetMinForLastSeconds(HistoryBufferSize); - } - - i64 GetMinInt() { - return static_cast<i64>(GetMin()); - } - - void Register(ui64 ts, double valueUs) { - if (ts < LastTs) { - LastTs = ts; - LastUs = valueUs; - AccumulatedUs = 0.0; - AccumulatedTs = 0; - return; - } - ui64 lastTs = std::exchange(LastTs, ts); - ui64 dTs = ts - lastTs; - double lastUs = std::exchange(LastUs, valueUs); - double dUs = valueUs - lastUs; - LWPROBE(RegisterValue, ts, lastTs, dTs, Us2Ts(8'000'000.0), valueUs, lastUs, dUs); - - if (dTs > Us2Ts(8'000'000.0)) { - dUs = dUs * 1'000'000.0 / Ts2Us(dTs); - for (size_t idx = 0; idx < HistoryBufferSize; ++idx) { - History[idx] = dUs; - } - AccumulatedUs = 0.0; - AccumulatedTs = 0; - return; - } - - while (dTs > 0) { - if (AccumulatedTs + dTs < Us2Ts(1'000'000.0)) { - AccumulatedTs += dTs; - AccumulatedUs += dUs; - break; - } else { - ui64 addTs = Us2Ts(1'000'000.0) - AccumulatedTs; - double addUs = dUs * addTs / dTs; - dTs -= addTs; - dUs -= addUs; - History[HistoryIdx] = AccumulatedUs + addUs; - HistoryIdx = (HistoryIdx + 1) % HistoryBufferSize; - AccumulatedUs = 0.0; - AccumulatedTs = 0; - } - } - } -}; - -struct TThreadInfo { - TValueHistory<8> Consumed; - TValueHistory<8> Booked; -}; - -struct TPoolInfo { - std::vector<TThreadInfo> ThreadInfo; - std::vector<TThreadInfo> SharedInfo; - TSharedExecutorPool* Shared = nullptr; - IExecutorPool* Pool = nullptr; - TBasicExecutorPool* BasicPool = nullptr; - - i16 DefaultFullThreadCount = 0; - i16 MinFullThreadCount = 0; - i16 MaxFullThreadCount = 0; - - float DefaultThreadCount = 0; - float MinThreadCount = 0; - float MaxThreadCount = 0; - - i16 Priority = 0; - NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter; - NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow; - ui32 MaxAvgPingUs = 0; - ui64 LastUpdateTs = 0; - ui64 NotEnoughCpuExecutions = 0; - ui64 NewNotEnoughCpuExecutions = 0; - ui16 LocalQueueSize = NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE; - - TAtomic LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish - TAtomic IncreasingThreadsByNeedyState = 0; - TAtomic IncreasingThreadsByExchange = 0; - TAtomic DecreasingThreadsByStarvedState = 0; - TAtomic DecreasingThreadsByHoggishState = 0; - TAtomic DecreasingThreadsByExchange = 0; - TAtomic PotentialMaxThreadCount = 0; - - TValueHistory<16> Consumed; - TValueHistory<16> Booked; - - std::atomic<float> MaxConsumedCpu = 0; - std::atomic<float> MinConsumedCpu = 0; - std::atomic<float> AvgConsumedCpu = 0; - std::atomic<float> MaxBookedCpu = 0; - std::atomic<float> MinBookedCpu = 0; - - std::unique_ptr<TWaitingStats<ui64>> WaitingStats; - std::unique_ptr<TWaitingStats<double>> MovingWaitingStats; - - double GetBooked(i16 threadIdx); - double GetSharedBooked(i16 threadIdx); - double GetLastSecondBooked(i16 threadIdx); - double GetLastSecondSharedBooked(i16 threadIdx); - double GetConsumed(i16 threadIdx); - double GetSharedConsumed(i16 threadIdx); - double GetLastSecondConsumed(i16 threadIdx); - double GetLastSecondSharedConsumed(i16 threadIdx); - TCpuConsumption PullStats(ui64 ts); - i16 GetFullThreadCount(); - float GetThreadCount(); - void SetFullThreadCount(i16 threadCount); - bool IsAvgPingGood(); -}; - -double TPoolInfo::GetBooked(i16 threadIdx) { - if ((size_t)threadIdx < ThreadInfo.size()) { - return ThreadInfo[threadIdx].Booked.GetAvgPart(); - } - return 0.0; -} - -double TPoolInfo::GetSharedBooked(i16 threadIdx) { - if ((size_t)threadIdx < SharedInfo.size()) { - return SharedInfo[threadIdx].Booked.GetAvgPart(); - } - return 0.0; -} - -double TPoolInfo::GetLastSecondBooked(i16 threadIdx) { - if ((size_t)threadIdx < ThreadInfo.size()) { - return ThreadInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1); - } - return 0.0; -} - -double TPoolInfo::GetLastSecondSharedBooked(i16 threadIdx) { - if ((size_t)threadIdx < SharedInfo.size()) { - return SharedInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1); - } - return 0.0; -} - -double TPoolInfo::GetConsumed(i16 threadIdx) { - if ((size_t)threadIdx < ThreadInfo.size()) { - return ThreadInfo[threadIdx].Consumed.GetAvgPart(); - } - return 0.0; -} - -double TPoolInfo::GetSharedConsumed(i16 threadIdx) { - if ((size_t)threadIdx < SharedInfo.size()) { - return SharedInfo[threadIdx].Consumed.GetAvgPart(); - } - return 0.0; -} - -double TPoolInfo::GetLastSecondConsumed(i16 threadIdx) { - if ((size_t)threadIdx < ThreadInfo.size()) { - return ThreadInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1); - } - return 0.0; -} - -double TPoolInfo::GetLastSecondSharedConsumed(i16 threadIdx) { - if ((size_t)threadIdx < SharedInfo.size()) { - return SharedInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1); - } - return 0.0; -} - -#define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7] -TCpuConsumption TPoolInfo::PullStats(ui64 ts) { - TCpuConsumption acc; - for (i16 threadIdx = 0; threadIdx < MaxFullThreadCount; ++threadIdx) { - TThreadInfo &threadInfo = ThreadInfo[threadIdx]; - TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx); - acc.Add(cpuConsumption); - threadInfo.Consumed.Register(ts, cpuConsumption.ConsumedUs); - LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History)); - threadInfo.Booked.Register(ts, cpuConsumption.BookedUs); - LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History)); - } - TVector<TExecutorThreadStats> sharedStats; - if (Shared) { - Shared->GetSharedStatsForHarmonizer(Pool->PoolId, sharedStats); - } - - for (ui32 sharedIdx = 0; sharedIdx < SharedInfo.size(); ++sharedIdx) { - auto stat = sharedStats[sharedIdx]; - TCpuConsumption sharedConsumption{ - Ts2Us(stat.SafeElapsedTicks), - static_cast<double>(stat.CpuUs), - stat.NotEnoughCpuExecutions - }; - acc.Add(sharedConsumption); - SharedInfo[sharedIdx].Consumed.Register(ts, sharedConsumption.ConsumedUs); - LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "shared_consumed", UNROLL_HISTORY(SharedInfo[sharedIdx].Consumed.History)); - SharedInfo[sharedIdx].Booked.Register(ts, sharedConsumption.BookedUs); - LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "shared_booked", UNROLL_HISTORY(SharedInfo[sharedIdx].Booked.History)); - } - - Consumed.Register(ts, acc.ConsumedUs); - MaxConsumedCpu.store(Consumed.GetMax() / 1'000'000, std::memory_order_relaxed); - MinConsumedCpu.store(Consumed.GetMin() / 1'000'000, std::memory_order_relaxed); - AvgConsumedCpu.store(Consumed.GetAvgPart() / 1'000'000, std::memory_order_relaxed); - Booked.Register(ts, acc.BookedUs); - MaxBookedCpu.store(Booked.GetMax() / 1'000'000, std::memory_order_relaxed); - MinBookedCpu.store(Booked.GetMin() / 1'000'000, std::memory_order_relaxed); - NewNotEnoughCpuExecutions = acc.NotEnoughCpuExecutions - NotEnoughCpuExecutions; - NotEnoughCpuExecutions = acc.NotEnoughCpuExecutions; - if (WaitingStats && BasicPool) { - WaitingStats->Clear(); - BasicPool->GetWaitingStats(*WaitingStats); - if constexpr (!NFeatures::TSpinFeatureFlags::CalcPerThread) { - MovingWaitingStats->Add(*WaitingStats, 0.8, 0.2); - } - } - return acc; -} -#undef UNROLL_HISTORY - -float TPoolInfo::GetThreadCount() { - return Pool->GetThreadCount(); -} - -i16 TPoolInfo::GetFullThreadCount() { - return Pool->GetFullThreadCount(); -} - -void TPoolInfo::SetFullThreadCount(i16 threadCount) { - Pool->SetFullThreadCount(threadCount); -} - -bool TPoolInfo::IsAvgPingGood() { - bool res = true; - if (AvgPingCounter) { - res &= *AvgPingCounter > MaxAvgPingUs; - } - if (AvgPingCounterWithSmallWindow) { - res &= *AvgPingCounterWithSmallWindow > MaxAvgPingUs; - } - return res; -} - -class THarmonizer: public IHarmonizer { -private: - std::atomic<bool> IsDisabled = false; - TSpinLock Lock; - std::atomic<ui64> NextHarmonizeTs = 0; - std::vector<std::unique_ptr<TPoolInfo>> Pools; - std::vector<ui16> PriorityOrder; - - TValueHistory<16> Consumed; - TValueHistory<16> Booked; - - TAtomic MaxConsumedCpu = 0; - TAtomic MinConsumedCpu = 0; - TAtomic MaxBookedCpu = 0; - TAtomic MinBookedCpu = 0; - - TSharedExecutorPool* Shared = nullptr; - - std::atomic<double> AvgAwakeningTimeUs = 0; - std::atomic<double> AvgWakingUpTimeUs = 0; - - void PullStats(ui64 ts); - void HarmonizeImpl(ui64 ts); - void CalculatePriorityOrder(); -public: - THarmonizer(ui64 ts); - virtual ~THarmonizer(); - double Rescale(double value) const; - void Harmonize(ui64 ts) override; - void DeclareEmergency(ui64 ts) override; - void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override; - void Enable(bool enable) override; - TPoolHarmonizerStats GetPoolStats(i16 poolId) const override; - THarmonizerStats GetStats() const override; - void SetSharedPool(TSharedExecutorPool* pool) override; -}; - -THarmonizer::THarmonizer(ui64 ts) { - NextHarmonizeTs = ts; -} - -THarmonizer::~THarmonizer() { -} - -double THarmonizer::Rescale(double value) const { - return Max(0.0, Min(1.0, value * (1.0/0.9))); -} - -void THarmonizer::PullStats(ui64 ts) { - TCpuConsumption acc; - for (auto &pool : Pools) { - TCpuConsumption consumption = pool->PullStats(ts); - acc.Add(consumption); - } - Consumed.Register(ts, acc.ConsumedUs); - RelaxedStore(&MaxConsumedCpu, Consumed.GetMaxInt()); - RelaxedStore(&MinConsumedCpu, Consumed.GetMinInt()); - Booked.Register(ts, acc.BookedUs); - RelaxedStore(&MaxBookedCpu, Booked.GetMaxInt()); - RelaxedStore(&MinBookedCpu, Booked.GetMinInt()); -} - -Y_FORCE_INLINE bool IsStarved(double consumed, double booked) { - return Max(consumed, booked) > 0.1 && consumed < booked * 0.7; -} - -Y_FORCE_INLINE bool IsHoggish(double booked, double currentThreadCount) { - return booked < currentThreadCount - 0.5; -} - -void THarmonizer::HarmonizeImpl(ui64 ts) { - bool isStarvedPresent = false; - double booked = 0.0; - double consumed = 0.0; - double lastSecondBooked = 0.0; - i64 beingStopped = 0; - double total = 0; - TStackVec<size_t, 8> needyPools; - TStackVec<std::pair<size_t, double>, 8> hoggishPools; - TStackVec<bool, 8> isNeedyByPool; - - size_t sumOfAdditionalThreads = 0; - - ui64 TotalWakingUpTime = 0; - ui64 TotalWakingUps = 0; - ui64 TotalAwakeningTime = 0; - ui64 TotalAwakenings = 0; - for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { - TPoolInfo& pool = *Pools[poolIdx]; - if (pool.WaitingStats) { - TotalWakingUpTime += pool.WaitingStats->WakingUpTotalTime; - TotalWakingUps += pool.WaitingStats->WakingUpCount; - TotalAwakeningTime += pool.WaitingStats->AwakingTotalTime; - TotalAwakenings += pool.WaitingStats->AwakingCount; - } - } - - constexpr ui64 knownAvgWakingUpTime = TWaitingStatsConstants::KnownAvgWakingUpTime; - constexpr ui64 knownAvgAwakeningUpTime = TWaitingStatsConstants::KnownAvgAwakeningTime; - - ui64 realAvgWakingUpTime = (TotalWakingUps ? TotalWakingUpTime / TotalWakingUps : knownAvgWakingUpTime); - ui64 avgWakingUpTime = realAvgWakingUpTime; - if (avgWakingUpTime > 2 * knownAvgWakingUpTime || !realAvgWakingUpTime) { - avgWakingUpTime = knownAvgWakingUpTime; - } - AvgWakingUpTimeUs = Ts2Us(avgWakingUpTime); - - ui64 realAvgAwakeningTime = (TotalAwakenings ? TotalAwakeningTime / TotalAwakenings : knownAvgAwakeningUpTime); - ui64 avgAwakeningTime = realAvgAwakeningTime; - if (avgAwakeningTime > 2 * knownAvgAwakeningUpTime || !realAvgAwakeningTime) { - avgAwakeningTime = knownAvgAwakeningUpTime; - } - AvgAwakeningTimeUs = Ts2Us(avgAwakeningTime); - - ui64 avgWakingUpConsumption = avgWakingUpTime + avgAwakeningTime; - LWPROBE(WakingUpConsumption, Ts2Us(avgWakingUpTime), Ts2Us(avgWakingUpTime), Ts2Us(avgAwakeningTime), Ts2Us(realAvgAwakeningTime), Ts2Us(avgWakingUpConsumption)); - - for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { - TPoolInfo& pool = *Pools[poolIdx]; - if (!pool.BasicPool) { - continue; - } - if (pool.BasicPool->ActorSystemProfile != EASProfile::Default) { - if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) { - pool.BasicPool->CalcSpinPerThread(avgWakingUpConsumption); - } else if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) { - ui64 newSpinThreshold = pool.MovingWaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption); - pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold); - } else { - ui64 newSpinThreshold = pool.WaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption); - pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold); - } - pool.BasicPool->ClearWaitingStats(); - } - } - - std::vector<bool> hasSharedThread(Pools.size()); - std::vector<bool> hasSharedThreadWhichWasNotBorrowed(Pools.size()); - std::vector<bool> hasBorrowedSharedThread(Pools.size()); - std::vector<i16> freeHalfThread; - if (Shared) { - auto sharedState = Shared->GetState(); - for (ui32 poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { - i16 threadIdx = sharedState.ThreadByPool[poolIdx]; - if (threadIdx != -1) { - hasSharedThread[poolIdx] = true; - if (sharedState.PoolByBorrowedThread[threadIdx] == -1) { - hasSharedThreadWhichWasNotBorrowed[poolIdx] = true; - } - - } - if (sharedState.BorrowedThreadByPool[poolIdx] != -1) { - hasBorrowedSharedThread[poolIdx] = true; - } - } - } - - for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { - TPoolInfo& pool = *Pools[poolIdx]; - total += pool.DefaultThreadCount; - - i16 currentFullThreadCount = pool.GetFullThreadCount(); - sumOfAdditionalThreads += Max(0, currentFullThreadCount - pool.DefaultFullThreadCount); - float currentThreadCount = pool.GetThreadCount(); - - double poolBooked = 0.0; - double poolConsumed = 0.0; - double lastSecondPoolBooked = 0.0; - double lastSecondPoolConsumed = 0.0; - beingStopped += pool.Pool->GetBlockingThreadCount(); - - for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) { - double threadBooked = Rescale(pool.GetBooked(threadIdx)); - double threadLastSecondBooked = Rescale(pool.GetLastSecondBooked(threadIdx)); - double threadConsumed = Rescale(pool.GetConsumed(threadIdx)); - double threadLastSecondConsumed = Rescale(pool.GetLastSecondConsumed(threadIdx)); - poolBooked += threadBooked; - lastSecondPoolBooked += threadLastSecondBooked; - poolConsumed += threadConsumed; - lastSecondPoolConsumed += threadLastSecondConsumed; - LWPROBE(HarmonizeCheckPoolByThread, poolIdx, pool.Pool->GetName(), threadIdx, threadBooked, threadConsumed, threadLastSecondBooked, threadLastSecondConsumed); - } - - for (ui32 sharedIdx = 0; sharedIdx < pool.SharedInfo.size(); ++sharedIdx) { - double sharedBooked = Rescale(pool.GetSharedBooked(sharedIdx)); - double sharedLastSecondBooked = Rescale(pool.GetLastSecondSharedBooked(sharedIdx)); - double sharedConsumed = Rescale(pool.GetSharedConsumed(sharedIdx)); - double sharedLastSecondConsumed = Rescale(pool.GetLastSecondSharedConsumed(sharedIdx)); - poolBooked += sharedBooked; - lastSecondPoolBooked += sharedLastSecondBooked; - poolConsumed += sharedConsumed; - lastSecondPoolConsumed += sharedLastSecondConsumed; - LWPROBE(HarmonizeCheckPoolByThread, poolIdx, pool.Pool->GetName(), -1 - sharedIdx, sharedBooked, sharedConsumed, sharedLastSecondBooked, sharedLastSecondConsumed); - } - - bool isStarved = IsStarved(poolConsumed, poolBooked) || IsStarved(lastSecondPoolConsumed, lastSecondPoolBooked); - if (isStarved) { - isStarvedPresent = true; - } - - bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (poolBooked >= currentThreadCount); - if (pool.AvgPingCounter) { - if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) { - isNeedy = false; - } else { - pool.LastUpdateTs = ts; - } - } - if (currentThreadCount - poolBooked > 0.5) { - if (hasBorrowedSharedThread[poolIdx] || hasSharedThreadWhichWasNotBorrowed[poolIdx]) { - freeHalfThread.push_back(poolIdx); - } - } - isNeedyByPool.push_back(isNeedy); - if (isNeedy) { - needyPools.push_back(poolIdx); - } - bool isHoggish = IsHoggish(poolBooked, currentThreadCount) - || IsHoggish(lastSecondPoolBooked, currentThreadCount); - if (isHoggish) { - hoggishPools.push_back({poolIdx, std::max(poolBooked - currentThreadCount, lastSecondPoolBooked - currentThreadCount)}); - } - booked += poolBooked; - consumed += poolConsumed; - AtomicSet(pool.LastFlags, (i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2)); - LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, currentThreadCount, pool.MaxFullThreadCount, isStarved, isNeedy, isHoggish); - } - - double budget = total - Max(booked, lastSecondBooked); - i16 budgetInt = static_cast<i16>(Max(budget, 0.0)); - if (budget < -0.1) { - isStarvedPresent = true; - } - double overbooked = consumed - booked; - if (overbooked < 0) { - isStarvedPresent = false; - } - - if (needyPools.size()) { - Sort(needyPools.begin(), needyPools.end(), [&] (i16 lhs, i16 rhs) { - if (Pools[lhs]->Priority != Pools[rhs]->Priority) { - return Pools[lhs]->Priority > Pools[rhs]->Priority; - } - return Pools[lhs]->Pool->PoolId < Pools[rhs]->Pool->PoolId; - }); - } - - if (freeHalfThread.size()) { - Sort(freeHalfThread.begin(), freeHalfThread.end(), [&] (i16 lhs, i16 rhs) { - if (Pools[lhs]->Priority != Pools[rhs]->Priority) { - return Pools[lhs]->Priority > Pools[rhs]->Priority; - } - return Pools[lhs]->Pool->PoolId < Pools[rhs]->Pool->PoolId; - }); - } - - if (isStarvedPresent) { - // last_starved_at_consumed_value = сумма по всем пулам consumed; - // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total, - // использовать вместо total - if (beingStopped && beingStopped >= overbooked) { - // do nothing - } else { - for (ui16 poolIdx : PriorityOrder) { - TPoolInfo &pool = *Pools[poolIdx]; - i64 threadCount = pool.GetFullThreadCount(); - if (hasSharedThread[poolIdx] && !hasSharedThreadWhichWasNotBorrowed[poolIdx]) { - Shared->ReturnOwnHalfThread(poolIdx); - } - while (threadCount > pool.DefaultFullThreadCount) { - pool.SetFullThreadCount(--threadCount); - AtomicIncrement(pool.DecreasingThreadsByStarvedState); - overbooked--; - sumOfAdditionalThreads--; - - LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by starving", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); - if (overbooked < 1) { - break; - } - } - if (overbooked < 1) { - break; - } - } - } - } else { - for (size_t needyPoolIdx : needyPools) { - TPoolInfo &pool = *Pools[needyPoolIdx]; - i64 threadCount = pool.GetFullThreadCount(); - if (budget >= 1.0) { - if (threadCount + 1 <= pool.MaxFullThreadCount) { - AtomicIncrement(pool.IncreasingThreadsByNeedyState); - isNeedyByPool[needyPoolIdx] = false; - sumOfAdditionalThreads++; - pool.SetFullThreadCount(threadCount + 1); - budget -= 1.0; - LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); - } - } else if (Shared && budget >= 0.5 && !hasBorrowedSharedThread[needyPoolIdx] && freeHalfThread.size()) { - Shared->GiveHalfThread(freeHalfThread.back(), needyPoolIdx); - freeHalfThread.pop_back(); - isNeedyByPool[needyPoolIdx] = false; - budget -= 0.5; - } - if constexpr (NFeatures::IsLocalQueues()) { - bool needToExpandLocalQueue = budget < 1.0 || threadCount >= pool.MaxFullThreadCount; - needToExpandLocalQueue &= (bool)pool.BasicPool; - needToExpandLocalQueue &= (pool.MaxFullThreadCount > 1); - needToExpandLocalQueue &= (pool.LocalQueueSize < NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE); - if (needToExpandLocalQueue) { - pool.BasicPool->SetLocalQueueSize(++pool.LocalQueueSize); - } - } - } - } - - if (budget < 1.0) { - size_t takingAwayThreads = 0; - for (size_t needyPoolIdx : needyPools) { - TPoolInfo &pool = *Pools[needyPoolIdx]; - i64 threadCount = pool.GetFullThreadCount(); - sumOfAdditionalThreads -= threadCount - pool.DefaultFullThreadCount; - if (sumOfAdditionalThreads < takingAwayThreads + 1) { - break; - } - if (!isNeedyByPool[needyPoolIdx]) { - continue; - } - AtomicIncrement(pool.IncreasingThreadsByExchange); - isNeedyByPool[needyPoolIdx] = false; - takingAwayThreads++; - pool.SetFullThreadCount(threadCount + 1); - - LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by exchanging", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); - } - - for (ui16 poolIdx : PriorityOrder) { - if (takingAwayThreads <= 0) { - break; - } - - TPoolInfo &pool = *Pools[poolIdx]; - size_t threadCount = pool.GetFullThreadCount(); - size_t additionalThreadsCount = Max<size_t>(0L, threadCount - pool.DefaultFullThreadCount); - size_t currentTakingAwayThreads = Min(additionalThreadsCount, takingAwayThreads); - - if (!currentTakingAwayThreads) { - continue; - } - takingAwayThreads -= currentTakingAwayThreads; - pool.SetFullThreadCount(threadCount - currentTakingAwayThreads); - - AtomicAdd(pool.DecreasingThreadsByExchange, takingAwayThreads); - LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by exchanging", threadCount - currentTakingAwayThreads, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); - } - } - - for (auto &[hoggishPoolIdx, freeCpu] : hoggishPools) { - TPoolInfo &pool = *Pools[hoggishPoolIdx]; - i64 threadCount = pool.GetFullThreadCount(); - if (hasBorrowedSharedThread[hoggishPoolIdx]) { - Shared->ReturnBorrowedHalfThread(hoggishPoolIdx); - freeCpu -= 0.5; - continue; - } - if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) { - pool.LocalQueueSize = std::min<ui16>(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE, pool.LocalQueueSize / 2); - pool.BasicPool->SetLocalQueueSize(pool.LocalQueueSize); - } - if (threadCount > pool.MinFullThreadCount && freeCpu >= 1) { - AtomicIncrement(pool.DecreasingThreadsByHoggishState); - LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); - pool.SetFullThreadCount(threadCount - 1); - } - } - - for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { - TPoolInfo& pool = *Pools[poolIdx]; - AtomicSet(pool.PotentialMaxThreadCount, std::min<i64>(pool.MaxThreadCount, pool.GetThreadCount() + budgetInt)); - } -} - -void THarmonizer::CalculatePriorityOrder() { - PriorityOrder.resize(Pools.size()); - Iota(PriorityOrder.begin(), PriorityOrder.end(), 0); - Sort(PriorityOrder.begin(), PriorityOrder.end(), [&] (i16 lhs, i16 rhs) { - if (Pools[lhs]->Priority != Pools[rhs]->Priority) { - return Pools[lhs]->Priority < Pools[rhs]->Priority; - } - return Pools[lhs]->Pool->PoolId > Pools[rhs]->Pool->PoolId; - }); -} - -void THarmonizer::Harmonize(ui64 ts) { - if (IsDisabled || NextHarmonizeTs > ts || !Lock.TryAcquire()) { - LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, false); - return; - } - // Check again under the lock - if (IsDisabled) { - LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, true); - Lock.Release(); - return; - } - // Will never reach this line disabled - ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull)); - LWPROBE(TryToHarmonizeSuccess, ts, NextHarmonizeTs, previousNextHarmonizeTs); - - { - TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_HARMONIZER> activityGuard; - - if (PriorityOrder.empty()) { - CalculatePriorityOrder(); - } - - PullStats(ts); - HarmonizeImpl(ts); - } - - Lock.Release(); -} - -void THarmonizer::DeclareEmergency(ui64 ts) { - NextHarmonizeTs = ts; -} - -void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) { - TGuard<TSpinLock> guard(Lock); - Pools.emplace_back(new TPoolInfo); - TPoolInfo &poolInfo = *Pools.back(); - poolInfo.Pool = pool; - poolInfo.Shared = Shared; - poolInfo.BasicPool = dynamic_cast<TBasicExecutorPool*>(pool); - poolInfo.DefaultThreadCount = pool->GetDefaultThreadCount(); - poolInfo.MinThreadCount = pool->GetMinThreadCount(); - poolInfo.MaxThreadCount = pool->GetMaxThreadCount(); - - poolInfo.DefaultFullThreadCount = pool->GetDefaultFullThreadCount(); - poolInfo.MinFullThreadCount = pool->GetMinFullThreadCount(); - poolInfo.MaxFullThreadCount = pool->GetMaxFullThreadCount(); - poolInfo.ThreadInfo.resize(poolInfo.MaxFullThreadCount); - poolInfo.SharedInfo.resize(Shared ? Shared->GetSharedThreadCount() : 0); - poolInfo.Priority = pool->GetPriority(); - pool->SetFullThreadCount(poolInfo.DefaultFullThreadCount); - if (pingInfo) { - poolInfo.AvgPingCounter = pingInfo->AvgPingCounter; - poolInfo.AvgPingCounterWithSmallWindow = pingInfo->AvgPingCounterWithSmallWindow; - poolInfo.MaxAvgPingUs = pingInfo->MaxAvgPingUs; - } - if (poolInfo.BasicPool) { - poolInfo.WaitingStats.reset(new TWaitingStats<ui64>()); - poolInfo.MovingWaitingStats.reset(new TWaitingStats<double>()); - } - PriorityOrder.clear(); -} - -void THarmonizer::Enable(bool enable) { - TGuard<TSpinLock> guard(Lock); - IsDisabled = enable; -} - -IHarmonizer* MakeHarmonizer(ui64 ts) { - return new THarmonizer(ts); -} - -TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const { - const TPoolInfo &pool = *Pools[poolId]; - ui64 flags = RelaxedLoad(&pool.LastFlags); - return TPoolHarmonizerStats{ - .IncreasingThreadsByNeedyState = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)), - .IncreasingThreadsByExchange = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByExchange)), - .DecreasingThreadsByStarvedState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)), - .DecreasingThreadsByHoggishState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)), - .DecreasingThreadsByExchange = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByExchange)), - .MaxConsumedCpu = pool.MaxConsumedCpu.load(std::memory_order_relaxed), - .MinConsumedCpu = pool.MinConsumedCpu.load(std::memory_order_relaxed), - .AvgConsumedCpu = pool.AvgConsumedCpu.load(std::memory_order_relaxed), - .MaxBookedCpu = pool.MaxBookedCpu.load(std::memory_order_relaxed), - .MinBookedCpu = pool.MinBookedCpu.load(std::memory_order_relaxed), - .PotentialMaxThreadCount = static_cast<i16>(RelaxedLoad(&pool.PotentialMaxThreadCount)), - .IsNeedy = static_cast<bool>(flags & 1), - .IsStarved = static_cast<bool>(flags & 2), - .IsHoggish = static_cast<bool>(flags & 4), - }; -} - -THarmonizerStats THarmonizer::GetStats() const { - return THarmonizerStats{ - .MaxConsumedCpu = static_cast<i64>(RelaxedLoad(&MaxConsumedCpu)), - .MinConsumedCpu = static_cast<i64>(RelaxedLoad(&MinConsumedCpu)), - .MaxBookedCpu = static_cast<i64>(RelaxedLoad(&MaxBookedCpu)), - .MinBookedCpu = static_cast<i64>(RelaxedLoad(&MinBookedCpu)), - .AvgAwakeningTimeUs = AvgAwakeningTimeUs, - .AvgWakingUpTimeUs = AvgWakingUpTimeUs, - }; -} - -void THarmonizer::SetSharedPool(TSharedExecutorPool* pool) { - Shared = pool; -} - -} diff --git a/ydb/library/actors/core/harmonizer/cpu_consumption.cpp b/ydb/library/actors/core/harmonizer/cpu_consumption.cpp new file mode 100644 index 00000000000..4cb858e435a --- /dev/null +++ b/ydb/library/actors/core/harmonizer/cpu_consumption.cpp @@ -0,0 +1,171 @@ +#include "cpu_consumption.h" +#include "debug.h" + +namespace NActors { + +LWTRACE_USING(ACTORLIB_PROVIDER); + +void TCpuConsumptionInfo::Clear() { + Elapsed = 0.0; + Cpu = 0.0; + LastSecondElapsed = 0.0; + LastSecondCpu = 0.0; +} + +void THarmonizerCpuConsumption::Init(i16 poolCount) { + PoolConsumption.resize(poolCount); + IsNeedyByPool.reserve(poolCount); + NeedyPools.reserve(poolCount); + HoggishPools.reserve(poolCount); +} + +namespace { + + float Rescale(float value) { + return Max(0.0, Min(1.0, value * (1.0/0.9))); + } + + void UpdatePoolConsumption(const TPoolInfo& pool, TCpuConsumptionInfo *poolConsumption) { + poolConsumption->Clear(); + for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) { + float threadElapsed = Rescale(pool.GetElapsed(threadIdx)); + float threadLastSecondElapsed = Rescale(pool.GetLastSecondElapsed(threadIdx)); + float threadCpu = Rescale(pool.GetCpu(threadIdx)); + float threadLastSecondCpu = Rescale(pool.GetLastSecondCpu(threadIdx)); + poolConsumption->Elapsed += threadElapsed; + poolConsumption->LastSecondElapsed += threadLastSecondElapsed; + poolConsumption->Cpu += threadCpu; + poolConsumption->LastSecondCpu += threadLastSecondCpu; + LWPROBE_WITH_DEBUG(HarmonizeCheckPoolByThread, pool.Pool->PoolId, pool.Pool->GetName(), threadIdx, threadElapsed, threadCpu, threadLastSecondElapsed, threadLastSecondCpu); + } + for (ui32 sharedIdx = 0; sharedIdx < pool.SharedInfo.size(); ++sharedIdx) { + float sharedElapsed = Rescale(pool.GetSharedElapsed(sharedIdx)); + float sharedLastSecondElapsed = Rescale(pool.GetLastSecondSharedElapsed(sharedIdx)); + float sharedCpu = Rescale(pool.GetSharedCpu(sharedIdx)); + float sharedLastSecondCpu = Rescale(pool.GetLastSecondSharedCpu(sharedIdx)); + poolConsumption->Elapsed += sharedElapsed; + poolConsumption->LastSecondElapsed += sharedLastSecondElapsed; + poolConsumption->Cpu += sharedCpu; + poolConsumption->LastSecondCpu += sharedLastSecondCpu; + LWPROBE_WITH_DEBUG(HarmonizeCheckPoolByThread, pool.Pool->PoolId, pool.Pool->GetName(), -1 - sharedIdx, sharedElapsed, sharedCpu, sharedLastSecondElapsed, sharedLastSecondCpu); + } + } + + bool IsStarved(double elapsed, double cpu) { + return Max(elapsed, cpu) > 0.1 && (cpu < elapsed * 0.7 || elapsed - cpu > 0.5); + } + + bool IsHoggish(double elapsed, double currentThreadCount) { + return elapsed < currentThreadCount - 0.5; + } + +} // namespace + + +void THarmonizerCpuConsumption::Pull(const std::vector<std::unique_ptr<TPoolInfo>> &pools, const TSharedInfo& sharedInfo) { + FreeHalfThread.clear(); + NeedyPools.clear(); + HoggishPools.clear(); + IsNeedyByPool.clear(); + + + TotalCores = 0; + AdditionalThreads = 0; + StoppingThreads = 0; + IsStarvedPresent = false; + Elapsed = 0.0; + Cpu = 0.0; + LastSecondElapsed = 0.0; + LastSecondCpu = 0.0; + for (size_t poolIdx = 0; poolIdx < pools.size(); ++poolIdx) { + TPoolInfo& pool = *pools[poolIdx]; + TotalCores += pool.DefaultThreadCount; + + AdditionalThreads += Max(0, pool.GetFullThreadCount() - pool.DefaultFullThreadCount); + float currentThreadCount = pool.GetThreadCount(); + StoppingThreads += pool.Pool->GetBlockingThreadCount(); + HARMONIZER_DEBUG_PRINT("pool", poolIdx, "pool name", pool.Pool->GetName(), "current thread count", currentThreadCount, "stopping threads", StoppingThreads, "default thread count", pool.DefaultThreadCount); + + UpdatePoolConsumption(pool, &PoolConsumption[poolIdx]); + + HARMONIZER_DEBUG_PRINT("pool", poolIdx, "pool name", pool.Pool->GetName(), "elapsed", PoolConsumption[poolIdx].Elapsed, "cpu", PoolConsumption[poolIdx].Cpu, "last second elapsed", PoolConsumption[poolIdx].LastSecondElapsed, "last second cpu", PoolConsumption[poolIdx].LastSecondCpu); + + bool isStarved = IsStarved(PoolConsumption[poolIdx].Elapsed, PoolConsumption[poolIdx].Cpu) + || IsStarved(PoolConsumption[poolIdx].LastSecondElapsed, PoolConsumption[poolIdx].LastSecondCpu); + if (isStarved) { + IsStarvedPresent = true; + } + + bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (PoolConsumption[poolIdx].Cpu >= currentThreadCount); + IsNeedyByPool.push_back(isNeedy); + if (isNeedy) { + NeedyPools.push_back(poolIdx); + } + + if (currentThreadCount - PoolConsumption[poolIdx].Elapsed > 0.5) { + if (sharedInfo.HasBorrowedSharedThread[poolIdx] || sharedInfo.HasSharedThreadWhichWasNotBorrowed[poolIdx]) { + FreeHalfThread.push_back(poolIdx); + } + } + + bool isHoggish = IsHoggish(PoolConsumption[poolIdx].Elapsed, currentThreadCount) + || IsHoggish(PoolConsumption[poolIdx].LastSecondElapsed, currentThreadCount); + if (isHoggish) { + float freeCpu = std::max(currentThreadCount - PoolConsumption[poolIdx].Elapsed, currentThreadCount - PoolConsumption[poolIdx].LastSecondElapsed); + HoggishPools.push_back({poolIdx, freeCpu}); + } + + Elapsed += PoolConsumption[poolIdx].Elapsed; + Cpu += PoolConsumption[poolIdx].Cpu; + LastSecondElapsed += PoolConsumption[poolIdx].LastSecondElapsed; + LastSecondCpu += PoolConsumption[poolIdx].LastSecondCpu; + pool.LastFlags.store((i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2), std::memory_order_relaxed); + LWPROBE_WITH_DEBUG( + HarmonizeCheckPool, + poolIdx, + pool.Pool->GetName(), + PoolConsumption[poolIdx].Elapsed, + PoolConsumption[poolIdx].Cpu, + PoolConsumption[poolIdx].LastSecondElapsed, + PoolConsumption[poolIdx].LastSecondCpu, + currentThreadCount, + pool.MaxFullThreadCount, + isStarved, + isNeedy, + isHoggish + ); + } + + if (NeedyPools.size()) { + Sort(NeedyPools.begin(), NeedyPools.end(), [&] (i16 lhs, i16 rhs) { + if (pools[lhs]->Priority != pools[rhs]->Priority) { + return pools[lhs]->Priority > pools[rhs]->Priority; + } + return pools[lhs]->Pool->PoolId < pools[rhs]->Pool->PoolId; + }); + } + + if (FreeHalfThread.size()) { + Sort(FreeHalfThread.begin(), FreeHalfThread.end(), [&] (i16 lhs, i16 rhs) { + if (pools[lhs]->Priority != pools[rhs]->Priority) { + return pools[lhs]->Priority > pools[rhs]->Priority; + } + return pools[lhs]->Pool->PoolId < pools[rhs]->Pool->PoolId; + }); + } + + HARMONIZER_DEBUG_PRINT("NeedyPools", NeedyPools.size(), "FreeHalfThread", FreeHalfThread.size(), "HoggishPools", HoggishPools.size()); + + Budget = TotalCores - Max(Elapsed, LastSecondElapsed); + BudgetInt = static_cast<i16>(Max(Budget, 0.0f)); + if (Budget < -0.1) { + IsStarvedPresent = true; + } + Overbooked = Elapsed - Cpu; + if (Overbooked < 0) { + IsStarvedPresent = false; + } + HARMONIZER_DEBUG_PRINT("IsStarvedPresent", IsStarvedPresent, "Budget", Budget, "BudgetInt", BudgetInt, "Overbooked", Overbooked, "TotalCores", TotalCores, "Elapsed", Elapsed, "Cpu", Cpu, "LastSecondElapsed", LastSecondElapsed, "LastSecondCpu", LastSecondCpu); +} + +} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/cpu_consumption.h b/ydb/library/actors/core/harmonizer/cpu_consumption.h new file mode 100644 index 00000000000..0fbfd85b6ee --- /dev/null +++ b/ydb/library/actors/core/harmonizer/cpu_consumption.h @@ -0,0 +1,45 @@ +#pragma once + +#include "defs.h" +#include "pool.h" +#include "shared_info.h" +namespace NActors { + +struct TPoolInfo; + +struct TCpuConsumptionInfo { + float Elapsed; + float Cpu; + float LastSecondElapsed; + float LastSecondCpu; + + void Clear(); +}; // struct TCpuConsumptionInfo + +struct THarmonizerCpuConsumption { + std::vector<TCpuConsumptionInfo> PoolConsumption; + + float TotalCores = 0; + i16 AdditionalThreads = 0; + i16 StoppingThreads = 0; + bool IsStarvedPresent = false; + float Budget = 0.0; + i16 BudgetInt = 0; + float Overbooked = 0.0; + + float Elapsed = 0.0; + float Cpu = 0.0; + float LastSecondElapsed = 0.0; + float LastSecondCpu = 0.0; + TStackVec<i16, 8> NeedyPools; + TStackVec<std::pair<i16, float>, 8> HoggishPools; + TStackVec<bool, 8> IsNeedyByPool; + std::vector<i16> FreeHalfThread; + + void Init(i16 poolCount); + + void Pull(const std::vector<std::unique_ptr<TPoolInfo>> &pools, const TSharedInfo& sharedInfo); + +}; // struct THarmonizerCpuConsumption + +} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/debug.h b/ydb/library/actors/core/harmonizer/debug.h new file mode 100644 index 00000000000..e4c95639ccc --- /dev/null +++ b/ydb/library/actors/core/harmonizer/debug.h @@ -0,0 +1,54 @@ +#pragma once + +#include <ydb/library/actors/core/probes.h> + +namespace NActors { + +enum class EDebugHarmonizerLevel { + None = 0, + Debug = 1, + Trace = 2, + History = 3, +}; + +#ifdef HARMONIZER_DEBUG +constexpr EDebugHarmonizerLevel DebugHarmonizerLevel = EDebugHarmonizerLevel::Debug; +#elif defined(HARMONIZER_HISTORY_DEBUG) +constexpr EDebugHarmonizerLevel DebugHarmonizerLevel = EDebugHarmonizerLevel::History; +#else +constexpr EDebugHarmonizerLevel DebugHarmonizerLevel = EDebugHarmonizerLevel::None; +#endif + +template <typename ... TArgs> +void Print(TArgs&& ... args) { + ((Cerr << std::forward<TArgs>(args) << " "), ...) << Endl; +} + +#define HARMONIZER_DEBUG_PRINT(...) \ + if constexpr (DebugHarmonizerLevel >= EDebugHarmonizerLevel::Debug) { Print(__VA_ARGS__); } +// HARMONIZER_DEBUG_PRINT(...) + +#define LWPROBE_WITH_DEBUG(probe, ...) \ + LWPROBE(probe, __VA_ARGS__); \ + HARMONIZER_DEBUG_PRINT(#probe, __VA_ARGS__); \ +// LWPROBE_WITH_DEBUG(probe, ...) + +#define HARMONIZER_TRACE_PRINT(...) \ + if constexpr (DebugHarmonizerLevel >= EDebugHarmonizerLevel::Trace) { Print(__VA_ARGS__); } +// HARMONIZER_TRACE_PRINT(...) + +#define LWPROBE_WITH_TRACE(probe, ...) \ + LWPROBE(probe, __VA_ARGS__); \ + HARMONIZER_TRACE_PRINT(#probe, __VA_ARGS__); \ +// LWPROBE_WITH_TRACE(probe, ...) + +#define HARMONIZER_HISTORY_PRINT(...) \ + if constexpr (DebugHarmonizerLevel >= EDebugHarmonizerLevel::History) { Print(__VA_ARGS__); } +// HARMONIZER_HISTORY_PRINT(...) + +#define LWPROBE_WITH_HISTORY(probe, ...) \ + LWPROBE(probe, __VA_ARGS__); \ + HARMONIZER_HISTORY_PRINT(#probe, __VA_ARGS__); \ +// LWPROBE_WITH_HISTORY(probe, ...) + +} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/defs.h b/ydb/library/actors/core/harmonizer/defs.h new file mode 100644 index 00000000000..4e185067403 --- /dev/null +++ b/ydb/library/actors/core/harmonizer/defs.h @@ -0,0 +1,3 @@ +#pragma once + +#include <ydb/library/actors/core/defs.h> diff --git a/ydb/library/actors/core/harmonizer/harmonizer.cpp b/ydb/library/actors/core/harmonizer/harmonizer.cpp new file mode 100644 index 00000000000..0c92e172308 --- /dev/null +++ b/ydb/library/actors/core/harmonizer/harmonizer.cpp @@ -0,0 +1,487 @@ +#include "harmonizer.h" +#include "history.h" +#include "pool.h" +#include "waiting_stats.h" +#include "cpu_consumption.h" +#include "shared_info.h" +#include "debug.h" +#include <ydb/library/actors/core/executor_pool.h> + +#include <ydb/library/actors/core/executor_thread_ctx.h> +#include <ydb/library/actors/core/executor_thread.h> +#include <ydb/library/actors/core/probes.h> + +#include <ydb/library/actors/core/activity_guard.h> +#include <ydb/library/actors/core/actorsystem.h> +#include <ydb/library/actors/core/executor_pool_basic.h> +#include <ydb/library/actors/core/executor_pool_basic_feature_flags.h> +#include <ydb/library/actors/core/executor_pool_shared.h> + +#include <atomic> +#include <ydb/library/actors/util/cpu_load_log.h> +#include <ydb/library/actors/util/datetime.h> +#include <ydb/library/actors/util/intrinsics.h> + +#include <util/system/spinlock.h> + +#include <algorithm> + +namespace NActors { + + + +LWTRACE_USING(ACTORLIB_PROVIDER); + + +class THarmonizer: public IHarmonizer { +private: + std::atomic<bool> IsDisabled = false; + TSpinLock Lock; + std::atomic<ui64> NextHarmonizeTs = 0; + std::vector<std::unique_ptr<TPoolInfo>> Pools; + std::vector<ui16> PriorityOrder; + + TValueHistory<16> UsedCpu; + TValueHistory<16> ElapsedCpu; + + std::atomic<float> MaxUsedCpu = 0; + std::atomic<float> MinUsedCpu = 0; + std::atomic<float> MaxElapsedCpu = 0; + std::atomic<float> MinElapsedCpu = 0; + + ISharedExecutorPool* Shared = nullptr; + TSharedInfo SharedInfo; + + TWaitingInfo WaitingInfo; + THarmonizerCpuConsumption CpuConsumption; + THarmonizerStats Stats; + float ProcessingBudget = 0.0; + + void PullStats(ui64 ts); + void PullSharedInfo(); + void ProcessWaitingStats(); + void HarmonizeImpl(ui64 ts); + void CalculatePriorityOrder(); + void ProcessStarvedState(); + void ProcessNeedyState(); + void ProcessExchange(); + void ProcessHoggishState(); +public: + THarmonizer(ui64 ts); + virtual ~THarmonizer(); + double Rescale(double value) const; + void Harmonize(ui64 ts) override; + void DeclareEmergency(ui64 ts) override; + void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override; + void Enable(bool enable) override; + TPoolHarmonizerStats GetPoolStats(i16 poolId) const override; + THarmonizerStats GetStats() const override; + void SetSharedPool(ISharedExecutorPool* pool) override; +}; + +THarmonizer::THarmonizer(ui64 ts) { + NextHarmonizeTs = ts; +} + +THarmonizer::~THarmonizer() { +} + +void THarmonizer::PullStats(ui64 ts) { + HARMONIZER_DEBUG_PRINT("PullStats"); + TCpuConsumption acc; + for (auto &pool : Pools) { + TCpuConsumption consumption = pool->PullStats(ts); + acc.Add(consumption); + } + UsedCpu.Register(ts, acc.CpuUs / 1'000'000.0); + MaxUsedCpu.store(UsedCpu.GetMax(), std::memory_order_relaxed); + MinUsedCpu.store(UsedCpu.GetMin(), std::memory_order_relaxed); + ElapsedCpu.Register(ts, acc.ElapsedUs / 1'000'000.0); + MaxElapsedCpu.store(ElapsedCpu.GetMax(), std::memory_order_relaxed); + MinElapsedCpu.store(ElapsedCpu.GetMin(), std::memory_order_relaxed); + + WaitingInfo.Pull(Pools); + if (Shared) { + SharedInfo.Pull(*Shared); + } + CpuConsumption.Pull(Pools, SharedInfo); + ProcessingBudget = CpuConsumption.Budget; +} + +void THarmonizer::ProcessWaitingStats() { + HARMONIZER_DEBUG_PRINT("ProcessWaitingStats"); + for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { + TPoolInfo& pool = *Pools[poolIdx]; + if (!pool.BasicPool) { + continue; + } + if (pool.BasicPool->ActorSystemProfile != EASProfile::Default) { + if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) { + pool.BasicPool->CalcSpinPerThread(WaitingInfo.AvgWakingUpTimeUs); + } else if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) { + ui64 newSpinThreshold = pool.MovingWaitingStats->CalculateGoodSpinThresholdCycles(WaitingInfo.AvgWakingUpTimeUs); + pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold); + } else { + ui64 newSpinThreshold = pool.WaitingStats->CalculateGoodSpinThresholdCycles(WaitingInfo.AvgWakingUpTimeUs); + pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold); + } + pool.BasicPool->ClearWaitingStats(); + } + } +} + +void THarmonizer::ProcessStarvedState() { + HARMONIZER_DEBUG_PRINT("ProcessStarvedState"); + HARMONIZER_DEBUG_PRINT("shared info", SharedInfo.ToString()); + for (ui16 poolIdx : PriorityOrder) { + TPoolInfo &pool = *Pools[poolIdx]; + i64 threadCount = pool.GetFullThreadCount(); + if (SharedInfo.HasSharedThread[poolIdx] && !SharedInfo.HasSharedThreadWhichWasNotBorrowed[poolIdx]) { + HARMONIZER_DEBUG_PRINT("return own half thread", poolIdx); + i16 borrowedPool = Shared->ReturnOwnHalfThread(poolIdx); + pool.ReturnedHalfThreadByStarvedState.fetch_add(1, std::memory_order_relaxed); + Y_ABORT_UNLESS(borrowedPool != -1); + Pools[borrowedPool]->GivenHalfThreadByOtherStarvedState.fetch_add(1, std::memory_order_relaxed); + } else { + HARMONIZER_DEBUG_PRINT("no own half thread", poolIdx, "has shared thread", SharedInfo.HasSharedThread[poolIdx], "has shared thread which was not borrowed", SharedInfo.HasSharedThreadWhichWasNotBorrowed[poolIdx]); + } + HARMONIZER_DEBUG_PRINT("poolIdx", poolIdx, "threadCount", threadCount, "pool.DefaultFullThreadCount", pool.DefaultFullThreadCount); + while (threadCount > pool.DefaultFullThreadCount) { + pool.SetFullThreadCount(--threadCount); + pool.DecreasingThreadsByStarvedState.fetch_add(1, std::memory_order_relaxed); + CpuConsumption.AdditionalThreads--; + CpuConsumption.StoppingThreads++; + + LWPROBE_WITH_DEBUG(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by starving", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); + if (CpuConsumption.Overbooked <= CpuConsumption.StoppingThreads) { + break; + } + } + if (CpuConsumption.Overbooked <= CpuConsumption.StoppingThreads) { + break; + } + } +} + +void THarmonizer::ProcessNeedyState() { + HARMONIZER_DEBUG_PRINT("ProcessNeedyState"); + if (CpuConsumption.NeedyPools.empty()) { + HARMONIZER_DEBUG_PRINT("No needy pools"); + return; + } + for (size_t needyPoolIdx : CpuConsumption.NeedyPools) { + TPoolInfo &pool = *Pools[needyPoolIdx]; + if (!CpuConsumption.IsNeedyByPool[needyPoolIdx]) { + continue; + } + i64 threadCount = pool.GetFullThreadCount(); + if (Shared && ProcessingBudget >= 0.5 && !SharedInfo.HasBorrowedSharedThread[needyPoolIdx] && CpuConsumption.FreeHalfThread.size()) { + i16 poolWithHalfThread = CpuConsumption.FreeHalfThread.back(); + Shared->GiveHalfThread(poolWithHalfThread, needyPoolIdx); + CpuConsumption.FreeHalfThread.pop_back(); + CpuConsumption.IsNeedyByPool[needyPoolIdx] = false; + ProcessingBudget -= 0.5; + pool.ReceivedHalfThreadByNeedyState.fetch_add(1, std::memory_order_relaxed); + Pools[poolWithHalfThread]->GivenHalfThreadByOtherNeedyState.fetch_add(1, std::memory_order_relaxed); + } else if (ProcessingBudget >= 1.0) { + if (threadCount + 1 <= pool.MaxFullThreadCount) { + pool.IncreasingThreadsByNeedyState.fetch_add(1, std::memory_order_relaxed); + CpuConsumption.IsNeedyByPool[needyPoolIdx] = false; + CpuConsumption.AdditionalThreads++; + pool.SetFullThreadCount(threadCount + 1); + ProcessingBudget -= 1.0; + LWPROBE_WITH_DEBUG(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); + } + } + if constexpr (NFeatures::IsLocalQueues()) { + bool needToExpandLocalQueue = ProcessingBudget < 1.0 || threadCount >= pool.MaxFullThreadCount; + needToExpandLocalQueue &= (bool)pool.BasicPool; + needToExpandLocalQueue &= (pool.MaxFullThreadCount > 1); + needToExpandLocalQueue &= (pool.LocalQueueSize < NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE); + if (needToExpandLocalQueue) { + pool.BasicPool->SetLocalQueueSize(++pool.LocalQueueSize); + } + } + } +} + +void THarmonizer::ProcessExchange() { + HARMONIZER_DEBUG_PRINT("ProcessExchange"); + if (CpuConsumption.NeedyPools.empty()) { + HARMONIZER_DEBUG_PRINT("No needy pools"); + return; + } + size_t takingAwayThreads = 0; + size_t sumOfAdditionalThreads = CpuConsumption.AdditionalThreads; + for (size_t needyPoolIdx : CpuConsumption.NeedyPools) { + TPoolInfo &pool = *Pools[needyPoolIdx]; + i64 threadCount = pool.GetFullThreadCount(); + sumOfAdditionalThreads -= threadCount - pool.DefaultFullThreadCount; + if (sumOfAdditionalThreads < takingAwayThreads + 1) { + break; + } + if (!CpuConsumption.IsNeedyByPool[needyPoolIdx]) { + continue; + } + pool.IncreasingThreadsByExchange.fetch_add(1, std::memory_order_relaxed); + CpuConsumption.IsNeedyByPool[needyPoolIdx] = false; + takingAwayThreads++; + pool.SetFullThreadCount(threadCount + 1); + + LWPROBE_WITH_DEBUG(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by exchanging", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); + } + + for (ui16 poolIdx : PriorityOrder) { + if (takingAwayThreads <= 0) { + break; + } + + TPoolInfo &pool = *Pools[poolIdx]; + size_t threadCount = pool.GetFullThreadCount(); + size_t additionalThreadsCount = Max<size_t>(0L, threadCount - pool.DefaultFullThreadCount); + size_t currentTakingAwayThreads = Min(additionalThreadsCount, takingAwayThreads); + + if (!currentTakingAwayThreads) { + continue; + } + takingAwayThreads -= currentTakingAwayThreads; + pool.SetFullThreadCount(threadCount - currentTakingAwayThreads); + + pool.DecreasingThreadsByExchange.fetch_add(currentTakingAwayThreads, std::memory_order_relaxed); + LWPROBE_WITH_DEBUG(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by exchanging", threadCount - currentTakingAwayThreads, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); + } +} + +void THarmonizer::ProcessHoggishState() { + HARMONIZER_DEBUG_PRINT("ProcessHoggishState"); + for (auto &[hoggishPoolIdx, freeCpu] : CpuConsumption.HoggishPools) { + TPoolInfo &pool = *Pools[hoggishPoolIdx]; + i64 threadCount = pool.GetFullThreadCount(); + if (SharedInfo.HasBorrowedSharedThread[hoggishPoolIdx]) { + i16 originalPool = Shared->ReturnBorrowedHalfThread(hoggishPoolIdx); + HARMONIZER_DEBUG_PRINT("has borrowed shared thread", hoggishPoolIdx, "will return half thread to", originalPool); + freeCpu -= 0.5; + pool.GivenHalfThreadByHoggishState.fetch_add(1, std::memory_order_relaxed); + Y_ABORT_UNLESS(originalPool != -1); + Pools[originalPool]->ReturnedHalfThreadByOtherHoggishState.fetch_add(1, std::memory_order_relaxed); + } + if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) { + pool.LocalQueueSize = std::min<ui16>(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE, pool.LocalQueueSize / 2); + pool.BasicPool->SetLocalQueueSize(pool.LocalQueueSize); + } + HARMONIZER_DEBUG_PRINT("poolIdx", hoggishPoolIdx, "threadCount", threadCount, "pool.MinFullThreadCount", pool.MinFullThreadCount, "freeCpu", freeCpu); + if (threadCount > pool.MinFullThreadCount && freeCpu >= 1) { + pool.DecreasingThreadsByHoggishState.fetch_add(1, std::memory_order_relaxed); + pool.SetFullThreadCount(threadCount - 1); + LWPROBE_WITH_DEBUG(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); + } + } +} + +void THarmonizer::HarmonizeImpl(ui64 ts) { + HARMONIZER_DEBUG_PRINT("HarmonizeImpl"); + Y_UNUSED(ts); + + ProcessWaitingStats(); + + for (size_t needyPoolIdx : CpuConsumption.NeedyPools) { + TPoolInfo &pool = *Pools[needyPoolIdx]; + if (pool.AvgPingCounter && pool.LastUpdateTs + Us2Ts(3'000'000) > ts) { + CpuConsumption.IsNeedyByPool[needyPoolIdx] = false; + HARMONIZER_DEBUG_PRINT("pool won't be updated because time", needyPoolIdx); + } + } + + HARMONIZER_DEBUG_PRINT("IsStarvedPresent", CpuConsumption.IsStarvedPresent, "Overbooked", CpuConsumption.Overbooked, "StoppingThreads", CpuConsumption.StoppingThreads); + if (CpuConsumption.IsStarvedPresent && CpuConsumption.Overbooked >= CpuConsumption.StoppingThreads) { + ProcessStarvedState(); + } else if (!CpuConsumption.IsStarvedPresent) { + ProcessNeedyState(); + } + + if (ProcessingBudget < 1.0) { + ProcessExchange(); + } + + if (!CpuConsumption.HoggishPools.empty()) { + ProcessHoggishState(); + } + + for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { + TPoolInfo& pool = *Pools[poolIdx]; + pool.PotentialMaxThreadCount.store(std::min<i64>(pool.MaxThreadCount, static_cast<i64>(pool.GetThreadCount() + CpuConsumption.Budget)), std::memory_order_relaxed); + HARMONIZER_DEBUG_PRINT(poolIdx, pool.Pool->GetName(), "potential max thread count", pool.PotentialMaxThreadCount.load(std::memory_order_relaxed), "budget", CpuConsumption.Budget, "thread count", pool.GetThreadCount()); + } +} + +void THarmonizer::CalculatePriorityOrder() { + PriorityOrder.resize(Pools.size()); + Iota(PriorityOrder.begin(), PriorityOrder.end(), 0); + Sort(PriorityOrder.begin(), PriorityOrder.end(), [&] (i16 lhs, i16 rhs) { + if (Pools[lhs]->Priority != Pools[rhs]->Priority) { + return Pools[lhs]->Priority < Pools[rhs]->Priority; + } + return Pools[lhs]->Pool->PoolId > Pools[rhs]->Pool->PoolId; + }); +} + +void THarmonizer::Harmonize(ui64 ts) { + if (IsDisabled.load(std::memory_order_relaxed) || NextHarmonizeTs.load(std::memory_order_acquire) > ts || !Lock.TryAcquire()) { + LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs.load(std::memory_order_relaxed), IsDisabled.load(std::memory_order_relaxed), false); + return; + } + + if (NextHarmonizeTs.load(std::memory_order_acquire) > ts) { + Lock.Release(); + return; + } + + // Check again under the lock + if (IsDisabled.load(std::memory_order_relaxed)) { + LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs.load(std::memory_order_relaxed), IsDisabled.load(std::memory_order_relaxed), true); + Lock.Release(); + return; + } + // Will never reach this line disabled + ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull), std::memory_order_acquire); + LWPROBE_WITH_DEBUG(TryToHarmonizeSuccess, ts, NextHarmonizeTs.load(std::memory_order_relaxed), previousNextHarmonizeTs); + + { + TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_HARMONIZER> activityGuard; + + if (PriorityOrder.empty()) { + CalculatePriorityOrder(); + SharedInfo.Init(Pools.size()); + CpuConsumption.Init(Pools.size()); + } + + PullStats(ts); + HarmonizeImpl(ts); + } + + Lock.Release(); +} + +void THarmonizer::DeclareEmergency(ui64 ts) { + NextHarmonizeTs = ts; +} + +void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) { + TGuard<TSpinLock> guard(Lock); + Pools.emplace_back(new TPoolInfo); + TPoolInfo &poolInfo = *Pools.back(); + poolInfo.Pool = pool; + poolInfo.Shared = Shared; + poolInfo.BasicPool = dynamic_cast<TBasicExecutorPool*>(pool); + poolInfo.DefaultThreadCount = pool->GetDefaultThreadCount(); + poolInfo.MinThreadCount = pool->GetMinThreadCount(); + poolInfo.MaxThreadCount = pool->GetMaxThreadCount(); + poolInfo.PotentialMaxThreadCount = poolInfo.MaxThreadCount; + + poolInfo.DefaultFullThreadCount = pool->GetDefaultFullThreadCount(); + poolInfo.MinFullThreadCount = pool->GetMinFullThreadCount(); + poolInfo.MaxFullThreadCount = pool->GetMaxFullThreadCount(); + poolInfo.ThreadInfo.resize(poolInfo.MaxFullThreadCount); + poolInfo.SharedInfo.resize(Shared ? Shared->GetSharedThreadCount() : 0); + poolInfo.Priority = pool->GetPriority(); + pool->SetFullThreadCount(poolInfo.DefaultFullThreadCount); + if (pingInfo) { + poolInfo.AvgPingCounter = pingInfo->AvgPingCounter; + poolInfo.AvgPingCounterWithSmallWindow = pingInfo->AvgPingCounterWithSmallWindow; + poolInfo.MaxAvgPingUs = pingInfo->MaxAvgPingUs; + } + if (poolInfo.BasicPool) { + poolInfo.WaitingStats.reset(new TWaitingStats<ui64>()); + poolInfo.MovingWaitingStats.reset(new TWaitingStats<double>()); + } + PriorityOrder.clear(); +} + +void THarmonizer::Enable(bool enable) { + TGuard<TSpinLock> guard(Lock); + IsDisabled = enable; +} + +IHarmonizer* MakeHarmonizer(ui64 ts) { + return new THarmonizer(ts); +} + +TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const { + const TPoolInfo &pool = *Pools[poolId]; + ui64 flags = pool.LastFlags.load(std::memory_order_relaxed); + return TPoolHarmonizerStats{ + .IncreasingThreadsByNeedyState = pool.IncreasingThreadsByNeedyState.load(std::memory_order_relaxed), + .IncreasingThreadsByExchange = pool.IncreasingThreadsByExchange.load(std::memory_order_relaxed), + .DecreasingThreadsByStarvedState = pool.DecreasingThreadsByStarvedState.load(std::memory_order_relaxed), + .DecreasingThreadsByHoggishState = pool.DecreasingThreadsByHoggishState.load(std::memory_order_relaxed), + .DecreasingThreadsByExchange = pool.DecreasingThreadsByExchange.load(std::memory_order_relaxed), + .ReceivedHalfThreadByNeedyState = pool.ReceivedHalfThreadByNeedyState.load(std::memory_order_relaxed), + .GivenHalfThreadByOtherStarvedState = pool.GivenHalfThreadByOtherStarvedState.load(std::memory_order_relaxed), + .GivenHalfThreadByHoggishState = pool.GivenHalfThreadByHoggishState.load(std::memory_order_relaxed), + .GivenHalfThreadByOtherNeedyState = pool.GivenHalfThreadByOtherNeedyState.load(std::memory_order_relaxed), + .ReturnedHalfThreadByStarvedState = pool.ReturnedHalfThreadByStarvedState.load(std::memory_order_relaxed), + .ReturnedHalfThreadByOtherHoggishState = pool.ReturnedHalfThreadByOtherHoggishState.load(std::memory_order_relaxed), + .MaxUsedCpu = pool.MaxUsedCpu.load(std::memory_order_relaxed), + .MinUsedCpu = pool.MinUsedCpu.load(std::memory_order_relaxed), + .AvgUsedCpu = pool.AvgUsedCpu.load(std::memory_order_relaxed), + .MaxElapsedCpu = pool.MaxElapsedCpu.load(std::memory_order_relaxed), + .MinElapsedCpu = pool.MinElapsedCpu.load(std::memory_order_relaxed), + .AvgElapsedCpu = pool.AvgElapsedCpu.load(std::memory_order_relaxed), + .PotentialMaxThreadCount = pool.PotentialMaxThreadCount.load(std::memory_order_relaxed), + .IsNeedy = static_cast<bool>(flags & 1), + .IsStarved = static_cast<bool>(flags & 2), + .IsHoggish = static_cast<bool>(flags & 4), + }; +} + +THarmonizerStats THarmonizer::GetStats() const { + return THarmonizerStats{ + .MaxUsedCpu = MaxUsedCpu.load(std::memory_order_relaxed), + .MinUsedCpu = MinUsedCpu.load(std::memory_order_relaxed), + .MaxElapsedCpu = MaxElapsedCpu.load(std::memory_order_relaxed), + .MinElapsedCpu = MinElapsedCpu.load(std::memory_order_relaxed), + .AvgAwakeningTimeUs = WaitingInfo.AvgAwakeningTimeUs.load(std::memory_order_relaxed), + .AvgWakingUpTimeUs = WaitingInfo.AvgWakingUpTimeUs.load(std::memory_order_relaxed), + }; +} + +void THarmonizer::SetSharedPool(ISharedExecutorPool* pool) { + Shared = pool; +} + +TString TPoolHarmonizerStats::ToString() const { + return TStringBuilder() << '{' + << "IncreasingThreadsByNeedyState: " << IncreasingThreadsByNeedyState << ", " + << "IncreasingThreadsByExchange: " << IncreasingThreadsByExchange << ", " + << "DecreasingThreadsByStarvedState: " << DecreasingThreadsByStarvedState << ", " + << "DecreasingThreadsByHoggishState: " << DecreasingThreadsByHoggishState << ", " + << "DecreasingThreadsByExchange: " << DecreasingThreadsByExchange << ", " + << "ReceivedHalfThreadByNeedyState: " << ReceivedHalfThreadByNeedyState << ", " + << "GivenHalfThreadByOtherStarvedState: " << GivenHalfThreadByOtherStarvedState << ", " + << "GivenHalfThreadByHoggishState: " << GivenHalfThreadByHoggishState << ", " + << "GivenHalfThreadByOtherNeedyState: " << GivenHalfThreadByOtherNeedyState << ", " + << "ReturnedHalfThreadByStarvedState: " << ReturnedHalfThreadByStarvedState << ", " + << "ReturnedHalfThreadByOtherHoggishState: " << ReturnedHalfThreadByOtherHoggishState << ", " + << "MaxUsedCpu: " << MaxUsedCpu << ", " + << "MinUsedCpu: " << MinUsedCpu << ", " + << "AvgUsedCpu: " << AvgUsedCpu << ", " + << "MaxElapsedCpu: " << MaxElapsedCpu << ", " + << "MinElapsedCpu: " << MinElapsedCpu << ", " + << "AvgElapsedCpu: " << AvgElapsedCpu << ", " + << "PotentialMaxThreadCount: " << PotentialMaxThreadCount << ", " + << "IsNeedy: " << IsNeedy << ", " + << "IsStarved: " << IsStarved << ", " + << "IsHoggish: " << IsHoggish << '}'; +} + +TString THarmonizerStats::ToString() const { + return TStringBuilder() << '{' + << "MaxUsedCpu: " << MaxUsedCpu << ", " + << "MinUsedCpu: " << MinUsedCpu << ", " + << "MaxElapsedCpu: " << MaxElapsedCpu << ", " + << "MinElapsedCpu: " << MinElapsedCpu << ", " + << "AvgAwakeningTimeUs: " << AvgAwakeningTimeUs << ", " + << "AvgWakingUpTimeUs: " << AvgWakingUpTimeUs << '}'; +} + +} diff --git a/ydb/library/actors/core/harmonizer.h b/ydb/library/actors/core/harmonizer/harmonizer.h index b4323ef8de1..e60bb19b919 100644 --- a/ydb/library/actors/core/harmonizer.h +++ b/ydb/library/actors/core/harmonizer/harmonizer.h @@ -1,11 +1,10 @@ #pragma once #include "defs.h" -#include "executor_pool_shared.h" namespace NActors { class IExecutorPool; - class TSharedExecutorPool; + class ISharedExecutorPool; struct TSelfPingInfo; template <typename T> @@ -17,25 +16,38 @@ namespace NActors { ui64 DecreasingThreadsByStarvedState = 0; ui64 DecreasingThreadsByHoggishState = 0; ui64 DecreasingThreadsByExchange = 0; - float MaxConsumedCpu = 0.0; - float MinConsumedCpu = 0.0; - float AvgConsumedCpu = 0.0; - float MaxBookedCpu = 0.0; - float MinBookedCpu = 0.0; + + ui64 ReceivedHalfThreadByNeedyState = 0; + ui64 GivenHalfThreadByOtherStarvedState = 0; + ui64 GivenHalfThreadByHoggishState = 0; + ui64 GivenHalfThreadByOtherNeedyState = 0; + ui64 ReturnedHalfThreadByStarvedState = 0; + ui64 ReturnedHalfThreadByOtherHoggishState = 0; + + float MaxUsedCpu = 0.0; + float MinUsedCpu = 0.0; + float AvgUsedCpu = 0.0; + float MaxElapsedCpu = 0.0; + float MinElapsedCpu = 0.0; + float AvgElapsedCpu = 0.0; i16 PotentialMaxThreadCount = 0; bool IsNeedy = false; bool IsStarved = false; bool IsHoggish = false; + + TString ToString() const; }; struct THarmonizerStats { - i64 MaxConsumedCpu = 0.0; - i64 MinConsumedCpu = 0.0; - i64 MaxBookedCpu = 0.0; - i64 MinBookedCpu = 0.0; + float MaxUsedCpu = 0.0; + float MinUsedCpu = 0.0; + float MaxElapsedCpu = 0.0; + float MinElapsedCpu = 0.0; + + float AvgAwakeningTimeUs = 0; + float AvgWakingUpTimeUs = 0; - double AvgAwakeningTimeUs = 0; - double AvgWakingUpTimeUs = 0; + TString ToString() const; }; // Pool cpu harmonizer @@ -48,7 +60,7 @@ namespace NActors { virtual void Enable(bool enable) = 0; virtual TPoolHarmonizerStats GetPoolStats(i16 poolId) const = 0; virtual THarmonizerStats GetStats() const = 0; - virtual void SetSharedPool(TSharedExecutorPool* pool) = 0; + virtual void SetSharedPool(ISharedExecutorPool* pool) = 0; }; IHarmonizer* MakeHarmonizer(ui64 ts); diff --git a/ydb/library/actors/core/harmonizer/history.h b/ydb/library/actors/core/harmonizer/history.h new file mode 100644 index 00000000000..889033dc7d8 --- /dev/null +++ b/ydb/library/actors/core/harmonizer/history.h @@ -0,0 +1,183 @@ +#pragma once + +#include "defs.h" +#include "debug.h" +#include <ydb/library/actors/util/datetime.h> + + +namespace NActors { + +// TValueHistory takes an accumulating value that only increases +// Then it splits this value by seconds relative to the elapsed time since the last value registration +// Allows finding per-second maximum, minimum, and average values for the last N seconds +// WithTail=false - calculate only fully passed seconds; example: current time is 4.5, GetAvgLastSeconds(2) calculates values from 2.0 to 4.0 +// WithTail=true - calculate also partially passed seconds; example: current time is 4.5, GetAvgLastSeconds(2) calculates values from 2.5 to 4.5 +// If time passed more seconds than history buffer size, then all values are recalculated by average value + +template <ui8 HistoryBufferSize = 8> +struct TValueHistory { + + static constexpr bool CheckBinaryPower(ui64 value) { + return !(value & (value - 1)); + } + + static_assert(CheckBinaryPower(HistoryBufferSize)); + + double History[HistoryBufferSize] = {0.0}; + ui64 HistoryIdx = 0; + ui64 LastTs = Max<ui64>(); + double LastValue = 0.0; + double AccumulatedValue = 0.0; + ui64 AccumulatedTs = 0; + + template <bool WithTail=false> + double Accumulate(auto op, auto comb, ui8 seconds) const { + HARMONIZER_HISTORY_PRINT("Accumulate, seconds = ", static_cast<ui16>(seconds), ", WithTail = ", WithTail); + double acc = AccumulatedValue; + size_t idx = HistoryIdx; + ui8 leftSeconds = seconds; + if constexpr (!WithTail) { + idx--; + leftSeconds--; + if (idx >= HistoryBufferSize) { + idx = HistoryBufferSize - 1; + } + acc = History[idx]; + } + HARMONIZER_HISTORY_PRINT("Accumulate iteration, acc = ", acc, ", idx = ", static_cast<ui16>(idx), ", leftSeconds = ", static_cast<ui16>(leftSeconds)); + while (leftSeconds) { + idx--; + leftSeconds--; + if (idx >= HistoryBufferSize) { + idx = HistoryBufferSize - 1; + } + if constexpr (!WithTail) { + acc = op(acc, History[idx]); + } else if (leftSeconds) { + acc = op(acc, History[idx]); + } else { + ui64 tsInSecond = Us2Ts(1'000'000.0); + acc = op(acc, History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond); + } + HARMONIZER_HISTORY_PRINT("Accumulate iteration, acc = ", acc, ", idx = ", static_cast<ui16>(idx), ", leftSeconds = ", static_cast<ui16>(leftSeconds)); + } + auto result = comb(acc, seconds); + HARMONIZER_HISTORY_PRINT("Accumulate return, acc = ", acc, ", duration = ", static_cast<ui16>(seconds), ", result = ", result); + return result; + } + + template <bool WithTail=false> + double GetAvgPartForLastSeconds(ui8 seconds) const { + auto sum = [](double acc, double value) { + HARMONIZER_HISTORY_PRINT("calculate sum, acc = ", acc, ", value = ", value, ", acc + value = ", acc + value); + return acc + value; + }; + auto avg = [](double sum, double duration) { + HARMONIZER_HISTORY_PRINT("calculate avg, sum = ", sum, ", duration = ", duration); + return sum / duration; + }; + HARMONIZER_HISTORY_PRINT("GetAvgPartForLastSeconds, seconds = ", static_cast<ui16>(seconds), ", WithTail = ", WithTail); + return Accumulate<WithTail>(sum, avg, seconds); + } + + double GetAvgPart() const { + HARMONIZER_HISTORY_PRINT("GetAvgPart, seconds = ", static_cast<ui16>(HistoryBufferSize), ", WithTail = ", true); + return GetAvgPartForLastSeconds<true>(HistoryBufferSize); + } + + double GetMaxForLastSeconds(ui8 seconds) const { + auto max = [](const double& acc, const double& value) { + HARMONIZER_HISTORY_PRINT("calculate max, acc = ", acc, ", value = ", value, ", Max(acc, value) = ", Max(acc, value)); + return Max(acc, value); + }; + auto fst = [](const double& value, const double&) { return value; }; + HARMONIZER_HISTORY_PRINT("GetMaxForLastSeconds, seconds = ", static_cast<ui16>(seconds), ", WithTail = ", false); + return Accumulate<false>(max, fst, seconds); + } + + double GetMax() const { + HARMONIZER_HISTORY_PRINT("GetMax, seconds = ", static_cast<ui16>(HistoryBufferSize), ", WithTail = ", false); + return GetMaxForLastSeconds(HistoryBufferSize); + } + + double GetMinForLastSeconds(ui8 seconds) const { + auto min = [](const double& acc, const double& value) { + HARMONIZER_HISTORY_PRINT("calculate min, acc = ", acc, ", value = ", value, ", Min(acc, value) = ", Min(acc, value)); + return Min(acc, value); + }; + auto fst = [](const double& value, const double&) { return value; }; + HARMONIZER_HISTORY_PRINT("GetMinForLastSeconds, seconds = ", static_cast<ui16>(seconds), ", WithTail = ", false); + return Accumulate<false>(min, fst, seconds); + } + + double GetMin() const { + HARMONIZER_HISTORY_PRINT("GetMin, seconds = ", static_cast<ui16>(HistoryBufferSize), ", WithTail = ", false); + return GetMinForLastSeconds(HistoryBufferSize); + } + + void Register(ui64 ts, double value) { + HARMONIZER_HISTORY_PRINT("Register, ts = ", ts, ", value = ", value); + if (ts < LastTs) { + LastTs = ts; + LastValue = value; + AccumulatedValue = 0.0; + AccumulatedTs = 0; + for (size_t idx = 0; idx < HistoryBufferSize; ++idx) { + History[idx] = 0.0; + } + HARMONIZER_HISTORY_PRINT("Register backward time, LastTs = ", LastTs, ", LastValue = ", LastValue, ", AccumulatedValue = ", AccumulatedValue, ", AccumulatedTs = ", AccumulatedTs); + return; + } + ui64 lastTs = std::exchange(LastTs, ts); + ui64 dTs = ts - lastTs; + double lastValue = std::exchange(LastValue, value); + double dValue = value - lastValue; + + if (dTs > HistoryBufferSize * Us2Ts(1'000'000.0)) { + dValue = dValue * 1'000'000.0 / Ts2Us(dTs); + for (size_t idx = 0; idx < HistoryBufferSize; ++idx) { + History[idx] = dValue; + } + AccumulatedValue = 0.0; + AccumulatedTs = 0; + HARMONIZER_HISTORY_PRINT("Register big gap, dTs = ", dTs, ", dValue = ", dValue, ", AccumulatedValue = ", AccumulatedValue, ", AccumulatedTs = ", AccumulatedTs, "lastTs = ", lastTs, ", lastValue = ", lastValue); + return; + } + + if (dTs == 0) { + AccumulatedValue += dValue; + HARMONIZER_HISTORY_PRINT("Register zero gap, dTs = ", dTs, ", dValue = ", dValue, ", AccumulatedValue = ", AccumulatedValue, ", AccumulatedTs = ", AccumulatedTs, "lastTs = ", lastTs, ", lastValue = ", lastValue); + return; + } + + HARMONIZER_HISTORY_PRINT("Register start processing seconds, dTs = ", dTs, ", dValue = ", dValue, ", AccumulatedValue = ", AccumulatedValue, ", AccumulatedTs = ", AccumulatedTs, "lastTs = ", lastTs, ", lastValue = ", lastValue); + + while (dTs > 0) { + if (AccumulatedTs + dTs < Us2Ts(1'000'000.0)) { + AccumulatedTs += dTs; + AccumulatedValue += dValue; + HARMONIZER_HISTORY_PRINT("Register small gap, dTs = ", dTs, ", dValue = ", dValue, ", AccumulatedValue = ", AccumulatedValue, ", AccumulatedTs = ", AccumulatedTs); + break; + } else { + ui64 addTs = Us2Ts(1'000'000.0) - AccumulatedTs; + double addValue = dValue * addTs / dTs; + dTs -= addTs; + dValue -= addValue; + History[HistoryIdx] = AccumulatedValue + addValue; + HistoryIdx = (HistoryIdx + 1) % HistoryBufferSize; + AccumulatedValue = 0.0; + AccumulatedTs = 0; + HARMONIZER_HISTORY_PRINT("Register process one second, dTs = ", dTs, + ", dValue = ", dValue, + ", addTs = ", addTs, + ", addValue = ", addValue, + ", AccumulatedValue = ", AccumulatedValue, + ", AccumulatedTs = ", AccumulatedTs, + ", HistoryIdx = ", HistoryIdx, + ", History[HistoryIdx - 1] = ", History[(HistoryIdx + HistoryBufferSize - 1) % HistoryBufferSize]); + } + } + } +}; // struct TValueHistory + +} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/pool.cpp b/ydb/library/actors/core/harmonizer/pool.cpp new file mode 100644 index 00000000000..2f96b891c8a --- /dev/null +++ b/ydb/library/actors/core/harmonizer/pool.cpp @@ -0,0 +1,150 @@ +#include "pool.h" + +#include <ydb/library/actors/core/executor_pool.h> +#include <ydb/library/actors/core/executor_pool_shared.h> +#include <ydb/library/actors/core/executor_pool_basic.h> +#include <ydb/library/actors/core/executor_pool_basic_feature_flags.h> + +#include "debug.h" + +namespace NActors { + +LWTRACE_USING(ACTORLIB_PROVIDER); + +TPoolInfo::TPoolInfo() + : LocalQueueSize(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) +{} + +double TPoolInfo::GetCpu(i16 threadIdx) const { + if ((size_t)threadIdx < ThreadInfo.size()) { + return ThreadInfo[threadIdx].UsedCpu.GetAvgPart(); + } + return 0.0; +} + +double TPoolInfo::GetSharedCpu(i16 sharedThreadIdx) const { + if ((size_t)sharedThreadIdx < SharedInfo.size()) { + return SharedInfo[sharedThreadIdx].UsedCpu.GetAvgPart(); + } + return 0.0; +} + +double TPoolInfo::GetLastSecondCpu(i16 threadIdx) const { + if ((size_t)threadIdx < ThreadInfo.size()) { + return ThreadInfo[threadIdx].UsedCpu.GetAvgPartForLastSeconds<true>(1); + } + return 0.0; +} + +double TPoolInfo::GetLastSecondSharedCpu(i16 sharedThreadIdx) const { + if ((size_t)sharedThreadIdx < SharedInfo.size()) { + return SharedInfo[sharedThreadIdx].UsedCpu.GetAvgPartForLastSeconds<true>(1); + } + return 0.0; +} + +double TPoolInfo::GetElapsed(i16 threadIdx) const { + if ((size_t)threadIdx < ThreadInfo.size()) { + return ThreadInfo[threadIdx].ElapsedCpu.GetAvgPart(); + } + return 0.0; +} + +double TPoolInfo::GetSharedElapsed(i16 sharedThreadIdx) const { + if ((size_t)sharedThreadIdx < SharedInfo.size()) { + return SharedInfo[sharedThreadIdx].ElapsedCpu.GetAvgPart(); + } + return 0.0; +} + +double TPoolInfo::GetLastSecondElapsed(i16 threadIdx) const { + if ((size_t)threadIdx < ThreadInfo.size()) { + return ThreadInfo[threadIdx].ElapsedCpu.GetAvgPartForLastSeconds<true>(1); + } + return 0.0; +} + +double TPoolInfo::GetLastSecondSharedElapsed(i16 sharedThreadIdx) const { + if ((size_t)sharedThreadIdx < SharedInfo.size()) { + return SharedInfo[sharedThreadIdx].ElapsedCpu.GetAvgPartForLastSeconds<true>(1); + } + return 0.0; +} + +#define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7] +TCpuConsumption TPoolInfo::PullStats(ui64 ts) { + TCpuConsumption acc; + for (i16 threadIdx = 0; threadIdx < MaxFullThreadCount; ++threadIdx) { + TThreadInfo &threadInfo = ThreadInfo[threadIdx]; + TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx); + acc.Add(cpuConsumption); + threadInfo.ElapsedCpu.Register(ts, cpuConsumption.ElapsedUs / 1'000'000.0); + LWPROBE_WITH_TRACE(SavedValues, Pool->PoolId, Pool->GetName(), "elapsed", UNROLL_HISTORY(threadInfo.ElapsedCpu.History)); + threadInfo.UsedCpu.Register(ts, cpuConsumption.CpuUs / 1'000'000.0); + LWPROBE_WITH_TRACE(SavedValues, Pool->PoolId, Pool->GetName(), "cpu", UNROLL_HISTORY(threadInfo.UsedCpu.History)); + } + TVector<TExecutorThreadStats> sharedStats; + if (Shared) { + Shared->GetSharedStatsForHarmonizer(Pool->PoolId, sharedStats); + } + + for (ui32 sharedIdx = 0; sharedIdx < SharedInfo.size(); ++sharedIdx) { + auto stat = sharedStats[sharedIdx]; + TCpuConsumption sharedConsumption{ + static_cast<float>(stat.CpuUs), + Ts2Us(stat.SafeElapsedTicks), + stat.NotEnoughCpuExecutions + }; + acc.Add(sharedConsumption); + SharedInfo[sharedIdx].ElapsedCpu.Register(ts, sharedConsumption.ElapsedUs / 1'000'000.0); + LWPROBE_WITH_TRACE(SavedValues, Pool->PoolId, Pool->GetName(), "elapsed", UNROLL_HISTORY(SharedInfo[sharedIdx].ElapsedCpu.History)); + SharedInfo[sharedIdx].UsedCpu.Register(ts, sharedConsumption.CpuUs / 1'000'000.0); + LWPROBE_WITH_TRACE(SavedValues, Pool->PoolId, Pool->GetName(), "cpu", UNROLL_HISTORY(SharedInfo[sharedIdx].UsedCpu.History)); + } + + UsedCpu.Register(ts, acc.CpuUs / 1'000'000.0); + MaxUsedCpu.store(UsedCpu.GetMax(), std::memory_order_relaxed); + MinUsedCpu.store(UsedCpu.GetMin(), std::memory_order_relaxed); + AvgUsedCpu.store(UsedCpu.GetAvgPart(), std::memory_order_relaxed); + ElapsedCpu.Register(ts, acc.ElapsedUs / 1'000'000.0); + MaxElapsedCpu.store(ElapsedCpu.GetMax(), std::memory_order_relaxed); + MinElapsedCpu.store(ElapsedCpu.GetMin(), std::memory_order_relaxed); + AvgElapsedCpu.store(ElapsedCpu.GetAvgPart(), std::memory_order_relaxed); + NewNotEnoughCpuExecutions = acc.NotEnoughCpuExecutions - NotEnoughCpuExecutions; + NotEnoughCpuExecutions = acc.NotEnoughCpuExecutions; + if (WaitingStats && BasicPool) { + WaitingStats->Clear(); + BasicPool->GetWaitingStats(*WaitingStats); + if constexpr (!NFeatures::TSpinFeatureFlags::CalcPerThread) { + MovingWaitingStats->Add(*WaitingStats, 0.8, 0.2); + } + } + return acc; +} +#undef UNROLL_HISTORY + +float TPoolInfo::GetThreadCount() { + return Pool->GetThreadCount(); +} + +i16 TPoolInfo::GetFullThreadCount() { + return Pool->GetFullThreadCount(); +} + +void TPoolInfo::SetFullThreadCount(i16 threadCount) { + HARMONIZER_DEBUG_PRINT(Pool->PoolId, Pool->GetName(), "set full thread count", threadCount); + Pool->SetFullThreadCount(threadCount); +} + +bool TPoolInfo::IsAvgPingGood() { + bool res = true; + if (AvgPingCounter) { + res &= *AvgPingCounter > MaxAvgPingUs; + } + if (AvgPingCounterWithSmallWindow) { + res &= *AvgPingCounterWithSmallWindow > MaxAvgPingUs; + } + return res; +} + +} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/pool.h b/ydb/library/actors/core/harmonizer/pool.h new file mode 100644 index 00000000000..9b083dcb8e4 --- /dev/null +++ b/ydb/library/actors/core/harmonizer/pool.h @@ -0,0 +1,93 @@ +#pragma once + +#include "defs.h" + +#include "history.h" +#include <ydb/library/actors/core/executor_pool.h> + +#include <library/cpp/monlib/dynamic_counters/counters.h> + +namespace NActors { + +class ISharedExecutorPool; +class TBasicExecutorPool; +class IExecutorPool; +template <typename T> +struct TWaitingStats; + +struct TThreadInfo { + TValueHistory<8> UsedCpu; + TValueHistory<8> ElapsedCpu; +}; // struct TThreadInfo + +struct TPoolInfo { + std::vector<TThreadInfo> ThreadInfo; + std::vector<TThreadInfo> SharedInfo; + ISharedExecutorPool* Shared = nullptr; + IExecutorPool* Pool = nullptr; + TBasicExecutorPool* BasicPool = nullptr; + + i16 DefaultFullThreadCount = 0; + i16 MinFullThreadCount = 0; + i16 MaxFullThreadCount = 0; + + float DefaultThreadCount = 0; + float MinThreadCount = 0; + float MaxThreadCount = 0; + + i16 Priority = 0; + NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter; + NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow; + ui32 MaxAvgPingUs = 0; + ui64 LastUpdateTs = 0; + ui64 NotEnoughCpuExecutions = 0; + ui64 NewNotEnoughCpuExecutions = 0; + ui16 LocalQueueSize; + + std::atomic<i64> LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish + std::atomic<ui64> IncreasingThreadsByNeedyState = 0; + std::atomic<ui64> IncreasingThreadsByExchange = 0; + std::atomic<ui64> DecreasingThreadsByStarvedState = 0; + std::atomic<ui64> DecreasingThreadsByHoggishState = 0; + std::atomic<ui64> DecreasingThreadsByExchange = 0; + std::atomic<i16> PotentialMaxThreadCount = 0; + std::atomic<ui64> ReceivedHalfThreadByNeedyState = 0; + std::atomic<ui64> GivenHalfThreadByOtherStarvedState = 0; + std::atomic<ui64> GivenHalfThreadByHoggishState = 0; + std::atomic<ui64> GivenHalfThreadByOtherNeedyState = 0; + std::atomic<ui64> ReturnedHalfThreadByStarvedState = 0; + std::atomic<ui64> ReturnedHalfThreadByOtherHoggishState = 0; + + TValueHistory<16> UsedCpu; + TValueHistory<16> ElapsedCpu; + + std::atomic<float> MaxUsedCpu = 0; + std::atomic<float> MinUsedCpu = 0; + std::atomic<float> AvgUsedCpu = 0; + std::atomic<float> MaxElapsedCpu = 0; + std::atomic<float> MinElapsedCpu = 0; + std::atomic<float> AvgElapsedCpu = 0; + + std::unique_ptr<TWaitingStats<ui64>> WaitingStats; + std::unique_ptr<TWaitingStats<double>> MovingWaitingStats; + + TPoolInfo(); + + double GetCpu(i16 threadIdx) const; + double GetElapsed(i16 threadIdx) const; + double GetLastSecondCpu(i16 threadIdx) const; + double GetLastSecondElapsed(i16 threadIdx) const; + + double GetSharedCpu(i16 sharedThreadIdx) const; + double GetSharedElapsed(i16 sharedThreadIdx) const; + double GetLastSecondSharedCpu(i16 sharedThreadIdx) const; + double GetLastSecondSharedElapsed(i16 sharedThreadIdx) const; + + TCpuConsumption PullStats(ui64 ts); + i16 GetFullThreadCount(); + float GetThreadCount(); + void SetFullThreadCount(i16 threadCount); + bool IsAvgPingGood(); +}; // struct TPoolInfo + +} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/shared_info.cpp b/ydb/library/actors/core/harmonizer/shared_info.cpp new file mode 100644 index 00000000000..6b8b94f0bd8 --- /dev/null +++ b/ydb/library/actors/core/harmonizer/shared_info.cpp @@ -0,0 +1,55 @@ +#include "shared_info.h" +#include <util/string/builder.h> + +#include <ydb/library/actors/core/executor_pool_shared.h> + +namespace NActors { + +void TSharedInfo::Init(i16 poolCount) { + HasSharedThread.resize(poolCount, false); + HasSharedThreadWhichWasNotBorrowed.resize(poolCount, false); + HasBorrowedSharedThread.resize(poolCount, false); +} + +void TSharedInfo::Pull(const ISharedExecutorPool& shared) { + auto sharedState = shared.GetState(); + for (ui32 poolIdx = 0; poolIdx < HasSharedThread.size(); ++poolIdx) { + i16 threadIdx = sharedState.ThreadByPool[poolIdx]; + if (threadIdx != -1) { + HasSharedThread[poolIdx] = true; + if (sharedState.PoolByBorrowedThread[threadIdx] == -1) { + HasSharedThreadWhichWasNotBorrowed[poolIdx] = true; + } else { + HasSharedThreadWhichWasNotBorrowed[poolIdx] = false; + } + } + if (sharedState.BorrowedThreadByPool[poolIdx] != -1) { + HasBorrowedSharedThread[poolIdx] = true; + } else { + HasBorrowedSharedThread[poolIdx] = false; + } + } +} + +TString TSharedInfo::ToString() const { + TStringBuilder builder; + builder << "{"; + builder << "HasSharedThread: \""; + for (ui32 i = 0; i < HasSharedThread.size(); ++i) { + builder << (HasSharedThread[i] ? "1" : "0"); + } + builder << "\", "; + builder << "HasSharedThreadWhichWasNotBorrowed: \""; + for (ui32 i = 0; i < HasSharedThreadWhichWasNotBorrowed.size(); ++i) { + builder << (HasSharedThreadWhichWasNotBorrowed[i] ? "1" : "0"); + } + builder << "\", "; + builder << "HasBorrowedSharedThread: \""; + for (ui32 i = 0; i < HasBorrowedSharedThread.size(); ++i) { + builder << (HasBorrowedSharedThread[i] ? "1" : "0"); + } + builder << "\"}"; + return builder; +} + +} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/shared_info.h b/ydb/library/actors/core/harmonizer/shared_info.h new file mode 100644 index 00000000000..41277db1bbb --- /dev/null +++ b/ydb/library/actors/core/harmonizer/shared_info.h @@ -0,0 +1,20 @@ +#pragma once + +#include "defs.h" + +namespace NActors { + +class ISharedExecutorPool; + +struct TSharedInfo { + std::vector<bool> HasSharedThread; + std::vector<bool> HasSharedThreadWhichWasNotBorrowed; + std::vector<bool> HasBorrowedSharedThread; + + void Init(i16 poolCount); + void Pull(const ISharedExecutorPool& shared); + + TString ToString() const; +}; // struct TSharedInfo + +} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/ut/harmonizer_ut.cpp b/ydb/library/actors/core/harmonizer/ut/harmonizer_ut.cpp new file mode 100644 index 00000000000..571550c7b1b --- /dev/null +++ b/ydb/library/actors/core/harmonizer/ut/harmonizer_ut.cpp @@ -0,0 +1,673 @@ +#include "harmonizer.h" +#include "debug.h" +#include <library/cpp/testing/unittest/registar.h> +#include <ydb/library/actors/core/executor_pool_shared.h> +#include <ydb/library/actors/core/executor_thread_ctx.h> +#include <ydb/library/actors/helpers/pool_stats_collector.h> + +using namespace NActors; + + +#define CHECK_CHANGING_THREADS(stats, inc_needy, inc_exchange, dec_hoggish, dec_starved, dec_exchange) \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).IncreasingThreadsByNeedyState, inc_needy, (stats).ToString()); \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).IncreasingThreadsByExchange, inc_exchange, (stats).ToString()); \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).DecreasingThreadsByHoggishState, dec_hoggish, (stats).ToString()); \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).DecreasingThreadsByStarvedState, dec_starved, (stats).ToString()); \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).DecreasingThreadsByExchange, dec_exchange, (stats).ToString()); +// end CHECK_CHANGING_THREADS + +#define CHECK_CHANGING_HALF_THREADS(stats, received_needy, given_starved, given_hoggish, given_other_needy, returned_starved, returned_other_hoggish) \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).ReceivedHalfThreadByNeedyState, received_needy, (stats).ToString()); \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).GivenHalfThreadByOtherStarvedState, given_starved, (stats).ToString()); \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).GivenHalfThreadByHoggishState, given_hoggish, (stats).ToString()); \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).GivenHalfThreadByOtherNeedyState, given_other_needy, (stats).ToString()); \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).ReturnedHalfThreadByStarvedState, returned_starved, (stats).ToString()); \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).ReturnedHalfThreadByOtherHoggishState, returned_other_hoggish, (stats).ToString()); +// end CHECK_CHANGING_HALF_THREADS + +#define CHECK_IS_NEEDY(stats) \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).IsNeedy, true, (stats).ToString()); \ +// end CHECK_IS_NEEDY + +#define CHECK_IS_NOT_NEEDY(stats) \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).IsNeedy, false, (stats).ToString()); \ +// end CHECK_IS_NOT_NEEDY + +#define CHECK_IS_HOGGISH(stats) \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).IsHoggish, true, (stats).ToString()); \ +// end CHECK_IS_HOGGISH + +#define CHECK_IS_NOT_HOGGISH(stats) \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).IsHoggish, false, (stats).ToString()); \ +// end CHECK_IS_NOT_HOGGISH + +#define CHECK_IS_STARVED(stats) \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).IsStarved, true, (stats).ToString()); \ +// end CHECK_IS_STARVED + +#define CHECK_IS_NOT_STARVED(stats) \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).IsStarved, false, (stats).ToString()); \ +// end CHECK_IS_NOT_STARVED + + +Y_UNIT_TEST_SUITE(HarmonizerTests) { + + struct TMockExecutorPoolParams { + i16 DefaultFullThreadCount = 4; + i16 MinFullThreadCount = 4; + i16 MaxFullThreadCount = 8; + float DefaultThreadCount = 4.0f; + float MinThreadCount = 4.0f; + float MaxThreadCount = 8.0f; + i16 Priority = 0; + TString Name = "MockPool"; + ui32 PoolId = 0; + + TString ToString() const { + return TStringBuilder() << "PoolId: " << PoolId << ", Name: " << Name << ", DefaultFullThreadCount: " << DefaultFullThreadCount << ", MinFullThreadCount: " << MinFullThreadCount << ", MaxFullThreadCount: " << MaxFullThreadCount << ", DefaultThreadCount: " << DefaultThreadCount << ", MinThreadCount: " << MinThreadCount << ", MaxThreadCount: " << MaxThreadCount << ", Priority: " << Priority; + } + }; + + struct TCpuConsumptionModel { + TCpuConsumption value; + TCpuConsumptionModel() : value() {} + TCpuConsumptionModel(const TCpuConsumption& other) : value(other) {} + operator TCpuConsumption() const { + return value; + } + void Increase(const TCpuConsumption& other) { + value.ElapsedUs += other.ElapsedUs; + value.CpuUs += other.CpuUs; + value.NotEnoughCpuExecutions += other.NotEnoughCpuExecutions; + } + }; + + class TMockExecutorPool : public IExecutorPool { + public: + TMockExecutorPool(const TMockExecutorPoolParams& params = TMockExecutorPoolParams()) + : IExecutorPool(params.PoolId) + , Params(params) + , ThreadCount(params.DefaultFullThreadCount) + , ThreadCpuConsumptions(params.MaxFullThreadCount, TCpuConsumption{0.0, 0.0}) + { + + } + + TMockExecutorPoolParams Params; + i16 ThreadCount = 0; + std::vector<TCpuConsumptionModel> ThreadCpuConsumptions; + std::vector<TSharedExecutorThreadCtx*> SharedThreads; + + i16 GetDefaultFullThreadCount() const override { return Params.DefaultFullThreadCount; } + i16 GetMinFullThreadCount() const override { return Params.MinFullThreadCount; } + i16 GetMaxFullThreadCount() const override { return Params.MaxFullThreadCount; } + void SetFullThreadCount(i16 count) override { + HARMONIZER_DEBUG_PRINT(Params.PoolId, Params.Name, "set full thread count", count); + ThreadCount = Max(Params.MinFullThreadCount, Min(Params.MaxFullThreadCount, count)); + } + i16 GetFullThreadCount() const override { return ThreadCount; } + float GetDefaultThreadCount() const override { return Params.DefaultThreadCount; } + float GetMinThreadCount() const override { return Params.MinThreadCount; } + float GetMaxThreadCount() const override { return Params.MaxThreadCount; } + i16 GetPriority() const override { return Params.Priority; } + TString GetName() const override { return Params.Name; } + + // Дополнительные методы из IExecutorPool + void Prepare(TActorSystem* /*actorSystem*/, NSchedulerQueue::TReader** /*scheduleReaders*/, ui32* /*scheduleSz*/) override {} + void Start() override {} + void PrepareStop() override {} + void Shutdown() override {} + bool Cleanup() override { return true; } + + TMailbox* GetReadyActivation(TWorkerContext& /*wctx*/, ui64 /*revolvingCounter*/) override { return nullptr; } + TMailbox* ResolveMailbox(ui32 /*hint*/) override { return nullptr; } + + void Schedule(TInstant /*deadline*/, TAutoPtr<IEventHandle> /*ev*/, ISchedulerCookie* /*cookie*/, TWorkerId /*workerId*/) override {} + void Schedule(TMonotonic /*deadline*/, TAutoPtr<IEventHandle> /*ev*/, ISchedulerCookie* /*cookie*/, TWorkerId /*workerId*/) override {} + void Schedule(TDuration /*delta*/, TAutoPtr<IEventHandle> /*ev*/, ISchedulerCookie* /*cookie*/, TWorkerId /*workerId*/) override {} + + bool Send(TAutoPtr<IEventHandle>& /*ev*/) override { return true; } + bool SpecificSend(TAutoPtr<IEventHandle>& /*ev*/) override { return true; } + void ScheduleActivation(TMailbox* /*activation*/) override {} + void SpecificScheduleActivation(TMailbox* /*activation*/) override {} + void ScheduleActivationEx(TMailbox* /*activation*/, ui64 /*revolvingCounter*/) override {} + TActorId Register(IActor* /*actor*/, TMailboxType::EType /*mailboxType*/, ui64 /*revolvingCounter*/, const TActorId& /*parentId*/) override { return TActorId(); } + TActorId Register(IActor* /*actor*/, TMailboxCache& /*cache*/, ui64 /*revolvingCounter*/, const TActorId& /*parentId*/) override { return TActorId(); } + TActorId Register(IActor* /*actor*/, TMailbox* /*mailbox*/, const TActorId& /*parentId*/) override { return TActorId(); } + + TAffinity* Affinity() const override { return nullptr; } + + ui32 GetThreads() const override { return static_cast<ui32>(ThreadCount); } + float GetThreadCount() const override { return static_cast<float>(ThreadCount); } + + TSharedExecutorThreadCtx* ReleaseSharedThread() override { + UNIT_ASSERT(!SharedThreads.empty()); + TSharedExecutorThreadCtx* thread = SharedThreads.back(); + SharedThreads.pop_back(); + return thread; + } + void AddSharedThread(TSharedExecutorThreadCtx* thread) override { + UNIT_ASSERT(SharedThreads.size() < 2); + SharedThreads.push_back(thread); + } + + void IncreaseThreadCpuConsumption(TCpuConsumption consumption, i16 start = 0, i16 count = -1) { + if (count == -1) { + count = Params.MaxFullThreadCount - start; + } + for (i16 i = start; i < start + count; ++i) { + ThreadCpuConsumptions[i].Increase(consumption); + } + } + + void SetThreadCpuConsumption(TCpuConsumption consumption, i16 start = 0, i16 count = -1) { + if (count == -1) { + count = Params.MaxFullThreadCount - start; + } + for (i16 i = start; i < start + count; ++i) { + ThreadCpuConsumptions[i] = consumption; + } + } + + TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) override { + UNIT_ASSERT_GE(threadIdx, 0); + UNIT_ASSERT_LE(static_cast<ui16>(threadIdx), ThreadCpuConsumptions.size()); + return ThreadCpuConsumptions[threadIdx]; + } + }; + + class TMockSharedExecutorPool : public ISharedExecutorPool { + std::unique_ptr<ISharedExecutorPool> OriginalPool; + std::vector<std::vector<TCpuConsumptionModel>> ThreadCpuConsumptions; + + static std::vector<i16> GetPoolIds(const std::vector<IExecutorPool*>& pools) { + std::vector<i16> poolIds; + for (size_t i = 0; i < pools.size(); ++i) { + poolIds.push_back(static_cast<i16>(i)); + } + return poolIds; + } + + public: + TMockSharedExecutorPool(const TSharedExecutorPoolConfig& config, i16 poolCount, std::vector<IExecutorPool*> pools) + : OriginalPool(CreateSharedExecutorPool(config, poolCount, GetPoolIds(pools))) + , ThreadCpuConsumptions(poolCount, std::vector<TCpuConsumptionModel>(config.Threads, TCpuConsumption{0.0, 0.0})) + { + OriginalPool->Init(pools, false); + } + + void Prepare(TActorSystem* /*actorSystem*/, NSchedulerQueue::TReader** /*scheduleReaders*/, ui32* /*scheduleSz*/) override {} + void Start() override {} + void PrepareStop() override {} + void Shutdown() override {} + bool Cleanup() override { return true; } + + void GetSharedStatsForHarmonizer(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override { + OriginalPool->GetSharedStatsForHarmonizer(pool, statsCopy); + } + + void GetSharedStats(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override { + OriginalPool->GetSharedStats(pool, statsCopy); + } + + void Init(const std::vector<IExecutorPool*>& pools, bool /*isShared*/) override { + OriginalPool->Init(pools, false); + } + + TSharedExecutorThreadCtx* GetSharedThread(i16 poolId) override { + return OriginalPool->GetSharedThread(poolId); + } + + i16 ReturnOwnHalfThread(i16 pool) override { + return OriginalPool->ReturnOwnHalfThread(pool); + } + + i16 ReturnBorrowedHalfThread(i16 pool) override { + return OriginalPool->ReturnBorrowedHalfThread(pool); + } + + void GiveHalfThread(i16 from, i16 to) override { + OriginalPool->GiveHalfThread(from, to); + } + + i16 GetSharedThreadCount() const override { + return OriginalPool->GetSharedThreadCount(); + } + + TSharedPoolState GetState() const override { + return OriginalPool->GetState(); + } + + TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx) override { + return ThreadCpuConsumptions[poolId][threadIdx]; + } + + std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId) override { + std::vector<TCpuConsumption> poolConsumptions(ThreadCpuConsumptions[poolId].size()); + for (size_t i = 0; i < poolConsumptions.size(); ++i) { + poolConsumptions[i] = ThreadCpuConsumptions[poolId][i]; + } + return poolConsumptions; + } + + void IncreaseThreadCpuConsumption(ui32 threadIdx, ui32 poolId, TCpuConsumption consumption) { + ThreadCpuConsumptions[poolId][threadIdx].Increase(consumption); + } + + void SetThreadCpuConsumption(ui32 threadIdx, ui32 poolId, TCpuConsumption consumption) { + ThreadCpuConsumptions[poolId][threadIdx] = consumption; + } + }; + + Y_UNIT_TEST(TestHarmonizerCreation) { + ui64 currentTs = 1000000; + std::unique_ptr<IHarmonizer> harmonizer(MakeHarmonizer(currentTs)); + UNIT_ASSERT(harmonizer != nullptr); + } + + Y_UNIT_TEST(TestAddPool) { + ui64 currentTs = 1000000; + std::unique_ptr<IHarmonizer> harmonizer(MakeHarmonizer(currentTs)); + auto mockPool = std::make_unique<TMockExecutorPool>(); + harmonizer->AddPool(mockPool.get()); + + auto stats = harmonizer->GetPoolStats(0); + UNIT_ASSERT_VALUES_EQUAL(stats.PotentialMaxThreadCount, 8); + UNIT_ASSERT_VALUES_EQUAL(stats.IncreasingThreadsByNeedyState, 0); + UNIT_ASSERT_VALUES_EQUAL(stats.DecreasingThreadsByStarvedState, 0); + } + + Y_UNIT_TEST(TestHarmonize) { + ui64 currentTs = 1000000; + auto harmonizer = MakeHarmonizer(currentTs); + auto mockPool = new TMockExecutorPool(); + harmonizer->AddPool(mockPool); + + harmonizer->Harmonize(currentTs + 1000000); // 1 second later + + auto stats = harmonizer->GetPoolStats(0); + Y_UNUSED(stats); + UNIT_ASSERT_VALUES_EQUAL(mockPool->ThreadCount, 4); // Should start with default + + delete harmonizer; + delete mockPool; + } + + Y_UNIT_TEST(TestToNeedyNextToHoggish) { + ui64 currentTs = 1000000; + auto harmonizer = MakeHarmonizer(currentTs); + TMockExecutorPoolParams params; + std::vector<std::unique_ptr<TMockExecutorPool>> mockPools; + mockPools.emplace_back(new TMockExecutorPool(params)); + params.PoolId = 1; + mockPools.emplace_back(new TMockExecutorPool(params)); + for (auto& pool : mockPools) { + harmonizer->AddPool(pool.get()); + pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params.MaxFullThreadCount); + } + + TCpuConsumptionModel cpuConsumptionModel; + + currentTs += Us2Ts(1'000'000); + harmonizer->Harmonize(currentTs); + mockPools[0]->SetThreadCpuConsumption({59'000'000.0, 59'000'000.0}, 0, params.DefaultFullThreadCount); + + currentTs += Us2Ts(59'000'000); + harmonizer->Harmonize(currentTs); + + auto stats = harmonizer->GetPoolStats(0); + + CHECK_CHANGING_THREADS(stats, 1, 0, 0, 0, 0); + CHECK_IS_NEEDY(stats); + UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 5); + UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4); + + currentTs += Us2Ts(60'000'000); + mockPools[0]->SetThreadCpuConsumption({0.0, 0.0}, 0, params.DefaultFullThreadCount); + harmonizer->Harmonize(currentTs); + + stats = harmonizer->GetPoolStats(0); + + CHECK_CHANGING_THREADS(stats, 1, 0, 1, 0, 0); + CHECK_IS_HOGGISH(stats); + UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 4); + UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4); + } + + Y_UNIT_TEST(TestToNeedyNextToStarved) { + ui64 currentTs = 1000000; + auto harmonizer = MakeHarmonizer(currentTs); + TMockExecutorPoolParams params; + std::vector<std::unique_ptr<TMockExecutorPool>> mockPools; + mockPools.emplace_back(new TMockExecutorPool(params)); + params.PoolId = 1; + mockPools.emplace_back(new TMockExecutorPool(params)); + for (auto& pool : mockPools) { + harmonizer->AddPool(pool.get()); + pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params.MaxFullThreadCount); + } + + TCpuConsumptionModel cpuConsumptionModel; + + currentTs += Us2Ts(1'000'000); + harmonizer->Harmonize(currentTs); + mockPools[0]->IncreaseThreadCpuConsumption({59'000'000.0, 59'000'000.0}, 0, params.DefaultFullThreadCount); + + currentTs += Us2Ts(59'000'000); + harmonizer->Harmonize(currentTs); + + auto stats = harmonizer->GetPoolStats(0); + + CHECK_CHANGING_THREADS(stats, 1, 0, 0, 0, 0); + CHECK_IS_NEEDY(stats); + UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 5); + UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4); + + currentTs += Us2Ts(60'000'000); + mockPools[0]->IncreaseThreadCpuConsumption({40'000'000.0, 60'000'000.0}, 0, 5); + mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 4); + harmonizer->Harmonize(currentTs); + + stats = harmonizer->GetPoolStats(0); + + CHECK_CHANGING_THREADS(stats, 1, 0, 0, 1, 0); + CHECK_IS_STARVED(stats); + UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 4); + UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4); + } + + Y_UNIT_TEST(TestExchangeThreads) { + ui64 currentTs = 1000000; + auto harmonizer = MakeHarmonizer(currentTs); + TMockExecutorPoolParams params { + .DefaultFullThreadCount = 1, + .MinFullThreadCount = 1, + .MaxFullThreadCount = 2, + .DefaultThreadCount = 1.0f, + .MinThreadCount = 1.0f, + .MaxThreadCount = 2.0f, + }; + std::vector<std::unique_ptr<TMockExecutorPool>> mockPools; + mockPools.emplace_back(new TMockExecutorPool(params)); + params.PoolId = 1; + mockPools.emplace_back(new TMockExecutorPool(params)); + params.PoolId = 2; + mockPools.emplace_back(new TMockExecutorPool(params)); + for (auto& pool : mockPools) { + harmonizer->AddPool(pool.get()); + pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params.MaxFullThreadCount); + } + + currentTs += Us2Ts(1'000'000); + harmonizer->Harmonize(currentTs); + + currentTs += Us2Ts(60'000'000); + mockPools[0]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 1); + mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 1); + mockPools[2]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 1); + harmonizer->Harmonize(currentTs); + + auto stats0 = harmonizer->GetPoolStats(0); + auto stats1 = harmonizer->GetPoolStats(1); + auto stats2 = harmonizer->GetPoolStats(2); + + CHECK_CHANGING_THREADS(stats0, 0, 0, 0, 0, 0); + CHECK_CHANGING_THREADS(stats1, 1, 0, 0, 0, 0); + CHECK_CHANGING_THREADS(stats2, 0, 0, 0, 0, 0); + CHECK_IS_HOGGISH(stats0); + CHECK_IS_NEEDY(stats1); + CHECK_IS_NEEDY(stats2); + UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 1); + UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 2); + UNIT_ASSERT_VALUES_EQUAL(mockPools[2]->ThreadCount, 1); + + currentTs += Us2Ts(60'000'000); + mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 1); + mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2); + mockPools[2]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 1); + harmonizer->Harmonize(currentTs); + + stats0 = harmonizer->GetPoolStats(0); + stats1 = harmonizer->GetPoolStats(1); + stats2 = harmonizer->GetPoolStats(2); + + CHECK_CHANGING_THREADS(stats0, 0, 1, 0, 0, 0); + CHECK_CHANGING_THREADS(stats1, 1, 0, 0, 0, 1); + CHECK_CHANGING_THREADS(stats2, 0, 0, 0, 0, 0); + CHECK_IS_NEEDY(stats0); + CHECK_IS_NEEDY(stats1); + CHECK_IS_HOGGISH(stats2); + UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 2); + UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 1); + UNIT_ASSERT_VALUES_EQUAL(mockPools[2]->ThreadCount, 1); + } + + Y_UNIT_TEST(TestThreadCounts) { + ui64 currentTs = 1000000; + std::vector<TMockExecutorPoolParams> params { + { + .DefaultFullThreadCount = 5, + .MinFullThreadCount = 5, + .MaxFullThreadCount = 15, + .DefaultThreadCount = 5.0f, + .MinThreadCount = 5.0f, + .MaxThreadCount = 15.0f, + .PoolId = 0, + }, + { + .DefaultFullThreadCount = 5, + .MinFullThreadCount = 5, + .MaxFullThreadCount = 15, + .DefaultThreadCount = 5.0f, + .MinThreadCount = 5.0f, + .MaxThreadCount = 15.0f, + .PoolId = 1, + }, + { + .DefaultFullThreadCount = 5, + .MinFullThreadCount = 5, + .MaxFullThreadCount = 15, + .DefaultThreadCount = 5.0f, + .MinThreadCount = 5.0f, + .MaxThreadCount = 15.0f, + .PoolId = 2, + }, + }; + auto harmonizer = MakeHarmonizer(currentTs); + std::vector<std::unique_ptr<TMockExecutorPool>> mockPools; + i16 budget = 0; + for (auto& param : params) { + mockPools.emplace_back(new TMockExecutorPool(param)); + HARMONIZER_DEBUG_PRINT("created pool", mockPools.back()->Params.ToString()); + budget += param.DefaultFullThreadCount; + } + for (ui32 poolIdx = 0; poolIdx < params.size(); ++poolIdx) { + auto &pool = mockPools[poolIdx]; + harmonizer->AddPool(pool.get()); + pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params[poolIdx].MaxFullThreadCount); + } + currentTs += Us2Ts(1'000'000); + harmonizer->Harmonize(currentTs); + + for (i16 i = 0; i < params[0].MaxFullThreadCount; ++i) { + for (i16 ii = 0; ii < params[1].MaxFullThreadCount; ++ii) { + for (i16 iii = 0; iii < params[2].MaxFullThreadCount; ++iii) { + if (i + ii + iii > budget) { + continue; + } + ui32 localBudget = budget - (i + ii + iii); + HARMONIZER_DEBUG_PRINT("first pool", i, "second pool", ii, "third pool", iii, "budget", budget, "local budget", localBudget); + currentTs += Us2Ts(60'000'000); + mockPools[0]->SetFullThreadCount(i); + mockPools[1]->SetFullThreadCount(ii); + mockPools[2]->SetFullThreadCount(iii); + mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, std::min<i16>(i, mockPools[0]->ThreadCount)); + mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, std::min<i16>(ii, mockPools[1]->ThreadCount)); + mockPools[2]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, std::min(iii, mockPools[2]->ThreadCount)); + harmonizer->Harmonize(currentTs); + std::vector<TPoolHarmonizerStats> stats; + for (auto& pool : params) { + stats.emplace_back(harmonizer->GetPoolStats(pool.PoolId)); + } + for (ui32 poolIdx = 0; poolIdx < params.size(); ++poolIdx) { + UNIT_ASSERT_VALUES_EQUAL(stats[poolIdx].PotentialMaxThreadCount, std::min<i16>(mockPools[poolIdx]->ThreadCount + localBudget, params[poolIdx].MaxFullThreadCount)); + } + } + } + } + } + + Y_UNIT_TEST(TestSharedHalfThreads) { + ui64 currentTs = 1000000; + std::unique_ptr<IHarmonizer> harmonizer(MakeHarmonizer(currentTs)); + TMockExecutorPoolParams params { + .DefaultFullThreadCount = 2, + .MinFullThreadCount = 1, + .MaxFullThreadCount = 3, + .DefaultThreadCount = 2.0f, + .MinThreadCount = 1.0f, + .MaxThreadCount = 3.0f, + }; + std::vector<std::unique_ptr<TMockExecutorPool>> mockPools; + std::vector<IExecutorPool*> pools; + mockPools.emplace_back(new TMockExecutorPool(params)); + pools.push_back(mockPools.back().get()); + params.PoolId = 1; + mockPools.emplace_back(new TMockExecutorPool(params)); + pools.push_back(mockPools.back().get()); + params.PoolId = 2; + mockPools.emplace_back(new TMockExecutorPool(params)); + pools.push_back(mockPools.back().get()); + + + TSharedExecutorPoolConfig sharedConfig; + sharedConfig.Threads = 3; + std::unique_ptr<ISharedExecutorPool> sharedPool(new TMockSharedExecutorPool(sharedConfig, 3, pools)); + + for (auto& pool : mockPools) { + harmonizer->AddPool(pool.get()); + } + harmonizer->SetSharedPool(sharedPool.get()); + + for (ui32 i = 0; i < mockPools.size(); ++i) { + mockPools[i]->AddSharedThread(sharedPool->GetSharedThread(i)); + } + + currentTs += Us2Ts(1'000'000); + harmonizer->Harmonize(currentTs); + + currentTs += Us2Ts(60'000'000); + mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2); + mockPools[1]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 2); + mockPools[2]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2); + + harmonizer->Harmonize(currentTs); + + auto stats0 = harmonizer->GetPoolStats(0); + auto stats1 = harmonizer->GetPoolStats(1); + auto stats2 = harmonizer->GetPoolStats(2); + auto sharedState = sharedPool->GetState(); + + CHECK_CHANGING_HALF_THREADS(stats0, 1, 0, 0, 0, 0, 0); + CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 1, 0, 0); + CHECK_IS_NEEDY(stats0); + CHECK_IS_HOGGISH(stats1); + CHECK_IS_NEEDY(stats2); + UNIT_ASSERT_VALUES_EQUAL(sharedState.BorrowedThreadByPool[0], 1); + UNIT_ASSERT_VALUES_EQUAL(sharedState.PoolByBorrowedThread[1], 0); + + currentTs += Us2Ts(60'000'000); + + harmonizer->Harmonize(currentTs); + + stats0 = harmonizer->GetPoolStats(0); + stats1 = harmonizer->GetPoolStats(1); + stats2 = harmonizer->GetPoolStats(2); + sharedState = sharedPool->GetState(); + + CHECK_CHANGING_HALF_THREADS(stats0, 1, 0, 1, 0, 0, 0); + CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 1, 0, 1); + CHECK_IS_HOGGISH(stats0); + CHECK_IS_HOGGISH(stats1); + CHECK_IS_HOGGISH(stats2); + UNIT_ASSERT_VALUES_EQUAL(sharedState.BorrowedThreadByPool[0], -1); + UNIT_ASSERT_VALUES_EQUAL(sharedState.PoolByBorrowedThread[1], -1); + } + + Y_UNIT_TEST(TestSharedHalfThreadsStarved) { + ui64 currentTs = 1000000; + std::unique_ptr<IHarmonizer> harmonizer(MakeHarmonizer(currentTs)); + TMockExecutorPoolParams params { + .DefaultFullThreadCount = 2, + .MinFullThreadCount = 1, + .MaxFullThreadCount = 3, + .DefaultThreadCount = 2.0f, + .MinThreadCount = 1.0f, + .MaxThreadCount = 3.0f, + }; + std::vector<std::unique_ptr<TMockExecutorPool>> mockPools; + std::vector<IExecutorPool*> pools; + mockPools.emplace_back(new TMockExecutorPool(params)); + pools.push_back(mockPools.back().get()); + params.PoolId = 1; + mockPools.emplace_back(new TMockExecutorPool(params)); + pools.push_back(mockPools.back().get()); + + TSharedExecutorPoolConfig sharedConfig; + sharedConfig.Threads = 2; + std::unique_ptr<ISharedExecutorPool> sharedPool(new TMockSharedExecutorPool(sharedConfig, 2, pools)); + + for (auto& pool : mockPools) { + harmonizer->AddPool(pool.get()); + } + harmonizer->SetSharedPool(sharedPool.get()); + + currentTs += Us2Ts(1'000'000); + harmonizer->Harmonize(currentTs); + + currentTs += Us2Ts(60'000'000); + mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2); + mockPools[1]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 2); + harmonizer->Harmonize(currentTs); + + auto stats0 = harmonizer->GetPoolStats(0); + auto stats1 = harmonizer->GetPoolStats(1); + auto sharedState = sharedPool->GetState(); + CHECK_CHANGING_HALF_THREADS(stats0, 1, 0, 0, 0, 0, 0); + CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 1, 0, 0); + CHECK_IS_NEEDY(stats0); + CHECK_IS_HOGGISH(stats1); + UNIT_ASSERT_VALUES_EQUAL_C(sharedState.BorrowedThreadByPool[0], 1, sharedState.ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(sharedState.PoolByBorrowedThread[1], 0, sharedState.ToString()); + + currentTs += Us2Ts(60'000'000); + mockPools[0]->IncreaseThreadCpuConsumption({30'000'000.0, 60'000'000.0}, 0, 2); + mockPools[1]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 2); + harmonizer->Harmonize(currentTs); + + stats0 = harmonizer->GetPoolStats(0); + stats1 = harmonizer->GetPoolStats(1); + sharedState = sharedPool->GetState(); + UNIT_ASSERT_VALUES_EQUAL_C(sharedState.BorrowedThreadByPool[0], -1, sharedState.ToString()); + UNIT_ASSERT_VALUES_EQUAL_C(sharedState.PoolByBorrowedThread[1], -1, sharedState.ToString()); + CHECK_CHANGING_HALF_THREADS(stats0, 1, 1, 0, 0, 0, 0); + CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 1, 1, 0); + CHECK_IS_STARVED(stats0); + CHECK_IS_HOGGISH(stats1); + + currentTs += Us2Ts(60'000'000); + mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2); + mockPools[1]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 2); + + harmonizer->Harmonize(currentTs); + + stats0 = harmonizer->GetPoolStats(0); + stats1 = harmonizer->GetPoolStats(1); + + CHECK_CHANGING_HALF_THREADS(stats0, 2, 1, 0, 0, 0, 0); + CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 2, 1, 0); + CHECK_IS_NEEDY(stats0); + CHECK_IS_HOGGISH(stats1); + } + +} diff --git a/ydb/library/actors/core/harmonizer/ut/history_ut.cpp b/ydb/library/actors/core/harmonizer/ut/history_ut.cpp new file mode 100644 index 00000000000..7c9965adf32 --- /dev/null +++ b/ydb/library/actors/core/harmonizer/ut/history_ut.cpp @@ -0,0 +1,345 @@ +#define HARMONIZER_HISTORY_DEBUG +#include "debug.h" +#include "history.h" +#include <library/cpp/testing/unittest/registar.h> + +using namespace NActors; + +constexpr double secondInUs = 1'000'000.0; +ui64 secondInTs = Us2Ts(secondInUs); + +Y_UNIT_TEST_SUITE(ValueHistoryTests) { + + Y_UNIT_TEST(TestConstructorAndInitialization) { + TValueHistory<8> history; + + UNIT_ASSERT_VALUES_EQUAL(history.HistoryIdx, 0); + UNIT_ASSERT_VALUES_EQUAL(history.LastTs, Max<ui64>()); + UNIT_ASSERT_VALUES_EQUAL(history.LastValue, 0.0); + UNIT_ASSERT_VALUES_EQUAL(history.AccumulatedValue, 0.0); + UNIT_ASSERT_VALUES_EQUAL(history.AccumulatedTs, 0); + + for (size_t i = 0; i < 8; ++i) { + UNIT_ASSERT_VALUES_EQUAL(history.History[i], 0.0); + } + } + + Y_UNIT_TEST(TestBufferSizePowerOfTwo) { + UNIT_ASSERT(TValueHistory<1>::CheckBinaryPower(1)); + UNIT_ASSERT(TValueHistory<2>::CheckBinaryPower(2)); + UNIT_ASSERT(TValueHistory<4>::CheckBinaryPower(4)); + UNIT_ASSERT(TValueHistory<8>::CheckBinaryPower(8)); + UNIT_ASSERT(TValueHistory<16>::CheckBinaryPower(16)); + + UNIT_ASSERT(!TValueHistory<1>::CheckBinaryPower(3)); + UNIT_ASSERT(!TValueHistory<1>::CheckBinaryPower(5)); + UNIT_ASSERT(!TValueHistory<1>::CheckBinaryPower(6)); + UNIT_ASSERT(!TValueHistory<1>::CheckBinaryPower(7)); + } + + Y_UNIT_TEST(TestCircularBufferFilling) { + TValueHistory<4> history; + ui64 baseTs = secondInUs; + + // increasing counter by 1 each second + for (size_t i = 0; i < 8; ++i) { + history.Register(baseTs + i * secondInTs, i * 1.0); + } + + // expecting 1.0 for each second + double expectedValues[] = {1.0, 1.0, 1.0, 1.0}; + for (size_t i = 0; i < 4; ++i) { + UNIT_ASSERT_DOUBLES_EQUAL(history.History[i], expectedValues[i], 1e-6); + } + } + + Y_UNIT_TEST(TestBasicRegistration) { + TValueHistory<4> history; + ui64 baseTs = secondInUs; + + history.Register(baseTs, 1.0); + UNIT_ASSERT_VALUES_EQUAL(history.LastTs, baseTs); + UNIT_ASSERT_DOUBLES_EQUAL(history.LastValue, 1.0, 1e-6); + + history.Register(baseTs + secondInTs, 2.0); + UNIT_ASSERT_DOUBLES_EQUAL(history.History[0], 1.0, 1e-6); + UNIT_ASSERT_VALUES_EQUAL(history.HistoryIdx, 1); + } + + Y_UNIT_TEST(TestRegistrationWithBackwardTime) { + TValueHistory<4> history; + ui64 baseTs = secondInUs; + + history.Register(baseTs, 1.0); + + history.Register(baseTs - 1000, 2.0); + + UNIT_ASSERT_VALUES_EQUAL(history.LastTs, baseTs - 1000); + UNIT_ASSERT_DOUBLES_EQUAL(history.LastValue, 2.0, 1e-6); + UNIT_ASSERT_DOUBLES_EQUAL(history.AccumulatedValue, 0.0, 1e-6); + UNIT_ASSERT_VALUES_EQUAL(history.AccumulatedTs, 0); + } + + Y_UNIT_TEST(TestRegistrationWithLargeTimeGap) { + TValueHistory<4> history; + ui64 baseTs = secondInUs; + + history.Register(baseTs, 1.0); + + history.Register(baseTs + 10 * secondInTs, 4.0); + + double expectedValue = (4.0 - 1.0) / 10; + for (size_t i = 0; i < 4; ++i) { + UNIT_ASSERT_DOUBLES_EQUAL(history.History[i], expectedValue, 1e-6); + } + } + + Y_UNIT_TEST(TestRegistrationOnSecondBoundary) { + TValueHistory<4> history; + ui64 baseTs = Us2Ts(1'000'000.0); + + history.Register(baseTs - Us2Ts(100'000.0), 1.0); + history.Register(baseTs, 2.0); + history.Register(baseTs + Us2Ts(100'000.0), 3.0); + + UNIT_ASSERT_VALUES_EQUAL(history.HistoryIdx, 0); + UNIT_ASSERT_VALUES_EQUAL(history.LastTs, baseTs + Us2Ts(100'000.0)); + UNIT_ASSERT_VALUES_EQUAL(history.AccumulatedValue, 2.0); + UNIT_ASSERT_DOUBLES_EQUAL(history.History[0], 0.0, 1e-6); + } + + Y_UNIT_TEST(TestAccumulationWithinSecond) { + TValueHistory<4> history; + ui64 baseTs = 1000000; + + history.Register(baseTs, 1.0); + history.Register(baseTs + Us2Ts(500'000.0), 2.0); + + UNIT_ASSERT_VALUES_EQUAL(history.AccumulatedTs, Us2Ts(500'000.0)); + UNIT_ASSERT_DOUBLES_EQUAL(history.AccumulatedValue, 1.0, 1e-6); + } + + Y_UNIT_TEST(TestTransitionBetweenSeconds) { + TValueHistory<4> history; + ui64 baseTs = 1000000; + + history.Register(baseTs, 1.0); + history.Register(baseTs + Us2Ts(1'500'000.0), 2.0); + + UNIT_ASSERT_VALUES_EQUAL(history.HistoryIdx, 1); + UNIT_ASSERT_DOUBLES_EQUAL(history.History[0], 2.0/3, 1e-6); + UNIT_ASSERT_DOUBLES_EQUAL(history.AccumulatedValue, 1.0/3, 1e-6); + UNIT_ASSERT_DOUBLES_EQUAL(Ts2Us(history.AccumulatedTs), 500'000.0, 1e-3); + } + + Y_UNIT_TEST(TestGetAvgPartForLastSeconds) { + TValueHistory<4> history; + ui64 baseTs = 1000000; + + float acc = 0; + for (size_t i = 0; i < 4; ++i) { + acc += i * 1.0; + history.Register(baseTs + i * Us2Ts(1'000'000.0), acc); + } + + UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds(1), 3.0, 1e-6); + UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds(2), 2.5, 1e-6); + UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds(3), 2.0, 1e-6); + } + + Y_UNIT_TEST(TestGetAvgPartWithTail) { + TValueHistory<4> history; + ui64 baseTs = 1000000; + + history.Register(baseTs, 1.0); + history.Register(baseTs + Us2Ts(500'000.0), 2.0); + + UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds<true>(1), 1.0, 1e-6); + UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds<false>(1), 0.0, 1e-6); + } + + Y_UNIT_TEST(TestGetAvgPartWithIncompleteData) { + TValueHistory<4> history; + ui64 baseTs = 1000000; + + history.Register(baseTs, 1.0); + history.Register(baseTs + Us2Ts(1'000'000.0), 2.0); + + UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds<true>(3), 1.0/3, 1e-6); + UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds<false>(3), 1.0/3, 1e-6); + } + + Y_UNIT_TEST(TestGetAvgPartOnBufferBoundary) { + TValueHistory<4> history; + ui64 baseTs = 1000000; + + for (size_t i = 0; i < 5; ++i) { + history.Register(baseTs + i * Us2Ts(1'000'000.0), i * 1.0); + } + + UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds(4), 1.0, 1e-6); + } + + Y_UNIT_TEST(TestGetMaxForLastSeconds) { + TValueHistory<4> history; + ui64 baseTs = 1000000; + + double acc = 0; + for (size_t i = 0; i < 4; ++i) { + acc += i * 1.0; + history.Register(baseTs + i * Us2Ts(1'000'000.0), acc); + } + + UNIT_ASSERT_DOUBLES_EQUAL(history.GetMaxForLastSeconds(1), 3.0, 1e-6); + UNIT_ASSERT_DOUBLES_EQUAL(history.GetMaxForLastSeconds(2), 3.0, 1e-6); + UNIT_ASSERT_DOUBLES_EQUAL(history.GetMaxForLastSeconds(3), 3.0, 1e-6); + } + + Y_UNIT_TEST(TestGetMax) { + TValueHistory<4> history; + ui64 baseTs = 1000000; + + double acc = 0; + for (size_t i = 0; i < 4; ++i) { + acc += i * 1.0; + history.Register(baseTs + i * Us2Ts(1'000'000.0), acc); + } + + UNIT_ASSERT_DOUBLES_EQUAL(history.GetMax(), 3.0, 1e-6); + } + + Y_UNIT_TEST(TestGetMaxWithNegativeValues) { + TValueHistory<4> history; + ui64 baseTs = 1000000; + + history.Register(baseTs, -1.0); + history.Register(baseTs + Us2Ts(1'000'000.0), -2.0); + history.Register(baseTs + Us2Ts(2'000'000.0), -3.0); + history.Register(baseTs + Us2Ts(3'000'000.0), -4.0); + history.Register(baseTs + Us2Ts(4'000'000.0), -5.0); + + UNIT_ASSERT_DOUBLES_EQUAL(history.GetMax(), -1.0, 1e-6); + } + + Y_UNIT_TEST(TestGetMinForLastSeconds) { + TValueHistory<4> history; + ui64 baseTs = 1000000; + + double acc = 0; + for (size_t i = 0; i < 4; ++i) { + acc += (4 - i) * 1.0; + history.Register(baseTs + i * Us2Ts(1'000'000.0), acc); + } + + UNIT_ASSERT_DOUBLES_EQUAL(history.GetMinForLastSeconds(1), 1.0, 1e-6); + UNIT_ASSERT_DOUBLES_EQUAL(history.GetMinForLastSeconds(2), 1.0, 1e-6); + UNIT_ASSERT_DOUBLES_EQUAL(history.GetMinForLastSeconds(3), 1.0, 1e-6); + } + + Y_UNIT_TEST(TestGetMin) { + TValueHistory<4> history; + ui64 baseTs = 1000000; + + double acc = 0; + for (size_t i = 0; i < 5; ++i) { + acc += (5 - i) * 1.0; + history.Register(baseTs + i * Us2Ts(1'000'000.0), acc); + } + + UNIT_ASSERT_DOUBLES_EQUAL(history.GetMin(), 1.0, 1e-6); + } + + Y_UNIT_TEST(TestGetMinWithNegativeValues) { + TValueHistory<4> history; + ui64 baseTs = 1000000; + + history.Register(baseTs, -1.0); + history.Register(baseTs + Us2Ts(1'000'000.0), -2.0); + history.Register(baseTs + Us2Ts(2'000'000.0), -4.0); + history.Register(baseTs + Us2Ts(3'000'000.0), -7.0); + + UNIT_ASSERT_DOUBLES_EQUAL(history.GetMin(), -3.0, 1e-6); + } + + Y_UNIT_TEST(TestBufferOverflow) { + TValueHistory<4> history; + ui64 baseTs = 1000000; + + // Fill buffer + double acc = 0; + for (size_t i = 0; i < 5; ++i) { + acc += i * 1.0; + history.Register(baseTs + i * Us2Ts(1'000'000.0), acc); + } + + // Add another value, which should overwrite the oldest + acc += 5 * 1.0; + history.Register(baseTs + 5 * Us2Ts(1'000'000.0), acc); + + UNIT_ASSERT_DOUBLES_EQUAL(history.GetMax(), 5.0, 1e-6); + UNIT_ASSERT_DOUBLES_EQUAL(history.GetMin(), 2.0, 1e-6); + } + + Y_UNIT_TEST(TestSmallTimeIntervals) { + TValueHistory<4> history; + ui64 baseTs = 1000000; + + history.Register(baseTs, 1.0); + history.Register(baseTs + 1, 1.1); // Очень маленький интервал + + UNIT_ASSERT_DOUBLES_EQUAL(history.AccumulatedValue, 0.1, 1e-6); + UNIT_ASSERT_VALUES_EQUAL(history.AccumulatedTs, 1); + } + + Y_UNIT_TEST(TestLargeTimeIntervals) { + TValueHistory<4> history; + ui64 baseTs = 1000000; + + history.Register(baseTs, 1.0); + history.Register(baseTs + 100 * secondInTs, 101.0); // Очень большой интервал + + double expectedValue = 1.0; + for (size_t i = 0; i < 4; ++i) { + UNIT_ASSERT_DOUBLES_EQUAL(history.History[i], expectedValue, 1e-6); + } + } + + Y_UNIT_TEST(TestPrecisionOverLongDuration) { + TValueHistory<4> history; + ui64 baseTs = 1000000; + + double acc = 0; + int values[4] = {1, 2, 3, 4}; + for (size_t i = 0; i < 100'000; ++i) { + acc += values[i % 4] * 1.0; + history.Register(baseTs + i * Us2Ts(1'000'000.0), acc); + } + + UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPart(), 2.5, 1e-6); + } + + Y_UNIT_TEST(TestRoundingDuringTimestampConversion) { + TValueHistory<4> history; + ui64 baseTs = 1000000; + + history.Register(baseTs, 1.0); + history.Register(baseTs + Us2Ts(1'000'000.5), 2.0); + + double expectedValue = 1.0 / (1'000'000.5 / 1'000'000.0); + UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds<true>(1), expectedValue, 1e-6); + UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds<false>(1), expectedValue, 1e-6); + double expectedValue2 = 1.0 - expectedValue; + UNIT_ASSERT_DOUBLES_EQUAL(history.AccumulatedValue, expectedValue2, 1e-6); + } + + Y_UNIT_TEST(TestAverageCalculationPrecision) { + TValueHistory<4> history; + ui64 baseTs = 1000000; + + history.Register(baseTs, 1.0); + history.Register(baseTs + Us2Ts(1'000'000.0), 3.0); + history.Register(baseTs + Us2Ts(2'000'000.0), 6.0); + + UNIT_ASSERT_DOUBLES_EQUAL(history.GetAvgPartForLastSeconds(2), 2.5, 1e-6); + } +} diff --git a/ydb/library/actors/core/harmonizer/ut/ya.make b/ydb/library/actors/core/harmonizer/ut/ya.make new file mode 100644 index 00000000000..c9aedbc5e18 --- /dev/null +++ b/ydb/library/actors/core/harmonizer/ut/ya.make @@ -0,0 +1,22 @@ +UNITTEST_FOR(ydb/library/actors/core/harmonizer) + +IF (SANITIZER_TYPE) + SIZE(MEDIUM) + TIMEOUT(600) +ELSE() + SIZE(SMALL) + TIMEOUT(60) +ENDIF() + + +PEERDIR( + ydb/library/actors/interconnect + ydb/library/actors/testlib +) + +SRCS( + harmonizer_ut.cpp + history_ut.cpp +) + +END() diff --git a/ydb/library/actors/core/harmonizer/waiting_stats.cpp b/ydb/library/actors/core/harmonizer/waiting_stats.cpp new file mode 100644 index 00000000000..f855055034a --- /dev/null +++ b/ydb/library/actors/core/harmonizer/waiting_stats.cpp @@ -0,0 +1,48 @@ +#include "waiting_stats.h" + +#include "pool.h" +#include <ydb/library/actors/core/executor_pool_basic.h> +#include <ydb/library/actors/core/probes.h> + +namespace NActors { + +LWTRACE_USING(ACTORLIB_PROVIDER); + +void TWaitingInfo::Pull(const std::vector<std::unique_ptr<TPoolInfo>> &pools) { + WakingUpTotalTime = 0; + WakingUpCount = 0; + AwakingTotalTime = 0; + AwakingCount = 0; + + for (size_t poolIdx = 0; poolIdx < pools.size(); ++poolIdx) { + TPoolInfo& pool = *pools[poolIdx]; + if (pool.WaitingStats) { + WakingUpTotalTime += pool.WaitingStats->WakingUpTotalTime; + WakingUpCount += pool.WaitingStats->WakingUpCount; + AwakingTotalTime += pool.WaitingStats->AwakingTotalTime; + AwakingCount += pool.WaitingStats->AwakingCount; + } + } + + constexpr ui64 knownAvgWakingUpTime = TWaitingStatsConstants::KnownAvgWakingUpTime; + constexpr ui64 knownAvgAwakeningUpTime = TWaitingStatsConstants::KnownAvgAwakeningTime; + + ui64 realAvgWakingUpTime = (WakingUpCount ? WakingUpTotalTime / WakingUpCount : knownAvgWakingUpTime); + ui64 avgWakingUpTime = realAvgWakingUpTime; + if (avgWakingUpTime > 2 * knownAvgWakingUpTime || !realAvgWakingUpTime) { + avgWakingUpTime = knownAvgWakingUpTime; + } + AvgWakingUpTimeUs.store(Ts2Us(avgWakingUpTime), std::memory_order_relaxed); + + ui64 realAvgAwakeningTime = (AwakingCount ? AwakingTotalTime / AwakingCount : knownAvgAwakeningUpTime); + ui64 avgAwakeningTime = realAvgAwakeningTime; + if (avgAwakeningTime > 2 * knownAvgAwakeningUpTime || !realAvgAwakeningTime) { + avgAwakeningTime = knownAvgAwakeningUpTime; + } + AvgAwakeningTimeUs.store(Ts2Us(avgAwakeningTime), std::memory_order_relaxed); + + ui64 avgWakingUpConsumption = avgWakingUpTime + avgAwakeningTime; + LWPROBE(WakingUpConsumption, Ts2Us(avgWakingUpTime), Ts2Us(avgWakingUpTime), Ts2Us(avgAwakeningTime), Ts2Us(realAvgAwakeningTime), Ts2Us(avgWakingUpConsumption)); +} + +} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/waiting_stats.h b/ydb/library/actors/core/harmonizer/waiting_stats.h new file mode 100644 index 00000000000..76f661bb37b --- /dev/null +++ b/ydb/library/actors/core/harmonizer/waiting_stats.h @@ -0,0 +1,21 @@ +#pragma once + +#include "defs.h" + +namespace NActors { + +struct TPoolInfo; + +struct TWaitingInfo { + ui64 WakingUpTotalTime = 0; + ui64 WakingUpCount = 0; + ui64 AwakingTotalTime = 0; + ui64 AwakingCount = 0; + std::atomic<float> AvgWakingUpTimeUs = 0; + std::atomic<float> AvgAwakeningTimeUs = 0; + + void Pull(const std::vector<std::unique_ptr<TPoolInfo>> &pools); + +}; // struct TWaitingInfo + +} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/ya.make b/ydb/library/actors/core/harmonizer/ya.make new file mode 100644 index 00000000000..70993baf64a --- /dev/null +++ b/ydb/library/actors/core/harmonizer/ya.make @@ -0,0 +1,44 @@ +LIBRARY() + +NO_WSHADOW() + +IF (PROFILE_MEMORY_ALLOCATIONS) + CFLAGS(-DPROFILE_MEMORY_ALLOCATIONS) +ENDIF() + +IF (ALLOCATOR == "B" OR ALLOCATOR == "BS" OR ALLOCATOR == "C") + CXXFLAGS(-DBALLOC) + PEERDIR( + library/cpp/balloc/optional + ) +ENDIF() + +SRCS( + cpu_consumption.cpp + pool.cpp + shared_info.cpp + waiting_stats.cpp + harmonizer.cpp +) + +PEERDIR( + ydb/library/actors/util + ydb/library/actors/protos + ydb/library/services + library/cpp/logger + library/cpp/lwtrace + library/cpp/monlib/dynamic_counters + library/cpp/time_provider +) + +IF (SANITIZER_TYPE == "thread") + SUPPRESSIONS( + ../tsan.supp + ) +ENDIF() + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/ydb/library/actors/core/mon_stats.h b/ydb/library/actors/core/mon_stats.h index 4bceffa8f37..5862dffa7d7 100644 --- a/ydb/library/actors/core/mon_stats.h +++ b/ydb/library/actors/core/mon_stats.h @@ -41,14 +41,14 @@ namespace NActors { }; struct TExecutorPoolState { - float UsedCpu = 0; + float ElapsedCpu = 0; float CurrentLimit = 0; float PossibleMaxLimit = 0; float MaxLimit = 0; float MinLimit = 0; void Aggregate(const TExecutorPoolState& other) { - UsedCpu += other.UsedCpu; + ElapsedCpu += other.ElapsedCpu; CurrentLimit += other.CurrentLimit; PossibleMaxLimit += other.PossibleMaxLimit; MaxLimit += other.MaxLimit; @@ -63,10 +63,6 @@ namespace NActors { ui64 DecreasingThreadsByStarvedState = 0; ui64 DecreasingThreadsByHoggishState = 0; ui64 DecreasingThreadsByExchange = 0; - i64 MaxConsumedCpuUs = 0; - i64 MinConsumedCpuUs = 0; - i64 MaxBookedCpuUs = 0; - i64 MinBookedCpuUs = 0; double SpinningTimeUs = 0; double SpinThresholdUs = 0; i16 WrongWakenedThreadCount = 0; diff --git a/ydb/library/actors/core/probes.h b/ydb/library/actors/core/probes.h index f9394f3273f..ebf34299e27 100644 --- a/ydb/library/actors/core/probes.h +++ b/ydb/library/actors/core/probes.h @@ -174,11 +174,11 @@ NAMES("poolId", "pool", "threacCount", "minThreadCount", "maxThreadCount", "defaultThreadCount")) \ PROBE(HarmonizeCheckPool, GROUPS("Harmonizer"), \ TYPES(ui32, TString, double, double, double, double, ui32, ui32, bool, bool, bool), \ - NAMES("poolId", "pool", "booked", "consumed", "lastSecondBooked", "lastSecondConsumed", "threadCount", "maxThreadCount", \ + NAMES("poolId", "pool", "elapsed", "cpu", "lastSecondElapsed", "lastSecondCpu", "threadCount", "maxThreadCount", \ "isStarved", "isNeedy", "isHoggish")) \ PROBE(HarmonizeCheckPoolByThread, GROUPS("Harmonizer"), \ TYPES(ui32, TString, i16, double, double, double, double), \ - NAMES("poolId", "pool", "threadIdx", "booked", "consumed", "lastSecondBooked", "lastSecondConsumed")) \ + NAMES("poolId", "pool", "threadIdx", "elapsed", "cpu", "lastSecondElapsed", "lastSecondCpu")) \ PROBE(WakingUpConsumption, GROUPS("Harmonizer"), \ TYPES(double, double, double, double, double), \ NAMES("avgWakingUpUs", "realAvgWakingUpUs", "avgAwakeningUs", "realAvgAwakeningUs", "total")) \ diff --git a/ydb/library/actors/core/ya.make b/ydb/library/actors/core/ya.make index fbeb0e2b0d9..ca44ab7707b 100644 --- a/ydb/library/actors/core/ya.make +++ b/ydb/library/actors/core/ya.make @@ -56,8 +56,6 @@ SRCS( executor_pool_shared.h executor_thread.cpp executor_thread.h - harmonizer.cpp - harmonizer.h hfunc.h interconnect.cpp interconnect.h @@ -106,6 +104,7 @@ GENERATE_ENUM_SERIALIZATION(log_iface.h) PEERDIR( ydb/library/actors/actor_type + ydb/library/actors/core/harmonizer ydb/library/actors/memory_log ydb/library/actors/prof ydb/library/actors/protos @@ -129,6 +128,10 @@ ENDIF() END() +RECURSE( + harmonizer +) + RECURSE_FOR_TESTS( ut ut_fat diff --git a/ydb/library/actors/helpers/pool_stats_collector.h b/ydb/library/actors/helpers/pool_stats_collector.h index a0790a55c8b..244cc27cefc 100644 --- a/ydb/library/actors/helpers/pool_stats_collector.h +++ b/ydb/library/actors/helpers/pool_stats_collector.h @@ -175,6 +175,7 @@ private: NMonitoring::TDynamicCounters::TCounterPtr MaxThreadCount; NMonitoring::TDynamicCounters::TCounterPtr CurrentThreadCountPercent; NMonitoring::TDynamicCounters::TCounterPtr PotentialMaxThreadCountPercent; + NMonitoring::TDynamicCounters::TCounterPtr PossibleMaxThreadCountPercent; NMonitoring::TDynamicCounters::TCounterPtr DefaultThreadCountPercent; NMonitoring::TDynamicCounters::TCounterPtr MaxThreadCountPercent; NMonitoring::TDynamicCounters::TCounterPtr IsNeedy; @@ -189,10 +190,6 @@ private: NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState; NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByExchange; NMonitoring::TDynamicCounters::TCounterPtr NotEnoughCpuExecutions; - NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu; - NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu; - NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu; - NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu; NMonitoring::TDynamicCounters::TCounterPtr SpinningTimeUs; NMonitoring::TDynamicCounters::TCounterPtr SpinThresholdUs; @@ -251,6 +248,7 @@ private: CurrentThreadCountPercent = PoolGroup->GetCounter("CurrentThreadCountPercent", false); PotentialMaxThreadCountPercent = PoolGroup->GetCounter("PotentialMaxThreadCountPercent", false); + PossibleMaxThreadCountPercent = PoolGroup->GetCounter("PossibleMaxThreadCountPercent", false); DefaultThreadCountPercent = PoolGroup->GetCounter("DefaultThreadCountPercent", false); MaxThreadCountPercent = PoolGroup->GetCounter("MaxThreadCountPercent", false); @@ -266,10 +264,6 @@ private: DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true); DecreasingThreadsByExchange = PoolGroup->GetCounter("DecreasingThreadsByExchange", true); NotEnoughCpuExecutions = PoolGroup->GetCounter("NotEnoughCpuExecutions", true); - MaxConsumedCpu = PoolGroup->GetCounter("MaxConsumedCpuByPool", false); - MinConsumedCpu = PoolGroup->GetCounter("MinConsumedCpuByPool", false); - MaxBookedCpu = PoolGroup->GetCounter("MaxBookedCpuByPool", false); - MinBookedCpu = PoolGroup->GetCounter("MinBookedCpuByPool", false); SpinningTimeUs = PoolGroup->GetCounter("SpinningTimeUs", true); SpinThresholdUs = PoolGroup->GetCounter("SpinThresholdUs", false); @@ -316,9 +310,9 @@ private: *DefaultThreadCount = poolStats.DefaultThreadCount; *MaxThreadCount = poolStats.MaxThreadCount; - *CurrentThreadCountPercent = poolStats.CurrentThreadCount * 100; *PotentialMaxThreadCountPercent = poolStats.PotentialMaxThreadCount * 100; + *PossibleMaxThreadCountPercent = poolStats.PotentialMaxThreadCount * 100; *DefaultThreadCountPercent = poolStats.DefaultThreadCount * 100; *MaxThreadCountPercent = poolStats.MaxThreadCount * 100; @@ -384,35 +378,35 @@ private: struct TActorSystemCounters { TIntrusivePtr<NMonitoring::TDynamicCounters> Group; - NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu; - NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu; - NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu; - NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MaxUsedCpuPercent; + NMonitoring::TDynamicCounters::TCounterPtr MinUsedCpuPercent; + NMonitoring::TDynamicCounters::TCounterPtr MaxElapsedCpuPercent; + NMonitoring::TDynamicCounters::TCounterPtr MinElapsedCpuPercent; - NMonitoring::TDynamicCounters::TCounterPtr AvgAwakeningTimeUs; - NMonitoring::TDynamicCounters::TCounterPtr AvgWakingUpTimeUs; + NMonitoring::TDynamicCounters::TCounterPtr AvgAwakeningTimeNs; + NMonitoring::TDynamicCounters::TCounterPtr AvgWakingUpTimeNs; void Init(NMonitoring::TDynamicCounters* group) { Group = group; - MaxConsumedCpu = Group->GetCounter("MaxConsumedCpu", false); - MinConsumedCpu = Group->GetCounter("MinConsumedCpu", false); - MaxBookedCpu = Group->GetCounter("MaxBookedCpu", false); - MinBookedCpu = Group->GetCounter("MinBookedCpu", false); - AvgAwakeningTimeUs = Group->GetCounter("AvgAwakeningTimeUs", false); - AvgWakingUpTimeUs = Group->GetCounter("AvgWakingUpTimeUs", false); + MaxUsedCpuPercent = Group->GetCounter("MaxUsedCpuPercent", false); + MinUsedCpuPercent = Group->GetCounter("MinUsedCpuPercent", false); + MaxElapsedCpuPercent = Group->GetCounter("MaxElapsedCpuPercent", false); + MinElapsedCpuPercent = Group->GetCounter("MinElapsedCpuPercent", false); + AvgAwakeningTimeNs = Group->GetCounter("AvgAwakeningTimeNs", false); + AvgWakingUpTimeNs = Group->GetCounter("AvgWakingUpTimeNs", false); } void Set(const THarmonizerStats& harmonizerStats) { #ifdef ACTORSLIB_COLLECT_EXEC_STATS - *MaxConsumedCpu = harmonizerStats.MaxConsumedCpu; - *MinConsumedCpu = harmonizerStats.MinConsumedCpu; - *MaxBookedCpu = harmonizerStats.MaxBookedCpu; - *MinBookedCpu = harmonizerStats.MinBookedCpu; + *MaxUsedCpuPercent = harmonizerStats.MaxUsedCpu; + *MinUsedCpuPercent = harmonizerStats.MinUsedCpu; + *MaxElapsedCpuPercent = harmonizerStats.MaxElapsedCpu; + *MinElapsedCpuPercent = harmonizerStats.MinElapsedCpu; - *AvgAwakeningTimeUs = harmonizerStats.AvgAwakeningTimeUs; - *AvgWakingUpTimeUs = harmonizerStats.AvgWakingUpTimeUs; + *AvgAwakeningTimeNs = harmonizerStats.AvgAwakeningTimeUs * 1000; + *AvgWakingUpTimeNs = harmonizerStats.AvgWakingUpTimeUs * 1000; #else Y_UNUSED(harmonizerStats); #endif |
