1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
#pragma once
#include <sys/event.h>
namespace NActors {
class TKqueueThread : public TPollerThreadBase<TKqueueThread> {
// KQueue file descriptor
int KqDescriptor;
void SafeKevent(const struct kevent* ev, int size) {
int rc;
do {
rc = kevent(KqDescriptor, ev, size, nullptr, 0, nullptr);
} while (rc == -1 && errno == EINTR);
Y_ABORT_UNLESS(rc != -1, "kevent() failed with %s", strerror(errno));
}
public:
TKqueueThread(TActorSystem *actorSystem)
: TPollerThreadBase(actorSystem)
{
// create kqueue
KqDescriptor = kqueue();
Y_ABORT_UNLESS(KqDescriptor != -1, "kqueue() failed with %s", strerror(errno));
// set close-on-exit flag
{
int flags = fcntl(KqDescriptor, F_GETFD);
Y_ABORT_UNLESS(flags >= 0, "fcntl(F_GETFD) failed with %s", strerror(errno));
int rc = fcntl(KqDescriptor, F_SETFD, flags | FD_CLOEXEC);
Y_ABORT_UNLESS(rc != -1, "fcntl(F_SETFD, +FD_CLOEXEC) failed with %s", strerror(errno));
}
// register pipe's read end in poller
struct kevent ev;
EV_SET(&ev, (int)ReadEnd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, nullptr);
SafeKevent(&ev, 1);
ISimpleThread::Start(); // start poller thread
}
~TKqueueThread() {
Stop();
close(KqDescriptor);
}
bool ProcessEventsInLoop() {
std::array<struct kevent, 256> events;
int numReady = kevent(KqDescriptor, nullptr, 0, events.data(), events.size(), nullptr);
if (numReady == -1) {
if (errno == EINTR) {
return false;
} else {
Y_ABORT("kevent() failed with %s", strerror(errno));
}
}
bool res = false;
for (int i = 0; i < numReady; ++i) {
const struct kevent& ev = events[i];
if (ev.udata) {
TSocketRecord *it = static_cast<TSocketRecord*>(ev.udata);
const bool error = ev.flags & (EV_EOF | EV_ERROR);
const bool read = error || ev.filter == EVFILT_READ;
const bool write = error || ev.filter == EVFILT_WRITE;
Notify(it, read, write);
} else {
res = true;
}
}
return res;
}
void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) {
struct kevent ev[2];
const int fd = socket->GetDescriptor();
EV_SET(&ev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
EV_SET(&ev[1], fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
SafeKevent(ev, 2);
}
void RegisterSocket(const TIntrusivePtr<TSocketRecord>& record) {
int flags = EV_ADD | EV_CLEAR | EV_ENABLE;
struct kevent ev[2];
const int fd = record->Socket->GetDescriptor();
EV_SET(&ev[0], fd, EVFILT_READ, flags, 0, 0, record.Get());
EV_SET(&ev[1], fd, EVFILT_WRITE, flags, 0, 0, record.Get());
SafeKevent(ev, 2);
}
bool Request(const TIntrusivePtr<TSocketRecord>& /*socket*/, bool /*read*/, bool /*write*/, bool /*suppressNotify*/,
bool /*afterWouldBlock*/) {
return false; // no special processing here as we use kqueue in edge-triggered mode
}
};
using TPollerThread = TKqueueThread;
}
|