aboutsummaryrefslogtreecommitdiffstats
path: root/util/network
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commitc2a1af049e9deca890e9923abe64fe6c59060348 (patch)
treeb222e5ac2e2e98872661c51ccceee5da0d291e13 /util/network
parent1f553f46fb4f3c5eec631352cdd900a0709016af (diff)
downloadydb-c2a1af049e9deca890e9923abe64fe6c59060348.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'util/network')
-rw-r--r--util/network/address.cpp54
-rw-r--r--util/network/address_ut.cpp42
-rw-r--r--util/network/ip.h8
-rw-r--r--util/network/poller.cpp24
-rw-r--r--util/network/poller.h2
-rw-r--r--util/network/poller_ut.cpp160
-rw-r--r--util/network/pollerimpl.h90
-rw-r--r--util/network/socket.cpp26
8 files changed, 203 insertions, 203 deletions
diff --git a/util/network/address.cpp b/util/network/address.cpp
index 5e13cdca81..a81a9e6994 100644
--- a/util/network/address.cpp
+++ b/util/network/address.cpp
@@ -37,8 +37,8 @@ static inline void PrintAddr(IOutputStream& out, const IRemoteAddr& addr) {
if (printPort) {
out << "[" << buf << "]"
<< ":" << InetToHost(sa->sin6_port);
- } else {
- out << buf;
+ } else {
+ out << buf;
}
break;
@@ -55,33 +55,33 @@ static inline void PrintAddr(IOutputStream& out, const IRemoteAddr& addr) {
#endif
default: {
- size_t len = addr.Len();
-
+ size_t len = addr.Len();
+
const char* b = (const char*)a;
- const char* e = b + len;
-
- bool allZeros = true;
- for (size_t i = 0; i < len; ++i) {
- if (b[i] != 0) {
- allZeros = false;
- break;
- }
- }
-
- if (allZeros) {
- out << "(raw all zeros)";
- } else {
+ const char* e = b + len;
+
+ bool allZeros = true;
+ for (size_t i = 0; i < len; ++i) {
+ if (b[i] != 0) {
+ allZeros = false;
+ break;
+ }
+ }
+
+ if (allZeros) {
+ out << "(raw all zeros)";
+ } else {
out << "(raw " << (int)a->sa_family << " ";
-
- while (b != e) {
- //just print raw bytes
- out << (int)*b++;
- if (b != e) {
- out << " ";
- }
- }
-
- out << ")";
+
+ while (b != e) {
+ //just print raw bytes
+ out << (int)*b++;
+ if (b != e) {
+ out << " ";
+ }
+ }
+
+ out << ")";
}
break;
diff --git a/util/network/address_ut.cpp b/util/network/address_ut.cpp
index 2a18cad9df..28f45172ff 100644
--- a/util/network/address_ut.cpp
+++ b/util/network/address_ut.cpp
@@ -1,33 +1,33 @@
#include <library/cpp/testing/unittest/registar.h>
-
-#include "address.h"
-
-using namespace NAddr;
-
+
+#include "address.h"
+
+using namespace NAddr;
+
Y_UNIT_TEST_SUITE(IRemoteAddr_ToString) {
Y_UNIT_TEST(Raw) {
- THolder<TOpaqueAddr> opaque(new TOpaqueAddr);
- IRemoteAddr* addr = opaque.Get();
-
+ THolder<TOpaqueAddr> opaque(new TOpaqueAddr);
+ IRemoteAddr* addr = opaque.Get();
+
TString s = ToString(*addr);
- UNIT_ASSERT_VALUES_EQUAL("(raw all zeros)", s);
-
- opaque->MutableAddr()->sa_data[10] = 17;
-
+ UNIT_ASSERT_VALUES_EQUAL("(raw all zeros)", s);
+
+ opaque->MutableAddr()->sa_data[10] = 17;
+
TString t = ToString(*addr);
-
+
UNIT_ASSERT_C(t.StartsWith("(raw 0 0"), t);
UNIT_ASSERT_C(t.EndsWith(')'), t);
- }
-
+ }
+
Y_UNIT_TEST(Ipv6) {
- TNetworkAddress address("::1", 22);
- TNetworkAddress::TIterator it = address.Begin();
- UNIT_ASSERT(it != address.End());
- UNIT_ASSERT(it->ai_family == AF_INET6);
+ TNetworkAddress address("::1", 22);
+ TNetworkAddress::TIterator it = address.Begin();
+ UNIT_ASSERT(it != address.End());
+ UNIT_ASSERT(it->ai_family == AF_INET6);
TString toString = ToString((const IRemoteAddr&)TAddrInfo(&*it));
UNIT_ASSERT_VALUES_EQUAL(TString("[::1]:22"), toString);
- }
+ }
Y_UNIT_TEST(Loopback) {
TNetworkAddress localAddress("127.70.0.1", 22);
@@ -36,4 +36,4 @@ Y_UNIT_TEST_SUITE(IRemoteAddr_ToString) {
TNetworkAddress localAddress2("127.0.0.1", 22);
UNIT_ASSERT_VALUES_EQUAL(NAddr::IsLoopback(TAddrInfo(&*localAddress2.Begin())), true);
}
-}
+}
diff --git a/util/network/ip.h b/util/network/ip.h
index 0e944248a5..dc7c2d24a0 100644
--- a/util/network/ip.h
+++ b/util/network/ip.h
@@ -8,10 +8,10 @@
#include <util/generic/string.h>
#include <util/generic/yexception.h>
-/// IPv4 address in network format
+/// IPv4 address in network format
using TIpHost = ui32;
-
-/// Port number in host format
+
+/// Port number in host format
using TIpPort = ui16;
/*
@@ -53,7 +53,7 @@ static inline TIpHost ResolveHost(const char* data, size_t len) {
return HostToInet(ret);
}
-/// socket address
+/// socket address
struct TIpAddress: public sockaddr_in {
inline TIpAddress() noexcept {
Clear();
diff --git a/util/network/poller.cpp b/util/network/poller.cpp
index 36a007b5c6..7954d0e8b5 100644
--- a/util/network/poller.cpp
+++ b/util/network/poller.cpp
@@ -57,18 +57,18 @@ void TSocketPoller::WaitRdhup(SOCKET sock, void* cookie) {
Impl_->Set(cookie, sock, CONT_POLL_RDHUP);
}
-void TSocketPoller::WaitReadOneShot(SOCKET sock, void* cookie) {
- Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_ONE_SHOT);
-}
-
-void TSocketPoller::WaitWriteOneShot(SOCKET sock, void* cookie) {
- Impl_->Set(cookie, sock, CONT_POLL_WRITE | CONT_POLL_ONE_SHOT);
-}
-
-void TSocketPoller::WaitReadWriteOneShot(SOCKET sock, void* cookie) {
- Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_ONE_SHOT);
-}
-
+void TSocketPoller::WaitReadOneShot(SOCKET sock, void* cookie) {
+ Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_ONE_SHOT);
+}
+
+void TSocketPoller::WaitWriteOneShot(SOCKET sock, void* cookie) {
+ Impl_->Set(cookie, sock, CONT_POLL_WRITE | CONT_POLL_ONE_SHOT);
+}
+
+void TSocketPoller::WaitReadWriteOneShot(SOCKET sock, void* cookie) {
+ Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_ONE_SHOT);
+}
+
void TSocketPoller::WaitReadWriteEdgeTriggered(SOCKET sock, void* cookie) {
Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_EDGE_TRIGGERED);
}
diff --git a/util/network/poller.h b/util/network/poller.h
index d687fb0463..8dccd73140 100644
--- a/util/network/poller.h
+++ b/util/network/poller.h
@@ -18,7 +18,7 @@ public:
void WaitReadOneShot(SOCKET sock, void* cookie);
void WaitWriteOneShot(SOCKET sock, void* cookie);
void WaitReadWriteOneShot(SOCKET sock, void* cookie);
-
+
void WaitReadWriteEdgeTriggered(SOCKET sock, void* cookie);
void RestartReadWriteEdgeTriggered(SOCKET sock, void* cookie, bool empty = true);
diff --git a/util/network/poller_ut.cpp b/util/network/poller_ut.cpp
index 1d542f8c67..6df0dda8ec 100644
--- a/util/network/poller_ut.cpp
+++ b/util/network/poller_ut.cpp
@@ -1,105 +1,105 @@
#include <library/cpp/testing/unittest/registar.h>
#include <util/system/error.h>
-
+
#include "pair.h"
-#include "poller.h"
+#include "poller.h"
#include "pollerimpl.h"
-
+
Y_UNIT_TEST_SUITE(TSocketPollerTest) {
Y_UNIT_TEST(TestSimple) {
- SOCKET sockets[2];
- UNIT_ASSERT(SocketPair(sockets) == 0);
-
- TSocketHolder s1(sockets[0]);
- TSocketHolder s2(sockets[1]);
-
- TSocketPoller poller;
+ SOCKET sockets[2];
+ UNIT_ASSERT(SocketPair(sockets) == 0);
+
+ TSocketHolder s1(sockets[0]);
+ TSocketHolder s2(sockets[1]);
+
+ TSocketPoller poller;
poller.WaitRead(sockets[1], (void*)17);
-
+
UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
-
- for (ui32 i = 0; i < 3; ++i) {
+
+ for (ui32 i = 0; i < 3; ++i) {
char buf[] = {18};
- UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
-
+ UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
+
UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
-
- UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0));
- UNIT_ASSERT_VALUES_EQUAL(18, buf[0]);
-
+
+ UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0));
+ UNIT_ASSERT_VALUES_EQUAL(18, buf[0]);
+
UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- }
- }
-
+ }
+ }
+
Y_UNIT_TEST(TestSimpleOneShot) {
- SOCKET sockets[2];
- UNIT_ASSERT(SocketPair(sockets) == 0);
-
- TSocketHolder s1(sockets[0]);
- TSocketHolder s2(sockets[1]);
-
- TSocketPoller poller;
-
+ SOCKET sockets[2];
+ UNIT_ASSERT(SocketPair(sockets) == 0);
+
+ TSocketHolder s1(sockets[0]);
+ TSocketHolder s2(sockets[1]);
+
+ TSocketPoller poller;
+
UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
-
- for (ui32 i = 0; i < 3; ++i) {
+
+ for (ui32 i = 0; i < 3; ++i) {
poller.WaitReadOneShot(sockets[1], (void*)17);
-
- char buf[1];
-
- buf[0] = i + 20;
-
- UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
-
+
+ char buf[1];
+
+ buf[0] = i + 20;
+
+ UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
+
UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
-
- UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0));
- UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[0]);
-
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
-
- buf[0] = i + 21;
-
- UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
-
- // this fails if socket is not oneshot
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
-
- UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0));
- UNIT_ASSERT_VALUES_EQUAL(char(i + 21), buf[0]);
- }
- }
-
+
+ UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0));
+ UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[0]);
+
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+
+ buf[0] = i + 21;
+
+ UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
+
+ // this fails if socket is not oneshot
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+ UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
+
+ UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0));
+ UNIT_ASSERT_VALUES_EQUAL(char(i + 21), buf[0]);
+ }
+ }
+
Y_UNIT_TEST(TestItIsSafeToUnregisterUnregisteredDescriptor) {
- SOCKET sockets[2];
- UNIT_ASSERT(SocketPair(sockets) == 0);
-
- TSocketHolder s1(sockets[0]);
- TSocketHolder s2(sockets[1]);
-
- TSocketPoller poller;
-
- poller.Unwait(s1);
- }
-
+ SOCKET sockets[2];
+ UNIT_ASSERT(SocketPair(sockets) == 0);
+
+ TSocketHolder s1(sockets[0]);
+ TSocketHolder s2(sockets[1]);
+
+ TSocketPoller poller;
+
+ poller.Unwait(s1);
+ }
+
Y_UNIT_TEST(TestItIsSafeToReregisterDescriptor) {
- SOCKET sockets[2];
- UNIT_ASSERT(SocketPair(sockets) == 0);
-
- TSocketHolder s1(sockets[0]);
- TSocketHolder s2(sockets[1]);
-
- TSocketPoller poller;
-
+ SOCKET sockets[2];
+ UNIT_ASSERT(SocketPair(sockets) == 0);
+
+ TSocketHolder s1(sockets[0]);
+ TSocketHolder s2(sockets[1]);
+
+ TSocketPoller poller;
+
poller.WaitRead(s1, nullptr);
poller.WaitRead(s1, nullptr);
poller.WaitWrite(s1, nullptr);
- }
+ }
Y_UNIT_TEST(TestSimpleEdgeTriggered) {
SOCKET sockets[2];
@@ -233,4 +233,4 @@ Y_UNIT_TEST_SUITE(TSocketPollerTest) {
UNIT_ASSERT_EQUAL(TPoller::ExtractEvent(&e), (void*)17);
}
#endif
-}
+}
diff --git a/util/network/pollerimpl.h b/util/network/pollerimpl.h
index 35a981ea7d..e8c7e40fba 100644
--- a/util/network/pollerimpl.h
+++ b/util/network/pollerimpl.h
@@ -9,7 +9,7 @@
#include <util/generic/utility.h>
#include <util/generic/vector.h>
#include <util/generic/yexception.h>
-#include <util/datetime/base.h>
+#include <util/datetime/base.h>
#if defined(_freebsd_) || defined(_darwin_)
#define HAVE_KQUEUE_POLLER
@@ -426,7 +426,7 @@ public:
inline void SetImpl(void* data, SOCKET fd, int what) {
with_lock (CommandLock_) {
- Commands_.push_back(TCommand(fd, what, data));
+ Commands_.push_back(TCommand(fd, what, data));
}
Signal();
@@ -434,7 +434,7 @@ public:
inline void Remove(SOCKET fd) noexcept {
with_lock (CommandLock_) {
- Commands_.push_back(TCommand(fd, 0));
+ Commands_.push_back(TCommand(fd, 0));
}
Signal();
@@ -442,7 +442,7 @@ public:
inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
auto guard = Guard(Lock_);
-
+
do {
if (Begin_ != End_) {
const size_t ret = Min<size_t>(End_ - Begin_, len);
@@ -475,25 +475,25 @@ public:
inline size_t WaitBase(TEvent* events, size_t len, int timeout) noexcept {
with_lock (CommandLock_) {
for (auto command = Commands_.begin(); command != Commands_.end(); ++command) {
- if (command->Filter_ != 0) {
- Fds_.Set(command->Fd_, command->Cookie_, command->Filter_);
- } else {
- Fds_.Remove(command->Fd_);
- }
- }
-
- Commands_.clear();
- }
-
- TTempBuf tmpBuf(3 * sizeof(fd_set) + Fds_.size() * sizeof(SOCKET));
+ if (command->Filter_ != 0) {
+ Fds_.Set(command->Fd_, command->Cookie_, command->Filter_);
+ } else {
+ Fds_.Remove(command->Fd_);
+ }
+ }
+
+ Commands_.clear();
+ }
+
+ TTempBuf tmpBuf(3 * sizeof(fd_set) + Fds_.size() * sizeof(SOCKET));
fd_set* in = (fd_set*)tmpBuf.Data();
fd_set* out = &in[1];
fd_set* errFds = &in[2];
SOCKET* keysToDeleteBegin = (SOCKET*)&in[3];
- SOCKET* keysToDeleteEnd = keysToDeleteBegin;
-
+ SOCKET* keysToDeleteEnd = keysToDeleteBegin;
+
#if defined(_msan_enabled_) // msan doesn't handle FD_ZERO and cause false positive BALANCER-1347
memset(in, 0, sizeof(*in));
memset(out, 0, sizeof(*out));
@@ -529,12 +529,12 @@ public:
if (FD_ISSET(fd, errFds)) {
(events++)->Error(handle.Data(), EIO);
-
- if (handle.Filter() & CONT_POLL_ONE_SHOT) {
- *keysToDeleteEnd = fd;
- ++keysToDeleteEnd;
- }
-
+
+ if (handle.Filter() & CONT_POLL_ONE_SHOT) {
+ *keysToDeleteEnd = fd;
+ ++keysToDeleteEnd;
+ }
+
} else {
int what = 0;
@@ -548,11 +548,11 @@ public:
if (what) {
(events++)->Success(handle.Data(), what);
-
- if (handle.Filter() & CONT_POLL_ONE_SHOT) {
- *keysToDeleteEnd = fd;
- ++keysToDeleteEnd;
- }
+
+ if (handle.Filter() & CONT_POLL_ONE_SHOT) {
+ *keysToDeleteEnd = fd;
+ ++keysToDeleteEnd;
+ }
if (handle.Filter() & CONT_POLL_EDGE_TRIGGERED) {
// Emulate edge-triggered for level-triggered select().
@@ -563,11 +563,11 @@ public:
}
}
- while (keysToDeleteBegin != keysToDeleteEnd) {
- Fds_.erase(*keysToDeleteBegin);
- ++keysToDeleteBegin;
- }
-
+ while (keysToDeleteBegin != keysToDeleteEnd) {
+ Fds_.erase(*keysToDeleteBegin);
+ ++keysToDeleteBegin;
+ }
+
return events - eventsStart;
}
@@ -611,25 +611,25 @@ private:
}
private:
- struct TCommand {
- SOCKET Fd_;
- int Filter_; // 0 to remove
- void* Cookie_;
-
- TCommand(SOCKET fd, int filter, void* cookie)
+ struct TCommand {
+ SOCKET Fd_;
+ int Filter_; // 0 to remove
+ void* Cookie_;
+
+ TCommand(SOCKET fd, int filter, void* cookie)
: Fd_(fd)
, Filter_(filter)
, Cookie_(cookie)
{
}
-
- TCommand(SOCKET fd, int filter)
+
+ TCommand(SOCKET fd, int filter)
: Fd_(fd)
, Filter_(filter)
{
}
- };
-
+ };
+
TFds Fds_;
TMyMutex Lock_;
@@ -637,9 +637,9 @@ private:
TEvent* Begin_;
TEvent* End_;
- TMyMutex CommandLock_;
+ TMyMutex CommandLock_;
TVector<TCommand> Commands_;
-
+
SOCKET Signal_[2];
};
#endif
diff --git a/util/network/socket.cpp b/util/network/socket.cpp
index f3ea77929a..4f6e804346 100644
--- a/util/network/socket.cpp
+++ b/util/network/socket.cpp
@@ -554,25 +554,25 @@ static ssize_t DoSendMsg(SOCKET sock, const struct iovec* iov, int iovcnt) {
#endif
void TSocketHolder::Close() noexcept {
- if (Fd_ != INVALID_SOCKET) {
- bool ok = (closesocket(Fd_) == 0);
- if (!ok) {
+ if (Fd_ != INVALID_SOCKET) {
+ bool ok = (closesocket(Fd_) == 0);
+ if (!ok) {
// Do not quietly close bad descriptor,
// because often it means double close
// that is disasterous
-#ifdef _win_
+#ifdef _win_
Y_VERIFY(WSAGetLastError() != WSAENOTSOCK, "must not quietly close bad socket descriptor");
-#elif defined(_unix_)
+#elif defined(_unix_)
Y_VERIFY(errno != EBADF, "must not quietly close bad descriptor: fd=%d", int(Fd_));
-#else
+#else
#error unsupported platform
-#endif
- }
-
- Fd_ = INVALID_SOCKET;
- }
-}
-
+#endif
+ }
+
+ Fd_ = INVALID_SOCKET;
+ }
+}
+
class TSocket::TImpl: public TAtomicRefCount<TImpl> {
using TOps = TSocket::TOps;