diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/coroutine/engine/poller.cpp | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/coroutine/engine/poller.cpp')
-rw-r--r-- | library/cpp/coroutine/engine/poller.cpp | 538 |
1 files changed, 269 insertions, 269 deletions
diff --git a/library/cpp/coroutine/engine/poller.cpp b/library/cpp/coroutine/engine/poller.cpp index 61164fa56b..6fd1f673ae 100644 --- a/library/cpp/coroutine/engine/poller.cpp +++ b/library/cpp/coroutine/engine/poller.cpp @@ -1,118 +1,118 @@ -#include "poller.h" -#include "sockmap.h" - -#include <util/memory/smallobj.h> -#include <util/generic/intrlist.h> -#include <util/generic/singleton.h> +#include "poller.h" +#include "sockmap.h" + +#include <util/memory/smallobj.h> +#include <util/generic/intrlist.h> +#include <util/generic/singleton.h> #include <util/system/env.h> #include <util/string/cast.h> - -namespace { + +namespace { using TChange = IPollerFace::TChange; using TEvent = IPollerFace::TEvent; using TEvents = IPollerFace::TEvents; - - template <class T> - class TUnsafeBuf { - public: + + template <class T> + class TUnsafeBuf { + public: TUnsafeBuf() noexcept - : L_(0) - { - } - + : L_(0) + { + } + T* operator~() const noexcept { - return B_.Get(); - } - + return B_.Get(); + } + size_t operator+() const noexcept { - return L_; - } - + return L_; + } + void Reserve(size_t len) { - len = FastClp2(len); - - if (len > L_) { - B_.Reset(new T[len]); - L_ = len; - } - } - - private: - TArrayHolder<T> B_; - size_t L_; - }; - - - template <class T> - class TVirtualize: public IPollerFace { - public: + len = FastClp2(len); + + if (len > L_) { + B_.Reset(new T[len]); + L_ = len; + } + } + + private: + TArrayHolder<T> B_; + size_t L_; + }; + + + template <class T> + class TVirtualize: public IPollerFace { + public: TVirtualize(EContPoller pollerEngine) : PollerEngine_(pollerEngine) { } void Set(const TChange& c) override { - P_.Set(c); - } - + P_.Set(c); + } + void Wait(TEvents& events, TInstant deadLine) override { - P_.Wait(events, deadLine); - } - + P_.Wait(events, deadLine); + } + EContPoller PollEngine() const override { return PollerEngine_; } - private: - T P_; + private: + T P_; const EContPoller PollerEngine_; - }; - + }; + - template <class T> - class TPoller { + template <class T> + class TPoller { using TInternalEvent = typename T::TEvent; - - public: + + public: TPoller() { - E_.Reserve(1); - } - + E_.Reserve(1); + } + void Set(const TChange& c) { - P_.Set(c.Data, c.Fd, c.Flags); - } - + P_.Set(c.Data, c.Fd, c.Flags); + } + void Reserve(size_t size) { E_.Reserve(size); } void Wait(TEvents& events, TInstant deadLine) { - const size_t ret = P_.WaitD(~E_, +E_, deadLine); - + const size_t ret = P_.WaitD(~E_, +E_, deadLine); + events.reserve(ret); - for (size_t i = 0; i < ret; ++i) { - const TInternalEvent* ie = ~E_ + i; - - const TEvent e = { - T::ExtractEvent(ie), - T::ExtractStatus(ie), - (ui16)T::ExtractFilter(ie), - }; - - events.push_back(e); - } - - E_.Reserve(ret + 1); - } - - private: - T P_; - TUnsafeBuf<TInternalEvent> E_; - }; - - - template <class T> - class TIndexedArray { + for (size_t i = 0; i < ret; ++i) { + const TInternalEvent* ie = ~E_ + i; + + const TEvent e = { + T::ExtractEvent(ie), + T::ExtractStatus(ie), + (ui16)T::ExtractFilter(ie), + }; + + events.push_back(e); + } + + E_.Reserve(ret + 1); + } + + private: + T P_; + TUnsafeBuf<TInternalEvent> E_; + }; + + + template <class T> + class TIndexedArray { struct TVal: public T, public TIntrusiveListItem<TVal>, @@ -125,244 +125,244 @@ namespace { // zero-initialization takes place in TVal() expression and the // data is overwritten. TVal() { - } - }; - + } + }; + typedef TIntrusiveList<TVal> TListType; - - public: + + public: typedef typename TListType::TIterator TIterator; typedef typename TListType::TConstIterator TConstIterator; - + TIndexedArray() - : P_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance()) - { - } - + : P_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance()) + { + } + TIterator Begin() noexcept { - return I_.Begin(); - } - + return I_.Begin(); + } + TIterator End() noexcept { - return I_.End(); - } - + return I_.End(); + } + TConstIterator Begin() const noexcept { - return I_.Begin(); - } - + return I_.Begin(); + } + TConstIterator End() const noexcept { - return I_.End(); - } - + return I_.End(); + } + T& operator[](size_t i) { - return *Get(i); - } - + return *Get(i); + } + T* Get(size_t i) { - TValRef& v = V_.Get(i); - + TValRef& v = V_.Get(i); + if (Y_UNLIKELY(!v)) { - v.Reset(new (&P_) TVal()); - I_.PushFront(v.Get()); - } - - Y_PREFETCH_WRITE(v.Get(), 1); - - return v.Get(); - } - + v.Reset(new (&P_) TVal()); + I_.PushFront(v.Get()); + } + + Y_PREFETCH_WRITE(v.Get(), 1); + + return v.Get(); + } + void Erase(size_t i) noexcept { - V_.Get(i).Destroy(); - } - + V_.Get(i).Destroy(); + } + size_t Size() const noexcept { - return I_.Size(); - } - - private: + return I_.Size(); + } + + private: using TValRef = THolder<TVal>; - typename TVal::TPool P_; - TSocketMap<TValRef> V_; + typename TVal::TPool P_; + TSocketMap<TValRef> V_; TListType I_; - }; - + }; + inline short PollFlags(ui16 flags) noexcept { - short ret = 0; - - if (flags & CONT_POLL_READ) { - ret |= POLLIN; - } - - if (flags & CONT_POLL_WRITE) { - ret |= POLLOUT; - } - + short ret = 0; + + if (flags & CONT_POLL_READ) { + ret |= POLLIN; + } + + if (flags & CONT_POLL_WRITE) { + ret |= POLLOUT; + } + #if defined(_linux_) if (flags & CONT_POLL_RDHUP) { ret |= POLLRDHUP; } #endif - return ret; - } - + return ret; + } + - class TPollPoller { - public: + class TPollPoller { + public: size_t Size() const noexcept { - return S_.Size(); - } - - template <class T> + return S_.Size(); + } + + template <class T> void Build(T& t) const { - for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) { - t.Set(*it); - } + for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) { + t.Set(*it); + } t.Reserve(Size()); - } - + } + void Set(const TChange& c) { - if (c.Flags) { - S_[c.Fd] = c; - } else { - S_.Erase(c.Fd); - } - } - + if (c.Flags) { + S_[c.Fd] = c; + } else { + S_.Erase(c.Fd); + } + } + void Wait(TEvents& events, TInstant deadLine) { - T_.clear(); - T_.reserve(Size()); - - for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) { - const pollfd pfd = { - it->Fd, - PollFlags(it->Flags), - 0, - }; - - T_.push_back(pfd); - } - + T_.clear(); + T_.reserve(Size()); + + for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) { + const pollfd pfd = { + it->Fd, + PollFlags(it->Flags), + 0, + }; + + T_.push_back(pfd); + } + const ssize_t ret = PollD(T_.data(), (nfds_t) T_.size(), deadLine); - + if (ret <= 0) { - return; - } - + return; + } + events.reserve(T_.size()); for (size_t i = 0; i < T_.size(); ++i) { - const pollfd& pfd = T_[i]; - const short ev = pfd.revents; - - if (!ev) { - continue; - } - - int status = 0; - ui16 filter = 0; - + const pollfd& pfd = T_[i]; + const short ev = pfd.revents; + + if (!ev) { + continue; + } + + int status = 0; + ui16 filter = 0; + // We are perfectly fine with an EOF while reading a pipe or a unix socket if ((ev & POLLIN) || (ev & POLLHUP) && (pfd.events & POLLIN)) { - filter |= CONT_POLL_READ; - } - - if (ev & POLLOUT) { - filter |= CONT_POLL_WRITE; - } - + filter |= CONT_POLL_READ; + } + + if (ev & POLLOUT) { + filter |= CONT_POLL_WRITE; + } + #if defined(_linux_) if (ev & POLLRDHUP) { filter |= CONT_POLL_RDHUP; } #endif - if (ev & POLLERR) { - status = EIO; + if (ev & POLLERR) { + status = EIO; } else if (ev & POLLHUP && pfd.events & POLLOUT) { // Only write operations may cause EPIPE - status = EPIPE; - } else if (ev & POLLNVAL) { - status = EINVAL; - } - - if (status) { + status = EPIPE; + } else if (ev & POLLNVAL) { + status = EINVAL; + } + + if (status) { filter = CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_RDHUP; - } - - const TEvent res = { - S_[pfd.fd].Data, - status, - filter, - }; - - events.push_back(res); - } - } - - private: - typedef TIndexedArray<TChange> TFds; - TFds S_; + } + + const TEvent res = { + S_[pfd.fd].Data, + status, + filter, + }; + + events.push_back(res); + } + } + + private: + typedef TIndexedArray<TChange> TFds; + TFds S_; typedef TVector<pollfd> TPollVec; - TPollVec T_; - }; - - - class TCombinedPoller { - typedef TPoller<TPollerImpl<TWithoutLocking>> TDefaultPoller; - - public: + TPollVec T_; + }; + + + class TCombinedPoller { + typedef TPoller<TPollerImpl<TWithoutLocking>> TDefaultPoller; + + public: TCombinedPoller() { - P_.Reset(new TPollPoller()); - } - + P_.Reset(new TPollPoller()); + } + void Set(const TChange& c) { - if (!P_) { - D_->Set(c); - } else { - P_->Set(c); - } - } - + if (!P_) { + D_->Set(c); + } else { + P_->Set(c); + } + } + void Wait(TEvents& events, TInstant deadLine) { - if (!P_) { - D_->Wait(events, deadLine); - } else { - if (P_->Size() > 200) { - D_.Reset(new TDefaultPoller()); - P_->Build(*D_); - P_.Destroy(); - D_->Wait(events, deadLine); - } else { - P_->Wait(events, deadLine); - } - } - } - - private: + if (!P_) { + D_->Wait(events, deadLine); + } else { + if (P_->Size() > 200) { + D_.Reset(new TDefaultPoller()); + P_->Build(*D_); + P_.Destroy(); + D_->Wait(events, deadLine); + } else { + P_->Wait(events, deadLine); + } + } + } + + private: THolder<TPollPoller> P_; THolder<TDefaultPoller> D_; - }; - - struct TUserPoller: public TString { + }; + + struct TUserPoller: public TString { TUserPoller() - : TString(GetEnv("USER_POLLER")) - { - } - }; -} - + : TString(GetEnv("USER_POLLER")) + { + } + }; +} + THolder<IPollerFace> IPollerFace::Default() { return Construct(*SingletonWithPriority<TUserPoller, 0>()); -} - +} + THolder<IPollerFace> IPollerFace::Construct(TStringBuf name) { return Construct(name ? FromString<EContPoller>(name) : EContPoller::Default); } - + THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) { switch (poller) { case EContPoller::Default: @@ -373,18 +373,18 @@ THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) { case EContPoller::Poll: return MakeHolder<TVirtualize<TPollPoller>>(poller); case EContPoller::Epoll: -#if defined(HAVE_EPOLL_POLLER) +#if defined(HAVE_EPOLL_POLLER) return MakeHolder<TVirtualize<TPoller<TGenericPoller<TEpollPoller<TWithoutLocking>>>>>(poller); #else return nullptr; -#endif +#endif case EContPoller::Kqueue: #if defined(HAVE_KQUEUE_POLLER) return MakeHolder<TVirtualize<TPoller<TGenericPoller<TKqueuePoller<TWithoutLocking>>>>>(poller); #else return nullptr; -#endif +#endif default: Y_FAIL("bad poller type"); - } -} + } +} |