#include "socket.h" #include "pair.h" #include <library/cpp/testing/unittest/registar.h> #include <util/string/builder.h> #include <util/generic/vector.h> #include <ctime> #ifdef _linux_ #include <linux/version.h> #include <sys/utsname.h> #endif class TSockTest: public TTestBase { UNIT_TEST_SUITE(TSockTest); UNIT_TEST(TestSock); UNIT_TEST(TestTimeout); #ifndef _win_ // Test hangs on Windows UNIT_TEST_EXCEPTION(TestConnectionRefused, yexception); #endif UNIT_TEST(TestNetworkResolutionError); UNIT_TEST(TestNetworkResolutionErrorMessage); UNIT_TEST(TestBrokenPipe); UNIT_TEST(TestClose); UNIT_TEST(TestReusePortAvailCheck); UNIT_TEST_SUITE_END(); public: void TestSock(); void TestTimeout(); void TestConnectionRefused(); void TestNetworkResolutionError(); void TestNetworkResolutionErrorMessage(); void TestBrokenPipe(); void TestClose(); void TestReusePortAvailCheck(); }; UNIT_TEST_SUITE_REGISTRATION(TSockTest); void TSockTest::TestSock() { TNetworkAddress addr("yandex.ru", 80); TSocket s(addr); TSocketOutput so(s); TSocketInput si(s); const TStringBuf req = "GET / HTTP/1.1\r\nHost: yandex.ru\r\n\r\n"; so.Write(req.data(), req.size()); UNIT_ASSERT(!si.ReadLine().empty()); } void TSockTest::TestTimeout() { static const int timeout = 1000; i64 startTime = millisec(); try { TNetworkAddress addr("localhost", 1313); TSocket s(addr, TDuration::MilliSeconds(timeout)); } catch (const yexception&) { } int realTimeout = (int)(millisec() - startTime); if (realTimeout > timeout + 2000) { TString err = TStringBuilder() << "Timeout exceeded: " << realTimeout << " ms (expected " << timeout << " ms)"; UNIT_FAIL(err); } } void TSockTest::TestConnectionRefused() { TNetworkAddress addr("localhost", 1313); TSocket s(addr); } void TSockTest::TestNetworkResolutionError() { TString errMsg; try { TNetworkAddress addr("", 0); } catch (const TNetworkResolutionError& e) { errMsg = e.what(); } if (errMsg.empty()) { return; // on Windows getaddrinfo("", 0, ...) returns "OK" } int expectedErr = EAI_NONAME; TString expectedErrMsg = gai_strerror(expectedErr); if (errMsg.find(expectedErrMsg) == TString::npos) { UNIT_FAIL("TNetworkResolutionError contains\nInvalid msg: " + errMsg + "\nExpected msg: " + expectedErrMsg + "\n"); } } void TSockTest::TestNetworkResolutionErrorMessage() { #ifdef _unix_ auto str = [](int code) -> TString { return TNetworkResolutionError(code).what(); }; auto expected = [](int code) -> TString { return gai_strerror(code); }; struct TErrnoGuard { TErrnoGuard() : PrevValue_(errno) { } ~TErrnoGuard() { errno = PrevValue_; } private: int PrevValue_; } g; UNIT_ASSERT_VALUES_EQUAL(expected(0) + "(0): ", str(0)); UNIT_ASSERT_VALUES_EQUAL(expected(-9) + "(-9): ", str(-9)); errno = 0; UNIT_ASSERT_VALUES_EQUAL(expected(EAI_SYSTEM) + "(" + IntToString<10>(EAI_SYSTEM) + "; errno=0): ", str(EAI_SYSTEM)); errno = 110; UNIT_ASSERT_VALUES_EQUAL(expected(EAI_SYSTEM) + "(" + IntToString<10>(EAI_SYSTEM) + "; errno=110): ", str(EAI_SYSTEM)); #endif } class TTempEnableSigPipe { public: TTempEnableSigPipe() { OriginalSigHandler_ = signal(SIGPIPE, SIG_DFL); Y_VERIFY(OriginalSigHandler_ != SIG_ERR); } ~TTempEnableSigPipe() { auto ret = signal(SIGPIPE, OriginalSigHandler_); Y_VERIFY(ret != SIG_ERR); } private: void (*OriginalSigHandler_)(int); }; void TSockTest::TestBrokenPipe() { TTempEnableSigPipe guard; SOCKET socks[2]; int ret = SocketPair(socks); UNIT_ASSERT_VALUES_EQUAL(ret, 0); TSocket sender(socks[0]); TSocket receiver(socks[1]); receiver.ShutDown(SHUT_RDWR); int sent = sender.Send("FOO", 3); UNIT_ASSERT(sent < 0); IOutputStream::TPart parts[] = { {"foo", 3}, {"bar", 3}, }; sent = sender.SendV(parts, 2); UNIT_ASSERT(sent < 0); } void TSockTest::TestClose() { SOCKET socks[2]; UNIT_ASSERT_EQUAL(SocketPair(socks), 0); TSocket receiver(socks[1]); UNIT_ASSERT_EQUAL(static_cast<SOCKET>(receiver), socks[1]); #if defined _linux_ UNIT_ASSERT_GE(fcntl(socks[1], F_GETFD), 0); receiver.Close(); UNIT_ASSERT_EQUAL(fcntl(socks[1], F_GETFD), -1); #else receiver.Close(); #endif UNIT_ASSERT_EQUAL(static_cast<SOCKET>(receiver), INVALID_SOCKET); } void TSockTest::TestReusePortAvailCheck() { #if defined _linux_ utsname sysInfo; Y_VERIFY(!uname(&sysInfo), "Error while call uname: %s", LastSystemErrorText()); TStringBuf release(sysInfo.release); release = release.substr(0, release.find_first_not_of(".0123456789")); int v1 = FromString<int>(release.NextTok('.')); int v2 = FromString<int>(release.NextTok('.')); int v3 = FromString<int>(release.NextTok('.')); int linuxVersionCode = KERNEL_VERSION(v1, v2, v3); if (linuxVersionCode >= KERNEL_VERSION(3, 9, 1)) { // new kernels support SO_REUSEPORT UNIT_ASSERT(true == IsReusePortAvailable()); UNIT_ASSERT(true == IsReusePortAvailable()); } else { // older kernels may or may not support SO_REUSEPORT // just check that it doesn't crash or throw (void)IsReusePortAvailable(); (void)IsReusePortAvailable(); } #else // check that it doesn't crash or throw (void)IsReusePortAvailable(); (void)IsReusePortAvailable(); #endif } class TPollTest: public TTestBase { UNIT_TEST_SUITE(TPollTest); UNIT_TEST(TestPollInOut); UNIT_TEST_SUITE_END(); public: inline TPollTest() { srand(static_cast<unsigned int>(time(nullptr))); } void TestPollInOut(); private: sockaddr_in GetAddress(ui32 ip, ui16 port); SOCKET CreateSocket(); SOCKET StartServerSocket(ui16 port, int backlog); SOCKET StartClientSocket(ui32 ip, ui16 port); SOCKET AcceptConnection(SOCKET serverSocket); }; UNIT_TEST_SUITE_REGISTRATION(TPollTest); sockaddr_in TPollTest::GetAddress(ui32 ip, ui16 port) { struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = htonl(ip); return addr; } SOCKET TPollTest::CreateSocket() { SOCKET s = socket(AF_INET, SOCK_STREAM, 0); if (s == INVALID_SOCKET) { ythrow yexception() << "Can not create socket (" << LastSystemErrorText() << ")"; } return s; } SOCKET TPollTest::StartServerSocket(ui16 port, int backlog) { TSocketHolder s(CreateSocket()); sockaddr_in addr = GetAddress(ntohl(INADDR_ANY), port); if (bind(s, (sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR) { ythrow yexception() << "Can not bind server socket (" << LastSystemErrorText() << ")"; } if (listen(s, backlog) == SOCKET_ERROR) { ythrow yexception() << "Can not listen on server socket (" << LastSystemErrorText() << ")"; } return s.Release(); } SOCKET TPollTest::StartClientSocket(ui32 ip, ui16 port) { TSocketHolder s(CreateSocket()); sockaddr_in addr = GetAddress(ip, port); if (connect(s, (sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR) { ythrow yexception() << "Can not connect client socket (" << LastSystemErrorText() << ")"; } return s.Release(); } SOCKET TPollTest::AcceptConnection(SOCKET serverSocket) { SOCKET connectedSocket = accept(serverSocket, nullptr, nullptr); if (connectedSocket == INVALID_SOCKET) { ythrow yexception() << "Can not accept connection on server socket (" << LastSystemErrorText() << ")"; } return connectedSocket; } void TPollTest::TestPollInOut() { #ifdef _win_ const size_t socketCount = 1000; ui16 port = static_cast<ui16>(1300 + rand() % 97); TSocketHolder serverSocket = StartServerSocket(port, socketCount); ui32 localIp = ntohl(inet_addr("127.0.0.1")); TVector<TSimpleSharedPtr<TSocketHolder>> clientSockets; TVector<TSimpleSharedPtr<TSocketHolder>> connectedSockets; TVector<pollfd> fds; for (size_t i = 0; i < socketCount; ++i) { TSimpleSharedPtr<TSocketHolder> clientSocket(new TSocketHolder(StartClientSocket(localIp, port))); clientSockets.push_back(clientSocket); if (i % 5 == 0 || i % 5 == 2) { char buffer = 'c'; if (send(*clientSocket, &buffer, 1, 0) == -1) ythrow yexception() << "Can not send (" << LastSystemErrorText() << ")"; } TSimpleSharedPtr<TSocketHolder> connectedSocket(new TSocketHolder(AcceptConnection(serverSocket))); connectedSockets.push_back(connectedSocket); if (i % 5 == 2 || i % 5 == 3) { closesocket(*clientSocket); shutdown(*clientSocket, SD_BOTH); } } int expectedCount = 0; for (size_t i = 0; i < connectedSockets.size(); ++i) { pollfd fd = {(i % 5 == 4) ? INVALID_SOCKET : static_cast<SOCKET>(*connectedSockets[i]), POLLIN | POLLOUT, 0}; fds.push_back(fd); if (i % 5 != 4) ++expectedCount; } int polledCount = poll(&fds[0], fds.size(), INFTIM); UNIT_ASSERT_EQUAL(expectedCount, polledCount); for (size_t i = 0; i < connectedSockets.size(); ++i) { short revents = fds[i].revents; if (i % 5 == 0) { UNIT_ASSERT_EQUAL(static_cast<short>(POLLRDNORM | POLLWRNORM), revents); } else if (i % 5 == 1) { UNIT_ASSERT_EQUAL(static_cast<short>(POLLOUT | POLLWRNORM), revents); } else if (i % 5 == 2) { UNIT_ASSERT_EQUAL(static_cast<short>(POLLHUP | POLLRDNORM | POLLWRNORM), revents); } else if (i % 5 == 3) { UNIT_ASSERT_EQUAL(static_cast<short>(POLLHUP | POLLWRNORM), revents); } else if (i % 5 == 4) { UNIT_ASSERT_EQUAL(static_cast<short>(POLLNVAL), revents); } } #endif }