aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/poller_actor_linux.h
blob: 183135d6b07fe8af6d474bcef9b30f73b1d5b6d1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#pragma once

#include <sys/epoll.h>

namespace NActors {

    enum {
        ReadExpected = 1,
        ReadHit = 2,
        WriteExpected = 4,
        WriteHit = 8,
    };

    class TEpollThread : public TPollerThreadBase<TEpollThread> {
        // epoll file descriptor
        int EpollDescriptor;

    public:
        TEpollThread(TActorSystem *actorSystem)
            : TPollerThreadBase(actorSystem)
        {
            EpollDescriptor = epoll_create1(EPOLL_CLOEXEC);
            Y_ABORT_UNLESS(EpollDescriptor != -1, "epoll_create1() failed with %s", strerror(errno));

            epoll_event event;
            event.data.ptr = nullptr;
            event.events = EPOLLIN;
            if (epoll_ctl(EpollDescriptor, EPOLL_CTL_ADD, ReadEnd, &event) == -1) {
                Y_ABORT("epoll_ctl(EPOLL_CTL_ADD) failed with %s", strerror(errno));
            }

            ISimpleThread::Start(); // start poller thread
        }

        ~TEpollThread() {
            Stop();
            close(EpollDescriptor);
        }

        bool ProcessEventsInLoop() {
            // preallocated array for events
            std::array<epoll_event, 256> events;

            // wait indefinitely for event to arrive
            LWPROBE(EpollStartWaitIn);
            int numReady = epoll_wait(EpollDescriptor, events.data(), events.size(), -1);
            LWPROBE(EpollFinishWaitIn, numReady);

            // check return status for any errors
            if (numReady == -1) {
                if (errno == EINTR) {
                    return false; // restart the call a bit later
                } else {
                    Y_ABORT("epoll_wait() failed with %s", strerror(errno));
                }
            }

            bool res = false;

            for (int i = 0; i < numReady; ++i) {
                const epoll_event& ev = events[i];
                if (auto *record = static_cast<TSocketRecord*>(ev.data.ptr)) {
                    const bool read = ev.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR);
                    const bool write = ev.events & (EPOLLOUT | EPOLLERR);
                    UpdateFlags(record, (read ? ReadHit : 0) | (write ? WriteHit : 0), false);
                } else {
                    res = true;
                }
            }

            return res;
        }

        bool UpdateFlags(TSocketRecord *record, ui32 addMask, bool suppressNotify) {
            ui32 flags = record->Flags.load(std::memory_order_acquire);
            for (;;) {
                ui32 updated = flags | addMask;
                static constexpr ui32 fullRead = ReadExpected | ReadHit;
                static constexpr ui32 fullWrite = WriteExpected | WriteHit;
                const bool read = (updated & fullRead) == fullRead;
                const bool write = (updated & fullWrite) == fullWrite;
                updated &= ~((read ? fullRead : 0) | (write ? fullWrite : 0));
                if (record->Flags.compare_exchange_weak(flags, updated, std::memory_order_acq_rel)) {
                    if (suppressNotify) {
                        return read || write;
                    } else {
                        Notify(record, read, write);
                        return false;
                    }
                }
            }
        }

        void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) {
            if (epoll_ctl(EpollDescriptor, EPOLL_CTL_DEL, socket->GetDescriptor(), nullptr) == -1) {
                Y_ABORT("epoll_ctl(EPOLL_CTL_DEL) failed with %s", strerror(errno));
            }
        }

        void RegisterSocket(const TIntrusivePtr<TSocketRecord>& record) {
            epoll_event event;
            event.events = EPOLLET | EPOLLRDHUP | EPOLLIN | EPOLLOUT;
            event.data.ptr = record.Get();
            if (epoll_ctl(EpollDescriptor, EPOLL_CTL_ADD, record->Socket->GetDescriptor(), &event) == -1) {
                Y_ABORT("epoll_ctl(EPOLL_CTL_ADD) failed with %s", strerror(errno));
            }
        }

        bool Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write, bool suppressNotify) {
            return UpdateFlags(record.Get(), (read ? ReadExpected : 0) | (write ? WriteExpected : 0), suppressNotify);
        }
    };

    using TPollerThread = TEpollThread;

} // namespace NActors