summaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core/harmonizer.cpp
diff options
context:
space:
mode:
authorkruall <[email protected]>2022-12-08 18:14:12 +0300
committerkruall <[email protected]>2022-12-08 18:14:12 +0300
commit88bfc0de583a421fc9582fc016b543cad6b30e84 (patch)
treefb0bdca7804cead088934831cc9432262f32c354 /library/cpp/actors/core/harmonizer.cpp
parentf82e9035da59f4625a16b5c3c03a9818f5be1398 (diff)
Improve AS14,
Diffstat (limited to 'library/cpp/actors/core/harmonizer.cpp')
-rw-r--r--library/cpp/actors/core/harmonizer.cpp159
1 files changed, 127 insertions, 32 deletions
diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp
index 3a0f4109bb1..75f3179a270 100644
--- a/library/cpp/actors/core/harmonizer.cpp
+++ b/library/cpp/actors/core/harmonizer.cpp
@@ -15,24 +15,48 @@ namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
+constexpr bool CheckBinaryPower(ui64 value) {
+ return !(value & (value - 1));
+}
+
struct TValueHistory {
- double History[8] = {0.0};
+ static constexpr ui64 HistoryBufferSize = 8;
+ static_assert(CheckBinaryPower(HistoryBufferSize));
+
+ double History[HistoryBufferSize] = {0.0};
ui64 HistoryIdx = 0;
ui64 LastTs = Max<ui64>();
double LastUs = 0.0;
double AccumulatedUs = 0.0;
ui64 AccumulatedTs = 0;
- double GetAvgPart() {
+ template <bool WithTail=false>
+ double GetAvgPartForLastSeconds(ui8 seconds) {
double sum = AccumulatedUs;
- for (size_t idx = 0; idx < (sizeof(History) / sizeof(History[0])); ++idx) {
- sum += History[idx];
- }
- double duration = 1'000'000.0 * (sizeof(History) / sizeof(History[0])) + Ts2Us(AccumulatedTs);
+ size_t idx = HistoryIdx;
+ ui8 leftSeconds = seconds;
+ do {
+ idx--;
+ leftSeconds--;
+ if (idx >= HistoryBufferSize) {
+ idx = HistoryBufferSize - 1;
+ }
+ if (WithTail || leftSeconds) {
+ sum += History[idx];
+ } else {
+ ui64 tsInSecond = Us2Ts(1'000'000.0);
+ sum += History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond;
+ }
+ } while (leftSeconds);
+ double duration = 1'000'000.0 * seconds + (WithTail ? Ts2Us(AccumulatedTs): 0.0);
double avg = sum / duration;
return avg;
}
+ double GetAvgPart() {
+ return GetAvgPartForLastSeconds<true>(HistoryBufferSize);
+ }
+
void Register(ui64 ts, double valueUs) {
if (ts < LastTs) {
LastTs = ts;
@@ -49,7 +73,7 @@ struct TValueHistory {
if (dTs > Us2Ts(8'000'000.0)) {
dUs = dUs * 1'000'000.0 / Ts2Us(dTs);
- for (size_t idx = 0; idx < (sizeof(History) / sizeof(History[0])); ++idx) {
+ for (size_t idx = 0; idx < HistoryBufferSize; ++idx) {
History[idx] = dUs;
}
AccumulatedUs = 0.0;
@@ -68,7 +92,7 @@ struct TValueHistory {
dTs -= addTs;
dUs -= addUs;
History[HistoryIdx] = AccumulatedUs + addUs;
- HistoryIdx = (HistoryIdx + 1) % (sizeof(History) / sizeof(History[0]));
+ HistoryIdx = (HistoryIdx + 1) % HistoryBufferSize;
AccumulatedUs = 0.0;
AccumulatedTs = 0;
}
@@ -87,13 +111,21 @@ struct TPoolInfo {
i16 DefaultThreadCount = 0;
i16 MinThreadCount = 0;
i16 MaxThreadCount = 0;
+ i16 Priority = 0;
+ NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter;
+ NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
+ ui32 MaxAvgPingUs = 0;
+ ui64 LastUpdateTs = 0;
bool IsBeingStopped(i16 threadIdx);
double GetBooked(i16 threadIdx);
+ double GetlastSecondPoolBooked(i16 threadIdx);
double GetConsumed(i16 threadIdx);
+ double GetlastSecondPoolConsumed(i16 threadIdx);
void PullStats(ui64 ts);
i16 GetThreadCount();
void SetThreadCount(i16 threadCount);
+ bool IsAvgPingGood();
};
bool TPoolInfo::IsBeingStopped(i16 threadIdx) {
@@ -105,7 +137,13 @@ double TPoolInfo::GetBooked(i16 threadIdx) {
return ThreadInfo[threadIdx].Booked.GetAvgPart();
}
return 0.0;
- //return Pool->GetThreadBooked(threadIdx);
+}
+
+double TPoolInfo::GetlastSecondPoolBooked(i16 threadIdx) {
+ if ((size_t)threadIdx < ThreadInfo.size()) {
+ return ThreadInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1);
+ }
+ return 0.0;
}
double TPoolInfo::GetConsumed(i16 threadIdx) {
@@ -113,7 +151,13 @@ double TPoolInfo::GetConsumed(i16 threadIdx) {
return ThreadInfo[threadIdx].Consumed.GetAvgPart();
}
return 0.0;
- //return Pool->GetThreadConsumed(threadIdx);
+}
+
+double TPoolInfo::GetlastSecondPoolConsumed(i16 threadIdx) {
+ if ((size_t)threadIdx < ThreadInfo.size()) {
+ return ThreadInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1);
+ }
+ return 0.0;
}
#define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7]
@@ -136,22 +180,35 @@ void TPoolInfo::SetThreadCount(i16 threadCount) {
Pool->SetThreadCount(threadCount);
}
+bool TPoolInfo::IsAvgPingGood() {
+ bool res = true;
+ if (AvgPingCounter) {
+ res &= *AvgPingCounter > MaxAvgPingUs;
+ }
+ if (AvgPingCounterWithSmallWindow) {
+ res &= *AvgPingCounterWithSmallWindow > MaxAvgPingUs;
+ }
+ return res;
+}
+
class THarmonizer: public IHarmonizer {
private:
std::atomic<bool> IsDisabled = false;
TSpinLock Lock;
std::atomic<ui64> NextHarmonizeTs = 0;
std::vector<TPoolInfo> Pools;
+ std::vector<ui16> PriorityOrder;
void PullStats(ui64 ts);
- void HarmonizeImpl();
+ void HarmonizeImpl(ui64 ts);
+ void CalculatePriorityOrder();
public:
THarmonizer(ui64 ts);
virtual ~THarmonizer();
double Rescale(double value) const;
void Harmonize(ui64 ts) override;
void DeclareEmergency(ui64 ts) override;
- void AddPool(IExecutorPool* pool) override;
+ void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override;
void Enable(bool enable) override;
};
@@ -172,10 +229,19 @@ void THarmonizer::PullStats(ui64 ts) {
}
}
-void THarmonizer::HarmonizeImpl() {
+Y_FORCE_INLINE bool IsStarved(double consumed, double booked) {
+ return Max(consumed, booked) > 0.1 && consumed < booked * 0.7;
+}
+
+Y_FORCE_INLINE bool IsHoggish(double booked, ui16 currentThreadCount) {
+ return booked < currentThreadCount - 1;
+}
+
+void THarmonizer::HarmonizeImpl(ui64 ts) {
bool isStarvedPresent = false;
double booked = 0.0;
double consumed = 0.0;
+ double lastSecondBooked = 0.0;
i64 beingStopped = 0;
i64 total = 0;
TStackVec<size_t, 8> needyPools;
@@ -185,32 +251,44 @@ void THarmonizer::HarmonizeImpl() {
total += pool.DefaultThreadCount;
double poolBooked = 0.0;
double poolConsumed = 0.0;
+ double lastSecondPoolBooked = 0.0;
+ double lastSecondPoolConsumed = 0.0;
beingStopped += pool.Pool->GetBlockingThreadCount();
for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) {
poolBooked += Rescale(pool.GetBooked(threadIdx));
+ lastSecondPoolBooked += Rescale(pool.GetlastSecondPoolBooked(threadIdx));
poolConsumed += Rescale(pool.GetConsumed(threadIdx));
+ lastSecondPoolConsumed += Rescale(pool.GetlastSecondPoolConsumed(threadIdx));
}
- bool isStarved = false;
- if (Max(consumed, booked) > 0.1 && consumed < booked * 0.7) {
+ bool isStarved = IsStarved(consumed, booked) || IsStarved(lastSecondPoolConsumed, lastSecondPoolBooked);
+ if (isStarved) {
isStarvedPresent = true;
- isStarved = true;
}
ui32 currentThreadCount = pool.GetThreadCount();
- bool isNeedy = false;
- if (poolBooked >= currentThreadCount) {
+ bool isNeedy = pool.IsAvgPingGood() && poolBooked >= currentThreadCount;
+ if (pool.AvgPingCounter) {
+ if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) {
+ isNeedy = false;
+ } else {
+ pool.LastUpdateTs = ts;
+ }
+ }
+ if (isNeedy) {
needyPools.push_back(poolIdx);
- isNeedy = true;
}
- bool isHoggish = false;
- if (poolBooked < currentThreadCount - 1) {
+ bool isHoggish = IsHoggish(poolBooked, currentThreadCount)
+ || IsHoggish(lastSecondPoolBooked, currentThreadCount);
+ if (isHoggish) {
hoggishPools.push_back(poolIdx);
- isHoggish = true;
}
booked += poolBooked;
consumed += poolConsumed;
- LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish);
+ LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish);
+ }
+ double budget = total - Max(booked, lastSecondBooked);
+ if (budget < -0.1) {
+ isStarvedPresent = true;
}
- double budget = total - booked;
double overbooked = consumed - booked;
if (isStarvedPresent) {
// last_starved_at_consumed_value = сумма по всем пулам consumed;
@@ -223,12 +301,7 @@ void THarmonizer::HarmonizeImpl() {
for (size_t i = 0; i < Pools.size(); ++i) {
reorder.push_back(i);
}
- while (!reorder.empty()) {
- size_t rndIdx = rand() % reorder.size();
- size_t poolIdx = reorder[rndIdx];
- reorder[rndIdx] = reorder.back();
- reorder.pop_back();
-
+ for (ui16 poolIdx : PriorityOrder) {
TPoolInfo &pool = Pools[poolIdx];
i64 threadCount = pool.GetThreadCount();
if (threadCount > pool.DefaultThreadCount) {
@@ -264,6 +337,17 @@ void THarmonizer::HarmonizeImpl() {
}
}
+void THarmonizer::CalculatePriorityOrder() {
+ PriorityOrder.resize(Pools.size());
+ Iota(PriorityOrder.begin(), PriorityOrder.end(), 0);
+ Sort(PriorityOrder.begin(), PriorityOrder.end(), [&] (i16 lhs, i16 rhs) {
+ if (Pools[lhs].Priority != Pools[rhs].Priority) {
+ return Pools[lhs].Priority < Pools[rhs].Priority;
+ }
+ return Pools[lhs].Pool->PoolId > Pools[rhs].Pool->PoolId;
+ });
+}
+
void THarmonizer::Harmonize(ui64 ts) {
if (IsDisabled || NextHarmonizeTs > ts || !Lock.TryAcquire()) {
LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, false);
@@ -279,8 +363,12 @@ void THarmonizer::Harmonize(ui64 ts) {
ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull));
LWPROBE(TryToHarmonizeSuccess, ts, NextHarmonizeTs, previousNextHarmonizeTs);
+ if (PriorityOrder.empty()) {
+ CalculatePriorityOrder();
+ }
+
PullStats(ts);
- HarmonizeImpl();
+ HarmonizeImpl(ts);
Lock.Release();
}
@@ -289,7 +377,7 @@ void THarmonizer::DeclareEmergency(ui64 ts) {
NextHarmonizeTs = ts;
}
-void THarmonizer::AddPool(IExecutorPool* pool) {
+void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) {
TGuard<TSpinLock> guard(Lock);
TPoolInfo poolInfo;
poolInfo.Pool = pool;
@@ -297,8 +385,15 @@ void THarmonizer::AddPool(IExecutorPool* pool) {
poolInfo.MinThreadCount = pool->GetMinThreadCount();
poolInfo.MaxThreadCount = pool->GetMaxThreadCount();
poolInfo.ThreadInfo.resize(poolInfo.MaxThreadCount);
+ poolInfo.Priority = pool->GetPriority();
pool->SetThreadCount(poolInfo.DefaultThreadCount);
+ if (pingInfo) {
+ poolInfo.AvgPingCounter = pingInfo->AvgPingCounter;
+ poolInfo.AvgPingCounterWithSmallWindow = pingInfo->AvgPingCounterWithSmallWindow;
+ poolInfo.MaxAvgPingUs = pingInfo->MaxAvgPingUs;
+ }
Pools.push_back(poolInfo);
+ PriorityOrder.clear();
};
void THarmonizer::Enable(bool enable) {