aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/poller_actor_linux.h
blob: 8831d2062b8b36baca815fb84d5857cbf8d3672a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#pragma once

#include <sys/epoll.h>

namespace NActors {

    class TEpollThread : public TPollerThreadBase<TEpollThread> {
        // epoll file descriptor
        int EpollDescriptor;

    public:
        TEpollThread(TActorSystem *actorSystem)
            : TPollerThreadBase(actorSystem)
        {
            EpollDescriptor = epoll_create1(EPOLL_CLOEXEC);
            Y_ABORT_UNLESS(EpollDescriptor != -1, "epoll_create1() failed with %s", strerror(errno));

            epoll_event event;
            event.data.ptr = nullptr;
            event.events = EPOLLIN;
            if (epoll_ctl(EpollDescriptor, EPOLL_CTL_ADD, ReadEnd, &event) == -1) {
                Y_ABORT("epoll_ctl(EPOLL_CTL_ADD) failed with %s", strerror(errno));
            }

            ISimpleThread::Start(); // start poller thread
        }

        ~TEpollThread() {
            Stop();
            close(EpollDescriptor);
        }

        bool ProcessEventsInLoop() {
            // preallocated array for events
            std::array<epoll_event, 256> events;

            // wait indefinitely for event to arrive
            LWPROBE(EpollStartWaitIn);
            int numReady = epoll_wait(EpollDescriptor, events.data(), events.size(), -1);
            LWPROBE(EpollFinishWaitIn, numReady);

            // check return status for any errors
            if (numReady == -1) {
                if (errno == EINTR) {
                    return false; // restart the call a bit later
                } else {
                    Y_ABORT("epoll_wait() failed with %s", strerror(errno));
                }
            }

            bool res = false;

            for (int i = 0; i < numReady; ++i) {
                const epoll_event& ev = events[i];
                if (auto *record = static_cast<TSocketRecord*>(ev.data.ptr)) {
                    const bool read = ev.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR);
                    const bool write = ev.events & (EPOLLOUT | EPOLLERR);

                    // remove hit flags from the bit set
                    ui32 flags = record->Flags;
                    const ui32 remove = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0);
                    while (!record->Flags.compare_exchange_weak(flags, flags & ~remove))
                    {}
                    flags &= ~remove;

                    // rearm poller if some flags remain
                    if (flags) {
                        epoll_event event;
                        event.events = EPOLLONESHOT | EPOLLRDHUP | flags;
                        event.data.ptr = record;
                        if (epoll_ctl(EpollDescriptor, EPOLL_CTL_MOD, record->Socket->GetDescriptor(), &event) == -1) {
                            Y_ABORT("epoll_ctl(EPOLL_CTL_MOD) failed with %s", strerror(errno));
                        }
                    }

                    // issue notifications
                    Notify(record, read, write);
                } else {
                    res = true;
                }
            }

            return res;
        }

        void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) {
            if (epoll_ctl(EpollDescriptor, EPOLL_CTL_DEL, socket->GetDescriptor(), nullptr) == -1) {
                Y_ABORT("epoll_ctl(EPOLL_CTL_DEL) failed with %s", strerror(errno));
            }
        }

        void RegisterSocket(const TIntrusivePtr<TSocketRecord>& record) {
            epoll_event event;
            event.events = EPOLLONESHOT | EPOLLRDHUP;
            event.data.ptr = record.Get();
            if (epoll_ctl(EpollDescriptor, EPOLL_CTL_ADD, record->Socket->GetDescriptor(), &event) == -1) {
                Y_ABORT("epoll_ctl(EPOLL_CTL_ADD) failed with %s", strerror(errno));
            }
        }

        void Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write) {
            const ui32 add = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0);
            ui32 flags = record->Flags;
            while (!record->Flags.compare_exchange_weak(flags, flags | add))
            {}
            flags |= add;
            if (flags) {
                epoll_event event;
                event.events = EPOLLONESHOT | EPOLLRDHUP | flags;
                event.data.ptr = record.Get();
                if (epoll_ctl(EpollDescriptor, EPOLL_CTL_MOD, record->Socket->GetDescriptor(), &event) == -1) {
                    Y_ABORT("epoll_ctl(EPOLL_CTL_MOD) failed with %s", strerror(errno));
                }
            }
        }
    };

    using TPollerThread = TEpollThread;

} // namespace NActors