aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/coroutine/engine/poller.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/coroutine/engine/poller.cpp
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/coroutine/engine/poller.cpp')
-rw-r--r--library/cpp/coroutine/engine/poller.cpp538
1 files changed, 269 insertions, 269 deletions
diff --git a/library/cpp/coroutine/engine/poller.cpp b/library/cpp/coroutine/engine/poller.cpp
index 61164fa56b..6fd1f673ae 100644
--- a/library/cpp/coroutine/engine/poller.cpp
+++ b/library/cpp/coroutine/engine/poller.cpp
@@ -1,118 +1,118 @@
-#include "poller.h"
-#include "sockmap.h"
-
-#include <util/memory/smallobj.h>
-#include <util/generic/intrlist.h>
-#include <util/generic/singleton.h>
+#include "poller.h"
+#include "sockmap.h"
+
+#include <util/memory/smallobj.h>
+#include <util/generic/intrlist.h>
+#include <util/generic/singleton.h>
#include <util/system/env.h>
#include <util/string/cast.h>
-
-namespace {
+
+namespace {
using TChange = IPollerFace::TChange;
using TEvent = IPollerFace::TEvent;
using TEvents = IPollerFace::TEvents;
-
- template <class T>
- class TUnsafeBuf {
- public:
+
+ template <class T>
+ class TUnsafeBuf {
+ public:
TUnsafeBuf() noexcept
- : L_(0)
- {
- }
-
+ : L_(0)
+ {
+ }
+
T* operator~() const noexcept {
- return B_.Get();
- }
-
+ return B_.Get();
+ }
+
size_t operator+() const noexcept {
- return L_;
- }
-
+ return L_;
+ }
+
void Reserve(size_t len) {
- len = FastClp2(len);
-
- if (len > L_) {
- B_.Reset(new T[len]);
- L_ = len;
- }
- }
-
- private:
- TArrayHolder<T> B_;
- size_t L_;
- };
-
-
- template <class T>
- class TVirtualize: public IPollerFace {
- public:
+ len = FastClp2(len);
+
+ if (len > L_) {
+ B_.Reset(new T[len]);
+ L_ = len;
+ }
+ }
+
+ private:
+ TArrayHolder<T> B_;
+ size_t L_;
+ };
+
+
+ template <class T>
+ class TVirtualize: public IPollerFace {
+ public:
TVirtualize(EContPoller pollerEngine)
: PollerEngine_(pollerEngine)
{
}
void Set(const TChange& c) override {
- P_.Set(c);
- }
-
+ P_.Set(c);
+ }
+
void Wait(TEvents& events, TInstant deadLine) override {
- P_.Wait(events, deadLine);
- }
-
+ P_.Wait(events, deadLine);
+ }
+
EContPoller PollEngine() const override {
return PollerEngine_;
}
- private:
- T P_;
+ private:
+ T P_;
const EContPoller PollerEngine_;
- };
-
+ };
+
- template <class T>
- class TPoller {
+ template <class T>
+ class TPoller {
using TInternalEvent = typename T::TEvent;
-
- public:
+
+ public:
TPoller() {
- E_.Reserve(1);
- }
-
+ E_.Reserve(1);
+ }
+
void Set(const TChange& c) {
- P_.Set(c.Data, c.Fd, c.Flags);
- }
-
+ P_.Set(c.Data, c.Fd, c.Flags);
+ }
+
void Reserve(size_t size) {
E_.Reserve(size);
}
void Wait(TEvents& events, TInstant deadLine) {
- const size_t ret = P_.WaitD(~E_, +E_, deadLine);
-
+ const size_t ret = P_.WaitD(~E_, +E_, deadLine);
+
events.reserve(ret);
- for (size_t i = 0; i < ret; ++i) {
- const TInternalEvent* ie = ~E_ + i;
-
- const TEvent e = {
- T::ExtractEvent(ie),
- T::ExtractStatus(ie),
- (ui16)T::ExtractFilter(ie),
- };
-
- events.push_back(e);
- }
-
- E_.Reserve(ret + 1);
- }
-
- private:
- T P_;
- TUnsafeBuf<TInternalEvent> E_;
- };
-
-
- template <class T>
- class TIndexedArray {
+ for (size_t i = 0; i < ret; ++i) {
+ const TInternalEvent* ie = ~E_ + i;
+
+ const TEvent e = {
+ T::ExtractEvent(ie),
+ T::ExtractStatus(ie),
+ (ui16)T::ExtractFilter(ie),
+ };
+
+ events.push_back(e);
+ }
+
+ E_.Reserve(ret + 1);
+ }
+
+ private:
+ T P_;
+ TUnsafeBuf<TInternalEvent> E_;
+ };
+
+
+ template <class T>
+ class TIndexedArray {
struct TVal:
public T,
public TIntrusiveListItem<TVal>,
@@ -125,244 +125,244 @@ namespace {
// zero-initialization takes place in TVal() expression and the
// data is overwritten.
TVal() {
- }
- };
-
+ }
+ };
+
typedef TIntrusiveList<TVal> TListType;
-
- public:
+
+ public:
typedef typename TListType::TIterator TIterator;
typedef typename TListType::TConstIterator TConstIterator;
-
+
TIndexedArray()
- : P_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance())
- {
- }
-
+ : P_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance())
+ {
+ }
+
TIterator Begin() noexcept {
- return I_.Begin();
- }
-
+ return I_.Begin();
+ }
+
TIterator End() noexcept {
- return I_.End();
- }
-
+ return I_.End();
+ }
+
TConstIterator Begin() const noexcept {
- return I_.Begin();
- }
-
+ return I_.Begin();
+ }
+
TConstIterator End() const noexcept {
- return I_.End();
- }
-
+ return I_.End();
+ }
+
T& operator[](size_t i) {
- return *Get(i);
- }
-
+ return *Get(i);
+ }
+
T* Get(size_t i) {
- TValRef& v = V_.Get(i);
-
+ TValRef& v = V_.Get(i);
+
if (Y_UNLIKELY(!v)) {
- v.Reset(new (&P_) TVal());
- I_.PushFront(v.Get());
- }
-
- Y_PREFETCH_WRITE(v.Get(), 1);
-
- return v.Get();
- }
-
+ v.Reset(new (&P_) TVal());
+ I_.PushFront(v.Get());
+ }
+
+ Y_PREFETCH_WRITE(v.Get(), 1);
+
+ return v.Get();
+ }
+
void Erase(size_t i) noexcept {
- V_.Get(i).Destroy();
- }
-
+ V_.Get(i).Destroy();
+ }
+
size_t Size() const noexcept {
- return I_.Size();
- }
-
- private:
+ return I_.Size();
+ }
+
+ private:
using TValRef = THolder<TVal>;
- typename TVal::TPool P_;
- TSocketMap<TValRef> V_;
+ typename TVal::TPool P_;
+ TSocketMap<TValRef> V_;
TListType I_;
- };
-
+ };
+
inline short PollFlags(ui16 flags) noexcept {
- short ret = 0;
-
- if (flags & CONT_POLL_READ) {
- ret |= POLLIN;
- }
-
- if (flags & CONT_POLL_WRITE) {
- ret |= POLLOUT;
- }
-
+ short ret = 0;
+
+ if (flags & CONT_POLL_READ) {
+ ret |= POLLIN;
+ }
+
+ if (flags & CONT_POLL_WRITE) {
+ ret |= POLLOUT;
+ }
+
#if defined(_linux_)
if (flags & CONT_POLL_RDHUP) {
ret |= POLLRDHUP;
}
#endif
- return ret;
- }
-
+ return ret;
+ }
+
- class TPollPoller {
- public:
+ class TPollPoller {
+ public:
size_t Size() const noexcept {
- return S_.Size();
- }
-
- template <class T>
+ return S_.Size();
+ }
+
+ template <class T>
void Build(T& t) const {
- for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) {
- t.Set(*it);
- }
+ for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) {
+ t.Set(*it);
+ }
t.Reserve(Size());
- }
-
+ }
+
void Set(const TChange& c) {
- if (c.Flags) {
- S_[c.Fd] = c;
- } else {
- S_.Erase(c.Fd);
- }
- }
-
+ if (c.Flags) {
+ S_[c.Fd] = c;
+ } else {
+ S_.Erase(c.Fd);
+ }
+ }
+
void Wait(TEvents& events, TInstant deadLine) {
- T_.clear();
- T_.reserve(Size());
-
- for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) {
- const pollfd pfd = {
- it->Fd,
- PollFlags(it->Flags),
- 0,
- };
-
- T_.push_back(pfd);
- }
-
+ T_.clear();
+ T_.reserve(Size());
+
+ for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) {
+ const pollfd pfd = {
+ it->Fd,
+ PollFlags(it->Flags),
+ 0,
+ };
+
+ T_.push_back(pfd);
+ }
+
const ssize_t ret = PollD(T_.data(), (nfds_t) T_.size(), deadLine);
-
+
if (ret <= 0) {
- return;
- }
-
+ return;
+ }
+
events.reserve(T_.size());
for (size_t i = 0; i < T_.size(); ++i) {
- const pollfd& pfd = T_[i];
- const short ev = pfd.revents;
-
- if (!ev) {
- continue;
- }
-
- int status = 0;
- ui16 filter = 0;
-
+ const pollfd& pfd = T_[i];
+ const short ev = pfd.revents;
+
+ if (!ev) {
+ continue;
+ }
+
+ int status = 0;
+ ui16 filter = 0;
+
// We are perfectly fine with an EOF while reading a pipe or a unix socket
if ((ev & POLLIN) || (ev & POLLHUP) && (pfd.events & POLLIN)) {
- filter |= CONT_POLL_READ;
- }
-
- if (ev & POLLOUT) {
- filter |= CONT_POLL_WRITE;
- }
-
+ filter |= CONT_POLL_READ;
+ }
+
+ if (ev & POLLOUT) {
+ filter |= CONT_POLL_WRITE;
+ }
+
#if defined(_linux_)
if (ev & POLLRDHUP) {
filter |= CONT_POLL_RDHUP;
}
#endif
- if (ev & POLLERR) {
- status = EIO;
+ if (ev & POLLERR) {
+ status = EIO;
} else if (ev & POLLHUP && pfd.events & POLLOUT) {
// Only write operations may cause EPIPE
- status = EPIPE;
- } else if (ev & POLLNVAL) {
- status = EINVAL;
- }
-
- if (status) {
+ status = EPIPE;
+ } else if (ev & POLLNVAL) {
+ status = EINVAL;
+ }
+
+ if (status) {
filter = CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_RDHUP;
- }
-
- const TEvent res = {
- S_[pfd.fd].Data,
- status,
- filter,
- };
-
- events.push_back(res);
- }
- }
-
- private:
- typedef TIndexedArray<TChange> TFds;
- TFds S_;
+ }
+
+ const TEvent res = {
+ S_[pfd.fd].Data,
+ status,
+ filter,
+ };
+
+ events.push_back(res);
+ }
+ }
+
+ private:
+ typedef TIndexedArray<TChange> TFds;
+ TFds S_;
typedef TVector<pollfd> TPollVec;
- TPollVec T_;
- };
-
-
- class TCombinedPoller {
- typedef TPoller<TPollerImpl<TWithoutLocking>> TDefaultPoller;
-
- public:
+ TPollVec T_;
+ };
+
+
+ class TCombinedPoller {
+ typedef TPoller<TPollerImpl<TWithoutLocking>> TDefaultPoller;
+
+ public:
TCombinedPoller() {
- P_.Reset(new TPollPoller());
- }
-
+ P_.Reset(new TPollPoller());
+ }
+
void Set(const TChange& c) {
- if (!P_) {
- D_->Set(c);
- } else {
- P_->Set(c);
- }
- }
-
+ if (!P_) {
+ D_->Set(c);
+ } else {
+ P_->Set(c);
+ }
+ }
+
void Wait(TEvents& events, TInstant deadLine) {
- if (!P_) {
- D_->Wait(events, deadLine);
- } else {
- if (P_->Size() > 200) {
- D_.Reset(new TDefaultPoller());
- P_->Build(*D_);
- P_.Destroy();
- D_->Wait(events, deadLine);
- } else {
- P_->Wait(events, deadLine);
- }
- }
- }
-
- private:
+ if (!P_) {
+ D_->Wait(events, deadLine);
+ } else {
+ if (P_->Size() > 200) {
+ D_.Reset(new TDefaultPoller());
+ P_->Build(*D_);
+ P_.Destroy();
+ D_->Wait(events, deadLine);
+ } else {
+ P_->Wait(events, deadLine);
+ }
+ }
+ }
+
+ private:
THolder<TPollPoller> P_;
THolder<TDefaultPoller> D_;
- };
-
- struct TUserPoller: public TString {
+ };
+
+ struct TUserPoller: public TString {
TUserPoller()
- : TString(GetEnv("USER_POLLER"))
- {
- }
- };
-}
-
+ : TString(GetEnv("USER_POLLER"))
+ {
+ }
+ };
+}
+
THolder<IPollerFace> IPollerFace::Default() {
return Construct(*SingletonWithPriority<TUserPoller, 0>());
-}
-
+}
+
THolder<IPollerFace> IPollerFace::Construct(TStringBuf name) {
return Construct(name ? FromString<EContPoller>(name) : EContPoller::Default);
}
-
+
THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) {
switch (poller) {
case EContPoller::Default:
@@ -373,18 +373,18 @@ THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) {
case EContPoller::Poll:
return MakeHolder<TVirtualize<TPollPoller>>(poller);
case EContPoller::Epoll:
-#if defined(HAVE_EPOLL_POLLER)
+#if defined(HAVE_EPOLL_POLLER)
return MakeHolder<TVirtualize<TPoller<TGenericPoller<TEpollPoller<TWithoutLocking>>>>>(poller);
#else
return nullptr;
-#endif
+#endif
case EContPoller::Kqueue:
#if defined(HAVE_KQUEUE_POLLER)
return MakeHolder<TVirtualize<TPoller<TGenericPoller<TKqueuePoller<TWithoutLocking>>>>>(poller);
#else
return nullptr;
-#endif
+#endif
default:
Y_FAIL("bad poller type");
- }
-}
+ }
+}