diff options
| author | kruall <[email protected]> | 2022-12-08 18:14:12 +0300 |
|---|---|---|
| committer | kruall <[email protected]> | 2022-12-08 18:14:12 +0300 |
| commit | 88bfc0de583a421fc9582fc016b543cad6b30e84 (patch) | |
| tree | fb0bdca7804cead088934831cc9432262f32c354 /library/cpp/actors/core/harmonizer.cpp | |
| parent | f82e9035da59f4625a16b5c3c03a9818f5be1398 (diff) | |
Improve AS14,
Diffstat (limited to 'library/cpp/actors/core/harmonizer.cpp')
| -rw-r--r-- | library/cpp/actors/core/harmonizer.cpp | 159 |
1 files changed, 127 insertions, 32 deletions
diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp index 3a0f4109bb1..75f3179a270 100644 --- a/library/cpp/actors/core/harmonizer.cpp +++ b/library/cpp/actors/core/harmonizer.cpp @@ -15,24 +15,48 @@ namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); +constexpr bool CheckBinaryPower(ui64 value) { + return !(value & (value - 1)); +} + struct TValueHistory { - double History[8] = {0.0}; + static constexpr ui64 HistoryBufferSize = 8; + static_assert(CheckBinaryPower(HistoryBufferSize)); + + double History[HistoryBufferSize] = {0.0}; ui64 HistoryIdx = 0; ui64 LastTs = Max<ui64>(); double LastUs = 0.0; double AccumulatedUs = 0.0; ui64 AccumulatedTs = 0; - double GetAvgPart() { + template <bool WithTail=false> + double GetAvgPartForLastSeconds(ui8 seconds) { double sum = AccumulatedUs; - for (size_t idx = 0; idx < (sizeof(History) / sizeof(History[0])); ++idx) { - sum += History[idx]; - } - double duration = 1'000'000.0 * (sizeof(History) / sizeof(History[0])) + Ts2Us(AccumulatedTs); + size_t idx = HistoryIdx; + ui8 leftSeconds = seconds; + do { + idx--; + leftSeconds--; + if (idx >= HistoryBufferSize) { + idx = HistoryBufferSize - 1; + } + if (WithTail || leftSeconds) { + sum += History[idx]; + } else { + ui64 tsInSecond = Us2Ts(1'000'000.0); + sum += History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond; + } + } while (leftSeconds); + double duration = 1'000'000.0 * seconds + (WithTail ? Ts2Us(AccumulatedTs): 0.0); double avg = sum / duration; return avg; } + double GetAvgPart() { + return GetAvgPartForLastSeconds<true>(HistoryBufferSize); + } + void Register(ui64 ts, double valueUs) { if (ts < LastTs) { LastTs = ts; @@ -49,7 +73,7 @@ struct TValueHistory { if (dTs > Us2Ts(8'000'000.0)) { dUs = dUs * 1'000'000.0 / Ts2Us(dTs); - for (size_t idx = 0; idx < (sizeof(History) / sizeof(History[0])); ++idx) { + for (size_t idx = 0; idx < HistoryBufferSize; ++idx) { History[idx] = dUs; } AccumulatedUs = 0.0; @@ -68,7 +92,7 @@ struct TValueHistory { dTs -= addTs; dUs -= addUs; History[HistoryIdx] = AccumulatedUs + addUs; - HistoryIdx = (HistoryIdx + 1) % (sizeof(History) / sizeof(History[0])); + HistoryIdx = (HistoryIdx + 1) % HistoryBufferSize; AccumulatedUs = 0.0; AccumulatedTs = 0; } @@ -87,13 +111,21 @@ struct TPoolInfo { i16 DefaultThreadCount = 0; i16 MinThreadCount = 0; i16 MaxThreadCount = 0; + i16 Priority = 0; + NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter; + NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow; + ui32 MaxAvgPingUs = 0; + ui64 LastUpdateTs = 0; bool IsBeingStopped(i16 threadIdx); double GetBooked(i16 threadIdx); + double GetlastSecondPoolBooked(i16 threadIdx); double GetConsumed(i16 threadIdx); + double GetlastSecondPoolConsumed(i16 threadIdx); void PullStats(ui64 ts); i16 GetThreadCount(); void SetThreadCount(i16 threadCount); + bool IsAvgPingGood(); }; bool TPoolInfo::IsBeingStopped(i16 threadIdx) { @@ -105,7 +137,13 @@ double TPoolInfo::GetBooked(i16 threadIdx) { return ThreadInfo[threadIdx].Booked.GetAvgPart(); } return 0.0; - //return Pool->GetThreadBooked(threadIdx); +} + +double TPoolInfo::GetlastSecondPoolBooked(i16 threadIdx) { + if ((size_t)threadIdx < ThreadInfo.size()) { + return ThreadInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1); + } + return 0.0; } double TPoolInfo::GetConsumed(i16 threadIdx) { @@ -113,7 +151,13 @@ double TPoolInfo::GetConsumed(i16 threadIdx) { return ThreadInfo[threadIdx].Consumed.GetAvgPart(); } return 0.0; - //return Pool->GetThreadConsumed(threadIdx); +} + +double TPoolInfo::GetlastSecondPoolConsumed(i16 threadIdx) { + if ((size_t)threadIdx < ThreadInfo.size()) { + return ThreadInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1); + } + return 0.0; } #define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7] @@ -136,22 +180,35 @@ void TPoolInfo::SetThreadCount(i16 threadCount) { Pool->SetThreadCount(threadCount); } +bool TPoolInfo::IsAvgPingGood() { + bool res = true; + if (AvgPingCounter) { + res &= *AvgPingCounter > MaxAvgPingUs; + } + if (AvgPingCounterWithSmallWindow) { + res &= *AvgPingCounterWithSmallWindow > MaxAvgPingUs; + } + return res; +} + class THarmonizer: public IHarmonizer { private: std::atomic<bool> IsDisabled = false; TSpinLock Lock; std::atomic<ui64> NextHarmonizeTs = 0; std::vector<TPoolInfo> Pools; + std::vector<ui16> PriorityOrder; void PullStats(ui64 ts); - void HarmonizeImpl(); + void HarmonizeImpl(ui64 ts); + void CalculatePriorityOrder(); public: THarmonizer(ui64 ts); virtual ~THarmonizer(); double Rescale(double value) const; void Harmonize(ui64 ts) override; void DeclareEmergency(ui64 ts) override; - void AddPool(IExecutorPool* pool) override; + void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override; void Enable(bool enable) override; }; @@ -172,10 +229,19 @@ void THarmonizer::PullStats(ui64 ts) { } } -void THarmonizer::HarmonizeImpl() { +Y_FORCE_INLINE bool IsStarved(double consumed, double booked) { + return Max(consumed, booked) > 0.1 && consumed < booked * 0.7; +} + +Y_FORCE_INLINE bool IsHoggish(double booked, ui16 currentThreadCount) { + return booked < currentThreadCount - 1; +} + +void THarmonizer::HarmonizeImpl(ui64 ts) { bool isStarvedPresent = false; double booked = 0.0; double consumed = 0.0; + double lastSecondBooked = 0.0; i64 beingStopped = 0; i64 total = 0; TStackVec<size_t, 8> needyPools; @@ -185,32 +251,44 @@ void THarmonizer::HarmonizeImpl() { total += pool.DefaultThreadCount; double poolBooked = 0.0; double poolConsumed = 0.0; + double lastSecondPoolBooked = 0.0; + double lastSecondPoolConsumed = 0.0; beingStopped += pool.Pool->GetBlockingThreadCount(); for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) { poolBooked += Rescale(pool.GetBooked(threadIdx)); + lastSecondPoolBooked += Rescale(pool.GetlastSecondPoolBooked(threadIdx)); poolConsumed += Rescale(pool.GetConsumed(threadIdx)); + lastSecondPoolConsumed += Rescale(pool.GetlastSecondPoolConsumed(threadIdx)); } - bool isStarved = false; - if (Max(consumed, booked) > 0.1 && consumed < booked * 0.7) { + bool isStarved = IsStarved(consumed, booked) || IsStarved(lastSecondPoolConsumed, lastSecondPoolBooked); + if (isStarved) { isStarvedPresent = true; - isStarved = true; } ui32 currentThreadCount = pool.GetThreadCount(); - bool isNeedy = false; - if (poolBooked >= currentThreadCount) { + bool isNeedy = pool.IsAvgPingGood() && poolBooked >= currentThreadCount; + if (pool.AvgPingCounter) { + if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) { + isNeedy = false; + } else { + pool.LastUpdateTs = ts; + } + } + if (isNeedy) { needyPools.push_back(poolIdx); - isNeedy = true; } - bool isHoggish = false; - if (poolBooked < currentThreadCount - 1) { + bool isHoggish = IsHoggish(poolBooked, currentThreadCount) + || IsHoggish(lastSecondPoolBooked, currentThreadCount); + if (isHoggish) { hoggishPools.push_back(poolIdx); - isHoggish = true; } booked += poolBooked; consumed += poolConsumed; - LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish); + LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish); + } + double budget = total - Max(booked, lastSecondBooked); + if (budget < -0.1) { + isStarvedPresent = true; } - double budget = total - booked; double overbooked = consumed - booked; if (isStarvedPresent) { // last_starved_at_consumed_value = сумма по всем пулам consumed; @@ -223,12 +301,7 @@ void THarmonizer::HarmonizeImpl() { for (size_t i = 0; i < Pools.size(); ++i) { reorder.push_back(i); } - while (!reorder.empty()) { - size_t rndIdx = rand() % reorder.size(); - size_t poolIdx = reorder[rndIdx]; - reorder[rndIdx] = reorder.back(); - reorder.pop_back(); - + for (ui16 poolIdx : PriorityOrder) { TPoolInfo &pool = Pools[poolIdx]; i64 threadCount = pool.GetThreadCount(); if (threadCount > pool.DefaultThreadCount) { @@ -264,6 +337,17 @@ void THarmonizer::HarmonizeImpl() { } } +void THarmonizer::CalculatePriorityOrder() { + PriorityOrder.resize(Pools.size()); + Iota(PriorityOrder.begin(), PriorityOrder.end(), 0); + Sort(PriorityOrder.begin(), PriorityOrder.end(), [&] (i16 lhs, i16 rhs) { + if (Pools[lhs].Priority != Pools[rhs].Priority) { + return Pools[lhs].Priority < Pools[rhs].Priority; + } + return Pools[lhs].Pool->PoolId > Pools[rhs].Pool->PoolId; + }); +} + void THarmonizer::Harmonize(ui64 ts) { if (IsDisabled || NextHarmonizeTs > ts || !Lock.TryAcquire()) { LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, false); @@ -279,8 +363,12 @@ void THarmonizer::Harmonize(ui64 ts) { ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull)); LWPROBE(TryToHarmonizeSuccess, ts, NextHarmonizeTs, previousNextHarmonizeTs); + if (PriorityOrder.empty()) { + CalculatePriorityOrder(); + } + PullStats(ts); - HarmonizeImpl(); + HarmonizeImpl(ts); Lock.Release(); } @@ -289,7 +377,7 @@ void THarmonizer::DeclareEmergency(ui64 ts) { NextHarmonizeTs = ts; } -void THarmonizer::AddPool(IExecutorPool* pool) { +void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) { TGuard<TSpinLock> guard(Lock); TPoolInfo poolInfo; poolInfo.Pool = pool; @@ -297,8 +385,15 @@ void THarmonizer::AddPool(IExecutorPool* pool) { poolInfo.MinThreadCount = pool->GetMinThreadCount(); poolInfo.MaxThreadCount = pool->GetMaxThreadCount(); poolInfo.ThreadInfo.resize(poolInfo.MaxThreadCount); + poolInfo.Priority = pool->GetPriority(); pool->SetThreadCount(poolInfo.DefaultThreadCount); + if (pingInfo) { + poolInfo.AvgPingCounter = pingInfo->AvgPingCounter; + poolInfo.AvgPingCounterWithSmallWindow = pingInfo->AvgPingCounterWithSmallWindow; + poolInfo.MaxAvgPingUs = pingInfo->MaxAvgPingUs; + } Pools.push_back(poolInfo); + PriorityOrder.clear(); }; void THarmonizer::Enable(bool enable) { |
