diff options
author | khlebnikov <khlebnikov@yandex-team.ru> | 2022-02-10 16:50:08 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:08 +0300 |
commit | 6cffcf9a14a1dd07278bd534c7cca706ec2827b3 (patch) | |
tree | 48eb57e1d9fd00d624ca68bb3418c3c041d1b096 /util/network | |
parent | 1977f1c7bcb225f59f789f5f8735e03eb0c87e1c (diff) | |
download | ydb-6cffcf9a14a1dd07278bd534c7cca706ec2827b3.tar.gz |
Restoring authorship annotation for <khlebnikov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'util/network')
-rw-r--r-- | util/network/poller.cpp | 12 | ||||
-rw-r--r-- | util/network/poller.h | 6 | ||||
-rw-r--r-- | util/network/poller_ut.cpp | 218 | ||||
-rw-r--r-- | util/network/pollerimpl.h | 98 |
4 files changed, 167 insertions, 167 deletions
diff --git a/util/network/poller.cpp b/util/network/poller.cpp index 7954d0e8b5..470fc1d1c2 100644 --- a/util/network/poller.cpp +++ b/util/network/poller.cpp @@ -69,14 +69,14 @@ void TSocketPoller::WaitReadWriteOneShot(SOCKET sock, void* cookie) { Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_ONE_SHOT); } -void TSocketPoller::WaitReadWriteEdgeTriggered(SOCKET sock, void* cookie) { +void TSocketPoller::WaitReadWriteEdgeTriggered(SOCKET sock, void* cookie) { Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_EDGE_TRIGGERED); -} - -void TSocketPoller::RestartReadWriteEdgeTriggered(SOCKET sock, void* cookie, bool empty) { +} + +void TSocketPoller::RestartReadWriteEdgeTriggered(SOCKET sock, void* cookie, bool empty) { Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_MODIFY | CONT_POLL_EDGE_TRIGGERED | (empty ? CONT_POLL_BACKLOG_EMPTY : 0)); -} - +} + void TSocketPoller::Unwait(SOCKET sock) { Impl_->Remove(sock); } diff --git a/util/network/poller.h b/util/network/poller.h index 8dccd73140..5a16010676 100644 --- a/util/network/poller.h +++ b/util/network/poller.h @@ -19,9 +19,9 @@ public: void WaitWriteOneShot(SOCKET sock, void* cookie); void WaitReadWriteOneShot(SOCKET sock, void* cookie); - void WaitReadWriteEdgeTriggered(SOCKET sock, void* cookie); - void RestartReadWriteEdgeTriggered(SOCKET sock, void* cookie, bool empty = true); - + void WaitReadWriteEdgeTriggered(SOCKET sock, void* cookie); + void RestartReadWriteEdgeTriggered(SOCKET sock, void* cookie, bool empty = true); + void Unwait(SOCKET sock); size_t WaitD(void** events, size_t len, const TInstant& deadLine); diff --git a/util/network/poller_ut.cpp b/util/network/poller_ut.cpp index 6df0dda8ec..9a2370aff8 100644 --- a/util/network/poller_ut.cpp +++ b/util/network/poller_ut.cpp @@ -1,5 +1,5 @@ #include <library/cpp/testing/unittest/registar.h> -#include <util/system/error.h> +#include <util/system/error.h> #include "pair.h" #include "poller.h" @@ -100,115 +100,115 @@ Y_UNIT_TEST_SUITE(TSocketPollerTest) { poller.WaitRead(s1, nullptr); poller.WaitWrite(s1, nullptr); } - - Y_UNIT_TEST(TestSimpleEdgeTriggered) { - SOCKET sockets[2]; - UNIT_ASSERT(SocketPair(sockets) == 0); - - TSocketHolder s1(sockets[0]); - TSocketHolder s2(sockets[1]); - - SetNonBlock(sockets[1]); - - TSocketPoller poller; - - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - for (ui32 i = 0; i < 3; ++i) { - poller.WaitReadWriteEdgeTriggered(sockets[1], (void*)17); - - // notify about writeble - UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - char buf[2]; - - buf[0] = i + 10; - buf[1] = i + 20; - - // send one byte - UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); - - UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - // restart without reading - poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, false); - - // after restart read and write might generate separate events - { + + Y_UNIT_TEST(TestSimpleEdgeTriggered) { + SOCKET sockets[2]; + UNIT_ASSERT(SocketPair(sockets) == 0); + + TSocketHolder s1(sockets[0]); + TSocketHolder s2(sockets[1]); + + SetNonBlock(sockets[1]); + + TSocketPoller poller; + + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + for (ui32 i = 0; i < 3; ++i) { + poller.WaitReadWriteEdgeTriggered(sockets[1], (void*)17); + + // notify about writeble + UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + char buf[2]; + + buf[0] = i + 10; + buf[1] = i + 20; + + // send one byte + UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); + + UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + // restart without reading + poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, false); + + // after restart read and write might generate separate events + { void* events[3]; - size_t count = poller.WaitT(events, 3, TDuration::Zero()); - UNIT_ASSERT_GE(count, 1); - UNIT_ASSERT_LE(count, 2); - UNIT_ASSERT_VALUES_EQUAL(events[0], (void*)17); - } - - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - // second two more bytes - UNIT_ASSERT_VALUES_EQUAL(2, send(sockets[0], buf, 2, 0)); - - // here poller could notify or not because we haven't seen end - Y_UNUSED(poller.WaitT(TDuration::Zero())); - - // recv one, leave two - UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0)); - UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); - - // nothing new - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - // recv the rest - UNIT_ASSERT_VALUES_EQUAL(2, recv(sockets[1], buf, 2, 0)); - UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); - UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[1]); - - // still nothing new - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - // hit end - ClearLastSystemError(); - UNIT_ASSERT_VALUES_EQUAL(-1, recv(sockets[1], buf, 1, 0)); - UNIT_ASSERT_VALUES_EQUAL(EAGAIN, LastSystemError()); - - // restart after end (noop for epoll) - poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true); - - // send and recv byte - UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); - - UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - // recv and see end - UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0)); - UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); - - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - // the same but send before restart - UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); - - // restart after end (noop for epoll) - poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true); - - UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0)); - UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); - - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - poller.Unwait(sockets[1]); - } - } + size_t count = poller.WaitT(events, 3, TDuration::Zero()); + UNIT_ASSERT_GE(count, 1); + UNIT_ASSERT_LE(count, 2); + UNIT_ASSERT_VALUES_EQUAL(events[0], (void*)17); + } + + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + // second two more bytes + UNIT_ASSERT_VALUES_EQUAL(2, send(sockets[0], buf, 2, 0)); + + // here poller could notify or not because we haven't seen end + Y_UNUSED(poller.WaitT(TDuration::Zero())); + + // recv one, leave two + UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0)); + UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); + + // nothing new + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + // recv the rest + UNIT_ASSERT_VALUES_EQUAL(2, recv(sockets[1], buf, 2, 0)); + UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); + UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[1]); + + // still nothing new + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + // hit end + ClearLastSystemError(); + UNIT_ASSERT_VALUES_EQUAL(-1, recv(sockets[1], buf, 1, 0)); + UNIT_ASSERT_VALUES_EQUAL(EAGAIN, LastSystemError()); + + // restart after end (noop for epoll) + poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true); + + // send and recv byte + UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); + + UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + // recv and see end + UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0)); + UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); + + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + // the same but send before restart + UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); + + // restart after end (noop for epoll) + poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true); + + UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0)); + UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); + + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + poller.Unwait(sockets[1]); + } + } #if defined(HAVE_EPOLL_POLLER) Y_UNIT_TEST(TestRdhup) { diff --git a/util/network/pollerimpl.h b/util/network/pollerimpl.h index e8c7e40fba..2c43c09826 100644 --- a/util/network/pollerimpl.h +++ b/util/network/pollerimpl.h @@ -107,24 +107,24 @@ public: inline void SetImpl(void* data, int fd, int what) { TEvent e[2]; - int flags = EV_ADD; - - if (what & CONT_POLL_EDGE_TRIGGERED) { - if (what & CONT_POLL_BACKLOG_EMPTY) { - // When backlog is empty, edge-triggered does not need restart. - return; - } - flags |= EV_CLEAR; - } - - if (what & CONT_POLL_ONE_SHOT) { - flags |= EV_ONESHOT; - } - + int flags = EV_ADD; + + if (what & CONT_POLL_EDGE_TRIGGERED) { + if (what & CONT_POLL_BACKLOG_EMPTY) { + // When backlog is empty, edge-triggered does not need restart. + return; + } + flags |= EV_CLEAR; + } + + if (what & CONT_POLL_ONE_SHOT) { + flags |= EV_ONESHOT; + } + Zero(e); - EV_SET(e + 0, fd, EVFILT_READ, flags | ((what & CONT_POLL_READ) ? EV_ENABLE : EV_DISABLE), 0, 0, data); - EV_SET(e + 1, fd, EVFILT_WRITE, flags | ((what & CONT_POLL_WRITE) ? EV_ENABLE : EV_DISABLE), 0, 0, data); + EV_SET(e + 0, fd, EVFILT_READ, flags | ((what & CONT_POLL_READ) ? EV_ENABLE : EV_DISABLE), 0, 0, data); + EV_SET(e + 1, fd, EVFILT_WRITE, flags | ((what & CONT_POLL_WRITE) ? EV_ENABLE : EV_DISABLE), 0, 0, data); if (Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1) { ythrow TSystemError() << "kevent add failed"; @@ -225,33 +225,33 @@ public: Zero(e); - if (what & CONT_POLL_EDGE_TRIGGERED) { - if (what & CONT_POLL_BACKLOG_EMPTY) { - // When backlog is empty, edge-triggered does not need restart. - return; - } - e.events |= EPOLLET; - } - - if (what & CONT_POLL_ONE_SHOT) { - e.events |= EPOLLONESHOT; - } - - if (what & CONT_POLL_READ) { - e.events |= EPOLLIN; - } - - if (what & CONT_POLL_WRITE) { - e.events |= EPOLLOUT; - } - + if (what & CONT_POLL_EDGE_TRIGGERED) { + if (what & CONT_POLL_BACKLOG_EMPTY) { + // When backlog is empty, edge-triggered does not need restart. + return; + } + e.events |= EPOLLET; + } + + if (what & CONT_POLL_ONE_SHOT) { + e.events |= EPOLLONESHOT; + } + + if (what & CONT_POLL_READ) { + e.events |= EPOLLIN; + } + + if (what & CONT_POLL_WRITE) { + e.events |= EPOLLOUT; + } + if (what & CONT_POLL_RDHUP) { e.events |= EPOLLRDHUP; } e.data.ptr = data; - if ((what & CONT_POLL_MODIFY) || epoll_ctl(Fd_, EPOLL_CTL_ADD, fd, &e) == -1) { + if ((what & CONT_POLL_MODIFY) || epoll_ctl(Fd_, EPOLL_CTL_ADD, fd, &e) == -1) { if (epoll_ctl(Fd_, EPOLL_CTL_MOD, fd, &e) == -1) { ythrow TSystemError() << "epoll add failed"; } @@ -345,10 +345,10 @@ struct TSelectPollerNoTemplate { Filter_ = s; } - inline void Clear(int c) noexcept { - Filter_ &= ~c; - } - + inline void Clear(int c) noexcept { + Filter_ &= ~c; + } + inline int Filter() const noexcept { return Filter_; } @@ -523,9 +523,9 @@ public: TEvent* eventsStart = events; - for (typename TFds::iterator it = Fds_.begin(); it != Fds_.end(); ++it) { + for (typename TFds::iterator it = Fds_.begin(); it != Fds_.end(); ++it) { const SOCKET fd = it->first; - THandle& handle = it->second; + THandle& handle = it->second; if (FD_ISSET(fd, errFds)) { (events++)->Error(handle.Data(), EIO); @@ -553,12 +553,12 @@ public: *keysToDeleteEnd = fd; ++keysToDeleteEnd; } - - if (handle.Filter() & CONT_POLL_EDGE_TRIGGERED) { - // Emulate edge-triggered for level-triggered select(). - // User must restart waiting this event when needed. - handle.Clear(what); - } + + if (handle.Filter() & CONT_POLL_EDGE_TRIGGERED) { + // Emulate edge-triggered for level-triggered select(). + // User must restart waiting this event when needed. + handle.Clear(what); + } } } } |