aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core/cpu_manager.cpp
blob: 24b3161e3c8b6913830b612b2dbc829744631304 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#include "cpu_manager.h"
#include "probes.h"

#include "executor_pool_basic.h"
#include "executor_pool_io.h"
#include "executor_pool_united.h"

namespace NActors {
    LWTRACE_USING(ACTORLIB_PROVIDER);

    TCpuManager::TCpuManager(THolder<TActorSystemSetup>& setup)
        : ExecutorPoolCount(setup->GetExecutorsCount())
        , Balancer(setup->Balancer)
        , Config(setup->CpuManager)
    {
        if (setup->Executors) { // Explicit mode w/o united pools
            Executors.Reset(setup->Executors.Release());
            for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
                IExecutorPool* pool = Executors[excIdx].Get();
                Y_ABORT_UNLESS(dynamic_cast<TUnitedExecutorPool*>(pool) == nullptr,
                    "united executor pool is prohibited in explicit mode of NActors::TCpuManager");
            }
        } else {
            Setup();
        }
    }

    void TCpuManager::Setup() {
        TAffinity available;
        available.Current();
        TCpuAllocationConfig allocation(available, Config);

        if (allocation) {
            if (!Balancer) {
                Balancer.Reset(MakeBalancer(Config.UnitedWorkers.Balancer, Config.United, GetCycleCountFast()));
            }
            UnitedWorkers.Reset(new TUnitedWorkers(Config.UnitedWorkers, Config.United, allocation, Balancer.Get()));
        }

        ui64 ts = GetCycleCountFast();
        Harmonizer.Reset(MakeHarmonizer(ts));

        Executors.Reset(new TAutoPtr<IExecutorPool>[ExecutorPoolCount]);

        for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
            Executors[excIdx].Reset(CreateExecutorPool(excIdx));
            if (excIdx < Config.PingInfoByPool.size()) {
                Harmonizer->AddPool(Executors[excIdx].Get(), &Config.PingInfoByPool[excIdx]);
            } else {
                Harmonizer->AddPool(Executors[excIdx].Get());
            }
        }
    }

    void TCpuManager::PrepareStart(TVector<NSchedulerQueue::TReader*>& scheduleReaders, TActorSystem* actorSystem) {
        if (UnitedWorkers) {
            UnitedWorkers->Prepare(actorSystem, scheduleReaders);
        }
        for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
            NSchedulerQueue::TReader* readers;
            ui32 readersCount = 0;
            Executors[excIdx]->Prepare(actorSystem, &readers, &readersCount);
            for (ui32 i = 0; i != readersCount; ++i, ++readers) {
                scheduleReaders.push_back(readers);
            }
        }
    }

    void TCpuManager::Start() {
        if (UnitedWorkers) {
            UnitedWorkers->Start();
        }
        for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
            Executors[excIdx]->Start();
        }
    }

    void TCpuManager::PrepareStop() {
        for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
            Executors[excIdx]->PrepareStop();
        }
        if (UnitedWorkers) {
            UnitedWorkers->PrepareStop();
        }
    }

    void TCpuManager::Shutdown() {
        for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
            Executors[excIdx]->Shutdown();
        }
        if (UnitedWorkers) {
            UnitedWorkers->Shutdown();
        }
        for (ui32 round = 0, done = 0; done < ExecutorPoolCount && round < 3; ++round) {
            done = 0;
            for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
                if (Executors[excIdx]->Cleanup()) {
                    ++done;
                }
            }
        }
    }

    void TCpuManager::Cleanup() {
        for (ui32 round = 0, done = 0; done < ExecutorPoolCount; ++round) {
            Y_ABORT_UNLESS(round < 10, "actorsystem cleanup could not be completed in 10 rounds");
            done = 0;
            for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
                if (Executors[excIdx]->Cleanup()) {
                    ++done;
                }
            }
        }
        Executors.Destroy();
        UnitedWorkers.Destroy();
    }

    IExecutorPool* TCpuManager::CreateExecutorPool(ui32 poolId) {
        for (TBasicExecutorPoolConfig& cfg : Config.Basic) {
            if (cfg.PoolId == poolId) {
                return new TBasicExecutorPool(cfg, Harmonizer.Get());
            }
        }
        for (TIOExecutorPoolConfig& cfg : Config.IO) {
            if (cfg.PoolId == poolId) {
                return new TIOExecutorPool(cfg);
            }
        }
        for (TUnitedExecutorPoolConfig& cfg : Config.United) {
            if (cfg.PoolId == poolId) {
                IExecutorPool* result = new TUnitedExecutorPool(cfg, UnitedWorkers.Get());
                return result;
            }
        }
        Y_ABORT("missing PoolId: %d", int(poolId));
    }

    TVector<IExecutorPool*> TCpuManager::GetBasicExecutorPools() const {
        TVector<IExecutorPool*> pools;
        for (ui32 idx = 0; idx < ExecutorPoolCount; ++idx) {
            if (auto basicPool = dynamic_cast<TBasicExecutorPool*>(Executors[idx].Get()); basicPool != nullptr) {
                pools.push_back(basicPool);
            }
        }
        return pools;
    }

}