1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
|
#include "balancer.h"
#include "probes.h"
#include <library/cpp/actors/util/cpu_load_log.h>
#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/actors/util/intrinsics.h>
#include <util/system/spinlock.h>
#include <algorithm>
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
// Describes balancing-related state of pool, the most notable is `Importance` to add new cpu
struct TLevel {
// Balancer will try to give more cpu to overloaded pools
enum ELoadClass {
Underloaded = 0,
Moderate = 1,
Overloaded = 2,
};
double ScaleFactor;
ELoadClass LoadClass;
ui64 Importance; // pool with lower importance is allowed to pass cpu to pool with higher, but the opposite is forbidden
TLevel() {}
TLevel(const TBalancingConfig& cfg, TPoolId poolId, ui64 currentCpus, double cpuIdle, ui64 addLatencyUs, ui64 worstLatencyUs) {
ScaleFactor = double(currentCpus) / cfg.Cpus;
if ((worstLatencyUs + addLatencyUs) < 2000 && cpuIdle > 1.0) { // Uderload criterion, based on estimated latency w/o 1 cpu
LoadClass = Underloaded;
} else if (worstLatencyUs > 2000 || cpuIdle < 0.2) { // Overload criterion, based on latency
LoadClass = Overloaded;
} else {
LoadClass = Moderate;
}
Importance = MakeImportance(LoadClass, cfg.Priority, ScaleFactor, cpuIdle, poolId);
}
private:
// Importance is simple ui64 value (from highest to lowest):
// 2 Bits: LoadClass
// 8 Bits: Priority
// 10 Bits: -ScaleFactor (for max-min fairness with weights equal to TBalancingConfig::Cpus)
// 10 Bits: -CpuIdle
// 6 Bits: PoolId
static ui64 MakeImportance(ELoadClass load, ui8 priority, double scaleFactor, double cpuIdle, TPoolId poolId) {
ui64 idle = std::clamp<i64>(1024 - cpuIdle * 512, 0, 1023);
ui64 scale = std::clamp<i64>(1024 - scaleFactor * 32, 0, 1023);
Y_ABORT_UNLESS(ui64(load) < (1ull << 2ull));
Y_ABORT_UNLESS(ui64(priority) < (1ull << 8ull));
Y_ABORT_UNLESS(ui64(scale) < (1ull << 10ull));
Y_ABORT_UNLESS(ui64(idle) < (1ull << 10ull));
Y_ABORT_UNLESS(ui64(poolId) < (1ull << 6ull));
static_assert(ui64(MaxPools) <= (1ull << 6ull));
ui64 importance =
(ui64(load) << ui64(6 + 10 + 10 + 8)) |
(ui64(priority) << ui64(6 + 10 + 10)) |
(ui64(scale) << ui64(6 + 10)) |
(ui64(idle) << ui64(6)) |
ui64(poolId);
return importance;
}
};
// Main balancer implemenation
class TBalancer: public IBalancer {
private:
struct TCpu;
struct TPool;
bool Disabled = true;
TSpinLock Lock;
ui64 NextBalanceTs;
TVector<TCpu> Cpus; // Indexed by CpuId, can have gaps
TVector<TPool> Pools; // Indexed by PoolId, can have gaps
TBalancerConfig Config;
public:
ui64 GetPeriodUs() override;
// Setup
TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts);
bool AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* cpu) override;
~TBalancer();
// Balancing
bool TryLock(ui64 ts) override;
void SetPoolStats(TPoolId pool, const TBalancerStats& stats) override;
void Balance() override;
void Unlock() override;
private:
void MoveCpu(TPool& from, TPool& to);
};
struct TBalancer::TPool {
TBalancingConfig Config;
TPoolId PoolId;
TString PoolName;
// Input data for balancing
TBalancerStats Prev;
TBalancerStats Next;
// Derived stats
double CpuLoad;
double CpuIdle;
// Classification
// NOTE: We want to avoid passing cpu back and forth, so we must consider not only current level,
// NOTE: but expected levels after movements also
TLevel CurLevel; // Level with current amount of cpu
TLevel AddLevel; // Level after one cpu acception
TLevel SubLevel; // Level after one cpu donation
// Balancing state
ui64 CurrentCpus = 0; // Total number of cpus assigned for this pool (zero means pools is not balanced)
ui64 PrevCpus = 0; // Cpus in last period
explicit TPool(const TBalancingConfig& cfg = {})
: Config(cfg)
{}
void Configure(const TBalancingConfig& cfg, const TString& poolName) {
Config = cfg;
// Enforce constraints
if (Config.Cpus > 0) {
Config.MinCpus = std::clamp<ui32>(Config.MinCpus, 1, Config.Cpus);
Config.MaxCpus = Max<ui32>(Config.MaxCpus, Config.Cpus);
} else {
Y_ABORT_UNLESS(Config.Cpus == 0,
"Unexpected negative Config.Cpus# %" PRIi64,
(i64)Config.Cpus);
Config.MinCpus = 0;
Config.MaxCpus = 0;
}
PoolName = poolName;
}
};
struct TBalancer::TCpu {
TCpuState* State = nullptr; // Cpu state, nullptr means cpu is not used (gap)
TCpuAllocation Alloc;
TPoolId Current;
TPoolId Assigned;
};
TBalancer::TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts)
: NextBalanceTs(ts)
, Config(config)
{
for (TPoolId pool = 0; pool < MaxPools; pool++) {
Pools.emplace_back();
Pools.back().PoolId = pool;
}
for (const TUnitedExecutorPoolConfig& united : unitedPools) {
Pools[united.PoolId].Configure(united.Balancing, united.PoolName);
}
}
TBalancer::~TBalancer() {
}
bool TBalancer::AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* state) {
// Setup
TCpuId cpuId = cpuAlloc.CpuId;
if (Cpus.size() <= cpuId) {
Cpus.resize(cpuId + 1);
}
TCpu& cpu = Cpus[cpuId];
cpu.State = state;
cpu.Alloc = cpuAlloc;
// Fill every pool with cpus up to TBalancingConfig::Cpus
TPoolId pool = 0;
for (TPool& p : Pools) {
if (p.CurrentCpus < p.Config.Cpus) {
p.CurrentCpus++;
break;
}
pool++;
}
if (pool != MaxPools) { // cpu under balancer control
state->SwitchPool(pool);
state->AssignPool(pool);
Disabled = false;
return true;
}
return false; // non-balanced cpu
}
bool TBalancer::TryLock(ui64 ts) {
if (!Disabled && NextBalanceTs < ts && Lock.TryAcquire()) {
NextBalanceTs = ts + Us2Ts(Config.PeriodUs);
return true;
}
return false;
}
void TBalancer::SetPoolStats(TPoolId pool, const TBalancerStats& stats) {
Y_ABORT_UNLESS(pool < MaxPools);
TPool& p = Pools[pool];
p.Prev = p.Next;
p.Next = stats;
}
void TBalancer::Balance() {
// Update every cpu state
for (TCpu& cpu : Cpus) {
if (cpu.State) {
cpu.State->Load(cpu.Assigned, cpu.Current);
if (cpu.Current < MaxPools && cpu.Current != cpu.Assigned) {
return; // previous movement has not been applied yet, wait
}
}
}
// Process stats, classify and compute pool importance
TStackVec<TPool*, MaxPools> order;
for (TPool& pool : Pools) {
if (pool.Config.Cpus == 0) {
continue; // skip gaps (non-existent or non-united pools)
}
if (pool.Prev.Ts == 0 || pool.Prev.Ts >= pool.Next.Ts) {
return; // invalid stats
}
// Compute derived stats
pool.CpuLoad = (pool.Next.CpuUs - pool.Prev.CpuUs) / Ts2Us(pool.Next.Ts - pool.Prev.Ts);
if (pool.Prev.IdleUs == ui64(-1) || pool.Next.IdleUs == ui64(-1)) {
pool.CpuIdle = pool.CurrentCpus - pool.CpuLoad; // for tests
} else {
pool.CpuIdle = (pool.Next.IdleUs - pool.Prev.IdleUs) / Ts2Us(pool.Next.Ts - pool.Prev.Ts);
}
// Compute levels
pool.CurLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus, pool.CpuIdle,
pool.Next.ExpectedLatencyIncreaseUs, pool.Next.WorstActivationTimeUs);
pool.AddLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus + 1, pool.CpuIdle,
0, pool.Next.WorstActivationTimeUs); // we expect taken cpu to became utilized
pool.SubLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus - 1, pool.CpuIdle - 1,
pool.Next.ExpectedLatencyIncreaseUs, pool.Next.WorstActivationTimeUs);
// Prepare for balancing
pool.PrevCpus = pool.CurrentCpus;
order.push_back(&pool);
}
// Sort pools by importance
std::sort(order.begin(), order.end(), [] (TPool* l, TPool* r) {return l->CurLevel.Importance < r->CurLevel.Importance; });
for (TPool* pool : order) {
LWPROBE(PoolStats, pool->PoolId, pool->PoolName, pool->CurrentCpus, pool->CurLevel.LoadClass, pool->Config.Priority, pool->CurLevel.ScaleFactor, pool->CpuIdle, pool->CpuLoad, pool->CurLevel.Importance, pool->AddLevel.Importance, pool->SubLevel.Importance);
}
// Move cpus from lower importance to higher importance pools
for (auto toIter = order.rbegin(); toIter != order.rend(); ++toIter) {
TPool& to = **toIter;
if (to.CurLevel.LoadClass == TLevel::Overloaded && // if pool is overloaded
to.CurrentCpus < to.Config.MaxCpus) // and constraints would not be violated
{
for (auto fromIter = order.begin(); (*fromIter)->CurLevel.Importance < to.CurLevel.Importance; ++fromIter) {
TPool& from = **fromIter;
if (from.CurrentCpus == from.PrevCpus && // if not balanced yet
from.CurrentCpus > from.Config.MinCpus && // and constraints would not be violated
from.SubLevel.Importance <= to.AddLevel.Importance) // and which of two pools is more important would not change after cpu movement
{
MoveCpu(from, to);
from.CurrentCpus--;
to.CurrentCpus++;
break;
}
}
}
}
}
void TBalancer::MoveCpu(TBalancer::TPool& from, TBalancer::TPool& to) {
for (auto ci = Cpus.rbegin(), ce = Cpus.rend(); ci != ce; ci++) {
TCpu& cpu = *ci;
if (!cpu.State) {
continue;
}
if (cpu.Assigned == from.PoolId) {
cpu.State->AssignPool(to.PoolId);
cpu.Assigned = to.PoolId;
LWPROBE(MoveCpu, from.PoolId, to.PoolId, from.PoolName, to.PoolName, cpu.Alloc.CpuId);
return;
}
}
Y_FAIL();
}
void TBalancer::Unlock() {
Lock.Release();
}
ui64 TBalancer::GetPeriodUs() {
return Config.PeriodUs;
}
IBalancer* MakeBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts) {
return new TBalancer(config, unitedPools, ts);
}
}
|