summaryrefslogtreecommitdiffstats
path: root/util/network/pollerimpl.h
diff options
context:
space:
mode:
authorDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/network/pollerimpl.h
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/network/pollerimpl.h')
-rw-r--r--util/network/pollerimpl.h706
1 files changed, 706 insertions, 0 deletions
diff --git a/util/network/pollerimpl.h b/util/network/pollerimpl.h
new file mode 100644
index 00000000000..e8c7e40fbaf
--- /dev/null
+++ b/util/network/pollerimpl.h
@@ -0,0 +1,706 @@
+#pragma once
+
+#include "socket.h"
+
+#include <util/system/error.h>
+#include <util/system/mutex.h>
+#include <util/system/defaults.h>
+#include <util/generic/ylimits.h>
+#include <util/generic/utility.h>
+#include <util/generic/vector.h>
+#include <util/generic/yexception.h>
+#include <util/datetime/base.h>
+
+#if defined(_freebsd_) || defined(_darwin_)
+ #define HAVE_KQUEUE_POLLER
+#endif
+
+#if (defined(_linux_) && !defined(_bionic_)) || (__ANDROID_API__ >= 21)
+ #define HAVE_EPOLL_POLLER
+#endif
+
+//now we always have it
+#define HAVE_SELECT_POLLER
+
+#if defined(HAVE_KQUEUE_POLLER)
+ #include <sys/event.h>
+#endif
+
+#if defined(HAVE_EPOLL_POLLER)
+ #include <sys/epoll.h>
+#endif
+
+enum EContPoll {
+ CONT_POLL_READ = 1,
+ CONT_POLL_WRITE = 2,
+ CONT_POLL_RDHUP = 4,
+ CONT_POLL_ONE_SHOT = 8, // Disable after first event
+ CONT_POLL_MODIFY = 16, // Modify already added event
+ CONT_POLL_EDGE_TRIGGERED = 32, // Notify only about new events
+ CONT_POLL_BACKLOG_EMPTY = 64, // Backlog is empty (seen end of request, EAGAIN or truncated read)
+};
+
+static inline bool IsSocket(SOCKET fd) noexcept {
+ int val = 0;
+ socklen_t len = sizeof(val);
+
+ if (getsockopt(fd, SOL_SOCKET, SO_TYPE, (char*)&val, &len) == 0) {
+ return true;
+ }
+
+ return LastSystemError() != ENOTSOCK;
+}
+
+static inline int MicroToMilli(int timeout) noexcept {
+ if (timeout) {
+ /*
+ * 1. API of epoll syscall allows to specify timeout with millisecond
+ * accuracy only
+ * 2. It is quite complicated to guarantee time resolution of blocking
+ * syscall less than kernel 1/HZ
+ *
+ * Without this rounding we just waste cpu time and do a lot of
+ * fast epoll_wait(..., 0) syscalls.
+ */
+ return Max(timeout / 1000, 1);
+ }
+
+ return 0;
+}
+
+struct TWithoutLocking {
+ using TMyMutex = TFakeMutex;
+};
+
+#if defined(HAVE_KQUEUE_POLLER)
+static inline int Kevent(int kq, struct kevent* changelist, int nchanges,
+ struct kevent* eventlist, int nevents, const struct timespec* timeout) noexcept {
+ int ret;
+
+ do {
+ ret = kevent(kq, changelist, nchanges, eventlist, nevents, timeout);
+ } while (ret == -1 && errno == EINTR);
+
+ return ret;
+}
+
+template <class TLockPolicy>
+class TKqueuePoller {
+public:
+ typedef struct ::kevent TEvent;
+
+ inline TKqueuePoller()
+ : Fd_(kqueue())
+ {
+ if (Fd_ == -1) {
+ ythrow TSystemError() << "kqueue failed";
+ }
+ }
+
+ inline ~TKqueuePoller() {
+ close(Fd_);
+ }
+
+ inline int Fd() const noexcept {
+ return Fd_;
+ }
+
+ inline void SetImpl(void* data, int fd, int what) {
+ TEvent e[2];
+ int flags = EV_ADD;
+
+ if (what & CONT_POLL_EDGE_TRIGGERED) {
+ if (what & CONT_POLL_BACKLOG_EMPTY) {
+ // When backlog is empty, edge-triggered does not need restart.
+ return;
+ }
+ flags |= EV_CLEAR;
+ }
+
+ if (what & CONT_POLL_ONE_SHOT) {
+ flags |= EV_ONESHOT;
+ }
+
+ Zero(e);
+
+ EV_SET(e + 0, fd, EVFILT_READ, flags | ((what & CONT_POLL_READ) ? EV_ENABLE : EV_DISABLE), 0, 0, data);
+ EV_SET(e + 1, fd, EVFILT_WRITE, flags | ((what & CONT_POLL_WRITE) ? EV_ENABLE : EV_DISABLE), 0, 0, data);
+
+ if (Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1) {
+ ythrow TSystemError() << "kevent add failed";
+ }
+ }
+
+ inline void Remove(int fd) noexcept {
+ TEvent e[2];
+
+ Zero(e);
+
+ EV_SET(e + 0, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
+ EV_SET(e + 1, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
+
+ Y_VERIFY(!(Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1 && errno != ENOENT), "kevent remove failed: %s", LastSystemErrorText());
+ }
+
+ inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
+ struct timespec ts;
+
+ ts.tv_sec = timeout / 1000000;
+ ts.tv_nsec = (timeout % 1000000) * 1000;
+
+ const int ret = Kevent(Fd_, nullptr, 0, events, len, &ts);
+
+ Y_VERIFY(ret >= 0, "kevent failed: %s", LastSystemErrorText());
+
+ return (size_t)ret;
+ }
+
+ static inline void* ExtractEvent(const TEvent* event) noexcept {
+ return event->udata;
+ }
+
+ static inline int ExtractStatus(const TEvent* event) noexcept {
+ if (event->flags & EV_ERROR) {
+ return EIO;
+ }
+
+ return event->fflags;
+ }
+
+ static inline int ExtractFilterImpl(const TEvent* event) noexcept {
+ if (event->filter == EVFILT_READ) {
+ return CONT_POLL_READ;
+ }
+
+ if (event->filter == EVFILT_WRITE) {
+ return CONT_POLL_WRITE;
+ }
+
+ if (event->flags & EV_EOF) {
+ return CONT_POLL_READ | CONT_POLL_WRITE;
+ }
+
+ return 0;
+ }
+
+private:
+ int Fd_;
+};
+#endif
+
+#if defined(HAVE_EPOLL_POLLER)
+static inline int ContEpollWait(int epfd, struct epoll_event* events, int maxevents, int timeout) noexcept {
+ int ret;
+
+ do {
+ ret = epoll_wait(epfd, events, maxevents, Min<int>(timeout, 35 * 60 * 1000));
+ } while (ret == -1 && errno == EINTR);
+
+ return ret;
+}
+
+template <class TLockPolicy>
+class TEpollPoller {
+public:
+ typedef struct ::epoll_event TEvent;
+
+ inline TEpollPoller(bool closeOnExec = false)
+ : Fd_(epoll_create1(closeOnExec ? EPOLL_CLOEXEC : 0))
+ {
+ if (Fd_ == -1) {
+ ythrow TSystemError() << "epoll_create failed";
+ }
+ }
+
+ inline ~TEpollPoller() {
+ close(Fd_);
+ }
+
+ inline int Fd() const noexcept {
+ return Fd_;
+ }
+
+ inline void SetImpl(void* data, int fd, int what) {
+ TEvent e;
+
+ Zero(e);
+
+ if (what & CONT_POLL_EDGE_TRIGGERED) {
+ if (what & CONT_POLL_BACKLOG_EMPTY) {
+ // When backlog is empty, edge-triggered does not need restart.
+ return;
+ }
+ e.events |= EPOLLET;
+ }
+
+ if (what & CONT_POLL_ONE_SHOT) {
+ e.events |= EPOLLONESHOT;
+ }
+
+ if (what & CONT_POLL_READ) {
+ e.events |= EPOLLIN;
+ }
+
+ if (what & CONT_POLL_WRITE) {
+ e.events |= EPOLLOUT;
+ }
+
+ if (what & CONT_POLL_RDHUP) {
+ e.events |= EPOLLRDHUP;
+ }
+
+ e.data.ptr = data;
+
+ if ((what & CONT_POLL_MODIFY) || epoll_ctl(Fd_, EPOLL_CTL_ADD, fd, &e) == -1) {
+ if (epoll_ctl(Fd_, EPOLL_CTL_MOD, fd, &e) == -1) {
+ ythrow TSystemError() << "epoll add failed";
+ }
+ }
+ }
+
+ inline void Remove(int fd) noexcept {
+ TEvent e;
+
+ Zero(e);
+
+ epoll_ctl(Fd_, EPOLL_CTL_DEL, fd, &e);
+ }
+
+ inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
+ const int ret = ContEpollWait(Fd_, events, len, MicroToMilli(timeout));
+
+ Y_VERIFY(ret >= 0, "epoll wait error: %s", LastSystemErrorText());
+
+ return (size_t)ret;
+ }
+
+ static inline void* ExtractEvent(const TEvent* event) noexcept {
+ return event->data.ptr;
+ }
+
+ static inline int ExtractStatus(const TEvent* event) noexcept {
+ if (event->events & (EPOLLERR | EPOLLHUP)) {
+ return EIO;
+ }
+
+ return 0;
+ }
+
+ static inline int ExtractFilterImpl(const TEvent* event) noexcept {
+ int ret = 0;
+
+ if (event->events & EPOLLIN) {
+ ret |= CONT_POLL_READ;
+ }
+
+ if (event->events & EPOLLOUT) {
+ ret |= CONT_POLL_WRITE;
+ }
+
+ if (event->events & EPOLLRDHUP) {
+ ret |= CONT_POLL_RDHUP;
+ }
+
+ return ret;
+ }
+
+private:
+ int Fd_;
+};
+#endif
+
+#if defined(HAVE_SELECT_POLLER)
+ #include <util/memory/tempbuf.h>
+ #include <util/generic/hash.h>
+
+ #include "pair.h"
+
+static inline int ContSelect(int n, fd_set* r, fd_set* w, fd_set* e, struct timeval* t) noexcept {
+ int ret;
+
+ do {
+ ret = select(n, r, w, e, t);
+ } while (ret == -1 && errno == EINTR);
+
+ return ret;
+}
+
+struct TSelectPollerNoTemplate {
+ struct THandle {
+ void* Data_;
+ int Filter_;
+
+ inline THandle()
+ : Data_(nullptr)
+ , Filter_(0)
+ {
+ }
+
+ inline void* Data() const noexcept {
+ return Data_;
+ }
+
+ inline void Set(void* d, int s) noexcept {
+ Data_ = d;
+ Filter_ = s;
+ }
+
+ inline void Clear(int c) noexcept {
+ Filter_ &= ~c;
+ }
+
+ inline int Filter() const noexcept {
+ return Filter_;
+ }
+ };
+
+ class TFds: public THashMap<SOCKET, THandle> {
+ public:
+ inline void Set(SOCKET fd, void* data, int filter) {
+ (*this)[fd].Set(data, filter);
+ }
+
+ inline void Remove(SOCKET fd) {
+ erase(fd);
+ }
+
+ inline SOCKET Build(fd_set* r, fd_set* w, fd_set* e) const noexcept {
+ SOCKET ret = 0;
+
+ for (const auto& it : *this) {
+ const SOCKET fd = it.first;
+ const THandle& handle = it.second;
+
+ FD_SET(fd, e);
+
+ if (handle.Filter() & CONT_POLL_READ) {
+ FD_SET(fd, r);
+ }
+
+ if (handle.Filter() & CONT_POLL_WRITE) {
+ FD_SET(fd, w);
+ }
+
+ if (fd > ret) {
+ ret = fd;
+ }
+ }
+
+ return ret;
+ }
+ };
+
+ struct TEvent: public THandle {
+ inline int Status() const noexcept {
+ return -Min(Filter(), 0);
+ }
+
+ inline void Error(void* d, int err) noexcept {
+ Set(d, -err);
+ }
+
+ inline void Success(void* d, int what) noexcept {
+ Set(d, what);
+ }
+ };
+};
+
+template <class TLockPolicy>
+class TSelectPoller: public TSelectPollerNoTemplate {
+ using TMyMutex = typename TLockPolicy::TMyMutex;
+
+public:
+ inline TSelectPoller()
+ : Begin_(nullptr)
+ , End_(nullptr)
+ {
+ SocketPair(Signal_);
+ SetNonBlock(WaitSock());
+ SetNonBlock(SigSock());
+ }
+
+ inline ~TSelectPoller() {
+ closesocket(Signal_[0]);
+ closesocket(Signal_[1]);
+ }
+
+ inline void SetImpl(void* data, SOCKET fd, int what) {
+ with_lock (CommandLock_) {
+ Commands_.push_back(TCommand(fd, what, data));
+ }
+
+ Signal();
+ }
+
+ inline void Remove(SOCKET fd) noexcept {
+ with_lock (CommandLock_) {
+ Commands_.push_back(TCommand(fd, 0));
+ }
+
+ Signal();
+ }
+
+ inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
+ auto guard = Guard(Lock_);
+
+ do {
+ if (Begin_ != End_) {
+ const size_t ret = Min<size_t>(End_ - Begin_, len);
+
+ memcpy(events, Begin_, sizeof(*events) * ret);
+ Begin_ += ret;
+
+ return ret;
+ }
+
+ if (len >= EventNumberHint()) {
+ return WaitBase(events, len, timeout);
+ }
+
+ Begin_ = SavedEvents();
+ End_ = Begin_ + WaitBase(Begin_, EventNumberHint(), timeout);
+ } while (Begin_ != End_);
+
+ return 0;
+ }
+
+ inline TEvent* SavedEvents() {
+ if (!SavedEvents_) {
+ SavedEvents_.Reset(new TEvent[EventNumberHint()]);
+ }
+
+ return SavedEvents_.Get();
+ }
+
+ inline size_t WaitBase(TEvent* events, size_t len, int timeout) noexcept {
+ with_lock (CommandLock_) {
+ for (auto command = Commands_.begin(); command != Commands_.end(); ++command) {
+ if (command->Filter_ != 0) {
+ Fds_.Set(command->Fd_, command->Cookie_, command->Filter_);
+ } else {
+ Fds_.Remove(command->Fd_);
+ }
+ }
+
+ Commands_.clear();
+ }
+
+ TTempBuf tmpBuf(3 * sizeof(fd_set) + Fds_.size() * sizeof(SOCKET));
+
+ fd_set* in = (fd_set*)tmpBuf.Data();
+ fd_set* out = &in[1];
+ fd_set* errFds = &in[2];
+
+ SOCKET* keysToDeleteBegin = (SOCKET*)&in[3];
+ SOCKET* keysToDeleteEnd = keysToDeleteBegin;
+
+ #if defined(_msan_enabled_) // msan doesn't handle FD_ZERO and cause false positive BALANCER-1347
+ memset(in, 0, sizeof(*in));
+ memset(out, 0, sizeof(*out));
+ memset(errFds, 0, sizeof(*errFds));
+ #endif
+
+ FD_ZERO(in);
+ FD_ZERO(out);
+ FD_ZERO(errFds);
+
+ FD_SET(WaitSock(), in);
+
+ const SOCKET maxFdNum = Max(Fds_.Build(in, out, errFds), WaitSock());
+ struct timeval tout;
+
+ tout.tv_sec = timeout / 1000000;
+ tout.tv_usec = timeout % 1000000;
+
+ int ret = ContSelect(int(maxFdNum + 1), in, out, errFds, &tout);
+
+ if (ret > 0 && FD_ISSET(WaitSock(), in)) {
+ --ret;
+ TryWait();
+ }
+
+ Y_VERIFY(ret >= 0 && (size_t)ret <= len, "select error: %s", LastSystemErrorText());
+
+ TEvent* eventsStart = events;
+
+ for (typename TFds::iterator it = Fds_.begin(); it != Fds_.end(); ++it) {
+ const SOCKET fd = it->first;
+ THandle& handle = it->second;
+
+ if (FD_ISSET(fd, errFds)) {
+ (events++)->Error(handle.Data(), EIO);
+
+ if (handle.Filter() & CONT_POLL_ONE_SHOT) {
+ *keysToDeleteEnd = fd;
+ ++keysToDeleteEnd;
+ }
+
+ } else {
+ int what = 0;
+
+ if (FD_ISSET(fd, in)) {
+ what |= CONT_POLL_READ;
+ }
+
+ if (FD_ISSET(fd, out)) {
+ what |= CONT_POLL_WRITE;
+ }
+
+ if (what) {
+ (events++)->Success(handle.Data(), what);
+
+ if (handle.Filter() & CONT_POLL_ONE_SHOT) {
+ *keysToDeleteEnd = fd;
+ ++keysToDeleteEnd;
+ }
+
+ if (handle.Filter() & CONT_POLL_EDGE_TRIGGERED) {
+ // Emulate edge-triggered for level-triggered select().
+ // User must restart waiting this event when needed.
+ handle.Clear(what);
+ }
+ }
+ }
+ }
+
+ while (keysToDeleteBegin != keysToDeleteEnd) {
+ Fds_.erase(*keysToDeleteBegin);
+ ++keysToDeleteBegin;
+ }
+
+ return events - eventsStart;
+ }
+
+ inline size_t EventNumberHint() const noexcept {
+ return sizeof(fd_set) * 8 * 2;
+ }
+
+ static inline void* ExtractEvent(const TEvent* event) noexcept {
+ return event->Data();
+ }
+
+ static inline int ExtractStatus(const TEvent* event) noexcept {
+ return event->Status();
+ }
+
+ static inline int ExtractFilterImpl(const TEvent* event) noexcept {
+ return event->Filter();
+ }
+
+private:
+ inline void Signal() noexcept {
+ char ch = 13;
+
+ send(SigSock(), &ch, 1, 0);
+ }
+
+ inline void TryWait() {
+ char ch[32];
+
+ while (recv(WaitSock(), ch, sizeof(ch), 0) > 0) {
+ Y_ASSERT(ch[0] == 13);
+ }
+ }
+
+ inline SOCKET WaitSock() const noexcept {
+ return Signal_[1];
+ }
+
+ inline SOCKET SigSock() const noexcept {
+ return Signal_[0];
+ }
+
+private:
+ struct TCommand {
+ SOCKET Fd_;
+ int Filter_; // 0 to remove
+ void* Cookie_;
+
+ TCommand(SOCKET fd, int filter, void* cookie)
+ : Fd_(fd)
+ , Filter_(filter)
+ , Cookie_(cookie)
+ {
+ }
+
+ TCommand(SOCKET fd, int filter)
+ : Fd_(fd)
+ , Filter_(filter)
+ {
+ }
+ };
+
+ TFds Fds_;
+
+ TMyMutex Lock_;
+ TArrayHolder<TEvent> SavedEvents_;
+ TEvent* Begin_;
+ TEvent* End_;
+
+ TMyMutex CommandLock_;
+ TVector<TCommand> Commands_;
+
+ SOCKET Signal_[2];
+};
+#endif
+
+static inline TDuration PollStep(const TInstant& deadLine, const TInstant& now) noexcept {
+ if (deadLine < now) {
+ return TDuration::Zero();
+ }
+
+ return Min(deadLine - now, TDuration::Seconds(1000));
+}
+
+template <class TBase>
+class TGenericPoller: public TBase {
+public:
+ using TBase::TBase;
+
+ using TEvent = typename TBase::TEvent;
+
+ inline void Set(void* data, SOCKET fd, int what) {
+ if (what) {
+ this->SetImpl(data, fd, what);
+ } else {
+ this->Remove(fd);
+ }
+ }
+
+ static inline int ExtractFilter(const TEvent* event) noexcept {
+ if (TBase::ExtractStatus(event)) {
+ return CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_RDHUP;
+ }
+
+ return TBase::ExtractFilterImpl(event);
+ }
+
+ inline size_t WaitD(TEvent* events, size_t len, TInstant deadLine, TInstant now = TInstant::Now()) noexcept {
+ if (!len) {
+ return 0;
+ }
+
+ size_t ret;
+
+ do {
+ ret = this->Wait(events, len, (int)PollStep(deadLine, now).MicroSeconds());
+ } while (!ret && ((now = TInstant::Now()) < deadLine));
+
+ return ret;
+ }
+};
+
+#if defined(HAVE_KQUEUE_POLLER)
+ #define TPollerImplBase TKqueuePoller
+#elif defined(HAVE_EPOLL_POLLER)
+ #define TPollerImplBase TEpollPoller
+#elif defined(HAVE_SELECT_POLLER)
+ #define TPollerImplBase TSelectPoller
+#else
+ #error "unsupported platform"
+#endif
+
+template <class TLockPolicy>
+using TPollerImpl = TGenericPoller<TPollerImplBase<TLockPolicy>>;
+
+#undef TPollerImplBase