1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
|
#pragma once
#include "defs.h"
#include <library/cpp/actors/util/cpumask.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <util/datetime/base.h>
#include <util/generic/ptr.h>
#include <util/generic/string.h>
#include <util/generic/vector.h>
namespace NActors {
struct TBalancingConfig {
// Default cpu count (used during overload). Zero value disables this pool balancing
// 1) Sum of `Cpus` on all pools cannot be changed without restart
// (changing cpu mode between Shared and Assigned is not implemented yet)
// 2) This sum must be equal to TUnitedWorkersConfig::CpuCount,
// otherwise `CpuCount - SUM(Cpus)` cpus will be in Shared mode (i.e. actorsystem 2.0)
ui32 Cpus = 0;
ui32 MinCpus = 0; // Lower balancing bound, should be at least 1, and not greater than `Cpus`
ui32 MaxCpus = 0; // Higher balancing bound, should be not lower than `Cpus`
ui8 Priority = 0; // Priority of pool to obtain cpu due to balancing (higher is better)
ui64 ToleratedLatencyUs = 0; // p100-latency threshold indicating that more cpus are required by pool
};
struct TBalancerConfig {
ui64 PeriodUs = 15000000; // Time between balancer steps
};
enum class EASProfile {
Default,
LowCpuConsumption,
LowLatency,
};
struct TBasicExecutorPoolConfig {
static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TDuration::MilliSeconds(10);
static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = 100;
ui32 PoolId = 0;
TString PoolName;
ui32 Threads = 1;
ui64 SpinThreshold = 100;
TCpuMask Affinity; // Executor thread affinity
TDuration TimePerMailbox = DEFAULT_TIME_PER_MAILBOX;
ui32 EventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX;
int RealtimePriority = 0;
i16 MinThreadCount = 0;
i16 MaxThreadCount = 0;
i16 DefaultThreadCount = 0;
i16 Priority = 0;
i16 SharedExecutorsCount = 0;
i16 SoftProcessingDurationTs = 0;
EASProfile ActorSystemProfile = EASProfile::Default;
};
struct TIOExecutorPoolConfig {
ui32 PoolId = 0;
TString PoolName;
ui32 Threads = 1;
TCpuMask Affinity; // Executor thread affinity
};
struct TUnitedExecutorPoolConfig {
static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TDuration::MilliSeconds(10);
static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = 100;
ui32 PoolId = 0;
TString PoolName;
// Resource sharing
ui32 Concurrency = 0; // Limits simultaneously running mailboxes count if set to non-zero value (do not set if Balancing.Cpus != 0)
TPoolWeight Weight = 0; // Weight in fair cpu-local pool scheduler
TCpuMask Allowed; // Allowed CPUs for workers to run this pool on (ignored if balancer works, i.e. actorsystem 1.5)
// Single mailbox execution limits
TDuration TimePerMailbox = DEFAULT_TIME_PER_MAILBOX;
ui32 EventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX;
// Long-term balancing
TBalancingConfig Balancing;
};
struct TUnitedWorkersConfig {
ui32 CpuCount = 0; // Total CPUs running united workers (i.e. TBasicExecutorPoolConfig::Threads analog); set to zero to disable united workers
ui64 SpinThresholdUs = 100; // Limit for active spinning in case all pools became idle
ui64 PoolLimitUs = 500; // Soft limit on pool execution
ui64 EventLimitUs = 100; // Hard limit on last event execution exceeding pool limit
ui64 LimitPrecisionUs = 100; // Maximum delay of timer on limit excess (delay needed to avoid settimer syscall on every pool switch)
ui64 FastWorkerPriority = 10; // Real-time priority of workers not exceeding hard limits
ui64 IdleWorkerPriority = 20; // Real-time priority of standby workers waiting for hard preemption on timers (should be greater than FastWorkerPriority)
TCpuMask Allowed; // Allowed CPUs for workers to run on (every worker has affinity for exactly one cpu)
bool NoRealtime = false; // For environments w/o permissions for RT-threads
bool NoAffinity = false; // For environments w/o permissions for cpu affinity
TBalancerConfig Balancer;
};
struct TSelfPingInfo {
NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter;
NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
ui32 MaxAvgPingUs;
};
struct TCpuManagerConfig {
TUnitedWorkersConfig UnitedWorkers;
TVector<TBasicExecutorPoolConfig> Basic;
TVector<TIOExecutorPoolConfig> IO;
TVector<TUnitedExecutorPoolConfig> United;
TVector<TSelfPingInfo> PingInfoByPool;
ui32 GetExecutorsCount() const {
return Basic.size() + IO.size() + United.size();
}
TString GetPoolName(ui32 poolId) const {
for (const auto& p : Basic) {
if (p.PoolId == poolId) {
return p.PoolName;
}
}
for (const auto& p : IO) {
if (p.PoolId == poolId) {
return p.PoolName;
}
}
for (const auto& p : United) {
if (p.PoolId == poolId) {
return p.PoolName;
}
}
Y_ABORT("undefined pool id: %" PRIu32, (ui32)poolId);
}
std::optional<ui32> GetThreadsOptional(ui32 poolId) const {
for (const auto& p : Basic) {
if (p.PoolId == poolId) {
return p.DefaultThreadCount;
}
}
for (const auto& p : IO) {
if (p.PoolId == poolId) {
return p.Threads;
}
}
for (const auto& p : United) {
if (p.PoolId == poolId) {
return p.Concurrency ? p.Concurrency : UnitedWorkers.CpuCount;
}
}
return {};
}
ui32 GetThreads(ui32 poolId) const {
auto result = GetThreadsOptional(poolId);
Y_ABORT_UNLESS(result, "undefined pool id: %" PRIu32, (ui32)poolId);
return *result;
}
};
struct TSchedulerConfig {
TSchedulerConfig(
ui64 resolution = 1024,
ui64 spinThreshold = 100,
ui64 progress = 10000,
bool useSchedulerActor = false)
: ResolutionMicroseconds(resolution)
, SpinThreshold(spinThreshold)
, ProgressThreshold(progress)
, UseSchedulerActor(useSchedulerActor)
{}
ui64 ResolutionMicroseconds = 1024;
ui64 SpinThreshold = 100;
ui64 ProgressThreshold = 10000;
bool UseSchedulerActor = false; // False is default because tests use scheduler thread
ui64 RelaxedSendPaceEventsPerSecond = 200000;
ui64 RelaxedSendPaceEventsPerCycle = RelaxedSendPaceEventsPerSecond * ResolutionMicroseconds / 1000000;
// For resolution >= 250000 microseconds threshold is SendPace
// For resolution <= 250 microseconds threshold is 20 * SendPace
ui64 RelaxedSendThresholdEventsPerSecond = RelaxedSendPaceEventsPerSecond *
(20 - ((20 - 1) * ClampVal(ResolutionMicroseconds, ui64(250), ui64(250000)) - 250) / (250000 - 250));
ui64 RelaxedSendThresholdEventsPerCycle = RelaxedSendThresholdEventsPerSecond * ResolutionMicroseconds / 1000000;
// Optional subsection for scheduler counters (usually subsystem=utils)
NMonitoring::TDynamicCounterPtr MonCounters = nullptr;
};
struct TCpuAllocation {
struct TPoolAllocation {
TPoolId PoolId;
TPoolWeight Weight;
TPoolAllocation(TPoolId poolId = 0, TPoolWeight weight = 0)
: PoolId(poolId)
, Weight(weight)
{}
};
TCpuId CpuId;
TVector<TPoolAllocation> AllowedPools;
TPoolsMask GetPoolsMask() const {
TPoolsMask mask = 0;
for (const auto& pa : AllowedPools) {
if (pa.PoolId < MaxPools) {
mask &= (1ull << pa.PoolId);
}
}
return mask;
}
bool HasPool(TPoolId pool) const {
for (const auto& pa : AllowedPools) {
if (pa.PoolId == pool) {
return true;
}
}
return false;
}
};
struct TCpuAllocationConfig {
TVector<TCpuAllocation> Items;
TCpuAllocationConfig(const TCpuMask& available, const TCpuManagerConfig& cfg) {
for (const TUnitedExecutorPoolConfig& pool : cfg.United) {
Y_ABORT_UNLESS(pool.PoolId < MaxPools, "wrong PoolId of united executor pool: %s(%d)",
pool.PoolName.c_str(), (pool.PoolId));
}
ui32 allocated[MaxPools] = {0};
for (TCpuId cpu = 0; cpu < available.Size() && Items.size() < cfg.UnitedWorkers.CpuCount; cpu++) {
if (available.IsSet(cpu)) {
TCpuAllocation item;
item.CpuId = cpu;
for (const TUnitedExecutorPoolConfig& pool : cfg.United) {
if (cfg.UnitedWorkers.Allowed.IsEmpty() || cfg.UnitedWorkers.Allowed.IsSet(cpu)) {
if (pool.Allowed.IsEmpty() || pool.Allowed.IsSet(cpu)) {
item.AllowedPools.emplace_back(pool.PoolId, pool.Weight);
allocated[pool.PoolId]++;
}
}
}
if (!item.AllowedPools.empty()) {
Items.push_back(item);
}
}
}
for (const TUnitedExecutorPoolConfig& pool : cfg.United) {
Y_ABORT_UNLESS(allocated[pool.PoolId] > 0, "unable to allocate cpu for united executor pool: %s(%d)",
pool.PoolName.c_str(), (pool.PoolId));
}
}
operator bool() const {
return !Items.empty();
}
};
}
|