1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
#pragma once
#include <sys/epoll.h>
namespace NActors {
enum {
ReadExpected = 1,
ReadHit = 2,
WriteExpected = 4,
WriteHit = 8,
};
class TEpollThread : public TPollerThreadBase<TEpollThread> {
// epoll file descriptor
int EpollDescriptor;
public:
TEpollThread(TActorSystem *actorSystem)
: TPollerThreadBase(actorSystem)
{
EpollDescriptor = epoll_create1(EPOLL_CLOEXEC);
Y_ABORT_UNLESS(EpollDescriptor != -1, "epoll_create1() failed with %s", strerror(errno));
epoll_event event;
event.data.ptr = nullptr;
event.events = EPOLLIN;
if (epoll_ctl(EpollDescriptor, EPOLL_CTL_ADD, ReadEnd, &event) == -1) {
Y_ABORT("epoll_ctl(EPOLL_CTL_ADD) failed with %s", strerror(errno));
}
ISimpleThread::Start(); // start poller thread
}
~TEpollThread() {
Stop();
close(EpollDescriptor);
}
bool ProcessEventsInLoop() {
// preallocated array for events
std::array<epoll_event, 256> events;
// wait indefinitely for event to arrive
LWPROBE(EpollStartWaitIn);
int numReady = epoll_wait(EpollDescriptor, events.data(), events.size(), -1);
LWPROBE(EpollFinishWaitIn, numReady);
// check return status for any errors
if (numReady == -1) {
if (errno == EINTR) {
return false; // restart the call a bit later
} else {
Y_ABORT("epoll_wait() failed with %s", strerror(errno));
}
}
bool res = false;
for (int i = 0; i < numReady; ++i) {
const epoll_event& ev = events[i];
if (auto *record = static_cast<TSocketRecord*>(ev.data.ptr)) {
const bool read = ev.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR);
const bool write = ev.events & (EPOLLOUT | EPOLLERR);
UpdateFlags(record, (read ? ReadHit : 0) | (write ? WriteHit : 0), false);
} else {
res = true;
}
}
return res;
}
bool UpdateFlags(TSocketRecord *record, ui32 addMask, bool suppressNotify) {
ui32 flags = record->Flags.load(std::memory_order_acquire);
for (;;) {
ui32 updated = flags | addMask;
static constexpr ui32 fullRead = ReadExpected | ReadHit;
static constexpr ui32 fullWrite = WriteExpected | WriteHit;
const bool read = (updated & fullRead) == fullRead;
const bool write = (updated & fullWrite) == fullWrite;
updated &= ~((read ? fullRead : 0) | (write ? fullWrite : 0));
if (record->Flags.compare_exchange_weak(flags, updated, std::memory_order_acq_rel)) {
if (suppressNotify) {
return read || write;
} else {
Notify(record, read, write);
return false;
}
}
}
}
void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) {
if (epoll_ctl(EpollDescriptor, EPOLL_CTL_DEL, socket->GetDescriptor(), nullptr) == -1) {
Y_ABORT("epoll_ctl(EPOLL_CTL_DEL) failed with %s", strerror(errno));
}
}
void RegisterSocket(const TIntrusivePtr<TSocketRecord>& record) {
epoll_event event;
event.events = EPOLLET | EPOLLRDHUP | EPOLLIN | EPOLLOUT;
event.data.ptr = record.Get();
if (epoll_ctl(EpollDescriptor, EPOLL_CTL_ADD, record->Socket->GetDescriptor(), &event) == -1) {
Y_ABORT("epoll_ctl(EPOLL_CTL_ADD) failed with %s", strerror(errno));
}
}
bool Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write, bool suppressNotify) {
return UpdateFlags(record.Get(), (read ? ReadExpected : 0) | (write ? WriteExpected : 0), suppressNotify);
}
};
using TPollerThread = TEpollThread;
} // namespace NActors
|