aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core/config.h
blob: 750bdd3a07e2b89e748004ad6413b3fa099af02e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
#pragma once 
 
#include "defs.h" 
#include <library/cpp/actors/util/cpumask.h> 
#include <library/cpp/monlib/dynamic_counters/counters.h> 
#include <util/datetime/base.h> 
#include <util/generic/ptr.h> 
#include <util/generic/string.h> 
#include <util/generic/vector.h> 
 
namespace NActors { 
 
    struct TBalancingConfig { 
        // Default cpu count (used during overload). Zero value disables this pool balancing 
        // 1) Sum of `Cpus` on all pools cannot be changed without restart 
        //    (changing cpu mode between Shared and Assigned is not implemented yet) 
        // 2) This sum must be equal to TUnitedWorkersConfig::CpuCount, 
        //    otherwise `CpuCount - SUM(Cpus)` cpus will be in Shared mode (i.e. actorsystem 2.0) 
        ui32 Cpus = 0; 
 
        ui32 MinCpus = 0; // Lower balancing bound, should be at least 1, and not greater than `Cpus` 
        ui32 MaxCpus = 0; // Higher balancing bound, should be not lower than `Cpus` 
        ui8 Priority = 0; // Priority of pool to obtain cpu due to balancing (higher is better) 
        ui64 ToleratedLatencyUs = 0; // p100-latency threshold indicating that more cpus are required by pool 
    }; 
 
    struct TBalancerConfig { 
        ui64 PeriodUs = 15000000; // Time between balancer steps 
    }; 
 
    struct TBasicExecutorPoolConfig { 
        static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TDuration::MilliSeconds(10); 
        static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = 100; 
 
        ui32 PoolId = 0; 
        TString PoolName; 
        ui32 Threads = 1; 
        ui64 SpinThreshold = 100; 
        TCpuMask Affinity; // Executor thread affinity 
        TDuration TimePerMailbox = DEFAULT_TIME_PER_MAILBOX; 
        ui32 EventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX; 
        int RealtimePriority = 0; 
        ui32 MaxActivityType = 1; 
    }; 
 
    struct TIOExecutorPoolConfig { 
        ui32 PoolId = 0; 
        TString PoolName; 
        ui32 Threads = 1; 
        TCpuMask Affinity; // Executor thread affinity 
        ui32 MaxActivityType = 1; 
    }; 
 
    struct TUnitedExecutorPoolConfig { 
        static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TDuration::MilliSeconds(10); 
        static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = 100; 
 
        ui32 PoolId = 0; 
        TString PoolName; 
 
        // Resource sharing 
        ui32 Concurrency = 0; // Limits simultaneously running mailboxes count if set to non-zero value (do not set if Balancing.Cpus != 0) 
        TPoolWeight Weight = 0; // Weight in fair cpu-local pool scheduler 
        TCpuMask Allowed; // Allowed CPUs for workers to run this pool on (ignored if balancer works, i.e. actorsystem 1.5) 
 
        // Single mailbox execution limits 
        TDuration TimePerMailbox = DEFAULT_TIME_PER_MAILBOX; 
        ui32 EventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX; 
 
        // Introspection 
        ui32 MaxActivityType = 1; 
 
        // Long-term balancing 
        TBalancingConfig Balancing; 
    }; 
 
    struct TUnitedWorkersConfig { 
        ui32 CpuCount = 0; // Total CPUs running united workers (i.e. TBasicExecutorPoolConfig::Threads analog); set to zero to disable united workers 
        ui64 SpinThresholdUs = 100; // Limit for active spinning in case all pools became idle 
        ui64 PoolLimitUs = 500; // Soft limit on pool execution 
        ui64 EventLimitUs = 100; // Hard limit on last event execution exceeding pool limit 
        ui64 LimitPrecisionUs = 100; // Maximum delay of timer on limit excess (delay needed to avoid settimer syscall on every pool switch) 
        ui64 FastWorkerPriority = 10; // Real-time priority of workers not exceeding hard limits 
        ui64 IdleWorkerPriority = 20; // Real-time priority of standby workers waiting for hard preemption on timers (should be greater than FastWorkerPriority) 
        TCpuMask Allowed; // Allowed CPUs for workers to run on (every worker has affinity for exactly one cpu) 
        bool NoRealtime = false; // For environments w/o permissions for RT-threads 
        bool NoAffinity = false; // For environments w/o permissions for cpu affinity 
        TBalancerConfig Balancer; 
    }; 
 
    struct TCpuManagerConfig { 
        TUnitedWorkersConfig UnitedWorkers; 
        TVector<TBasicExecutorPoolConfig> Basic; 
        TVector<TIOExecutorPoolConfig> IO; 
        TVector<TUnitedExecutorPoolConfig> United; 
 
        ui32 GetExecutorsCount() const { 
            return Basic.size() + IO.size() + United.size(); 
        } 
 
        TString GetPoolName(ui32 poolId) const { 
            for (const auto& p : Basic) { 
                if (p.PoolId == poolId) { 
                    return p.PoolName; 
                } 
            } 
            for (const auto& p : IO) { 
                if (p.PoolId == poolId) { 
                    return p.PoolName; 
                } 
            } 
            for (const auto& p : United) { 
                if (p.PoolId == poolId) { 
                    return p.PoolName; 
                } 
            } 
            Y_FAIL("undefined pool id: %" PRIu32, (ui32)poolId); 
        } 
 
        ui32 GetThreads(ui32 poolId) const { 
            for (const auto& p : Basic) { 
                if (p.PoolId == poolId) { 
                    return p.Threads; 
                } 
            } 
            for (const auto& p : IO) { 
                if (p.PoolId == poolId) { 
                    return p.Threads; 
                } 
            } 
            for (const auto& p : United) { 
                if (p.PoolId == poolId) { 
                    return p.Concurrency ? p.Concurrency : UnitedWorkers.CpuCount; 
                } 
            } 
            Y_FAIL("undefined pool id: %" PRIu32, (ui32)poolId); 
        } 
    }; 
 
    struct TSchedulerConfig { 
        TSchedulerConfig( 
                ui64 resolution = 1024, 
                ui64 spinThreshold = 100, 
                ui64 progress = 10000, 
                bool useSchedulerActor = false) 
            : ResolutionMicroseconds(resolution) 
            , SpinThreshold(spinThreshold) 
            , ProgressThreshold(progress) 
            , UseSchedulerActor(useSchedulerActor) 
        {} 
 
        ui64 ResolutionMicroseconds = 1024; 
        ui64 SpinThreshold = 100; 
        ui64 ProgressThreshold = 10000; 
        bool UseSchedulerActor = false; // False is default because tests use scheduler thread 
        ui64 RelaxedSendPaceEventsPerSecond = 200000; 
        ui64 RelaxedSendPaceEventsPerCycle = RelaxedSendPaceEventsPerSecond * ResolutionMicroseconds / 1000000; 
        // For resolution >= 250000 microseconds threshold is SendPace 
        // For resolution <= 250 microseconds threshold is 20 * SendPace 
        ui64 RelaxedSendThresholdEventsPerSecond = RelaxedSendPaceEventsPerSecond * 
            (20 - ((20 - 1) * ClampVal(ResolutionMicroseconds, ui64(250), ui64(250000)) - 250) / (250000 - 250)); 
        ui64 RelaxedSendThresholdEventsPerCycle = RelaxedSendThresholdEventsPerSecond * ResolutionMicroseconds / 1000000; 
 
        // Optional subsection for scheduler counters (usually subsystem=utils) 
        NMonitoring::TDynamicCounterPtr MonCounters = nullptr; 
    }; 
 
    struct TCpuAllocation { 
        struct TPoolAllocation { 
            TPoolId PoolId; 
            TPoolWeight Weight; 
 
            TPoolAllocation(TPoolId poolId = 0, TPoolWeight weight = 0) 
                : PoolId(poolId) 
                , Weight(weight) 
            {} 
        }; 
 
        TCpuId CpuId; 
        TVector<TPoolAllocation> AllowedPools; 
 
        TPoolsMask GetPoolsMask() const { 
            TPoolsMask mask = 0; 
            for (const auto& pa : AllowedPools) { 
                if (pa.PoolId < MaxPools) { 
                    mask &= (1ull << pa.PoolId); 
                } 
            } 
            return mask; 
        } 
 
        bool HasPool(TPoolId pool) const { 
            for (const auto& pa : AllowedPools) { 
                if (pa.PoolId == pool) { 
                    return true; 
                } 
            } 
            return false; 
        } 
    }; 
 
    struct TCpuAllocationConfig { 
        TVector<TCpuAllocation> Items; 
 
        TCpuAllocationConfig(const TCpuMask& available, const TCpuManagerConfig& cfg) { 
            for (const TUnitedExecutorPoolConfig& pool : cfg.United) { 
                Y_VERIFY(pool.PoolId < MaxPools, "wrong PoolId of united executor pool: %s(%d)", 
                    pool.PoolName.c_str(), (pool.PoolId)); 
            } 
            ui32 allocated[MaxPools] = {0}; 
            for (TCpuId cpu = 0; cpu < available.Size() && Items.size() < cfg.UnitedWorkers.CpuCount; cpu++) { 
                if (available.IsSet(cpu)) { 
                    TCpuAllocation item; 
                    item.CpuId = cpu; 
                    for (const TUnitedExecutorPoolConfig& pool : cfg.United) { 
                        if (cfg.UnitedWorkers.Allowed.IsEmpty() || cfg.UnitedWorkers.Allowed.IsSet(cpu)) { 
                            if (pool.Allowed.IsEmpty() || pool.Allowed.IsSet(cpu)) { 
                                item.AllowedPools.emplace_back(pool.PoolId, pool.Weight); 
                                allocated[pool.PoolId]++; 
                            } 
                        } 
                    } 
                    if (!item.AllowedPools.empty()) { 
                        Items.push_back(item); 
                    } 
                } 
            } 
            for (const TUnitedExecutorPoolConfig& pool : cfg.United) { 
                Y_VERIFY(allocated[pool.PoolId] > 0, "unable to allocate cpu for united executor pool: %s(%d)", 
                    pool.PoolName.c_str(), (pool.PoolId)); 
            } 
        } 
 
        operator bool() const { 
            return !Items.empty(); 
        } 
    }; 
 
}