1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
#pragma once
#include <sys/epoll.h>
namespace NActors {
class TEpollThread : public TPollerThreadBase<TEpollThread> {
// epoll file descriptor
int EpollDescriptor;
public:
TEpollThread(TActorSystem *actorSystem)
: TPollerThreadBase(actorSystem)
{
EpollDescriptor = epoll_create1(EPOLL_CLOEXEC);
Y_ABORT_UNLESS(EpollDescriptor != -1, "epoll_create1() failed with %s", strerror(errno));
epoll_event event;
event.data.ptr = nullptr;
event.events = EPOLLIN;
if (epoll_ctl(EpollDescriptor, EPOLL_CTL_ADD, ReadEnd, &event) == -1) {
Y_ABORT("epoll_ctl(EPOLL_CTL_ADD) failed with %s", strerror(errno));
}
ISimpleThread::Start(); // start poller thread
}
~TEpollThread() {
Stop();
close(EpollDescriptor);
}
bool ProcessEventsInLoop() {
// preallocated array for events
std::array<epoll_event, 256> events;
// wait indefinitely for event to arrive
LWPROBE(EpollStartWaitIn);
int numReady = epoll_wait(EpollDescriptor, events.data(), events.size(), -1);
LWPROBE(EpollFinishWaitIn, numReady);
// check return status for any errors
if (numReady == -1) {
if (errno == EINTR) {
return false; // restart the call a bit later
} else {
Y_ABORT("epoll_wait() failed with %s", strerror(errno));
}
}
bool res = false;
for (int i = 0; i < numReady; ++i) {
const epoll_event& ev = events[i];
if (auto *record = static_cast<TSocketRecord*>(ev.data.ptr)) {
const bool read = ev.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR);
const bool write = ev.events & (EPOLLOUT | EPOLLERR);
// remove hit flags from the bit set
ui32 flags = record->Flags;
const ui32 remove = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0);
while (!record->Flags.compare_exchange_weak(flags, flags & ~remove))
{}
flags &= ~remove;
// rearm poller if some flags remain
if (flags) {
epoll_event event;
event.events = EPOLLONESHOT | EPOLLRDHUP | flags;
event.data.ptr = record;
if (epoll_ctl(EpollDescriptor, EPOLL_CTL_MOD, record->Socket->GetDescriptor(), &event) == -1) {
Y_ABORT("epoll_ctl(EPOLL_CTL_MOD) failed with %s", strerror(errno));
}
}
// issue notifications
Notify(record, read, write);
} else {
res = true;
}
}
return res;
}
void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) {
if (epoll_ctl(EpollDescriptor, EPOLL_CTL_DEL, socket->GetDescriptor(), nullptr) == -1) {
Y_ABORT("epoll_ctl(EPOLL_CTL_DEL) failed with %s", strerror(errno));
}
}
void RegisterSocket(const TIntrusivePtr<TSocketRecord>& record) {
epoll_event event;
event.events = EPOLLONESHOT | EPOLLRDHUP;
event.data.ptr = record.Get();
if (epoll_ctl(EpollDescriptor, EPOLL_CTL_ADD, record->Socket->GetDescriptor(), &event) == -1) {
Y_ABORT("epoll_ctl(EPOLL_CTL_ADD) failed with %s", strerror(errno));
}
}
void Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write) {
const ui32 add = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0);
ui32 flags = record->Flags;
while (!record->Flags.compare_exchange_weak(flags, flags | add))
{}
flags |= add;
if (flags) {
epoll_event event;
event.events = EPOLLONESHOT | EPOLLRDHUP | flags;
event.data.ptr = record.Get();
if (epoll_ctl(EpollDescriptor, EPOLL_CTL_MOD, record->Socket->GetDescriptor(), &event) == -1) {
Y_ABORT("epoll_ctl(EPOLL_CTL_MOD) failed with %s", strerror(errno));
}
}
}
};
using TPollerThread = TEpollThread;
} // namespace NActors
|