aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/poller_tcp_unit_epoll.cpp
blob: c0c4524f1ece99a6fc49f9b20f4806c3bbeab999 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#include "poller_tcp_unit_epoll.h"
#if !defined(_win_) && !defined(_darwin_)
#include <unistd.h>
#include <sys/epoll.h>

#include <csignal>
#include <cerrno>
#include <cstring>

namespace NInterconnect {
    namespace { 
        void 
        DeleteEpoll(int epoll, SOCKET stream) { 
            ::epoll_event event = {0, {.fd = stream}}; 
            if (::epoll_ctl(epoll, EPOLL_CTL_DEL, stream, &event)) { 
                Cerr << "epoll_ctl errno: " << errno << Endl; 
                Y_FAIL("epoll delete error!"); 
            } 
        } 

        template <ui32 Events> 
        void 
        AddEpoll(int epoll, SOCKET stream) { 
            ::epoll_event event = {.events = Events};
            event.data.fd = stream; 
            if (::epoll_ctl(epoll, EPOLL_CTL_ADD, stream, &event)) { 
                Cerr << "epoll_ctl errno: " << errno << Endl; 
                Y_FAIL("epoll add error!"); 
            } 
        } 

        int 
        Initialize() { 
            const auto epoll = ::epoll_create(10000); 
            Y_VERIFY_DEBUG(epoll > 0); 
            return epoll; 
        } 
 
    }

    TPollerUnitEpoll::TPollerUnitEpoll() 
        : ReadDescriptor(Initialize()) 
        , WriteDescriptor(Initialize()) 
    { 
        // Block on the epoll descriptor. 
        ::sigemptyset(&sigmask); 
        ::sigaddset(&sigmask, SIGPIPE); 
        ::sigaddset(&sigmask, SIGTERM); 
    }

    TPollerUnitEpoll::~TPollerUnitEpoll() { 
        ::close(ReadDescriptor); 
        ::close(WriteDescriptor); 
    } 

    template <> 
    int TPollerUnitEpoll::GetDescriptor<false>() const { 
        return ReadDescriptor; 
    } 

    template <> 
    int TPollerUnitEpoll::GetDescriptor<true>() const { 
        return WriteDescriptor; 
    } 

    void 
    TPollerUnitEpoll::StartReadOperation( 
        const TIntrusivePtr<TSharedDescriptor>& s,
        TFDDelegate&& operation) { 
        TPollerUnit::StartReadOperation(s, std::move(operation)); 
        AddEpoll<EPOLLRDHUP | EPOLLIN>(ReadDescriptor, s->GetDescriptor()); 
    } 

    void 
    TPollerUnitEpoll::StartWriteOperation( 
        const TIntrusivePtr<TSharedDescriptor>& s,
        TFDDelegate&& operation) { 
        TPollerUnit::StartWriteOperation(s, std::move(operation)); 
        AddEpoll<EPOLLRDHUP | EPOLLOUT>(WriteDescriptor, s->GetDescriptor()); 
    } 

    constexpr int EVENTS_BUF_SIZE = 128; 

    template <bool WriteOp> 
    void 
    TPollerUnitEpoll::Process() { 
        ::epoll_event events[EVENTS_BUF_SIZE]; 

        const int epoll = GetDescriptor<WriteOp>(); 

        /* Timeout just to check StopFlag sometimes */ 
        const int result = 
            ::epoll_pwait(epoll, events, EVENTS_BUF_SIZE, 200, &sigmask); 

        if (result == -1 && errno != EINTR) 
            Y_FAIL("epoll wait error!"); 

        auto& side = GetSide<WriteOp>(); 
        side.ProcessInput(); 

        for (int i = 0; i < result; ++i) { 
            const auto it = side.Operations.find(events[i].data.fd); 
            if (side.Operations.end() == it) 
                continue; 
            if (const auto& finalizer = it->second.second(it->second.first)) { 
                DeleteEpoll(epoll, it->first); 
                side.Operations.erase(it); 
                finalizer(); 
            } 
        }
    }

    void 
    TPollerUnitEpoll::ProcessRead() { 
        Process<false>(); 
    } 

    void 
    TPollerUnitEpoll::ProcessWrite() { 
        Process<true>(); 
    } 

}

#endif