#include "poller.h"
#include "sockmap.h"
#include <util/memory/smallobj.h>
#include <util/generic/intrlist.h>
#include <util/generic/singleton.h>
#include <util/system/env.h>
#include <util/string/cast.h>
namespace {
using TChange = IPollerFace::TChange;
using TEvent = IPollerFace::TEvent;
using TEvents = IPollerFace::TEvents;
template <class T>
class TUnsafeBuf {
public:
TUnsafeBuf() noexcept
: L_(0)
{
}
T* operator~() const noexcept {
return B_.Get();
}
size_t operator+() const noexcept {
return L_;
}
void Reserve(size_t len) {
len = FastClp2(len);
if (len > L_) {
B_.Reset(new T[len]);
L_ = len;
}
}
private:
TArrayHolder<T> B_;
size_t L_;
};
template <class T>
class TVirtualize: public IPollerFace {
public:
TVirtualize(EContPoller pollerEngine)
: PollerEngine_(pollerEngine)
{
}
void Set(const TChange& c) override {
P_.Set(c);
}
void Wait(TEvents& events, TInstant deadLine) override {
P_.Wait(events, deadLine);
}
EContPoller PollEngine() const override {
return PollerEngine_;
}
private:
T P_;
const EContPoller PollerEngine_;
};
template <class T>
class TPoller {
using TInternalEvent = typename T::TEvent;
public:
TPoller() {
E_.Reserve(1);
}
void Set(const TChange& c) {
P_.Set(c.Data, c.Fd, c.Flags);
}
void Reserve(size_t size) {
E_.Reserve(size);
}
void Wait(TEvents& events, TInstant deadLine) {
const size_t ret = P_.WaitD(~E_, +E_, deadLine);
events.reserve(ret);
for (size_t i = 0; i < ret; ++i) {
const TInternalEvent* ie = ~E_ + i;
const TEvent e = {
T::ExtractEvent(ie),
T::ExtractStatus(ie),
(ui16)T::ExtractFilter(ie),
};
events.push_back(e);
}
E_.Reserve(ret + 1);
}
private:
T P_;
TUnsafeBuf<TInternalEvent> E_;
};
template <class T>
class TIndexedArray {
struct TVal:
public T,
public TIntrusiveListItem<TVal>,
public TObjectFromPool<TVal>
{
// NOTE Constructor must be user-defined (and not =default) here
// because TVal objects are created in the UB-capable placement
// TObjectFromPool::new operator that stores data in a memory
// allocated for the object. Without user defined constructor
// zero-initialization takes place in TVal() expression and the
// data is overwritten.
TVal() {
}
};
typedef TIntrusiveList<TVal> TListType;
public:
typedef typename TListType::TIterator TIterator;
typedef typename TListType::TConstIterator TConstIterator;
TIndexedArray()
: P_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance())
{
}
TIterator Begin() noexcept {
return I_.Begin();
}
TIterator End() noexcept {
return I_.End();
}
TConstIterator Begin() const noexcept {
return I_.Begin();
}
TConstIterator End() const noexcept {
return I_.End();
}
T& operator[](size_t i) {
return *Get(i);
}
T* Get(size_t i) {
TValRef& v = V_.Get(i);
if (Y_UNLIKELY(!v)) {
v.Reset(new (&P_) TVal());
I_.PushFront(v.Get());
}
Y_PREFETCH_WRITE(v.Get(), 1);
return v.Get();
}
void Erase(size_t i) noexcept {
V_.Get(i).Destroy();
}
size_t Size() const noexcept {
return I_.Size();
}
private:
using TValRef = THolder<TVal>;
typename TVal::TPool P_;
TSocketMap<TValRef> V_;
TListType I_;
};
inline short PollFlags(ui16 flags) noexcept {
short ret = 0;
if (flags & CONT_POLL_READ) {
ret |= POLLIN;
}
if (flags & CONT_POLL_WRITE) {
ret |= POLLOUT;
}
#if defined(_linux_)
if (flags & CONT_POLL_RDHUP) {
ret |= POLLRDHUP;
}
#endif
return ret;
}
class TPollPoller {
public:
size_t Size() const noexcept {
return S_.Size();
}
template <class T>
void Build(T& t) const {
for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) {
t.Set(*it);
}
t.Reserve(Size());
}
void Set(const TChange& c) {
if (c.Flags) {
S_[c.Fd] = c;
} else {
S_.Erase(c.Fd);
}
}
void Wait(TEvents& events, TInstant deadLine) {
T_.clear();
T_.reserve(Size());
for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) {
const pollfd pfd = {
it->Fd,
PollFlags(it->Flags),
0,
};
T_.push_back(pfd);
}
const ssize_t ret = PollD(T_.data(), (nfds_t) T_.size(), deadLine);
if (ret <= 0) {
return;
}
events.reserve(T_.size());
for (size_t i = 0; i < T_.size(); ++i) {
const pollfd& pfd = T_[i];
const short ev = pfd.revents;
if (!ev) {
continue;
}
int status = 0;
ui16 filter = 0;
// We are perfectly fine with an EOF while reading a pipe or a unix socket
if ((ev & POLLIN) || (ev & POLLHUP) && (pfd.events & POLLIN)) {
filter |= CONT_POLL_READ;
}
if (ev & POLLOUT) {
filter |= CONT_POLL_WRITE;
}
#if defined(_linux_)
if (ev & POLLRDHUP) {
filter |= CONT_POLL_RDHUP;
}
#endif
if (ev & POLLERR) {
status = EIO;
} else if (ev & POLLHUP && pfd.events & POLLOUT) {
// Only write operations may cause EPIPE
status = EPIPE;
} else if (ev & POLLNVAL) {
status = EINVAL;
}
if (status) {
filter = CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_RDHUP;
}
const TEvent res = {
S_[pfd.fd].Data,
status,
filter,
};
events.push_back(res);
}
}
private:
typedef TIndexedArray<TChange> TFds;
TFds S_;
typedef TVector<pollfd> TPollVec;
TPollVec T_;
};
class TCombinedPoller {
typedef TPoller<TPollerImpl<TWithoutLocking>> TDefaultPoller;
public:
TCombinedPoller() {
P_.Reset(new TPollPoller());
}
void Set(const TChange& c) {
if (!P_) {
D_->Set(c);
} else {
P_->Set(c);
}
}
void Wait(TEvents& events, TInstant deadLine) {
if (!P_) {
D_->Wait(events, deadLine);
} else {
if (P_->Size() > 200) {
D_.Reset(new TDefaultPoller());
P_->Build(*D_);
P_.Destroy();
D_->Wait(events, deadLine);
} else {
P_->Wait(events, deadLine);
}
}
}
private:
THolder<TPollPoller> P_;
THolder<TDefaultPoller> D_;
};
struct TUserPoller: public TString {
TUserPoller()
: TString(GetEnv("USER_POLLER"))
{
}
};
}
THolder<IPollerFace> IPollerFace::Default() {
return Construct(*SingletonWithPriority<TUserPoller, 0>());
}
THolder<IPollerFace> IPollerFace::Construct(TStringBuf name) {
return Construct(name ? FromString<EContPoller>(name) : EContPoller::Default);
}
THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) {
switch (poller) {
case EContPoller::Default:
case EContPoller::Combined:
return MakeHolder<TVirtualize<TCombinedPoller>>(EContPoller::Combined);
case EContPoller::Select:
return MakeHolder<TVirtualize<TPoller<TGenericPoller<TSelectPoller<TWithoutLocking>>>>>(poller);
case EContPoller::Poll:
return MakeHolder<TVirtualize<TPollPoller>>(poller);
case EContPoller::Epoll:
#if defined(HAVE_EPOLL_POLLER)
return MakeHolder<TVirtualize<TPoller<TGenericPoller<TEpollPoller<TWithoutLocking>>>>>(poller);
#else
return nullptr;
#endif
case EContPoller::Kqueue:
#if defined(HAVE_KQUEUE_POLLER)
return MakeHolder<TVirtualize<TPoller<TGenericPoller<TKqueuePoller<TWithoutLocking>>>>>(poller);
#else
return nullptr;
#endif
default:
Y_FAIL("bad poller type");
}
}