diff options
author | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/network/pollerimpl.h |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/network/pollerimpl.h')
-rw-r--r-- | util/network/pollerimpl.h | 706 |
1 files changed, 706 insertions, 0 deletions
diff --git a/util/network/pollerimpl.h b/util/network/pollerimpl.h new file mode 100644 index 00000000000..e8c7e40fbaf --- /dev/null +++ b/util/network/pollerimpl.h @@ -0,0 +1,706 @@ +#pragma once + +#include "socket.h" + +#include <util/system/error.h> +#include <util/system/mutex.h> +#include <util/system/defaults.h> +#include <util/generic/ylimits.h> +#include <util/generic/utility.h> +#include <util/generic/vector.h> +#include <util/generic/yexception.h> +#include <util/datetime/base.h> + +#if defined(_freebsd_) || defined(_darwin_) + #define HAVE_KQUEUE_POLLER +#endif + +#if (defined(_linux_) && !defined(_bionic_)) || (__ANDROID_API__ >= 21) + #define HAVE_EPOLL_POLLER +#endif + +//now we always have it +#define HAVE_SELECT_POLLER + +#if defined(HAVE_KQUEUE_POLLER) + #include <sys/event.h> +#endif + +#if defined(HAVE_EPOLL_POLLER) + #include <sys/epoll.h> +#endif + +enum EContPoll { + CONT_POLL_READ = 1, + CONT_POLL_WRITE = 2, + CONT_POLL_RDHUP = 4, + CONT_POLL_ONE_SHOT = 8, // Disable after first event + CONT_POLL_MODIFY = 16, // Modify already added event + CONT_POLL_EDGE_TRIGGERED = 32, // Notify only about new events + CONT_POLL_BACKLOG_EMPTY = 64, // Backlog is empty (seen end of request, EAGAIN or truncated read) +}; + +static inline bool IsSocket(SOCKET fd) noexcept { + int val = 0; + socklen_t len = sizeof(val); + + if (getsockopt(fd, SOL_SOCKET, SO_TYPE, (char*)&val, &len) == 0) { + return true; + } + + return LastSystemError() != ENOTSOCK; +} + +static inline int MicroToMilli(int timeout) noexcept { + if (timeout) { + /* + * 1. API of epoll syscall allows to specify timeout with millisecond + * accuracy only + * 2. It is quite complicated to guarantee time resolution of blocking + * syscall less than kernel 1/HZ + * + * Without this rounding we just waste cpu time and do a lot of + * fast epoll_wait(..., 0) syscalls. + */ + return Max(timeout / 1000, 1); + } + + return 0; +} + +struct TWithoutLocking { + using TMyMutex = TFakeMutex; +}; + +#if defined(HAVE_KQUEUE_POLLER) +static inline int Kevent(int kq, struct kevent* changelist, int nchanges, + struct kevent* eventlist, int nevents, const struct timespec* timeout) noexcept { + int ret; + + do { + ret = kevent(kq, changelist, nchanges, eventlist, nevents, timeout); + } while (ret == -1 && errno == EINTR); + + return ret; +} + +template <class TLockPolicy> +class TKqueuePoller { +public: + typedef struct ::kevent TEvent; + + inline TKqueuePoller() + : Fd_(kqueue()) + { + if (Fd_ == -1) { + ythrow TSystemError() << "kqueue failed"; + } + } + + inline ~TKqueuePoller() { + close(Fd_); + } + + inline int Fd() const noexcept { + return Fd_; + } + + inline void SetImpl(void* data, int fd, int what) { + TEvent e[2]; + int flags = EV_ADD; + + if (what & CONT_POLL_EDGE_TRIGGERED) { + if (what & CONT_POLL_BACKLOG_EMPTY) { + // When backlog is empty, edge-triggered does not need restart. + return; + } + flags |= EV_CLEAR; + } + + if (what & CONT_POLL_ONE_SHOT) { + flags |= EV_ONESHOT; + } + + Zero(e); + + EV_SET(e + 0, fd, EVFILT_READ, flags | ((what & CONT_POLL_READ) ? EV_ENABLE : EV_DISABLE), 0, 0, data); + EV_SET(e + 1, fd, EVFILT_WRITE, flags | ((what & CONT_POLL_WRITE) ? EV_ENABLE : EV_DISABLE), 0, 0, data); + + if (Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1) { + ythrow TSystemError() << "kevent add failed"; + } + } + + inline void Remove(int fd) noexcept { + TEvent e[2]; + + Zero(e); + + EV_SET(e + 0, fd, EVFILT_READ, EV_DELETE, 0, 0, 0); + EV_SET(e + 1, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0); + + Y_VERIFY(!(Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1 && errno != ENOENT), "kevent remove failed: %s", LastSystemErrorText()); + } + + inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept { + struct timespec ts; + + ts.tv_sec = timeout / 1000000; + ts.tv_nsec = (timeout % 1000000) * 1000; + + const int ret = Kevent(Fd_, nullptr, 0, events, len, &ts); + + Y_VERIFY(ret >= 0, "kevent failed: %s", LastSystemErrorText()); + + return (size_t)ret; + } + + static inline void* ExtractEvent(const TEvent* event) noexcept { + return event->udata; + } + + static inline int ExtractStatus(const TEvent* event) noexcept { + if (event->flags & EV_ERROR) { + return EIO; + } + + return event->fflags; + } + + static inline int ExtractFilterImpl(const TEvent* event) noexcept { + if (event->filter == EVFILT_READ) { + return CONT_POLL_READ; + } + + if (event->filter == EVFILT_WRITE) { + return CONT_POLL_WRITE; + } + + if (event->flags & EV_EOF) { + return CONT_POLL_READ | CONT_POLL_WRITE; + } + + return 0; + } + +private: + int Fd_; +}; +#endif + +#if defined(HAVE_EPOLL_POLLER) +static inline int ContEpollWait(int epfd, struct epoll_event* events, int maxevents, int timeout) noexcept { + int ret; + + do { + ret = epoll_wait(epfd, events, maxevents, Min<int>(timeout, 35 * 60 * 1000)); + } while (ret == -1 && errno == EINTR); + + return ret; +} + +template <class TLockPolicy> +class TEpollPoller { +public: + typedef struct ::epoll_event TEvent; + + inline TEpollPoller(bool closeOnExec = false) + : Fd_(epoll_create1(closeOnExec ? EPOLL_CLOEXEC : 0)) + { + if (Fd_ == -1) { + ythrow TSystemError() << "epoll_create failed"; + } + } + + inline ~TEpollPoller() { + close(Fd_); + } + + inline int Fd() const noexcept { + return Fd_; + } + + inline void SetImpl(void* data, int fd, int what) { + TEvent e; + + Zero(e); + + if (what & CONT_POLL_EDGE_TRIGGERED) { + if (what & CONT_POLL_BACKLOG_EMPTY) { + // When backlog is empty, edge-triggered does not need restart. + return; + } + e.events |= EPOLLET; + } + + if (what & CONT_POLL_ONE_SHOT) { + e.events |= EPOLLONESHOT; + } + + if (what & CONT_POLL_READ) { + e.events |= EPOLLIN; + } + + if (what & CONT_POLL_WRITE) { + e.events |= EPOLLOUT; + } + + if (what & CONT_POLL_RDHUP) { + e.events |= EPOLLRDHUP; + } + + e.data.ptr = data; + + if ((what & CONT_POLL_MODIFY) || epoll_ctl(Fd_, EPOLL_CTL_ADD, fd, &e) == -1) { + if (epoll_ctl(Fd_, EPOLL_CTL_MOD, fd, &e) == -1) { + ythrow TSystemError() << "epoll add failed"; + } + } + } + + inline void Remove(int fd) noexcept { + TEvent e; + + Zero(e); + + epoll_ctl(Fd_, EPOLL_CTL_DEL, fd, &e); + } + + inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept { + const int ret = ContEpollWait(Fd_, events, len, MicroToMilli(timeout)); + + Y_VERIFY(ret >= 0, "epoll wait error: %s", LastSystemErrorText()); + + return (size_t)ret; + } + + static inline void* ExtractEvent(const TEvent* event) noexcept { + return event->data.ptr; + } + + static inline int ExtractStatus(const TEvent* event) noexcept { + if (event->events & (EPOLLERR | EPOLLHUP)) { + return EIO; + } + + return 0; + } + + static inline int ExtractFilterImpl(const TEvent* event) noexcept { + int ret = 0; + + if (event->events & EPOLLIN) { + ret |= CONT_POLL_READ; + } + + if (event->events & EPOLLOUT) { + ret |= CONT_POLL_WRITE; + } + + if (event->events & EPOLLRDHUP) { + ret |= CONT_POLL_RDHUP; + } + + return ret; + } + +private: + int Fd_; +}; +#endif + +#if defined(HAVE_SELECT_POLLER) + #include <util/memory/tempbuf.h> + #include <util/generic/hash.h> + + #include "pair.h" + +static inline int ContSelect(int n, fd_set* r, fd_set* w, fd_set* e, struct timeval* t) noexcept { + int ret; + + do { + ret = select(n, r, w, e, t); + } while (ret == -1 && errno == EINTR); + + return ret; +} + +struct TSelectPollerNoTemplate { + struct THandle { + void* Data_; + int Filter_; + + inline THandle() + : Data_(nullptr) + , Filter_(0) + { + } + + inline void* Data() const noexcept { + return Data_; + } + + inline void Set(void* d, int s) noexcept { + Data_ = d; + Filter_ = s; + } + + inline void Clear(int c) noexcept { + Filter_ &= ~c; + } + + inline int Filter() const noexcept { + return Filter_; + } + }; + + class TFds: public THashMap<SOCKET, THandle> { + public: + inline void Set(SOCKET fd, void* data, int filter) { + (*this)[fd].Set(data, filter); + } + + inline void Remove(SOCKET fd) { + erase(fd); + } + + inline SOCKET Build(fd_set* r, fd_set* w, fd_set* e) const noexcept { + SOCKET ret = 0; + + for (const auto& it : *this) { + const SOCKET fd = it.first; + const THandle& handle = it.second; + + FD_SET(fd, e); + + if (handle.Filter() & CONT_POLL_READ) { + FD_SET(fd, r); + } + + if (handle.Filter() & CONT_POLL_WRITE) { + FD_SET(fd, w); + } + + if (fd > ret) { + ret = fd; + } + } + + return ret; + } + }; + + struct TEvent: public THandle { + inline int Status() const noexcept { + return -Min(Filter(), 0); + } + + inline void Error(void* d, int err) noexcept { + Set(d, -err); + } + + inline void Success(void* d, int what) noexcept { + Set(d, what); + } + }; +}; + +template <class TLockPolicy> +class TSelectPoller: public TSelectPollerNoTemplate { + using TMyMutex = typename TLockPolicy::TMyMutex; + +public: + inline TSelectPoller() + : Begin_(nullptr) + , End_(nullptr) + { + SocketPair(Signal_); + SetNonBlock(WaitSock()); + SetNonBlock(SigSock()); + } + + inline ~TSelectPoller() { + closesocket(Signal_[0]); + closesocket(Signal_[1]); + } + + inline void SetImpl(void* data, SOCKET fd, int what) { + with_lock (CommandLock_) { + Commands_.push_back(TCommand(fd, what, data)); + } + + Signal(); + } + + inline void Remove(SOCKET fd) noexcept { + with_lock (CommandLock_) { + Commands_.push_back(TCommand(fd, 0)); + } + + Signal(); + } + + inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept { + auto guard = Guard(Lock_); + + do { + if (Begin_ != End_) { + const size_t ret = Min<size_t>(End_ - Begin_, len); + + memcpy(events, Begin_, sizeof(*events) * ret); + Begin_ += ret; + + return ret; + } + + if (len >= EventNumberHint()) { + return WaitBase(events, len, timeout); + } + + Begin_ = SavedEvents(); + End_ = Begin_ + WaitBase(Begin_, EventNumberHint(), timeout); + } while (Begin_ != End_); + + return 0; + } + + inline TEvent* SavedEvents() { + if (!SavedEvents_) { + SavedEvents_.Reset(new TEvent[EventNumberHint()]); + } + + return SavedEvents_.Get(); + } + + inline size_t WaitBase(TEvent* events, size_t len, int timeout) noexcept { + with_lock (CommandLock_) { + for (auto command = Commands_.begin(); command != Commands_.end(); ++command) { + if (command->Filter_ != 0) { + Fds_.Set(command->Fd_, command->Cookie_, command->Filter_); + } else { + Fds_.Remove(command->Fd_); + } + } + + Commands_.clear(); + } + + TTempBuf tmpBuf(3 * sizeof(fd_set) + Fds_.size() * sizeof(SOCKET)); + + fd_set* in = (fd_set*)tmpBuf.Data(); + fd_set* out = &in[1]; + fd_set* errFds = &in[2]; + + SOCKET* keysToDeleteBegin = (SOCKET*)&in[3]; + SOCKET* keysToDeleteEnd = keysToDeleteBegin; + + #if defined(_msan_enabled_) // msan doesn't handle FD_ZERO and cause false positive BALANCER-1347 + memset(in, 0, sizeof(*in)); + memset(out, 0, sizeof(*out)); + memset(errFds, 0, sizeof(*errFds)); + #endif + + FD_ZERO(in); + FD_ZERO(out); + FD_ZERO(errFds); + + FD_SET(WaitSock(), in); + + const SOCKET maxFdNum = Max(Fds_.Build(in, out, errFds), WaitSock()); + struct timeval tout; + + tout.tv_sec = timeout / 1000000; + tout.tv_usec = timeout % 1000000; + + int ret = ContSelect(int(maxFdNum + 1), in, out, errFds, &tout); + + if (ret > 0 && FD_ISSET(WaitSock(), in)) { + --ret; + TryWait(); + } + + Y_VERIFY(ret >= 0 && (size_t)ret <= len, "select error: %s", LastSystemErrorText()); + + TEvent* eventsStart = events; + + for (typename TFds::iterator it = Fds_.begin(); it != Fds_.end(); ++it) { + const SOCKET fd = it->first; + THandle& handle = it->second; + + if (FD_ISSET(fd, errFds)) { + (events++)->Error(handle.Data(), EIO); + + if (handle.Filter() & CONT_POLL_ONE_SHOT) { + *keysToDeleteEnd = fd; + ++keysToDeleteEnd; + } + + } else { + int what = 0; + + if (FD_ISSET(fd, in)) { + what |= CONT_POLL_READ; + } + + if (FD_ISSET(fd, out)) { + what |= CONT_POLL_WRITE; + } + + if (what) { + (events++)->Success(handle.Data(), what); + + if (handle.Filter() & CONT_POLL_ONE_SHOT) { + *keysToDeleteEnd = fd; + ++keysToDeleteEnd; + } + + if (handle.Filter() & CONT_POLL_EDGE_TRIGGERED) { + // Emulate edge-triggered for level-triggered select(). + // User must restart waiting this event when needed. + handle.Clear(what); + } + } + } + } + + while (keysToDeleteBegin != keysToDeleteEnd) { + Fds_.erase(*keysToDeleteBegin); + ++keysToDeleteBegin; + } + + return events - eventsStart; + } + + inline size_t EventNumberHint() const noexcept { + return sizeof(fd_set) * 8 * 2; + } + + static inline void* ExtractEvent(const TEvent* event) noexcept { + return event->Data(); + } + + static inline int ExtractStatus(const TEvent* event) noexcept { + return event->Status(); + } + + static inline int ExtractFilterImpl(const TEvent* event) noexcept { + return event->Filter(); + } + +private: + inline void Signal() noexcept { + char ch = 13; + + send(SigSock(), &ch, 1, 0); + } + + inline void TryWait() { + char ch[32]; + + while (recv(WaitSock(), ch, sizeof(ch), 0) > 0) { + Y_ASSERT(ch[0] == 13); + } + } + + inline SOCKET WaitSock() const noexcept { + return Signal_[1]; + } + + inline SOCKET SigSock() const noexcept { + return Signal_[0]; + } + +private: + struct TCommand { + SOCKET Fd_; + int Filter_; // 0 to remove + void* Cookie_; + + TCommand(SOCKET fd, int filter, void* cookie) + : Fd_(fd) + , Filter_(filter) + , Cookie_(cookie) + { + } + + TCommand(SOCKET fd, int filter) + : Fd_(fd) + , Filter_(filter) + { + } + }; + + TFds Fds_; + + TMyMutex Lock_; + TArrayHolder<TEvent> SavedEvents_; + TEvent* Begin_; + TEvent* End_; + + TMyMutex CommandLock_; + TVector<TCommand> Commands_; + + SOCKET Signal_[2]; +}; +#endif + +static inline TDuration PollStep(const TInstant& deadLine, const TInstant& now) noexcept { + if (deadLine < now) { + return TDuration::Zero(); + } + + return Min(deadLine - now, TDuration::Seconds(1000)); +} + +template <class TBase> +class TGenericPoller: public TBase { +public: + using TBase::TBase; + + using TEvent = typename TBase::TEvent; + + inline void Set(void* data, SOCKET fd, int what) { + if (what) { + this->SetImpl(data, fd, what); + } else { + this->Remove(fd); + } + } + + static inline int ExtractFilter(const TEvent* event) noexcept { + if (TBase::ExtractStatus(event)) { + return CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_RDHUP; + } + + return TBase::ExtractFilterImpl(event); + } + + inline size_t WaitD(TEvent* events, size_t len, TInstant deadLine, TInstant now = TInstant::Now()) noexcept { + if (!len) { + return 0; + } + + size_t ret; + + do { + ret = this->Wait(events, len, (int)PollStep(deadLine, now).MicroSeconds()); + } while (!ret && ((now = TInstant::Now()) < deadLine)); + + return ret; + } +}; + +#if defined(HAVE_KQUEUE_POLLER) + #define TPollerImplBase TKqueuePoller +#elif defined(HAVE_EPOLL_POLLER) + #define TPollerImplBase TEpollPoller +#elif defined(HAVE_SELECT_POLLER) + #define TPollerImplBase TSelectPoller +#else + #error "unsupported platform" +#endif + +template <class TLockPolicy> +using TPollerImpl = TGenericPoller<TPollerImplBase<TLockPolicy>>; + +#undef TPollerImplBase |