diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/network/poller_ut.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/network/poller_ut.cpp')
-rw-r--r-- | util/network/poller_ut.cpp | 236 |
1 files changed, 236 insertions, 0 deletions
diff --git a/util/network/poller_ut.cpp b/util/network/poller_ut.cpp new file mode 100644 index 0000000000..6df0dda8ec --- /dev/null +++ b/util/network/poller_ut.cpp @@ -0,0 +1,236 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <util/system/error.h> + +#include "pair.h" +#include "poller.h" +#include "pollerimpl.h" + +Y_UNIT_TEST_SUITE(TSocketPollerTest) { + Y_UNIT_TEST(TestSimple) { + SOCKET sockets[2]; + UNIT_ASSERT(SocketPair(sockets) == 0); + + TSocketHolder s1(sockets[0]); + TSocketHolder s2(sockets[1]); + + TSocketPoller poller; + poller.WaitRead(sockets[1], (void*)17); + + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + for (ui32 i = 0; i < 3; ++i) { + char buf[] = {18}; + UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); + + UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); + + UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0)); + UNIT_ASSERT_VALUES_EQUAL(18, buf[0]); + + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + } + } + + Y_UNIT_TEST(TestSimpleOneShot) { + SOCKET sockets[2]; + UNIT_ASSERT(SocketPair(sockets) == 0); + + TSocketHolder s1(sockets[0]); + TSocketHolder s2(sockets[1]); + + TSocketPoller poller; + + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + for (ui32 i = 0; i < 3; ++i) { + poller.WaitReadOneShot(sockets[1], (void*)17); + + char buf[1]; + + buf[0] = i + 20; + + UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); + + UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); + + UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0)); + UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[0]); + + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + buf[0] = i + 21; + + UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); + + // this fails if socket is not oneshot + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0)); + UNIT_ASSERT_VALUES_EQUAL(char(i + 21), buf[0]); + } + } + + Y_UNIT_TEST(TestItIsSafeToUnregisterUnregisteredDescriptor) { + SOCKET sockets[2]; + UNIT_ASSERT(SocketPair(sockets) == 0); + + TSocketHolder s1(sockets[0]); + TSocketHolder s2(sockets[1]); + + TSocketPoller poller; + + poller.Unwait(s1); + } + + Y_UNIT_TEST(TestItIsSafeToReregisterDescriptor) { + SOCKET sockets[2]; + UNIT_ASSERT(SocketPair(sockets) == 0); + + TSocketHolder s1(sockets[0]); + TSocketHolder s2(sockets[1]); + + TSocketPoller poller; + + poller.WaitRead(s1, nullptr); + poller.WaitRead(s1, nullptr); + poller.WaitWrite(s1, nullptr); + } + + Y_UNIT_TEST(TestSimpleEdgeTriggered) { + SOCKET sockets[2]; + UNIT_ASSERT(SocketPair(sockets) == 0); + + TSocketHolder s1(sockets[0]); + TSocketHolder s2(sockets[1]); + + SetNonBlock(sockets[1]); + + TSocketPoller poller; + + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + for (ui32 i = 0; i < 3; ++i) { + poller.WaitReadWriteEdgeTriggered(sockets[1], (void*)17); + + // notify about writeble + UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + char buf[2]; + + buf[0] = i + 10; + buf[1] = i + 20; + + // send one byte + UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); + + UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + // restart without reading + poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, false); + + // after restart read and write might generate separate events + { + void* events[3]; + size_t count = poller.WaitT(events, 3, TDuration::Zero()); + UNIT_ASSERT_GE(count, 1); + UNIT_ASSERT_LE(count, 2); + UNIT_ASSERT_VALUES_EQUAL(events[0], (void*)17); + } + + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + // second two more bytes + UNIT_ASSERT_VALUES_EQUAL(2, send(sockets[0], buf, 2, 0)); + + // here poller could notify or not because we haven't seen end + Y_UNUSED(poller.WaitT(TDuration::Zero())); + + // recv one, leave two + UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0)); + UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); + + // nothing new + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + // recv the rest + UNIT_ASSERT_VALUES_EQUAL(2, recv(sockets[1], buf, 2, 0)); + UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); + UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[1]); + + // still nothing new + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + // hit end + ClearLastSystemError(); + UNIT_ASSERT_VALUES_EQUAL(-1, recv(sockets[1], buf, 1, 0)); + UNIT_ASSERT_VALUES_EQUAL(EAGAIN, LastSystemError()); + + // restart after end (noop for epoll) + poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true); + + // send and recv byte + UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); + + UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + // recv and see end + UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0)); + UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); + + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + // the same but send before restart + UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); + + // restart after end (noop for epoll) + poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true); + + UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0)); + UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); + + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + poller.Unwait(sockets[1]); + } + } + +#if defined(HAVE_EPOLL_POLLER) + Y_UNIT_TEST(TestRdhup) { + SOCKET sockets[2]; + UNIT_ASSERT(SocketPair(sockets) == 0); + + TSocketHolder s1(sockets[0]); + TSocketHolder s2(sockets[1]); + + char buf[1] = {0}; + UNIT_ASSERT_VALUES_EQUAL(1, send(s1, buf, 1, 0)); + shutdown(s1, SHUT_WR); + + using TPoller = TGenericPoller<TEpollPoller<TWithoutLocking>>; + TPoller poller; + poller.Set((void*)17, s2, CONT_POLL_RDHUP); + + TPoller::TEvent e; + UNIT_ASSERT_VALUES_EQUAL(poller.WaitD(&e, 1, TDuration::Zero().ToDeadLine()), 1); + UNIT_ASSERT_EQUAL(TPoller::ExtractStatus(&e), 0); + UNIT_ASSERT_EQUAL(TPoller::ExtractFilter(&e), CONT_POLL_RDHUP); + UNIT_ASSERT_EQUAL(TPoller::ExtractEvent(&e), (void*)17); + } +#endif +} |