diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | c2a1af049e9deca890e9923abe64fe6c59060348 (patch) | |
tree | b222e5ac2e2e98872661c51ccceee5da0d291e13 /util/network | |
parent | 1f553f46fb4f3c5eec631352cdd900a0709016af (diff) | |
download | ydb-c2a1af049e9deca890e9923abe64fe6c59060348.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'util/network')
-rw-r--r-- | util/network/address.cpp | 54 | ||||
-rw-r--r-- | util/network/address_ut.cpp | 42 | ||||
-rw-r--r-- | util/network/ip.h | 8 | ||||
-rw-r--r-- | util/network/poller.cpp | 24 | ||||
-rw-r--r-- | util/network/poller.h | 2 | ||||
-rw-r--r-- | util/network/poller_ut.cpp | 160 | ||||
-rw-r--r-- | util/network/pollerimpl.h | 90 | ||||
-rw-r--r-- | util/network/socket.cpp | 26 |
8 files changed, 203 insertions, 203 deletions
diff --git a/util/network/address.cpp b/util/network/address.cpp index 5e13cdca81..a81a9e6994 100644 --- a/util/network/address.cpp +++ b/util/network/address.cpp @@ -37,8 +37,8 @@ static inline void PrintAddr(IOutputStream& out, const IRemoteAddr& addr) { if (printPort) { out << "[" << buf << "]" << ":" << InetToHost(sa->sin6_port); - } else { - out << buf; + } else { + out << buf; } break; @@ -55,33 +55,33 @@ static inline void PrintAddr(IOutputStream& out, const IRemoteAddr& addr) { #endif default: { - size_t len = addr.Len(); - + size_t len = addr.Len(); + const char* b = (const char*)a; - const char* e = b + len; - - bool allZeros = true; - for (size_t i = 0; i < len; ++i) { - if (b[i] != 0) { - allZeros = false; - break; - } - } - - if (allZeros) { - out << "(raw all zeros)"; - } else { + const char* e = b + len; + + bool allZeros = true; + for (size_t i = 0; i < len; ++i) { + if (b[i] != 0) { + allZeros = false; + break; + } + } + + if (allZeros) { + out << "(raw all zeros)"; + } else { out << "(raw " << (int)a->sa_family << " "; - - while (b != e) { - //just print raw bytes - out << (int)*b++; - if (b != e) { - out << " "; - } - } - - out << ")"; + + while (b != e) { + //just print raw bytes + out << (int)*b++; + if (b != e) { + out << " "; + } + } + + out << ")"; } break; diff --git a/util/network/address_ut.cpp b/util/network/address_ut.cpp index 2a18cad9df..28f45172ff 100644 --- a/util/network/address_ut.cpp +++ b/util/network/address_ut.cpp @@ -1,33 +1,33 @@ #include <library/cpp/testing/unittest/registar.h> - -#include "address.h" - -using namespace NAddr; - + +#include "address.h" + +using namespace NAddr; + Y_UNIT_TEST_SUITE(IRemoteAddr_ToString) { Y_UNIT_TEST(Raw) { - THolder<TOpaqueAddr> opaque(new TOpaqueAddr); - IRemoteAddr* addr = opaque.Get(); - + THolder<TOpaqueAddr> opaque(new TOpaqueAddr); + IRemoteAddr* addr = opaque.Get(); + TString s = ToString(*addr); - UNIT_ASSERT_VALUES_EQUAL("(raw all zeros)", s); - - opaque->MutableAddr()->sa_data[10] = 17; - + UNIT_ASSERT_VALUES_EQUAL("(raw all zeros)", s); + + opaque->MutableAddr()->sa_data[10] = 17; + TString t = ToString(*addr); - + UNIT_ASSERT_C(t.StartsWith("(raw 0 0"), t); UNIT_ASSERT_C(t.EndsWith(')'), t); - } - + } + Y_UNIT_TEST(Ipv6) { - TNetworkAddress address("::1", 22); - TNetworkAddress::TIterator it = address.Begin(); - UNIT_ASSERT(it != address.End()); - UNIT_ASSERT(it->ai_family == AF_INET6); + TNetworkAddress address("::1", 22); + TNetworkAddress::TIterator it = address.Begin(); + UNIT_ASSERT(it != address.End()); + UNIT_ASSERT(it->ai_family == AF_INET6); TString toString = ToString((const IRemoteAddr&)TAddrInfo(&*it)); UNIT_ASSERT_VALUES_EQUAL(TString("[::1]:22"), toString); - } + } Y_UNIT_TEST(Loopback) { TNetworkAddress localAddress("127.70.0.1", 22); @@ -36,4 +36,4 @@ Y_UNIT_TEST_SUITE(IRemoteAddr_ToString) { TNetworkAddress localAddress2("127.0.0.1", 22); UNIT_ASSERT_VALUES_EQUAL(NAddr::IsLoopback(TAddrInfo(&*localAddress2.Begin())), true); } -} +} diff --git a/util/network/ip.h b/util/network/ip.h index 0e944248a5..dc7c2d24a0 100644 --- a/util/network/ip.h +++ b/util/network/ip.h @@ -8,10 +8,10 @@ #include <util/generic/string.h> #include <util/generic/yexception.h> -/// IPv4 address in network format +/// IPv4 address in network format using TIpHost = ui32; - -/// Port number in host format + +/// Port number in host format using TIpPort = ui16; /* @@ -53,7 +53,7 @@ static inline TIpHost ResolveHost(const char* data, size_t len) { return HostToInet(ret); } -/// socket address +/// socket address struct TIpAddress: public sockaddr_in { inline TIpAddress() noexcept { Clear(); diff --git a/util/network/poller.cpp b/util/network/poller.cpp index 36a007b5c6..7954d0e8b5 100644 --- a/util/network/poller.cpp +++ b/util/network/poller.cpp @@ -57,18 +57,18 @@ void TSocketPoller::WaitRdhup(SOCKET sock, void* cookie) { Impl_->Set(cookie, sock, CONT_POLL_RDHUP); } -void TSocketPoller::WaitReadOneShot(SOCKET sock, void* cookie) { - Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_ONE_SHOT); -} - -void TSocketPoller::WaitWriteOneShot(SOCKET sock, void* cookie) { - Impl_->Set(cookie, sock, CONT_POLL_WRITE | CONT_POLL_ONE_SHOT); -} - -void TSocketPoller::WaitReadWriteOneShot(SOCKET sock, void* cookie) { - Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_ONE_SHOT); -} - +void TSocketPoller::WaitReadOneShot(SOCKET sock, void* cookie) { + Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_ONE_SHOT); +} + +void TSocketPoller::WaitWriteOneShot(SOCKET sock, void* cookie) { + Impl_->Set(cookie, sock, CONT_POLL_WRITE | CONT_POLL_ONE_SHOT); +} + +void TSocketPoller::WaitReadWriteOneShot(SOCKET sock, void* cookie) { + Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_ONE_SHOT); +} + void TSocketPoller::WaitReadWriteEdgeTriggered(SOCKET sock, void* cookie) { Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_EDGE_TRIGGERED); } diff --git a/util/network/poller.h b/util/network/poller.h index d687fb0463..8dccd73140 100644 --- a/util/network/poller.h +++ b/util/network/poller.h @@ -18,7 +18,7 @@ public: void WaitReadOneShot(SOCKET sock, void* cookie); void WaitWriteOneShot(SOCKET sock, void* cookie); void WaitReadWriteOneShot(SOCKET sock, void* cookie); - + void WaitReadWriteEdgeTriggered(SOCKET sock, void* cookie); void RestartReadWriteEdgeTriggered(SOCKET sock, void* cookie, bool empty = true); diff --git a/util/network/poller_ut.cpp b/util/network/poller_ut.cpp index 1d542f8c67..6df0dda8ec 100644 --- a/util/network/poller_ut.cpp +++ b/util/network/poller_ut.cpp @@ -1,105 +1,105 @@ #include <library/cpp/testing/unittest/registar.h> #include <util/system/error.h> - + #include "pair.h" -#include "poller.h" +#include "poller.h" #include "pollerimpl.h" - + Y_UNIT_TEST_SUITE(TSocketPollerTest) { Y_UNIT_TEST(TestSimple) { - SOCKET sockets[2]; - UNIT_ASSERT(SocketPair(sockets) == 0); - - TSocketHolder s1(sockets[0]); - TSocketHolder s2(sockets[1]); - - TSocketPoller poller; + SOCKET sockets[2]; + UNIT_ASSERT(SocketPair(sockets) == 0); + + TSocketHolder s1(sockets[0]); + TSocketHolder s2(sockets[1]); + + TSocketPoller poller; poller.WaitRead(sockets[1], (void*)17); - + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - for (ui32 i = 0; i < 3; ++i) { + + for (ui32 i = 0; i < 3; ++i) { char buf[] = {18}; - UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); - + UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); + UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); - - UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0)); - UNIT_ASSERT_VALUES_EQUAL(18, buf[0]); - + + UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0)); + UNIT_ASSERT_VALUES_EQUAL(18, buf[0]); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - } - } - + } + } + Y_UNIT_TEST(TestSimpleOneShot) { - SOCKET sockets[2]; - UNIT_ASSERT(SocketPair(sockets) == 0); - - TSocketHolder s1(sockets[0]); - TSocketHolder s2(sockets[1]); - - TSocketPoller poller; - + SOCKET sockets[2]; + UNIT_ASSERT(SocketPair(sockets) == 0); + + TSocketHolder s1(sockets[0]); + TSocketHolder s2(sockets[1]); + + TSocketPoller poller; + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - for (ui32 i = 0; i < 3; ++i) { + + for (ui32 i = 0; i < 3; ++i) { poller.WaitReadOneShot(sockets[1], (void*)17); - - char buf[1]; - - buf[0] = i + 20; - - UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); - + + char buf[1]; + + buf[0] = i + 20; + + UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); + UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); - - UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0)); - UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[0]); - - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - buf[0] = i + 21; - - UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); - - // this fails if socket is not oneshot - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); - - UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0)); - UNIT_ASSERT_VALUES_EQUAL(char(i + 21), buf[0]); - } - } - + + UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0)); + UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[0]); + + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + buf[0] = i + 21; + + UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); + + // this fails if socket is not oneshot + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); + + UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0)); + UNIT_ASSERT_VALUES_EQUAL(char(i + 21), buf[0]); + } + } + Y_UNIT_TEST(TestItIsSafeToUnregisterUnregisteredDescriptor) { - SOCKET sockets[2]; - UNIT_ASSERT(SocketPair(sockets) == 0); - - TSocketHolder s1(sockets[0]); - TSocketHolder s2(sockets[1]); - - TSocketPoller poller; - - poller.Unwait(s1); - } - + SOCKET sockets[2]; + UNIT_ASSERT(SocketPair(sockets) == 0); + + TSocketHolder s1(sockets[0]); + TSocketHolder s2(sockets[1]); + + TSocketPoller poller; + + poller.Unwait(s1); + } + Y_UNIT_TEST(TestItIsSafeToReregisterDescriptor) { - SOCKET sockets[2]; - UNIT_ASSERT(SocketPair(sockets) == 0); - - TSocketHolder s1(sockets[0]); - TSocketHolder s2(sockets[1]); - - TSocketPoller poller; - + SOCKET sockets[2]; + UNIT_ASSERT(SocketPair(sockets) == 0); + + TSocketHolder s1(sockets[0]); + TSocketHolder s2(sockets[1]); + + TSocketPoller poller; + poller.WaitRead(s1, nullptr); poller.WaitRead(s1, nullptr); poller.WaitWrite(s1, nullptr); - } + } Y_UNIT_TEST(TestSimpleEdgeTriggered) { SOCKET sockets[2]; @@ -233,4 +233,4 @@ Y_UNIT_TEST_SUITE(TSocketPollerTest) { UNIT_ASSERT_EQUAL(TPoller::ExtractEvent(&e), (void*)17); } #endif -} +} diff --git a/util/network/pollerimpl.h b/util/network/pollerimpl.h index 35a981ea7d..e8c7e40fba 100644 --- a/util/network/pollerimpl.h +++ b/util/network/pollerimpl.h @@ -9,7 +9,7 @@ #include <util/generic/utility.h> #include <util/generic/vector.h> #include <util/generic/yexception.h> -#include <util/datetime/base.h> +#include <util/datetime/base.h> #if defined(_freebsd_) || defined(_darwin_) #define HAVE_KQUEUE_POLLER @@ -426,7 +426,7 @@ public: inline void SetImpl(void* data, SOCKET fd, int what) { with_lock (CommandLock_) { - Commands_.push_back(TCommand(fd, what, data)); + Commands_.push_back(TCommand(fd, what, data)); } Signal(); @@ -434,7 +434,7 @@ public: inline void Remove(SOCKET fd) noexcept { with_lock (CommandLock_) { - Commands_.push_back(TCommand(fd, 0)); + Commands_.push_back(TCommand(fd, 0)); } Signal(); @@ -442,7 +442,7 @@ public: inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept { auto guard = Guard(Lock_); - + do { if (Begin_ != End_) { const size_t ret = Min<size_t>(End_ - Begin_, len); @@ -475,25 +475,25 @@ public: inline size_t WaitBase(TEvent* events, size_t len, int timeout) noexcept { with_lock (CommandLock_) { for (auto command = Commands_.begin(); command != Commands_.end(); ++command) { - if (command->Filter_ != 0) { - Fds_.Set(command->Fd_, command->Cookie_, command->Filter_); - } else { - Fds_.Remove(command->Fd_); - } - } - - Commands_.clear(); - } - - TTempBuf tmpBuf(3 * sizeof(fd_set) + Fds_.size() * sizeof(SOCKET)); + if (command->Filter_ != 0) { + Fds_.Set(command->Fd_, command->Cookie_, command->Filter_); + } else { + Fds_.Remove(command->Fd_); + } + } + + Commands_.clear(); + } + + TTempBuf tmpBuf(3 * sizeof(fd_set) + Fds_.size() * sizeof(SOCKET)); fd_set* in = (fd_set*)tmpBuf.Data(); fd_set* out = &in[1]; fd_set* errFds = &in[2]; SOCKET* keysToDeleteBegin = (SOCKET*)&in[3]; - SOCKET* keysToDeleteEnd = keysToDeleteBegin; - + SOCKET* keysToDeleteEnd = keysToDeleteBegin; + #if defined(_msan_enabled_) // msan doesn't handle FD_ZERO and cause false positive BALANCER-1347 memset(in, 0, sizeof(*in)); memset(out, 0, sizeof(*out)); @@ -529,12 +529,12 @@ public: if (FD_ISSET(fd, errFds)) { (events++)->Error(handle.Data(), EIO); - - if (handle.Filter() & CONT_POLL_ONE_SHOT) { - *keysToDeleteEnd = fd; - ++keysToDeleteEnd; - } - + + if (handle.Filter() & CONT_POLL_ONE_SHOT) { + *keysToDeleteEnd = fd; + ++keysToDeleteEnd; + } + } else { int what = 0; @@ -548,11 +548,11 @@ public: if (what) { (events++)->Success(handle.Data(), what); - - if (handle.Filter() & CONT_POLL_ONE_SHOT) { - *keysToDeleteEnd = fd; - ++keysToDeleteEnd; - } + + if (handle.Filter() & CONT_POLL_ONE_SHOT) { + *keysToDeleteEnd = fd; + ++keysToDeleteEnd; + } if (handle.Filter() & CONT_POLL_EDGE_TRIGGERED) { // Emulate edge-triggered for level-triggered select(). @@ -563,11 +563,11 @@ public: } } - while (keysToDeleteBegin != keysToDeleteEnd) { - Fds_.erase(*keysToDeleteBegin); - ++keysToDeleteBegin; - } - + while (keysToDeleteBegin != keysToDeleteEnd) { + Fds_.erase(*keysToDeleteBegin); + ++keysToDeleteBegin; + } + return events - eventsStart; } @@ -611,25 +611,25 @@ private: } private: - struct TCommand { - SOCKET Fd_; - int Filter_; // 0 to remove - void* Cookie_; - - TCommand(SOCKET fd, int filter, void* cookie) + struct TCommand { + SOCKET Fd_; + int Filter_; // 0 to remove + void* Cookie_; + + TCommand(SOCKET fd, int filter, void* cookie) : Fd_(fd) , Filter_(filter) , Cookie_(cookie) { } - - TCommand(SOCKET fd, int filter) + + TCommand(SOCKET fd, int filter) : Fd_(fd) , Filter_(filter) { } - }; - + }; + TFds Fds_; TMyMutex Lock_; @@ -637,9 +637,9 @@ private: TEvent* Begin_; TEvent* End_; - TMyMutex CommandLock_; + TMyMutex CommandLock_; TVector<TCommand> Commands_; - + SOCKET Signal_[2]; }; #endif diff --git a/util/network/socket.cpp b/util/network/socket.cpp index f3ea77929a..4f6e804346 100644 --- a/util/network/socket.cpp +++ b/util/network/socket.cpp @@ -554,25 +554,25 @@ static ssize_t DoSendMsg(SOCKET sock, const struct iovec* iov, int iovcnt) { #endif void TSocketHolder::Close() noexcept { - if (Fd_ != INVALID_SOCKET) { - bool ok = (closesocket(Fd_) == 0); - if (!ok) { + if (Fd_ != INVALID_SOCKET) { + bool ok = (closesocket(Fd_) == 0); + if (!ok) { // Do not quietly close bad descriptor, // because often it means double close // that is disasterous -#ifdef _win_ +#ifdef _win_ Y_VERIFY(WSAGetLastError() != WSAENOTSOCK, "must not quietly close bad socket descriptor"); -#elif defined(_unix_) +#elif defined(_unix_) Y_VERIFY(errno != EBADF, "must not quietly close bad descriptor: fd=%d", int(Fd_)); -#else +#else #error unsupported platform -#endif - } - - Fd_ = INVALID_SOCKET; - } -} - +#endif + } + + Fd_ = INVALID_SOCKET; + } +} + class TSocket::TImpl: public TAtomicRefCount<TImpl> { using TOps = TSocket::TOps; |