aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core/balancer.cpp
blob: cc5417b0b5023ecf04725625978dba03053d6d47 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
#include "balancer.h"

#include "probes.h"

#include <library/cpp/actors/util/intrinsics.h>
#include <library/cpp/actors/util/datetime.h>

#include <util/system/spinlock.h>

#include <algorithm>

namespace NActors {
    LWTRACE_USING(ACTORLIB_PROVIDER);

    // Describes balancing-related state of pool, the most notable is `Importance` to add new cpu
    struct TLevel {
        // Balancer will try to give more cpu to overloaded pools
        enum ELoadClass {
            Underloaded = 0,
            Moderate = 1,
            Overloaded = 2,
        };

        double ScaleFactor;
        ELoadClass LoadClass;
        ui64 Importance; // pool with lower importance is allowed to pass cpu to pool with higher, but the opposite is forbidden

        TLevel() {}

        TLevel(const TBalancingConfig& cfg, TPoolId poolId, ui64 currentCpus, double cpuIdle) {
            ScaleFactor = double(currentCpus) / cfg.Cpus;
            if (cpuIdle > 1.3) { // TODO: add a better underload criterion, based on estimated latency w/o 1 cpu
                LoadClass = Underloaded;
            } else if (cpuIdle < 0.2) { // TODO: add a better overload criterion, based on latency
                LoadClass = Overloaded;
            } else {
                LoadClass = Moderate;
            }
            Importance = MakeImportance(LoadClass, cfg.Priority, ScaleFactor, cpuIdle, poolId);
        }

    private:
        // Importance is simple ui64 value (from highest to lowest):
        //   2 Bits: LoadClass
        //   8 Bits: Priority
        //  10 Bits: -ScaleFactor (for max-min fairness with weights equal to TBalancingConfig::Cpus)
        //  10 Bits: -CpuIdle
        //   6 Bits: PoolId
        static ui64 MakeImportance(ELoadClass load, ui8 priority, double scaleFactor, double cpuIdle, TPoolId poolId) {
            ui64 idle = std::clamp<i64>(1024 - cpuIdle * 512, 0, 1023);
            ui64 scale = std::clamp<i64>(1024 - scaleFactor * 32, 0, 1023);

            Y_VERIFY(ui64(load)     < (1ull << 2ull));
            Y_VERIFY(ui64(priority) < (1ull << 8ull));
            Y_VERIFY(ui64(scale)    < (1ull << 10ull));
            Y_VERIFY(ui64(idle)     < (1ull << 10ull));
            Y_VERIFY(ui64(poolId)   < (1ull << 6ull));

            static_assert(ui64(MaxPools) <= (1ull << 6ull));

            ui64 importance =
                (ui64(load)     << ui64(6 + 10 + 10 + 8)) |
                (ui64(priority) << ui64(6 + 10 + 10)) |
                (ui64(scale)    << ui64(6 + 10)) |
                (ui64(idle)     << ui64(6)) |
                ui64(poolId);
            return importance;
        }
    };

    // Main balancer implemenation
    class TBalancer: public IBalancer {
    private:
        struct TCpu;
        struct TPool;

        bool Disabled = true;
        TSpinLock Lock;
        ui64 NextBalanceTs;
        TVector<TCpu> Cpus; // Indexed by CpuId, can have gaps
        TVector<TPool> Pools; // Indexed by PoolId, can have gaps
        TBalancerConfig Config;

    public:
        // Setup
        TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts);
        bool AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* cpu) override;
        ~TBalancer();

        // Balancing
        bool TryLock(ui64 ts) override;
        void SetPoolStats(TPoolId pool, const TBalancerStats& stats) override;
        void Balance() override;
        void Unlock() override;

    private:
        void MoveCpu(TPool& from, TPool& to);
    };

    struct TBalancer::TPool {
        TBalancingConfig Config;
        TPoolId PoolId;
        TString PoolName;

        // Input data for balancing
        TBalancerStats Prev;
        TBalancerStats Next;

        // Derived stats
        double CpuLoad;
        double CpuIdle;

        // Classification
        // NOTE: We want to avoid passing cpu back and forth, so we must consider not only current level,
        // NOTE: but expected levels after movements also
        TLevel CurLevel; // Level with current amount of cpu
        TLevel AddLevel; // Level after one cpu acception
        TLevel SubLevel; // Level after one cpu donation

        // Balancing state
        ui64 CurrentCpus = 0; // Total number of cpus assigned for this pool (zero means pools is not balanced)
        ui64 PrevCpus = 0; // Cpus in last period

        explicit TPool(const TBalancingConfig& cfg = {})
            : Config(cfg)
        {}

        void Configure(const TBalancingConfig& cfg, const TString& poolName) {
            Config = cfg;
            // Enforce constraints
            Config.MinCpus = std::clamp<ui32>(Config.MinCpus, 1, Config.Cpus);
            Config.MaxCpus = Max<ui32>(Config.MaxCpus, Config.Cpus);
            PoolName = poolName;
        }
    };

    struct TBalancer::TCpu {
        TCpuState* State = nullptr; // Cpu state, nullptr means cpu is not used (gap)
        TCpuAllocation Alloc;
        TPoolId Current;
        TPoolId Assigned;
    };

    TBalancer::TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts)
        : NextBalanceTs(ts)
        , Config(config)
    {
        for (TPoolId pool = 0; pool < MaxPools; pool++) {
            Pools.emplace_back();
            Pools.back().PoolId = pool;
        }
        for (const TUnitedExecutorPoolConfig& united : unitedPools) {
            Pools[united.PoolId].Configure(united.Balancing, united.PoolName);
        }
    }

    TBalancer::~TBalancer() {
    }

    bool TBalancer::AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* state) {
        // Setup
        TCpuId cpuId = cpuAlloc.CpuId;
        if (Cpus.size() <= cpuId) {
            Cpus.resize(cpuId + 1);
        }
        TCpu& cpu = Cpus[cpuId];
        cpu.State = state;
        cpu.Alloc = cpuAlloc;

        // Fill every pool with cpus up to TBalancingConfig::Cpus
        TPoolId pool = 0;
        for (TPool& p : Pools) {
            if (p.CurrentCpus < p.Config.Cpus) {
                p.CurrentCpus++;
                break;
            }
            pool++;
        }
        if (pool != MaxPools) { // cpu under balancer control
            state->SwitchPool(pool);
            state->AssignPool(pool);
            Disabled = false;
            return true;
        }
        return false; // non-balanced cpu
    }

    bool TBalancer::TryLock(ui64 ts) {
        if (!Disabled && NextBalanceTs < ts && Lock.TryAcquire()) {
            NextBalanceTs = ts + Us2Ts(Config.PeriodUs);
            return true;
        }
        return false;
    }

    void TBalancer::SetPoolStats(TPoolId pool, const TBalancerStats& stats) {
        Y_VERIFY(pool < MaxPools);
        TPool& p = Pools[pool];
        p.Prev = p.Next;
        p.Next = stats;
    }

    void TBalancer::Balance() {
        // Update every cpu state
        for (TCpu& cpu : Cpus) {
            if (cpu.State) {
                cpu.State->Load(cpu.Assigned, cpu.Current);
                if (cpu.Current < MaxPools && cpu.Current != cpu.Assigned) {
                    return; // previous movement has not been applied yet, wait
                }
            }
        }

        // Process stats, classify and compute pool importance
        TStackVec<TPool*, MaxPools> order;
        for (TPool& pool : Pools) {
            if (pool.Config.Cpus == 0) {
                continue; // skip gaps (non-existent or non-united pools)
            }
            if (pool.Prev.Ts == 0 || pool.Prev.Ts >= pool.Next.Ts) {
                return; // invalid stats
            }

            // Compute derived stats
            pool.CpuLoad = (pool.Next.CpuUs - pool.Prev.CpuUs) / Ts2Us(pool.Next.Ts - pool.Prev.Ts);
            if (pool.Prev.IdleUs == ui64(-1) || pool.Next.IdleUs == ui64(-1)) {
                pool.CpuIdle = pool.CurrentCpus - pool.CpuLoad; // for tests
            } else {
                pool.CpuIdle = (pool.Next.IdleUs - pool.Prev.IdleUs) / Ts2Us(pool.Next.Ts - pool.Prev.Ts);
            }

            // Compute levels
            pool.CurLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus, pool.CpuIdle);
            pool.AddLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus + 1, pool.CpuIdle); // we expect taken cpu to became utilized
            pool.SubLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus - 1, pool.CpuIdle - 1);

            // Prepare for balancing
            pool.PrevCpus = pool.CurrentCpus;
            order.push_back(&pool);
        }

        // Sort pools by importance
        std::sort(order.begin(), order.end(), [] (TPool* l, TPool* r) {return l->CurLevel.Importance < r->CurLevel.Importance; });
        for (TPool* pool : order) {
            LWPROBE(PoolStats, pool->PoolId, pool->PoolName, pool->CurrentCpus, pool->CurLevel.LoadClass, pool->Config.Priority, pool->CurLevel.ScaleFactor, pool->CpuIdle, pool->CpuLoad, pool->CurLevel.Importance, pool->AddLevel.Importance, pool->SubLevel.Importance);
        }

        // Move cpus from lower importance to higher importance pools
        for (auto toIter = order.rbegin(); toIter != order.rend(); ++toIter) {
            TPool& to = **toIter;
            if (to.CurLevel.LoadClass == TLevel::Overloaded && // if pool is overloaded
                to.CurrentCpus < to.Config.MaxCpus) // and constraints would not be violated
            {
                for (auto fromIter = order.begin(); (*fromIter)->CurLevel.Importance < to.CurLevel.Importance; ++fromIter) {
                    TPool& from = **fromIter;
                    if (from.CurrentCpus == from.PrevCpus && // if not balanced yet
                        from.CurrentCpus > from.Config.MinCpus && // and constraints would not be violated
                        from.SubLevel.Importance < to.AddLevel.Importance) // and which of two pools is more important would not change after cpu movement
                    {
                        MoveCpu(from, to);
                        from.CurrentCpus--;
                        to.CurrentCpus++;
                        break;
                    }
                }
            }
        }
    }

    void TBalancer::MoveCpu(TBalancer::TPool& from, TBalancer::TPool& to) {
        for (auto ci = Cpus.rbegin(), ce = Cpus.rend(); ci != ce; ci++) {
            TCpu& cpu = *ci;
            if (!cpu.State) {
                continue;
            }
            if (cpu.Assigned == from.PoolId) {
                cpu.State->AssignPool(to.PoolId);
                cpu.Assigned = to.PoolId;
                LWPROBE(MoveCpu, from.PoolId, to.PoolId, from.PoolName, to.PoolName, cpu.Alloc.CpuId);
                return;
            }
        }
        Y_FAIL();
    }

    void TBalancer::Unlock() {
        Lock.Release();
    }

    IBalancer* MakeBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts) {
        return new TBalancer(config, unitedPools, ts);
    }
}