#pragma once
#include "socket.h"
#include <util/system/error.h>
#include <util/system/mutex.h>
#include <util/system/defaults.h>
#include <util/generic/ylimits.h>
#include <util/generic/utility.h>
#include <util/generic/vector.h>
#include <util/generic/yexception.h>
#include <util/datetime/base.h>
#if defined(_freebsd_) || defined(_darwin_)
#define HAVE_KQUEUE_POLLER
#endif
#if (defined(_linux_) && !defined(_bionic_)) || (__ANDROID_API__ >= 21)
#define HAVE_EPOLL_POLLER
#endif
//now we always have it
#define HAVE_SELECT_POLLER
#if defined(HAVE_KQUEUE_POLLER)
#include <sys/event.h>
#endif
#if defined(HAVE_EPOLL_POLLER)
#include <sys/epoll.h>
#endif
enum EContPoll {
CONT_POLL_READ = 1,
CONT_POLL_WRITE = 2,
CONT_POLL_RDHUP = 4,
CONT_POLL_ONE_SHOT = 8, // Disable after first event
CONT_POLL_MODIFY = 16, // Modify already added event
CONT_POLL_EDGE_TRIGGERED = 32, // Notify only about new events
CONT_POLL_BACKLOG_EMPTY = 64, // Backlog is empty (seen end of request, EAGAIN or truncated read)
};
static inline bool IsSocket(SOCKET fd) noexcept {
int val = 0;
socklen_t len = sizeof(val);
if (getsockopt(fd, SOL_SOCKET, SO_TYPE, (char*)&val, &len) == 0) {
return true;
}
return LastSystemError() != ENOTSOCK;
}
static inline int MicroToMilli(int timeout) noexcept {
if (timeout) {
/*
* 1. API of epoll syscall allows to specify timeout with millisecond
* accuracy only
* 2. It is quite complicated to guarantee time resolution of blocking
* syscall less than kernel 1/HZ
*
* Without this rounding we just waste cpu time and do a lot of
* fast epoll_wait(..., 0) syscalls.
*/
return Max(timeout / 1000, 1);
}
return 0;
}
struct TWithoutLocking {
using TMyMutex = TFakeMutex;
};
#if defined(HAVE_KQUEUE_POLLER)
static inline int Kevent(int kq, struct kevent* changelist, int nchanges,
struct kevent* eventlist, int nevents, const struct timespec* timeout) noexcept {
int ret;
do {
ret = kevent(kq, changelist, nchanges, eventlist, nevents, timeout);
} while (ret == -1 && errno == EINTR);
return ret;
}
template <class TLockPolicy>
class TKqueuePoller {
public:
typedef struct ::kevent TEvent;
inline TKqueuePoller()
: Fd_(kqueue())
{
if (Fd_ == -1) {
ythrow TSystemError() << "kqueue failed";
}
}
inline ~TKqueuePoller() {
close(Fd_);
}
inline int Fd() const noexcept {
return Fd_;
}
inline void SetImpl(void* data, int fd, int what) {
TEvent e[2];
int flags = EV_ADD;
if (what & CONT_POLL_EDGE_TRIGGERED) {
if (what & CONT_POLL_BACKLOG_EMPTY) {
// When backlog is empty, edge-triggered does not need restart.
return;
}
flags |= EV_CLEAR;
}
if (what & CONT_POLL_ONE_SHOT) {
flags |= EV_ONESHOT;
}
Zero(e);
EV_SET(e + 0, fd, EVFILT_READ, flags | ((what & CONT_POLL_READ) ? EV_ENABLE : EV_DISABLE), 0, 0, data);
EV_SET(e + 1, fd, EVFILT_WRITE, flags | ((what & CONT_POLL_WRITE) ? EV_ENABLE : EV_DISABLE), 0, 0, data);
if (Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1) {
ythrow TSystemError() << "kevent add failed";
}
}
inline void Remove(int fd) noexcept {
TEvent e[2];
Zero(e);
EV_SET(e + 0, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
EV_SET(e + 1, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
Y_VERIFY(!(Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1 && errno != ENOENT), "kevent remove failed: %s", LastSystemErrorText());
}
inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
struct timespec ts;
ts.tv_sec = timeout / 1000000;
ts.tv_nsec = (timeout % 1000000) * 1000;
const int ret = Kevent(Fd_, nullptr, 0, events, len, &ts);
Y_VERIFY(ret >= 0, "kevent failed: %s", LastSystemErrorText());
return (size_t)ret;
}
static inline void* ExtractEvent(const TEvent* event) noexcept {
return event->udata;
}
static inline int ExtractStatus(const TEvent* event) noexcept {
if (event->flags & EV_ERROR) {
return EIO;
}
return event->fflags;
}
static inline int ExtractFilterImpl(const TEvent* event) noexcept {
if (event->filter == EVFILT_READ) {
return CONT_POLL_READ;
}
if (event->filter == EVFILT_WRITE) {
return CONT_POLL_WRITE;
}
if (event->flags & EV_EOF) {
return CONT_POLL_READ | CONT_POLL_WRITE;
}
return 0;
}
private:
int Fd_;
};
#endif
#if defined(HAVE_EPOLL_POLLER)
static inline int ContEpollWait(int epfd, struct epoll_event* events, int maxevents, int timeout) noexcept {
int ret;
do {
ret = epoll_wait(epfd, events, maxevents, Min<int>(timeout, 35 * 60 * 1000));
} while (ret == -1 && errno == EINTR);
return ret;
}
template <class TLockPolicy>
class TEpollPoller {
public:
typedef struct ::epoll_event TEvent;
inline TEpollPoller(bool closeOnExec = false)
: Fd_(epoll_create1(closeOnExec ? EPOLL_CLOEXEC : 0))
{
if (Fd_ == -1) {
ythrow TSystemError() << "epoll_create failed";
}
}
inline ~TEpollPoller() {
close(Fd_);
}
inline int Fd() const noexcept {
return Fd_;
}
inline void SetImpl(void* data, int fd, int what) {
TEvent e;
Zero(e);
if (what & CONT_POLL_EDGE_TRIGGERED) {
if (what & CONT_POLL_BACKLOG_EMPTY) {
// When backlog is empty, edge-triggered does not need restart.
return;
}
e.events |= EPOLLET;
}
if (what & CONT_POLL_ONE_SHOT) {
e.events |= EPOLLONESHOT;
}
if (what & CONT_POLL_READ) {
e.events |= EPOLLIN;
}
if (what & CONT_POLL_WRITE) {
e.events |= EPOLLOUT;
}
if (what & CONT_POLL_RDHUP) {
e.events |= EPOLLRDHUP;
}
e.data.ptr = data;
if ((what & CONT_POLL_MODIFY) || epoll_ctl(Fd_, EPOLL_CTL_ADD, fd, &e) == -1) {
if (epoll_ctl(Fd_, EPOLL_CTL_MOD, fd, &e) == -1) {
ythrow TSystemError() << "epoll add failed";
}
}
}
inline void Remove(int fd) noexcept {
TEvent e;
Zero(e);
epoll_ctl(Fd_, EPOLL_CTL_DEL, fd, &e);
}
inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
const int ret = ContEpollWait(Fd_, events, len, MicroToMilli(timeout));
Y_VERIFY(ret >= 0, "epoll wait error: %s", LastSystemErrorText());
return (size_t)ret;
}
static inline void* ExtractEvent(const TEvent* event) noexcept {
return event->data.ptr;
}
static inline int ExtractStatus(const TEvent* event) noexcept {
if (event->events & (EPOLLERR | EPOLLHUP)) {
return EIO;
}
return 0;
}
static inline int ExtractFilterImpl(const TEvent* event) noexcept {
int ret = 0;
if (event->events & EPOLLIN) {
ret |= CONT_POLL_READ;
}
if (event->events & EPOLLOUT) {
ret |= CONT_POLL_WRITE;
}
if (event->events & EPOLLRDHUP) {
ret |= CONT_POLL_RDHUP;
}
return ret;
}
private:
int Fd_;
};
#endif
#if defined(HAVE_SELECT_POLLER)
#include <util/memory/tempbuf.h>
#include <util/generic/hash.h>
#include "pair.h"
static inline int ContSelect(int n, fd_set* r, fd_set* w, fd_set* e, struct timeval* t) noexcept {
int ret;
do {
ret = select(n, r, w, e, t);
} while (ret == -1 && errno == EINTR);
return ret;
}
struct TSelectPollerNoTemplate {
struct THandle {
void* Data_;
int Filter_;
inline THandle()
: Data_(nullptr)
, Filter_(0)
{
}
inline void* Data() const noexcept {
return Data_;
}
inline void Set(void* d, int s) noexcept {
Data_ = d;
Filter_ = s;
}
inline void Clear(int c) noexcept {
Filter_ &= ~c;
}
inline int Filter() const noexcept {
return Filter_;
}
};
class TFds: public THashMap<SOCKET, THandle> {
public:
inline void Set(SOCKET fd, void* data, int filter) {
(*this)[fd].Set(data, filter);
}
inline void Remove(SOCKET fd) {
erase(fd);
}
inline SOCKET Build(fd_set* r, fd_set* w, fd_set* e) const noexcept {
SOCKET ret = 0;
for (const auto& it : *this) {
const SOCKET fd = it.first;
const THandle& handle = it.second;
FD_SET(fd, e);
if (handle.Filter() & CONT_POLL_READ) {
FD_SET(fd, r);
}
if (handle.Filter() & CONT_POLL_WRITE) {
FD_SET(fd, w);
}
if (fd > ret) {
ret = fd;
}
}
return ret;
}
};
struct TEvent: public THandle {
inline int Status() const noexcept {
return -Min(Filter(), 0);
}
inline void Error(void* d, int err) noexcept {
Set(d, -err);
}
inline void Success(void* d, int what) noexcept {
Set(d, what);
}
};
};
template <class TLockPolicy>
class TSelectPoller: public TSelectPollerNoTemplate {
using TMyMutex = typename TLockPolicy::TMyMutex;
public:
inline TSelectPoller()
: Begin_(nullptr)
, End_(nullptr)
{
SocketPair(Signal_);
SetNonBlock(WaitSock());
SetNonBlock(SigSock());
}
inline ~TSelectPoller() {
closesocket(Signal_[0]);
closesocket(Signal_[1]);
}
inline void SetImpl(void* data, SOCKET fd, int what) {
with_lock (CommandLock_) {
Commands_.push_back(TCommand(fd, what, data));
}
Signal();
}
inline void Remove(SOCKET fd) noexcept {
with_lock (CommandLock_) {
Commands_.push_back(TCommand(fd, 0));
}
Signal();
}
inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
auto guard = Guard(Lock_);
do {
if (Begin_ != End_) {
const size_t ret = Min<size_t>(End_ - Begin_, len);
memcpy(events, Begin_, sizeof(*events) * ret);
Begin_ += ret;
return ret;
}
if (len >= EventNumberHint()) {
return WaitBase(events, len, timeout);
}
Begin_ = SavedEvents();
End_ = Begin_ + WaitBase(Begin_, EventNumberHint(), timeout);
} while (Begin_ != End_);
return 0;
}
inline TEvent* SavedEvents() {
if (!SavedEvents_) {
SavedEvents_.Reset(new TEvent[EventNumberHint()]);
}
return SavedEvents_.Get();
}
inline size_t WaitBase(TEvent* events, size_t len, int timeout) noexcept {
with_lock (CommandLock_) {
for (auto command = Commands_.begin(); command != Commands_.end(); ++command) {
if (command->Filter_ != 0) {
Fds_.Set(command->Fd_, command->Cookie_, command->Filter_);
} else {
Fds_.Remove(command->Fd_);
}
}
Commands_.clear();
}
TTempBuf tmpBuf(3 * sizeof(fd_set) + Fds_.size() * sizeof(SOCKET));
fd_set* in = (fd_set*)tmpBuf.Data();
fd_set* out = &in[1];
fd_set* errFds = &in[2];
SOCKET* keysToDeleteBegin = (SOCKET*)&in[3];
SOCKET* keysToDeleteEnd = keysToDeleteBegin;
#if defined(_msan_enabled_) // msan doesn't handle FD_ZERO and cause false positive BALANCER-1347
memset(in, 0, sizeof(*in));
memset(out, 0, sizeof(*out));
memset(errFds, 0, sizeof(*errFds));
#endif
FD_ZERO(in);
FD_ZERO(out);
FD_ZERO(errFds);
FD_SET(WaitSock(), in);
const SOCKET maxFdNum = Max(Fds_.Build(in, out, errFds), WaitSock());
struct timeval tout;
tout.tv_sec = timeout / 1000000;
tout.tv_usec = timeout % 1000000;
int ret = ContSelect(int(maxFdNum + 1), in, out, errFds, &tout);
if (ret > 0 && FD_ISSET(WaitSock(), in)) {
--ret;
TryWait();
}
Y_VERIFY(ret >= 0 && (size_t)ret <= len, "select error: %s", LastSystemErrorText());
TEvent* eventsStart = events;
for (typename TFds::iterator it = Fds_.begin(); it != Fds_.end(); ++it) {
const SOCKET fd = it->first;
THandle& handle = it->second;
if (FD_ISSET(fd, errFds)) {
(events++)->Error(handle.Data(), EIO);
if (handle.Filter() & CONT_POLL_ONE_SHOT) {
*keysToDeleteEnd = fd;
++keysToDeleteEnd;
}
} else {
int what = 0;
if (FD_ISSET(fd, in)) {
what |= CONT_POLL_READ;
}
if (FD_ISSET(fd, out)) {
what |= CONT_POLL_WRITE;
}
if (what) {
(events++)->Success(handle.Data(), what);
if (handle.Filter() & CONT_POLL_ONE_SHOT) {
*keysToDeleteEnd = fd;
++keysToDeleteEnd;
}
if (handle.Filter() & CONT_POLL_EDGE_TRIGGERED) {
// Emulate edge-triggered for level-triggered select().
// User must restart waiting this event when needed.
handle.Clear(what);
}
}
}
}
while (keysToDeleteBegin != keysToDeleteEnd) {
Fds_.erase(*keysToDeleteBegin);
++keysToDeleteBegin;
}
return events - eventsStart;
}
inline size_t EventNumberHint() const noexcept {
return sizeof(fd_set) * 8 * 2;
}
static inline void* ExtractEvent(const TEvent* event) noexcept {
return event->Data();
}
static inline int ExtractStatus(const TEvent* event) noexcept {
return event->Status();
}
static inline int ExtractFilterImpl(const TEvent* event) noexcept {
return event->Filter();
}
private:
inline void Signal() noexcept {
char ch = 13;
send(SigSock(), &ch, 1, 0);
}
inline void TryWait() {
char ch[32];
while (recv(WaitSock(), ch, sizeof(ch), 0) > 0) {
Y_ASSERT(ch[0] == 13);
}
}
inline SOCKET WaitSock() const noexcept {
return Signal_[1];
}
inline SOCKET SigSock() const noexcept {
return Signal_[0];
}
private:
struct TCommand {
SOCKET Fd_;
int Filter_; // 0 to remove
void* Cookie_;
TCommand(SOCKET fd, int filter, void* cookie)
: Fd_(fd)
, Filter_(filter)
, Cookie_(cookie)
{
}
TCommand(SOCKET fd, int filter)
: Fd_(fd)
, Filter_(filter)
{
}
};
TFds Fds_;
TMyMutex Lock_;
TArrayHolder<TEvent> SavedEvents_;
TEvent* Begin_;
TEvent* End_;
TMyMutex CommandLock_;
TVector<TCommand> Commands_;
SOCKET Signal_[2];
};
#endif
static inline TDuration PollStep(const TInstant& deadLine, const TInstant& now) noexcept {
if (deadLine < now) {
return TDuration::Zero();
}
return Min(deadLine - now, TDuration::Seconds(1000));
}
template <class TBase>
class TGenericPoller: public TBase {
public:
using TBase::TBase;
using TEvent = typename TBase::TEvent;
inline void Set(void* data, SOCKET fd, int what) {
if (what) {
this->SetImpl(data, fd, what);
} else {
this->Remove(fd);
}
}
static inline int ExtractFilter(const TEvent* event) noexcept {
if (TBase::ExtractStatus(event)) {
return CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_RDHUP;
}
return TBase::ExtractFilterImpl(event);
}
inline size_t WaitD(TEvent* events, size_t len, TInstant deadLine, TInstant now = TInstant::Now()) noexcept {
if (!len) {
return 0;
}
size_t ret;
do {
ret = this->Wait(events, len, (int)PollStep(deadLine, now).MicroSeconds());
} while (!ret && ((now = TInstant::Now()) < deadLine));
return ret;
}
};
#if defined(HAVE_KQUEUE_POLLER)
#define TPollerImplBase TKqueuePoller
#elif defined(HAVE_EPOLL_POLLER)
#define TPollerImplBase TEpollPoller
#elif defined(HAVE_SELECT_POLLER)
#define TPollerImplBase TSelectPoller
#else
#error "unsupported platform"
#endif
template <class TLockPolicy>
using TPollerImpl = TGenericPoller<TPollerImplBase<TLockPolicy>>;
#undef TPollerImplBase