#include "poller_tcp_unit_epoll.h" #if !defined(_win_) && !defined(_darwin_) #include <unistd.h> #include <sys/epoll.h> #include <csignal> #include <cerrno> namespace NInterconnect { namespace { void DeleteEpoll(int epoll, SOCKET stream) { ::epoll_event event = {0, {.fd = stream}}; if (::epoll_ctl(epoll, EPOLL_CTL_DEL, stream, &event)) { Cerr << "epoll_ctl errno: " << errno << Endl; Y_FAIL("epoll delete error!"); } } template <ui32 Events> void AddEpoll(int epoll, SOCKET stream) { ::epoll_event event = {.events = Events}; event.data.fd = stream; if (::epoll_ctl(epoll, EPOLL_CTL_ADD, stream, &event)) { Cerr << "epoll_ctl errno: " << errno << Endl; Y_FAIL("epoll add error!"); } } int Initialize() { const auto epoll = ::epoll_create(10000); Y_VERIFY_DEBUG(epoll > 0); return epoll; } } TPollerUnitEpoll::TPollerUnitEpoll() : ReadDescriptor(Initialize()) , WriteDescriptor(Initialize()) { // Block on the epoll descriptor. ::sigemptyset(&sigmask); ::sigaddset(&sigmask, SIGPIPE); ::sigaddset(&sigmask, SIGTERM); } TPollerUnitEpoll::~TPollerUnitEpoll() { ::close(ReadDescriptor); ::close(WriteDescriptor); } template <> int TPollerUnitEpoll::GetDescriptor<false>() const { return ReadDescriptor; } template <> int TPollerUnitEpoll::GetDescriptor<true>() const { return WriteDescriptor; } void TPollerUnitEpoll::StartReadOperation( const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) { TPollerUnit::StartReadOperation(s, std::move(operation)); AddEpoll<EPOLLRDHUP | EPOLLIN>(ReadDescriptor, s->GetDescriptor()); } void TPollerUnitEpoll::StartWriteOperation( const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) { TPollerUnit::StartWriteOperation(s, std::move(operation)); AddEpoll<EPOLLRDHUP | EPOLLOUT>(WriteDescriptor, s->GetDescriptor()); } constexpr int EVENTS_BUF_SIZE = 128; template <bool WriteOp> void TPollerUnitEpoll::Process() { ::epoll_event events[EVENTS_BUF_SIZE]; const int epoll = GetDescriptor<WriteOp>(); /* Timeout just to check StopFlag sometimes */ const int result = ::epoll_pwait(epoll, events, EVENTS_BUF_SIZE, 200, &sigmask); if (result == -1 && errno != EINTR) Y_FAIL("epoll wait error!"); auto& side = GetSide<WriteOp>(); side.ProcessInput(); for (int i = 0; i < result; ++i) { const auto it = side.Operations.find(events[i].data.fd); if (side.Operations.end() == it) continue; if (const auto& finalizer = it->second.second(it->second.first)) { DeleteEpoll(epoll, it->first); side.Operations.erase(it); finalizer(); } } } void TPollerUnitEpoll::ProcessRead() { Process<false>(); } void TPollerUnitEpoll::ProcessWrite() { Process<true>(); } } #endif