aboutsummaryrefslogtreecommitdiffstats
path: root/util/network
diff options
context:
space:
mode:
authorkhlebnikov <khlebnikov@yandex-team.ru>2022-02-10 16:50:08 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:08 +0300
commit6cffcf9a14a1dd07278bd534c7cca706ec2827b3 (patch)
tree48eb57e1d9fd00d624ca68bb3418c3c041d1b096 /util/network
parent1977f1c7bcb225f59f789f5f8735e03eb0c87e1c (diff)
downloadydb-6cffcf9a14a1dd07278bd534c7cca706ec2827b3.tar.gz
Restoring authorship annotation for <khlebnikov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'util/network')
-rw-r--r--util/network/poller.cpp12
-rw-r--r--util/network/poller.h6
-rw-r--r--util/network/poller_ut.cpp218
-rw-r--r--util/network/pollerimpl.h98
4 files changed, 167 insertions, 167 deletions
diff --git a/util/network/poller.cpp b/util/network/poller.cpp
index 7954d0e8b5..470fc1d1c2 100644
--- a/util/network/poller.cpp
+++ b/util/network/poller.cpp
@@ -69,14 +69,14 @@ void TSocketPoller::WaitReadWriteOneShot(SOCKET sock, void* cookie) {
Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_ONE_SHOT);
}
-void TSocketPoller::WaitReadWriteEdgeTriggered(SOCKET sock, void* cookie) {
+void TSocketPoller::WaitReadWriteEdgeTriggered(SOCKET sock, void* cookie) {
Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_EDGE_TRIGGERED);
-}
-
-void TSocketPoller::RestartReadWriteEdgeTriggered(SOCKET sock, void* cookie, bool empty) {
+}
+
+void TSocketPoller::RestartReadWriteEdgeTriggered(SOCKET sock, void* cookie, bool empty) {
Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_MODIFY | CONT_POLL_EDGE_TRIGGERED | (empty ? CONT_POLL_BACKLOG_EMPTY : 0));
-}
-
+}
+
void TSocketPoller::Unwait(SOCKET sock) {
Impl_->Remove(sock);
}
diff --git a/util/network/poller.h b/util/network/poller.h
index 8dccd73140..5a16010676 100644
--- a/util/network/poller.h
+++ b/util/network/poller.h
@@ -19,9 +19,9 @@ public:
void WaitWriteOneShot(SOCKET sock, void* cookie);
void WaitReadWriteOneShot(SOCKET sock, void* cookie);
- void WaitReadWriteEdgeTriggered(SOCKET sock, void* cookie);
- void RestartReadWriteEdgeTriggered(SOCKET sock, void* cookie, bool empty = true);
-
+ void WaitReadWriteEdgeTriggered(SOCKET sock, void* cookie);
+ void RestartReadWriteEdgeTriggered(SOCKET sock, void* cookie, bool empty = true);
+
void Unwait(SOCKET sock);
size_t WaitD(void** events, size_t len, const TInstant& deadLine);
diff --git a/util/network/poller_ut.cpp b/util/network/poller_ut.cpp
index 6df0dda8ec..9a2370aff8 100644
--- a/util/network/poller_ut.cpp
+++ b/util/network/poller_ut.cpp
@@ -1,5 +1,5 @@
#include <library/cpp/testing/unittest/registar.h>
-#include <util/system/error.h>
+#include <util/system/error.h>
#include "pair.h"
#include "poller.h"
@@ -100,115 +100,115 @@ Y_UNIT_TEST_SUITE(TSocketPollerTest) {
poller.WaitRead(s1, nullptr);
poller.WaitWrite(s1, nullptr);
}
-
- Y_UNIT_TEST(TestSimpleEdgeTriggered) {
- SOCKET sockets[2];
- UNIT_ASSERT(SocketPair(sockets) == 0);
-
- TSocketHolder s1(sockets[0]);
- TSocketHolder s2(sockets[1]);
-
- SetNonBlock(sockets[1]);
-
- TSocketPoller poller;
-
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
-
- for (ui32 i = 0; i < 3; ++i) {
- poller.WaitReadWriteEdgeTriggered(sockets[1], (void*)17);
-
- // notify about writeble
- UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
-
- char buf[2];
-
- buf[0] = i + 10;
- buf[1] = i + 20;
-
- // send one byte
- UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
-
- UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
-
- // restart without reading
- poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, false);
-
- // after restart read and write might generate separate events
- {
+
+ Y_UNIT_TEST(TestSimpleEdgeTriggered) {
+ SOCKET sockets[2];
+ UNIT_ASSERT(SocketPair(sockets) == 0);
+
+ TSocketHolder s1(sockets[0]);
+ TSocketHolder s2(sockets[1]);
+
+ SetNonBlock(sockets[1]);
+
+ TSocketPoller poller;
+
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+
+ for (ui32 i = 0; i < 3; ++i) {
+ poller.WaitReadWriteEdgeTriggered(sockets[1], (void*)17);
+
+ // notify about writeble
+ UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+
+ char buf[2];
+
+ buf[0] = i + 10;
+ buf[1] = i + 20;
+
+ // send one byte
+ UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
+
+ UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+
+ // restart without reading
+ poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, false);
+
+ // after restart read and write might generate separate events
+ {
void* events[3];
- size_t count = poller.WaitT(events, 3, TDuration::Zero());
- UNIT_ASSERT_GE(count, 1);
- UNIT_ASSERT_LE(count, 2);
- UNIT_ASSERT_VALUES_EQUAL(events[0], (void*)17);
- }
-
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
-
- // second two more bytes
- UNIT_ASSERT_VALUES_EQUAL(2, send(sockets[0], buf, 2, 0));
-
- // here poller could notify or not because we haven't seen end
- Y_UNUSED(poller.WaitT(TDuration::Zero()));
-
- // recv one, leave two
- UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0));
- UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]);
-
- // nothing new
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
-
- // recv the rest
- UNIT_ASSERT_VALUES_EQUAL(2, recv(sockets[1], buf, 2, 0));
- UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]);
- UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[1]);
-
- // still nothing new
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
-
- // hit end
- ClearLastSystemError();
- UNIT_ASSERT_VALUES_EQUAL(-1, recv(sockets[1], buf, 1, 0));
- UNIT_ASSERT_VALUES_EQUAL(EAGAIN, LastSystemError());
-
- // restart after end (noop for epoll)
- poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true);
-
- // send and recv byte
- UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
-
- UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
-
- // recv and see end
- UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0));
- UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]);
-
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
-
- // the same but send before restart
- UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
-
- // restart after end (noop for epoll)
- poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true);
-
- UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
-
- UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0));
- UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]);
-
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
-
- poller.Unwait(sockets[1]);
- }
- }
+ size_t count = poller.WaitT(events, 3, TDuration::Zero());
+ UNIT_ASSERT_GE(count, 1);
+ UNIT_ASSERT_LE(count, 2);
+ UNIT_ASSERT_VALUES_EQUAL(events[0], (void*)17);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+
+ // second two more bytes
+ UNIT_ASSERT_VALUES_EQUAL(2, send(sockets[0], buf, 2, 0));
+
+ // here poller could notify or not because we haven't seen end
+ Y_UNUSED(poller.WaitT(TDuration::Zero()));
+
+ // recv one, leave two
+ UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0));
+ UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]);
+
+ // nothing new
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+
+ // recv the rest
+ UNIT_ASSERT_VALUES_EQUAL(2, recv(sockets[1], buf, 2, 0));
+ UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]);
+ UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[1]);
+
+ // still nothing new
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+
+ // hit end
+ ClearLastSystemError();
+ UNIT_ASSERT_VALUES_EQUAL(-1, recv(sockets[1], buf, 1, 0));
+ UNIT_ASSERT_VALUES_EQUAL(EAGAIN, LastSystemError());
+
+ // restart after end (noop for epoll)
+ poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true);
+
+ // send and recv byte
+ UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
+
+ UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+
+ // recv and see end
+ UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0));
+ UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]);
+
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+
+ // the same but send before restart
+ UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
+
+ // restart after end (noop for epoll)
+ poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true);
+
+ UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+
+ UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0));
+ UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]);
+
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+
+ poller.Unwait(sockets[1]);
+ }
+ }
#if defined(HAVE_EPOLL_POLLER)
Y_UNIT_TEST(TestRdhup) {
diff --git a/util/network/pollerimpl.h b/util/network/pollerimpl.h
index e8c7e40fba..2c43c09826 100644
--- a/util/network/pollerimpl.h
+++ b/util/network/pollerimpl.h
@@ -107,24 +107,24 @@ public:
inline void SetImpl(void* data, int fd, int what) {
TEvent e[2];
- int flags = EV_ADD;
-
- if (what & CONT_POLL_EDGE_TRIGGERED) {
- if (what & CONT_POLL_BACKLOG_EMPTY) {
- // When backlog is empty, edge-triggered does not need restart.
- return;
- }
- flags |= EV_CLEAR;
- }
-
- if (what & CONT_POLL_ONE_SHOT) {
- flags |= EV_ONESHOT;
- }
-
+ int flags = EV_ADD;
+
+ if (what & CONT_POLL_EDGE_TRIGGERED) {
+ if (what & CONT_POLL_BACKLOG_EMPTY) {
+ // When backlog is empty, edge-triggered does not need restart.
+ return;
+ }
+ flags |= EV_CLEAR;
+ }
+
+ if (what & CONT_POLL_ONE_SHOT) {
+ flags |= EV_ONESHOT;
+ }
+
Zero(e);
- EV_SET(e + 0, fd, EVFILT_READ, flags | ((what & CONT_POLL_READ) ? EV_ENABLE : EV_DISABLE), 0, 0, data);
- EV_SET(e + 1, fd, EVFILT_WRITE, flags | ((what & CONT_POLL_WRITE) ? EV_ENABLE : EV_DISABLE), 0, 0, data);
+ EV_SET(e + 0, fd, EVFILT_READ, flags | ((what & CONT_POLL_READ) ? EV_ENABLE : EV_DISABLE), 0, 0, data);
+ EV_SET(e + 1, fd, EVFILT_WRITE, flags | ((what & CONT_POLL_WRITE) ? EV_ENABLE : EV_DISABLE), 0, 0, data);
if (Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1) {
ythrow TSystemError() << "kevent add failed";
@@ -225,33 +225,33 @@ public:
Zero(e);
- if (what & CONT_POLL_EDGE_TRIGGERED) {
- if (what & CONT_POLL_BACKLOG_EMPTY) {
- // When backlog is empty, edge-triggered does not need restart.
- return;
- }
- e.events |= EPOLLET;
- }
-
- if (what & CONT_POLL_ONE_SHOT) {
- e.events |= EPOLLONESHOT;
- }
-
- if (what & CONT_POLL_READ) {
- e.events |= EPOLLIN;
- }
-
- if (what & CONT_POLL_WRITE) {
- e.events |= EPOLLOUT;
- }
-
+ if (what & CONT_POLL_EDGE_TRIGGERED) {
+ if (what & CONT_POLL_BACKLOG_EMPTY) {
+ // When backlog is empty, edge-triggered does not need restart.
+ return;
+ }
+ e.events |= EPOLLET;
+ }
+
+ if (what & CONT_POLL_ONE_SHOT) {
+ e.events |= EPOLLONESHOT;
+ }
+
+ if (what & CONT_POLL_READ) {
+ e.events |= EPOLLIN;
+ }
+
+ if (what & CONT_POLL_WRITE) {
+ e.events |= EPOLLOUT;
+ }
+
if (what & CONT_POLL_RDHUP) {
e.events |= EPOLLRDHUP;
}
e.data.ptr = data;
- if ((what & CONT_POLL_MODIFY) || epoll_ctl(Fd_, EPOLL_CTL_ADD, fd, &e) == -1) {
+ if ((what & CONT_POLL_MODIFY) || epoll_ctl(Fd_, EPOLL_CTL_ADD, fd, &e) == -1) {
if (epoll_ctl(Fd_, EPOLL_CTL_MOD, fd, &e) == -1) {
ythrow TSystemError() << "epoll add failed";
}
@@ -345,10 +345,10 @@ struct TSelectPollerNoTemplate {
Filter_ = s;
}
- inline void Clear(int c) noexcept {
- Filter_ &= ~c;
- }
-
+ inline void Clear(int c) noexcept {
+ Filter_ &= ~c;
+ }
+
inline int Filter() const noexcept {
return Filter_;
}
@@ -523,9 +523,9 @@ public:
TEvent* eventsStart = events;
- for (typename TFds::iterator it = Fds_.begin(); it != Fds_.end(); ++it) {
+ for (typename TFds::iterator it = Fds_.begin(); it != Fds_.end(); ++it) {
const SOCKET fd = it->first;
- THandle& handle = it->second;
+ THandle& handle = it->second;
if (FD_ISSET(fd, errFds)) {
(events++)->Error(handle.Data(), EIO);
@@ -553,12 +553,12 @@ public:
*keysToDeleteEnd = fd;
++keysToDeleteEnd;
}
-
- if (handle.Filter() & CONT_POLL_EDGE_TRIGGERED) {
- // Emulate edge-triggered for level-triggered select().
- // User must restart waiting this event when needed.
- handle.Clear(what);
- }
+
+ if (handle.Filter() & CONT_POLL_EDGE_TRIGGERED) {
+ // Emulate edge-triggered for level-triggered select().
+ // User must restart waiting this event when needed.
+ handle.Clear(what);
+ }
}
}
}