aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/poller_tcp_unit_epoll.cpp
blob: c78538b95bd193a9da6f372b48a0d19b3cf00494 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#include "poller_tcp_unit_epoll.h"
#if !defined(_win_) && !defined(_darwin_)
#include <unistd.h>
#include <sys/epoll.h>

#include <csignal>
#include <cerrno>
#include <cstring>

namespace NInterconnect {
    namespace {
        void
        DeleteEpoll(int epoll, SOCKET stream) {
            ::epoll_event event = {0, {.fd = stream}};
            if (::epoll_ctl(epoll, EPOLL_CTL_DEL, stream, &event)) {
                Cerr << "epoll_ctl errno: " << errno << Endl;
                Y_FAIL("epoll delete error!");
            }
        }

        template <ui32 Events>
        void
        AddEpoll(int epoll, SOCKET stream) {
            ::epoll_event event = {.events = Events};
            event.data.fd = stream;
            if (::epoll_ctl(epoll, EPOLL_CTL_ADD, stream, &event)) {
                Cerr << "epoll_ctl errno: " << errno << Endl;
                Y_FAIL("epoll add error!");
            }
        }

        int
        Initialize() {
            const auto epoll = ::epoll_create(10000);
            Y_VERIFY_DEBUG(epoll > 0);
            return epoll;
        }

    }

    TPollerUnitEpoll::TPollerUnitEpoll()
        : ReadDescriptor(Initialize())
        , WriteDescriptor(Initialize())
    {
        // Block on the epoll descriptor.
        ::sigemptyset(&sigmask);
        ::sigaddset(&sigmask, SIGPIPE);
        ::sigaddset(&sigmask, SIGTERM);
    }

    TPollerUnitEpoll::~TPollerUnitEpoll() {
        ::close(ReadDescriptor);
        ::close(WriteDescriptor);
    }

    template <>
    int TPollerUnitEpoll::GetDescriptor<false>() const {
        return ReadDescriptor;
    }

    template <>
    int TPollerUnitEpoll::GetDescriptor<true>() const {
        return WriteDescriptor;
    }

    void
    TPollerUnitEpoll::StartReadOperation(
        const TIntrusivePtr<TSharedDescriptor>& s,
        TFDDelegate&& operation) {
        TPollerUnit::StartReadOperation(s, std::move(operation));
        AddEpoll<EPOLLRDHUP | EPOLLIN>(ReadDescriptor, s->GetDescriptor());
    }

    void
    TPollerUnitEpoll::StartWriteOperation(
        const TIntrusivePtr<TSharedDescriptor>& s,
        TFDDelegate&& operation) {
        TPollerUnit::StartWriteOperation(s, std::move(operation));
        AddEpoll<EPOLLRDHUP | EPOLLOUT>(WriteDescriptor, s->GetDescriptor());
    }

    constexpr int EVENTS_BUF_SIZE = 128;

    template <bool WriteOp>
    void
    TPollerUnitEpoll::Process() {
        ::epoll_event events[EVENTS_BUF_SIZE];

        const int epoll = GetDescriptor<WriteOp>();

        /* Timeout just to check StopFlag sometimes */
        const int result =
            ::epoll_pwait(epoll, events, EVENTS_BUF_SIZE, 200, &sigmask);

        if (result == -1 && errno != EINTR)
            Y_FAIL("epoll wait error!");

        auto& side = GetSide<WriteOp>();
        side.ProcessInput();

        for (int i = 0; i < result; ++i) {
            const auto it = side.Operations.find(events[i].data.fd);
            if (side.Operations.end() == it)
                continue;
            if (const auto& finalizer = it->second.second(it->second.first)) {
                DeleteEpoll(epoll, it->first);
                side.Operations.erase(it);
                finalizer();
            }
        }
    }

    void
    TPollerUnitEpoll::ProcessRead() {
        Process<false>();
    }

    void
    TPollerUnitEpoll::ProcessWrite() {
        Process<true>();
    }

}

#endif