blob: 4f6549044d82f04e1e175ca4f5fc6a98bb951d6d (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
#pragma once
#include "asio.h"
#include <library/cpp/deprecated/atomic/atomic.h>
#include <util/thread/factory.h>
#include <util/system/thread.h>
namespace NAsio {
class TIOServiceExecutor: public IThreadFactory::IThreadAble {
public:
TIOServiceExecutor()
: Work_(new TIOService::TWork(Srv_))
{
T_ = SystemThreadFactory()->Run(this);
}
~TIOServiceExecutor() override {
SyncShutdown();
}
void DoExecute() override {
TThread::SetCurrentThreadName("NehAsioExecutor");
Srv_.Run();
}
inline TIOService& GetIOService() noexcept {
return Srv_;
}
void SyncShutdown() {
if (Work_) {
Work_.Destroy();
Srv_.Abort(); //cancel all async operations, break Run() execution
T_->Join();
}
}
private:
TIOService Srv_;
TAutoPtr<TIOService::TWork> Work_;
typedef TAutoPtr<IThreadFactory::IThread> IThreadRef;
IThreadRef T_;
};
class TExecutorsPool {
public:
TExecutorsPool(size_t executors)
: C_(0)
{
for (size_t i = 0; i < executors; ++i) {
E_.push_back(new TIOServiceExecutor());
}
}
inline size_t Size() const noexcept {
return E_.size();
}
inline TIOServiceExecutor& GetExecutor() noexcept {
TAtomicBase next = AtomicIncrement(C_);
return *E_[next % E_.size()];
}
void SyncShutdown() {
for (size_t i = 0; i < E_.size(); ++i) {
E_[i]->SyncShutdown();
}
}
private:
TAtomic C_;
TVector<TAutoPtr<TIOServiceExecutor>> E_;
};
}
|