1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
#include "cpu_manager.h"
#include "probes.h"
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
void TCpuManager::Setup() {
TAffinity available;
available.Current();
TCpuAllocationConfig allocation(available, Config);
if (allocation) {
if (!Balancer) {
Balancer.Reset(MakeBalancer(Config.UnitedWorkers.Balancer, Config.United, GetCycleCountFast()));
}
UnitedWorkers.Reset(new TUnitedWorkers(Config.UnitedWorkers, Config.United, allocation, Balancer.Get()));
}
ui64 ts = GetCycleCountFast();
Harmonizer.Reset(MakeHarmonizer(ts));
Executors.Reset(new TAutoPtr<IExecutorPool>[ExecutorPoolCount]);
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
Executors[excIdx].Reset(CreateExecutorPool(excIdx));
if (excIdx < Config.PingInfoByPool.size()) {
Harmonizer->AddPool(Executors[excIdx].Get(), &Config.PingInfoByPool[excIdx]);
} else {
Harmonizer->AddPool(Executors[excIdx].Get());
}
}
}
void TCpuManager::PrepareStart(TVector<NSchedulerQueue::TReader*>& scheduleReaders, TActorSystem* actorSystem) {
if (UnitedWorkers) {
UnitedWorkers->Prepare(actorSystem, scheduleReaders);
}
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
NSchedulerQueue::TReader* readers;
ui32 readersCount = 0;
Executors[excIdx]->Prepare(actorSystem, &readers, &readersCount);
for (ui32 i = 0; i != readersCount; ++i, ++readers) {
scheduleReaders.push_back(readers);
}
}
}
void TCpuManager::Start() {
if (UnitedWorkers) {
UnitedWorkers->Start();
}
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
Executors[excIdx]->Start();
}
}
void TCpuManager::PrepareStop() {
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
Executors[excIdx]->PrepareStop();
}
if (UnitedWorkers) {
UnitedWorkers->PrepareStop();
}
}
void TCpuManager::Shutdown() {
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
Executors[excIdx]->Shutdown();
}
if (UnitedWorkers) {
UnitedWorkers->Shutdown();
}
for (ui32 round = 0, done = 0; done < ExecutorPoolCount && round < 3; ++round) {
done = 0;
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
if (Executors[excIdx]->Cleanup()) {
++done;
}
}
}
}
void TCpuManager::Cleanup() {
for (ui32 round = 0, done = 0; done < ExecutorPoolCount; ++round) {
Y_VERIFY(round < 10, "actorsystem cleanup could not be completed in 10 rounds");
done = 0;
for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
if (Executors[excIdx]->Cleanup()) {
++done;
}
}
}
Executors.Destroy();
UnitedWorkers.Destroy();
}
IExecutorPool* TCpuManager::CreateExecutorPool(ui32 poolId) {
for (TBasicExecutorPoolConfig& cfg : Config.Basic) {
if (cfg.PoolId == poolId) {
return new TBasicExecutorPool(cfg, Harmonizer.Get());
}
}
for (TIOExecutorPoolConfig& cfg : Config.IO) {
if (cfg.PoolId == poolId) {
return new TIOExecutorPool(cfg);
}
}
for (TUnitedExecutorPoolConfig& cfg : Config.United) {
if (cfg.PoolId == poolId) {
IExecutorPool* result = new TUnitedExecutorPool(cfg, UnitedWorkers.Get());
return result;
}
}
Y_FAIL("missing PoolId: %d", int(poolId));
}
}
|