aboutsummaryrefslogtreecommitdiffstats
path: root/util/network
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /util/network
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'util/network')
-rw-r--r--util/network/address.cpp206
-rw-r--r--util/network/address.h190
-rw-r--r--util/network/endpoint.h6
-rw-r--r--util/network/endpoint_ut.cpp52
-rw-r--r--util/network/hostip.cpp106
-rw-r--r--util/network/hostip.h20
-rw-r--r--util/network/init.cpp30
-rw-r--r--util/network/init.h76
-rw-r--r--util/network/interface.cpp92
-rw-r--r--util/network/interface.h4
-rw-r--r--util/network/iovec.cpp2
-rw-r--r--util/network/iovec.h110
-rw-r--r--util/network/ip.cpp2
-rw-r--r--util/network/ip.h158
-rw-r--r--util/network/ip_ut.cpp58
-rw-r--r--util/network/nonblock.cpp196
-rw-r--r--util/network/nonblock.h14
-rw-r--r--util/network/pair.cpp154
-rw-r--r--util/network/pair.h14
-rw-r--r--util/network/poller.cpp114
-rw-r--r--util/network/poller.h92
-rw-r--r--util/network/poller_ut.cpp14
-rw-r--r--util/network/pollerimpl.cpp2
-rw-r--r--util/network/pollerimpl.h984
-rw-r--r--util/network/sock.cpp2
-rw-r--r--util/network/sock.h112
-rw-r--r--util/network/sock_ut.cpp274
-rw-r--r--util/network/socket.cpp1230
-rw-r--r--util/network/socket.h580
-rw-r--r--util/network/socket_ut.cpp146
-rw-r--r--util/network/ut/ya.make20
31 files changed, 2530 insertions, 2530 deletions
diff --git a/util/network/address.cpp b/util/network/address.cpp
index a81a9e6994..8352743fdf 100644
--- a/util/network/address.cpp
+++ b/util/network/address.cpp
@@ -1,65 +1,65 @@
#include <util/stream/str.h>
-#include "address.h"
-
-#if defined(_unix_)
- #include <sys/types.h>
- #include <sys/un.h>
-#endif
-
-using namespace NAddr;
-
-template <bool printPort>
+#include "address.h"
+
+#if defined(_unix_)
+ #include <sys/types.h>
+ #include <sys/un.h>
+#endif
+
+using namespace NAddr;
+
+template <bool printPort>
static inline void PrintAddr(IOutputStream& out, const IRemoteAddr& addr) {
- const sockaddr* a = addr.Addr();
- char buf[INET6_ADDRSTRLEN + 10];
-
- switch (a->sa_family) {
- case AF_INET: {
- const TIpAddress sa(*(const sockaddr_in*)a);
-
- out << IpToString(sa.Host(), buf, sizeof(buf));
-
- if (printPort) {
- out << ":" << sa.Port();
- }
-
- break;
- }
-
- case AF_INET6: {
- const sockaddr_in6* sa = (const sockaddr_in6*)a;
-
- if (!inet_ntop(AF_INET6, (void*)&sa->sin6_addr.s6_addr, buf, sizeof(buf))) {
- ythrow TSystemError() << "inet_ntop() failed";
- }
-
- if (printPort) {
- out << "[" << buf << "]"
- << ":" << InetToHost(sa->sin6_port);
+ const sockaddr* a = addr.Addr();
+ char buf[INET6_ADDRSTRLEN + 10];
+
+ switch (a->sa_family) {
+ case AF_INET: {
+ const TIpAddress sa(*(const sockaddr_in*)a);
+
+ out << IpToString(sa.Host(), buf, sizeof(buf));
+
+ if (printPort) {
+ out << ":" << sa.Port();
+ }
+
+ break;
+ }
+
+ case AF_INET6: {
+ const sockaddr_in6* sa = (const sockaddr_in6*)a;
+
+ if (!inet_ntop(AF_INET6, (void*)&sa->sin6_addr.s6_addr, buf, sizeof(buf))) {
+ ythrow TSystemError() << "inet_ntop() failed";
+ }
+
+ if (printPort) {
+ out << "[" << buf << "]"
+ << ":" << InetToHost(sa->sin6_port);
} else {
out << buf;
- }
-
- break;
- }
-
+ }
+
+ break;
+ }
+
#if defined(AF_UNIX)
case AF_UNIX: {
- const sockaddr_un* sa = (const sockaddr_un*)a;
-
- out << TStringBuf(sa->sun_path);
-
- break;
- }
-#endif
-
- default: {
+ const sockaddr_un* sa = (const sockaddr_un*)a;
+
+ out << TStringBuf(sa->sun_path);
+
+ break;
+ }
+#endif
+
+ default: {
size_t len = addr.Len();
- const char* b = (const char*)a;
+ const char* b = (const char*)a;
const char* e = b + len;
-
+
bool allZeros = true;
for (size_t i = 0; i < len; ++i) {
if (b[i] != 0) {
@@ -67,11 +67,11 @@ static inline void PrintAddr(IOutputStream& out, const IRemoteAddr& addr) {
break;
}
}
-
+
if (allZeros) {
out << "(raw all zeros)";
} else {
- out << "(raw " << (int)a->sa_family << " ";
+ out << "(raw " << (int)a->sa_family << " ";
while (b != e) {
//just print raw bytes
@@ -82,18 +82,18 @@ static inline void PrintAddr(IOutputStream& out, const IRemoteAddr& addr) {
}
out << ")";
- }
-
- break;
- }
- }
-}
-
-template <>
+ }
+
+ break;
+ }
+ }
+}
+
+template <>
void Out<IRemoteAddr>(IOutputStream& out, const IRemoteAddr& addr) {
- PrintAddr<true>(out, addr);
-}
-
+ PrintAddr<true>(out, addr);
+}
+
template <>
void Out<NAddr::TAddrInfo>(IOutputStream& out, const NAddr::TAddrInfo& addr) {
PrintAddr<true>(out, addr);
@@ -115,9 +115,9 @@ void Out<NAddr::TOpaqueAddr>(IOutputStream& out, const NAddr::TOpaqueAddr& addr)
}
void NAddr::PrintHost(IOutputStream& out, const IRemoteAddr& addr) {
- PrintAddr<false>(out, addr);
-}
-
+ PrintAddr<false>(out, addr);
+}
+
TString NAddr::PrintHost(const IRemoteAddr& addr) {
TStringStream ss;
PrintAddr<false>(ss, addr);
@@ -130,15 +130,15 @@ TString NAddr::PrintHostAndPort(const IRemoteAddr& addr) {
return ss.Str();
}
-IRemoteAddrPtr NAddr::GetSockAddr(SOCKET s) {
+IRemoteAddrPtr NAddr::GetSockAddr(SOCKET s) {
auto addr = MakeHolder<TOpaqueAddr>();
-
- if (getsockname(s, addr->MutableAddr(), addr->LenPtr()) < 0) {
- ythrow TSystemError() << "getsockname() failed";
- }
-
+
+ if (getsockname(s, addr->MutableAddr(), addr->LenPtr()) < 0) {
+ ythrow TSystemError() << "getsockname() failed";
+ }
+
return addr;
-}
+}
IRemoteAddrPtr NAddr::GetPeerAddr(SOCKET s) {
auto addr = MakeHolder<TOpaqueAddr>();
@@ -161,44 +161,44 @@ static const in6_addr& In6Addr(const IRemoteAddr& addr) {
bool NAddr::IsLoopback(const IRemoteAddr& addr) {
if (addr.Addr()->sa_family == AF_INET) {
return ((ntohl(InAddr(addr).s_addr) >> 24) & 0xff) == 127;
- }
-
- if (addr.Addr()->sa_family == AF_INET6) {
+ }
+
+ if (addr.Addr()->sa_family == AF_INET6) {
return 0 == memcmp(&In6Addr(addr), &in6addr_loopback, sizeof(in6_addr));
}
-
- return false;
+
+ return false;
}
bool NAddr::IsSame(const IRemoteAddr& lhs, const IRemoteAddr& rhs) {
if (lhs.Addr()->sa_family != rhs.Addr()->sa_family) {
return false;
- }
-
- if (lhs.Addr()->sa_family == AF_INET) {
+ }
+
+ if (lhs.Addr()->sa_family == AF_INET) {
return InAddr(lhs).s_addr == InAddr(rhs).s_addr;
- }
-
- if (lhs.Addr()->sa_family == AF_INET6) {
+ }
+
+ if (lhs.Addr()->sa_family == AF_INET6) {
return 0 == memcmp(&In6Addr(lhs), &In6Addr(rhs), sizeof(in6_addr));
}
ythrow yexception() << "unsupported addr family: " << lhs.Addr()->sa_family;
}
-
-socklen_t NAddr::SockAddrLength(const sockaddr* addr) {
- switch (addr->sa_family) {
- case AF_INET:
- return sizeof(sockaddr_in);
-
- case AF_INET6:
- return sizeof(sockaddr_in6);
-
-#if defined(AF_LOCAL)
- case AF_LOCAL:
- return sizeof(sockaddr_un);
-#endif
- }
-
- ythrow yexception() << "unsupported address family: " << addr->sa_family;
-}
+
+socklen_t NAddr::SockAddrLength(const sockaddr* addr) {
+ switch (addr->sa_family) {
+ case AF_INET:
+ return sizeof(sockaddr_in);
+
+ case AF_INET6:
+ return sizeof(sockaddr_in6);
+
+#if defined(AF_LOCAL)
+ case AF_LOCAL:
+ return sizeof(sockaddr_un);
+#endif
+ }
+
+ ythrow yexception() << "unsupported address family: " << addr->sa_family;
+}
diff --git a/util/network/address.h b/util/network/address.h
index 448fcac0c9..470ca8195a 100644
--- a/util/network/address.h
+++ b/util/network/address.h
@@ -1,136 +1,136 @@
-#pragma once
-
-#include "ip.h"
-#include "socket.h"
-
-#include <util/generic/ptr.h>
+#pragma once
+
+#include "ip.h"
+#include "socket.h"
+
+#include <util/generic/ptr.h>
#include <util/generic/string.h>
-
-namespace NAddr {
- class IRemoteAddr {
- public:
+
+namespace NAddr {
+ class IRemoteAddr {
+ public:
virtual ~IRemoteAddr() = default;
-
- virtual const sockaddr* Addr() const = 0;
- virtual socklen_t Len() const = 0;
- };
-
+
+ virtual const sockaddr* Addr() const = 0;
+ virtual socklen_t Len() const = 0;
+ };
+
using IRemoteAddrPtr = THolder<IRemoteAddr>;
using IRemoteAddrRef = TAtomicSharedPtr<NAddr::IRemoteAddr>;
-
- IRemoteAddrPtr GetSockAddr(SOCKET s);
+
+ IRemoteAddrPtr GetSockAddr(SOCKET s);
IRemoteAddrPtr GetPeerAddr(SOCKET s);
void PrintHost(IOutputStream& out, const IRemoteAddr& addr);
TString PrintHost(const IRemoteAddr& addr);
TString PrintHostAndPort(const IRemoteAddr& addr);
-
+
bool IsLoopback(const IRemoteAddr& addr);
bool IsSame(const IRemoteAddr& lhs, const IRemoteAddr& rhs);
- socklen_t SockAddrLength(const sockaddr* addr);
-
- //for accept, recvfrom - see LenPtr()
- class TOpaqueAddr: public IRemoteAddr {
- public:
+ socklen_t SockAddrLength(const sockaddr* addr);
+
+ //for accept, recvfrom - see LenPtr()
+ class TOpaqueAddr: public IRemoteAddr {
+ public:
inline TOpaqueAddr() noexcept
- : L_(sizeof(S_))
- {
- Zero(S_);
- }
-
+ : L_(sizeof(S_))
+ {
+ Zero(S_);
+ }
+
inline TOpaqueAddr(const IRemoteAddr* addr) noexcept {
Assign(addr->Addr(), addr->Len());
- }
-
+ }
+
inline TOpaqueAddr(const sockaddr* addr) {
- Assign(addr, SockAddrLength(addr));
+ Assign(addr, SockAddrLength(addr));
}
const sockaddr* Addr() const override {
- return MutableAddr();
- }
-
+ return MutableAddr();
+ }
+
socklen_t Len() const override {
- return L_;
- }
-
+ return L_;
+ }
+
inline sockaddr* MutableAddr() const noexcept {
- return (sockaddr*)&S_;
- }
-
+ return (sockaddr*)&S_;
+ }
+
inline socklen_t* LenPtr() noexcept {
- return &L_;
- }
-
- private:
+ return &L_;
+ }
+
+ private:
inline void Assign(const sockaddr* addr, socklen_t len) noexcept {
L_ = len;
memcpy(MutableAddr(), addr, L_);
}
private:
- sockaddr_storage S_;
- socklen_t L_;
- };
-
- //for TNetworkAddress
- class TAddrInfo: public IRemoteAddr {
- public:
+ sockaddr_storage S_;
+ socklen_t L_;
+ };
+
+ //for TNetworkAddress
+ class TAddrInfo: public IRemoteAddr {
+ public:
inline TAddrInfo(const addrinfo* ai) noexcept
- : AI_(ai)
- {
- }
-
+ : AI_(ai)
+ {
+ }
+
const sockaddr* Addr() const override {
- return AI_->ai_addr;
- }
-
+ return AI_->ai_addr;
+ }
+
socklen_t Len() const override {
return (socklen_t)AI_->ai_addrlen;
- }
-
- private:
+ }
+
+ private:
const addrinfo* const AI_;
- };
-
- //compat, for TIpAddress
- class TIPv4Addr: public IRemoteAddr {
- public:
+ };
+
+ //compat, for TIpAddress
+ class TIPv4Addr: public IRemoteAddr {
+ public:
inline TIPv4Addr(const TIpAddress& addr) noexcept
- : A_(addr)
- {
- }
-
+ : A_(addr)
+ {
+ }
+
const sockaddr* Addr() const override {
- return A_;
- }
-
+ return A_;
+ }
+
socklen_t Len() const override {
- return A_;
- }
-
- private:
+ return A_;
+ }
+
+ private:
const TIpAddress A_;
- };
-
- //same, for ipv6 addresses
- class TIPv6Addr: public IRemoteAddr {
- public:
+ };
+
+ //same, for ipv6 addresses
+ class TIPv6Addr: public IRemoteAddr {
+ public:
inline TIPv6Addr(const sockaddr_in6& a) noexcept
- : A_(a)
- {
- }
-
+ : A_(a)
+ {
+ }
+
const sockaddr* Addr() const override {
- return (sockaddr*)&A_;
- }
-
+ return (sockaddr*)&A_;
+ }
+
socklen_t Len() const override {
- return sizeof(A_);
- }
-
- private:
+ return sizeof(A_);
+ }
+
+ private:
const sockaddr_in6 A_;
- };
-}
+ };
+}
diff --git a/util/network/endpoint.h b/util/network/endpoint.h
index a3e59b4925..ef652d82fc 100644
--- a/util/network/endpoint.h
+++ b/util/network/endpoint.h
@@ -45,14 +45,14 @@ private:
TAddrRef Addr_;
};
-template <>
+template <>
struct THash<TEndpoint> {
- inline size_t operator()(const TEndpoint& ep) const {
+ inline size_t operator()(const TEndpoint& ep) const {
return ep.Hash();
}
};
-inline bool operator==(const TEndpoint& l, const TEndpoint& r) {
+inline bool operator==(const TEndpoint& l, const TEndpoint& r) {
try {
return NAddr::IsSame(*l.Addr(), *r.Addr()) && l.Port() == r.Port();
} catch (...) {
diff --git a/util/network/endpoint_ut.cpp b/util/network/endpoint_ut.cpp
index d5e40dd6e1..f0050a1d86 100644
--- a/util/network/endpoint_ut.cpp
+++ b/util/network/endpoint_ut.cpp
@@ -8,33 +8,33 @@
Y_UNIT_TEST_SUITE(TEndpointTest) {
Y_UNIT_TEST(TestSimple) {
TVector<TNetworkAddress> addrs;
-
+
TEndpoint ep0;
UNIT_ASSERT(ep0.IsIpV4());
UNIT_ASSERT_VALUES_EQUAL(0, ep0.Port());
UNIT_ASSERT_VALUES_EQUAL("0.0.0.0", ep0.IpToString());
- TEndpoint ep1;
-
- try {
- TNetworkAddress na1("25.26.27.28", 24242);
-
- addrs.push_back(na1);
-
- ep1 = TEndpoint(new NAddr::TAddrInfo(&*na1.Begin()));
-
- UNIT_ASSERT(ep1.IsIpV4());
- UNIT_ASSERT_VALUES_EQUAL("25.26.27.28", ep1.IpToString());
- UNIT_ASSERT_VALUES_EQUAL(24242, ep1.Port());
- } catch (const TNetworkResolutionError&) {
- TNetworkAddress n("2a02:6b8:0:1420:0::5f6c:f3c2", 11111);
-
- addrs.push_back(n);
-
- ep1 = TEndpoint(new NAddr::TAddrInfo(&*n.Begin()));
- }
-
+ TEndpoint ep1;
+
+ try {
+ TNetworkAddress na1("25.26.27.28", 24242);
+
+ addrs.push_back(na1);
+
+ ep1 = TEndpoint(new NAddr::TAddrInfo(&*na1.Begin()));
+
+ UNIT_ASSERT(ep1.IsIpV4());
+ UNIT_ASSERT_VALUES_EQUAL("25.26.27.28", ep1.IpToString());
+ UNIT_ASSERT_VALUES_EQUAL(24242, ep1.Port());
+ } catch (const TNetworkResolutionError&) {
+ TNetworkAddress n("2a02:6b8:0:1420:0::5f6c:f3c2", 11111);
+
+ addrs.push_back(n);
+
+ ep1 = TEndpoint(new NAddr::TAddrInfo(&*n.Begin()));
+ }
+
ep0.SetPort(12345);
TEndpoint ep2(ep0);
@@ -89,17 +89,17 @@ Y_UNIT_TEST_SUITE(TEndpointTest) {
Y_UNIT_TEST(TestEqual) {
const TString ip1 = "2a02:6b8:0:1410::5f6c:f3c2";
const TString ip2 = "2a02:6b8:0:1410::5f6c:f3c3";
-
- TNetworkAddress na1(ip1, 24242);
+
+ TNetworkAddress na1(ip1, 24242);
TEndpoint ep1(new NAddr::TAddrInfo(&*na1.Begin()));
- TNetworkAddress na2(ip1, 24242);
+ TNetworkAddress na2(ip1, 24242);
TEndpoint ep2(new NAddr::TAddrInfo(&*na2.Begin()));
- TNetworkAddress na3(ip2, 24242);
+ TNetworkAddress na3(ip2, 24242);
TEndpoint ep3(new NAddr::TAddrInfo(&*na3.Begin()));
- TNetworkAddress na4(ip2, 24243);
+ TNetworkAddress na4(ip2, 24243);
TEndpoint ep4(new NAddr::TAddrInfo(&*na4.Begin()));
UNIT_ASSERT(ep1 == ep2);
diff --git a/util/network/hostip.cpp b/util/network/hostip.cpp
index cb8d43bf90..b7a691c5d3 100644
--- a/util/network/hostip.cpp
+++ b/util/network/hostip.cpp
@@ -1,21 +1,21 @@
-#include "socket.h"
-#include "hostip.h"
-
+#include "socket.h"
+#include "hostip.h"
+
#include <util/system/defaults.h>
#include <util/system/byteorder.h>
#if defined(_unix_) || defined(_cygwin_)
- #include <netdb.h>
+ #include <netdb.h>
#endif
#if !defined(BIND_LIB)
- #if !defined(__FreeBSD__) && !defined(_win32_) && !defined(_cygwin_)
- #define AGENT_USE_GETADDRINFO
- #endif
-
- #if defined(__FreeBSD__)
- #define AGENT_USE_GETADDRINFO
- #endif
+ #if !defined(__FreeBSD__) && !defined(_win32_) && !defined(_cygwin_)
+ #define AGENT_USE_GETADDRINFO
+ #endif
+
+ #if defined(__FreeBSD__)
+ #define AGENT_USE_GETADDRINFO
+ #endif
#endif
int NResolver::GetHostIP(const char* hostname, ui32* ip, size_t* slots) {
@@ -23,54 +23,54 @@ int NResolver::GetHostIP(const char* hostname, ui32* ip, size_t* slots) {
size_t ipsFound = 0;
#ifdef AGENT_USE_GETADDRINFO
- int ret = 0;
- struct addrinfo hints;
- memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_INET;
- hints.ai_socktype = SOCK_STREAM;
+ int ret = 0;
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
struct addrinfo* gai_res = nullptr;
int gai_ret = getaddrinfo(hostname, nullptr, &hints, &gai_res);
- if (gai_ret == 0 && gai_res->ai_addr) {
- struct addrinfo* cur = gai_res;
- for (i = 0; i < *slots && cur; i++, cur = cur->ai_next, ipsFound++) {
+ if (gai_ret == 0 && gai_res->ai_addr) {
+ struct addrinfo* cur = gai_res;
+ for (i = 0; i < *slots && cur; i++, cur = cur->ai_next, ipsFound++) {
ip[i] = *(ui32*)(&((sockaddr_in*)(cur->ai_addr))->sin_addr);
- }
- } else {
- if (gai_ret == EAI_NONAME || gai_ret == EAI_SERVICE) {
- ret = HOST_NOT_FOUND;
- } else {
- ret = GetDnsError();
- }
- }
-
- if (gai_res) {
- freeaddrinfo(gai_res);
- }
-
- if (ret) {
- return ret;
- }
+ }
+ } else {
+ if (gai_ret == EAI_NONAME || gai_ret == EAI_SERVICE) {
+ ret = HOST_NOT_FOUND;
+ } else {
+ ret = GetDnsError();
+ }
+ }
+
+ if (gai_res) {
+ freeaddrinfo(gai_res);
+ }
+
+ if (ret) {
+ return ret;
+ }
#else
- hostent* hostent = gethostbyname(hostname);
-
- if (!hostent)
- return GetDnsError();
-
- if (hostent->h_addrtype != AF_INET || (unsigned)hostent->h_length < sizeof(ui32))
- return HOST_NOT_FOUND;
-
- char** cur = hostent->h_addr_list;
- for (i = 0; i < *slots && *cur; i++, cur++, ipsFound++)
+ hostent* hostent = gethostbyname(hostname);
+
+ if (!hostent)
+ return GetDnsError();
+
+ if (hostent->h_addrtype != AF_INET || (unsigned)hostent->h_length < sizeof(ui32))
+ return HOST_NOT_FOUND;
+
+ char** cur = hostent->h_addr_list;
+ for (i = 0; i < *slots && *cur; i++, cur++, ipsFound++)
ip[i] = *(ui32*)*cur;
#endif
- for (i = 0; i < ipsFound; i++) {
+ for (i = 0; i < ipsFound; i++) {
ip[i] = InetToHost(ip[i]);
- }
- *slots = ipsFound;
+ }
+ *slots = ipsFound;
- return 0;
-}
+ return 0;
+}
-int NResolver::GetDnsError() {
- return h_errno;
-}
+int NResolver::GetDnsError() {
+ return h_errno;
+}
diff --git a/util/network/hostip.h b/util/network/hostip.h
index cf63e4846a..9cc00f6af1 100644
--- a/util/network/hostip.h
+++ b/util/network/hostip.h
@@ -2,15 +2,15 @@
#include <util/system/defaults.h>
-namespace NResolver {
+namespace NResolver {
// resolve hostname and fills up to *slots slots in ip array;
// actual number of slots filled is returned in *slots;
- int GetHostIP(const char* hostname, ui32* ip, size_t* slots);
- int GetDnsError();
-
- inline int GetHostIP(const char* hostname, ui32* ip) {
- size_t slots = 1;
-
- return GetHostIP(hostname, ip, &slots);
- }
-}
+ int GetHostIP(const char* hostname, ui32* ip, size_t* slots);
+ int GetDnsError();
+
+ inline int GetHostIP(const char* hostname, ui32* ip) {
+ size_t slots = 1;
+
+ return GetHostIP(hostname, ip, &slots);
+ }
+}
diff --git a/util/network/init.cpp b/util/network/init.cpp
index 366e65682c..d9fff7d6a7 100644
--- a/util/network/init.cpp
+++ b/util/network/init.cpp
@@ -1,13 +1,13 @@
-#include "init.h"
-
-#include <util/system/compat.h>
-#include <util/system/yassert.h>
+#include "init.h"
+
+#include <util/system/compat.h>
+#include <util/system/yassert.h>
#include <util/system/defaults.h>
-#include <util/generic/singleton.h>
-
-#include <cstdio>
-#include <cstdlib>
-
+#include <util/generic/singleton.h>
+
+#include <cstdio>
+#include <cstdlib>
+
namespace {
class TNetworkInit {
public:
@@ -15,9 +15,9 @@ namespace {
#ifndef ROBOT_SIGPIPE
signal(SIGPIPE, SIG_IGN);
#endif
-
+
#if defined(_win_)
- #pragma comment(lib, "ws2_32.lib")
+ #pragma comment(lib, "ws2_32.lib")
WSADATA wsaData;
int result = WSAStartup(MAKEWORD(2, 2), &wsaData);
Y_ASSERT(!result);
@@ -25,10 +25,10 @@ namespace {
exit(-1);
}
#endif
- }
+ }
};
}
-
-void InitNetworkSubSystem() {
- (void)Singleton<TNetworkInit>();
+
+void InitNetworkSubSystem() {
+ (void)Singleton<TNetworkInit>();
}
diff --git a/util/network/init.h b/util/network/init.h
index 08a79c0fca..42c6e287ad 100644
--- a/util/network/init.h
+++ b/util/network/init.h
@@ -1,60 +1,60 @@
#pragma once
-#include <util/system/error.h>
+#include <util/system/error.h>
#if defined(_unix_)
- #include <fcntl.h>
- #include <netdb.h>
- #include <time.h>
- #include <unistd.h>
- #include <poll.h>
+ #include <fcntl.h>
+ #include <netdb.h>
+ #include <time.h>
+ #include <unistd.h>
+ #include <poll.h>
- #include <sys/uio.h>
- #include <sys/time.h>
- #include <sys/types.h>
- #include <sys/socket.h>
+ #include <sys/uio.h>
+ #include <sys/time.h>
+ #include <sys/types.h>
+ #include <sys/socket.h>
- #include <netinet/in.h>
- #include <netinet/tcp.h>
- #include <arpa/inet.h>
+ #include <netinet/in.h>
+ #include <netinet/tcp.h>
+ #include <arpa/inet.h>
using SOCKET = int;
- #define closesocket(s) close(s)
- #define SOCKET_ERROR -1
- #define INVALID_SOCKET -1
- #define WSAGetLastError() errno
+ #define closesocket(s) close(s)
+ #define SOCKET_ERROR -1
+ #define INVALID_SOCKET -1
+ #define WSAGetLastError() errno
#elif defined(_win_)
- #include <util/system/winint.h>
- #include <io.h>
- #include <winsock2.h>
- #include <ws2tcpip.h>
+ #include <util/system/winint.h>
+ #include <io.h>
+ #include <winsock2.h>
+ #include <ws2tcpip.h>
using nfds_t = ULONG;
+
+ #undef Yield
- #undef Yield
+struct sockaddr_un {
+ short sun_family;
+ char sun_path[108];
+};
-struct sockaddr_un {
- short sun_family;
- char sun_path[108];
-};
-
- #define PF_LOCAL AF_UNIX
- #define NETDB_INTERNAL -1
- #define NETDB_SUCCESS 0
+ #define PF_LOCAL AF_UNIX
+ #define NETDB_INTERNAL -1
+ #define NETDB_SUCCESS 0
#endif
#if defined(_win_) || defined(_darwin_)
- #ifndef MSG_NOSIGNAL
- #define MSG_NOSIGNAL 0
- #endif
+ #ifndef MSG_NOSIGNAL
+ #define MSG_NOSIGNAL 0
+ #endif
#endif // _win_ or _darwin_
void InitNetworkSubSystem();
-static struct TNetworkInitializer {
- inline TNetworkInitializer() {
- InitNetworkSubSystem();
- }
-} NetworkInitializerObject;
+static struct TNetworkInitializer {
+ inline TNetworkInitializer() {
+ InitNetworkSubSystem();
+ }
+} NetworkInitializerObject;
diff --git a/util/network/interface.cpp b/util/network/interface.cpp
index 256776c6d3..ecafd9447a 100644
--- a/util/network/interface.cpp
+++ b/util/network/interface.cpp
@@ -1,55 +1,55 @@
#include "interface.h"
-#if defined(_unix_)
- #include <ifaddrs.h>
+#if defined(_unix_)
+ #include <ifaddrs.h>
#endif
#ifdef _win_
- #include <iphlpapi.h>
- #pragma comment(lib, "Iphlpapi.lib")
+ #include <iphlpapi.h>
+ #pragma comment(lib, "Iphlpapi.lib")
#endif
namespace NAddr {
- static bool IsInetAddress(sockaddr* addr) {
+ static bool IsInetAddress(sockaddr* addr) {
return (addr != nullptr) && ((addr->sa_family == AF_INET) || (addr->sa_family == AF_INET6));
- }
+ }
- TNetworkInterfaceList GetNetworkInterfaces() {
- TNetworkInterfaceList result;
+ TNetworkInterfaceList GetNetworkInterfaces() {
+ TNetworkInterfaceList result;
#ifdef _win_
TVector<char> buf;
- buf.resize(1000000);
- PIP_ADAPTER_ADDRESSES adapterBuf = (PIP_ADAPTER_ADDRESSES)&buf[0];
- ULONG bufSize = buf.ysize();
+ buf.resize(1000000);
+ PIP_ADAPTER_ADDRESSES adapterBuf = (PIP_ADAPTER_ADDRESSES)&buf[0];
+ ULONG bufSize = buf.ysize();
if (GetAdaptersAddresses(AF_UNSPEC, 0, nullptr, adapterBuf, &bufSize) == ERROR_SUCCESS) {
- for (PIP_ADAPTER_ADDRESSES ptr = adapterBuf; ptr != 0; ptr = ptr->Next) {
- // The check below makes code working on Vista+
- if ((ptr->Flags & (IP_ADAPTER_IPV4_ENABLED | IP_ADAPTER_IPV6_ENABLED)) == 0) {
- continue;
- }
- if (ptr->IfType == IF_TYPE_TUNNEL) {
- // ignore tunnels
- continue;
- }
- if (ptr->OperStatus != IfOperStatusUp) {
- // ignore disable adapters
- continue;
- }
+ for (PIP_ADAPTER_ADDRESSES ptr = adapterBuf; ptr != 0; ptr = ptr->Next) {
+ // The check below makes code working on Vista+
+ if ((ptr->Flags & (IP_ADAPTER_IPV4_ENABLED | IP_ADAPTER_IPV6_ENABLED)) == 0) {
+ continue;
+ }
+ if (ptr->IfType == IF_TYPE_TUNNEL) {
+ // ignore tunnels
+ continue;
+ }
+ if (ptr->OperStatus != IfOperStatusUp) {
+ // ignore disable adapters
+ continue;
+ }
- for (IP_ADAPTER_UNICAST_ADDRESS* addr = ptr->FirstUnicastAddress; addr != 0; addr = addr->Next) {
- sockaddr* a = (sockaddr*)addr->Address.lpSockaddr;
- if (IsInetAddress(a)) {
+ for (IP_ADAPTER_UNICAST_ADDRESS* addr = ptr->FirstUnicastAddress; addr != 0; addr = addr->Next) {
+ sockaddr* a = (sockaddr*)addr->Address.lpSockaddr;
+ if (IsInetAddress(a)) {
TNetworkInterface networkInterface;
- // Not very efficient but straightforward
- for (size_t i = 0; ptr->FriendlyName[i] != 0; i++) {
- CHAR w = ptr->FriendlyName[i];
- char c = (w < 0x80) ? char(w) : '?';
+ // Not very efficient but straightforward
+ for (size_t i = 0; ptr->FriendlyName[i] != 0; i++) {
+ CHAR w = ptr->FriendlyName[i];
+ char c = (w < 0x80) ? char(w) : '?';
networkInterface.Name.append(1, c);
- }
-
+ }
+
networkInterface.Address = new TOpaqueAddr(a);
result.push_back(networkInterface);
}
@@ -57,23 +57,23 @@ namespace NAddr {
}
}
#else
- ifaddrs* ifap;
- if (getifaddrs(&ifap) != -1) {
+ ifaddrs* ifap;
+ if (getifaddrs(&ifap) != -1) {
for (ifaddrs* ifa = ifap; ifa != nullptr; ifa = ifa->ifa_next) {
- if (IsInetAddress(ifa->ifa_addr)) {
- TNetworkInterface interface;
- interface.Name = ifa->ifa_name;
- interface.Address = new TOpaqueAddr(ifa->ifa_addr);
- if (IsInetAddress(ifa->ifa_netmask)) {
- interface.Mask = new TOpaqueAddr(ifa->ifa_netmask);
- }
- result.push_back(interface);
+ if (IsInetAddress(ifa->ifa_addr)) {
+ TNetworkInterface interface;
+ interface.Name = ifa->ifa_name;
+ interface.Address = new TOpaqueAddr(ifa->ifa_addr);
+ if (IsInetAddress(ifa->ifa_netmask)) {
+ interface.Mask = new TOpaqueAddr(ifa->ifa_netmask);
+ }
+ result.push_back(interface);
}
}
- freeifaddrs(ifap);
+ freeifaddrs(ifap);
}
#endif
- return result;
- }
+ return result;
+ }
}
diff --git a/util/network/interface.h b/util/network/interface.h
index dda4555021..c595e9788c 100644
--- a/util/network/interface.h
+++ b/util/network/interface.h
@@ -7,8 +7,8 @@
namespace NAddr {
struct TNetworkInterface {
TString Name;
- IRemoteAddrRef Address;
- IRemoteAddrRef Mask;
+ IRemoteAddrRef Address;
+ IRemoteAddrRef Mask;
};
using TNetworkInterfaceList = TVector<TNetworkInterface>;
diff --git a/util/network/iovec.cpp b/util/network/iovec.cpp
index 7251038848..770e0aa1df 100644
--- a/util/network/iovec.cpp
+++ b/util/network/iovec.cpp
@@ -1 +1 @@
-#include "iovec.h"
+#include "iovec.h"
diff --git a/util/network/iovec.h b/util/network/iovec.h
index ac15a41f54..6a18a28bd2 100644
--- a/util/network/iovec.h
+++ b/util/network/iovec.h
@@ -1,65 +1,65 @@
#pragma once
-
-#include <util/stream/output.h>
-#include <util/system/types.h>
-#include <util/system/yassert.h>
-
-class TContIOVector {
+
+#include <util/stream/output.h>
+#include <util/system/types.h>
+#include <util/system/yassert.h>
+
+class TContIOVector {
using TPart = IOutputStream::TPart;
-
-public:
- inline TContIOVector(TPart* parts, size_t count)
- : Parts_(parts)
- , Count_(count)
- {
- }
-
+
+public:
+ inline TContIOVector(TPart* parts, size_t count)
+ : Parts_(parts)
+ , Count_(count)
+ {
+ }
+
inline void Proceed(size_t len) noexcept {
- while (Count_) {
- if (len < Parts_->len) {
- Parts_->len -= len;
- Parts_->buf = (const char*)Parts_->buf + len;
-
- return;
- } else {
- len -= Parts_->len;
- --Count_;
- ++Parts_;
- }
- }
-
- if (len) {
+ while (Count_) {
+ if (len < Parts_->len) {
+ Parts_->len -= len;
+ Parts_->buf = (const char*)Parts_->buf + len;
+
+ return;
+ } else {
+ len -= Parts_->len;
+ --Count_;
+ ++Parts_;
+ }
+ }
+
+ if (len) {
Y_ASSERT(0 && "non zero length left");
- }
- }
-
+ }
+ }
+
inline const TPart* Parts() const noexcept {
- return Parts_;
- }
-
+ return Parts_;
+ }
+
inline size_t Count() const noexcept {
- return Count_;
- }
-
+ return Count_;
+ }
+
static inline size_t Bytes(const TPart* parts, size_t count) noexcept {
- size_t ret = 0;
-
- for (size_t i = 0; i < count; ++i) {
- ret += parts[i].len;
- }
-
- return ret;
- }
+ size_t ret = 0;
+
+ for (size_t i = 0; i < count; ++i) {
+ ret += parts[i].len;
+ }
+
+ return ret;
+ }
inline size_t Bytes() const noexcept {
- return Bytes(Parts_, Count_);
- }
-
+ return Bytes(Parts_, Count_);
+ }
+
inline bool Complete() const noexcept {
- return !Count();
- }
-
-private:
- TPart* Parts_;
- size_t Count_;
-};
+ return !Count();
+ }
+
+private:
+ TPart* Parts_;
+ size_t Count_;
+};
diff --git a/util/network/ip.cpp b/util/network/ip.cpp
index a43bcdadcf..798ac4c118 100644
--- a/util/network/ip.cpp
+++ b/util/network/ip.cpp
@@ -1 +1 @@
-#include "ip.h"
+#include "ip.h"
diff --git a/util/network/ip.h b/util/network/ip.h
index dc7c2d24a0..9307231465 100644
--- a/util/network/ip.h
+++ b/util/network/ip.h
@@ -1,119 +1,119 @@
#pragma once
-
-#include "socket.h"
-#include "hostip.h"
-
-#include <util/system/error.h>
-#include <util/system/byteorder.h>
+
+#include "socket.h"
+#include "hostip.h"
+
+#include <util/system/error.h>
+#include <util/system/byteorder.h>
#include <util/generic/string.h>
-#include <util/generic/yexception.h>
-
+#include <util/generic/yexception.h>
+
/// IPv4 address in network format
using TIpHost = ui32;
/// Port number in host format
using TIpPort = ui16;
-
+
/*
- * ipStr is in 'ddd.ddd.ddd.ddd' format
- * returns IPv4 address in inet format
- */
+ * ipStr is in 'ddd.ddd.ddd.ddd' format
+ * returns IPv4 address in inet format
+ */
static inline TIpHost IpFromString(const char* ipStr) {
in_addr ia;
-
+
if (inet_aton(ipStr, &ia) == 0) {
- ythrow TSystemError() << "Failed to convert (" << ipStr << ") to ip address";
+ ythrow TSystemError() << "Failed to convert (" << ipStr << ") to ip address";
}
-
+
return (ui32)ia.s_addr;
}
-static inline char* IpToString(TIpHost ip, char* buf, size_t len) {
+static inline char* IpToString(TIpHost ip, char* buf, size_t len) {
if (!inet_ntop(AF_INET, (void*)&ip, buf, (socklen_t)len)) {
- ythrow TSystemError() << "Failed to get ip address string";
+ ythrow TSystemError() << "Failed to get ip address string";
}
- return buf;
+ return buf;
}
static inline TString IpToString(TIpHost ip) {
- char buf[INET_ADDRSTRLEN];
-
+ char buf[INET_ADDRSTRLEN];
+
return TString(IpToString(ip, buf, sizeof(buf)));
-}
-
-static inline TIpHost ResolveHost(const char* data, size_t len) {
- TIpHost ret;
+}
+
+static inline TIpHost ResolveHost(const char* data, size_t len) {
+ TIpHost ret;
const TString s(data, len);
-
+
if (NResolver::GetHostIP(s.data(), &ret) != 0) {
- ythrow TSystemError(NResolver::GetDnsError()) << "can not resolve(" << s << ")";
- }
-
- return HostToInet(ret);
-}
-
+ ythrow TSystemError(NResolver::GetDnsError()) << "can not resolve(" << s << ")";
+ }
+
+ return HostToInet(ret);
+}
+
/// socket address
-struct TIpAddress: public sockaddr_in {
+struct TIpAddress: public sockaddr_in {
inline TIpAddress() noexcept {
- Clear();
- }
-
+ Clear();
+ }
+
inline TIpAddress(const sockaddr_in& addr) noexcept
- : sockaddr_in(addr)
+ : sockaddr_in(addr)
, tmp(0)
- {
- }
-
+ {
+ }
+
inline TIpAddress(TIpHost ip, TIpPort port) noexcept {
- Set(ip, port);
- }
-
+ Set(ip, port);
+ }
+
inline TIpAddress(TStringBuf ip, TIpPort port) {
Set(ResolveHost(ip.data(), ip.size()), port);
- }
-
- inline TIpAddress(const char* ip, TIpPort port) {
- Set(ResolveHost(ip, strlen(ip)), port);
- }
-
+ }
+
+ inline TIpAddress(const char* ip, TIpPort port) {
+ Set(ResolveHost(ip, strlen(ip)), port);
+ }
+
inline operator sockaddr*() const noexcept {
- return (sockaddr*)(sockaddr_in*)this;
- }
-
+ return (sockaddr*)(sockaddr_in*)this;
+ }
+
inline operator socklen_t*() const noexcept {
- tmp = sizeof(sockaddr_in);
-
- return (socklen_t*)&tmp;
- }
-
+ tmp = sizeof(sockaddr_in);
+
+ return (socklen_t*)&tmp;
+ }
+
inline operator socklen_t() const noexcept {
- tmp = sizeof(sockaddr_in);
-
- return tmp;
- }
-
+ tmp = sizeof(sockaddr_in);
+
+ return tmp;
+ }
+
inline void Clear() noexcept {
- Zero((sockaddr_in&)(*this));
- }
-
+ Zero((sockaddr_in&)(*this));
+ }
+
inline void Set(TIpHost ip, TIpPort port) noexcept {
- Clear();
-
- sin_family = AF_INET;
- sin_addr.s_addr = ip;
- sin_port = HostToInet(port);
- }
-
+ Clear();
+
+ sin_family = AF_INET;
+ sin_addr.s_addr = ip;
+ sin_port = HostToInet(port);
+ }
+
inline TIpHost Host() const noexcept {
- return sin_addr.s_addr;
- }
-
+ return sin_addr.s_addr;
+ }
+
inline TIpPort Port() const noexcept {
- return InetToHost(sin_port);
- }
-
+ return InetToHost(sin_port);
+ }
+
private:
// required for "operator socklen_t*()"
- mutable socklen_t tmp;
-};
+ mutable socklen_t tmp;
+};
diff --git a/util/network/ip_ut.cpp b/util/network/ip_ut.cpp
index 6716c6a699..67c19aa968 100644
--- a/util/network/ip_ut.cpp
+++ b/util/network/ip_ut.cpp
@@ -1,55 +1,55 @@
-#include "ip.h"
-
+#include "ip.h"
+
#include <library/cpp/testing/unittest/registar.h>
#include <util/generic/yexception.h>
class TSysIpTest: public TTestBase {
- UNIT_TEST_SUITE(TSysIpTest);
- UNIT_TEST(TestIpFromString);
- UNIT_TEST_EXCEPTION(TestIpFromString2, yexception);
- UNIT_TEST_EXCEPTION(TestIpFromString3, yexception);
- UNIT_TEST_EXCEPTION(TestIpFromString4, yexception);
- UNIT_TEST_EXCEPTION(TestIpFromString5, yexception);
- UNIT_TEST(TestIpToString);
- UNIT_TEST_SUITE_END();
-
-private:
- void TestIpFromString();
- void TestIpFromString2();
- void TestIpFromString3();
- void TestIpFromString4();
- void TestIpFromString5();
- void TestIpToString();
+ UNIT_TEST_SUITE(TSysIpTest);
+ UNIT_TEST(TestIpFromString);
+ UNIT_TEST_EXCEPTION(TestIpFromString2, yexception);
+ UNIT_TEST_EXCEPTION(TestIpFromString3, yexception);
+ UNIT_TEST_EXCEPTION(TestIpFromString4, yexception);
+ UNIT_TEST_EXCEPTION(TestIpFromString5, yexception);
+ UNIT_TEST(TestIpToString);
+ UNIT_TEST_SUITE_END();
+
+private:
+ void TestIpFromString();
+ void TestIpFromString2();
+ void TestIpFromString3();
+ void TestIpFromString4();
+ void TestIpFromString5();
+ void TestIpToString();
};
UNIT_TEST_SUITE_REGISTRATION(TSysIpTest);
-void TSysIpTest::TestIpFromString() {
+void TSysIpTest::TestIpFromString() {
const char* ipStr[] = {"192.168.0.1", "87.255.18.167", "255.255.0.31", "188.225.124.255"};
ui8 ipArr[][4] = {{192, 168, 0, 1}, {87, 255, 18, 167}, {255, 255, 0, 31}, {188, 225, 124, 255}};
for (size_t i = 0; i < Y_ARRAY_SIZE(ipStr); ++i) {
- const ui32 ip = IpFromString(ipStr[i]);
-
+ const ui32 ip = IpFromString(ipStr[i]);
+
UNIT_ASSERT(memcmp(&ip, ipArr[i], sizeof(ui32)) == 0);
}
}
-void TSysIpTest::TestIpFromString2() {
- IpFromString("XXXXXXWXW");
+void TSysIpTest::TestIpFromString2() {
+ IpFromString("XXXXXXWXW");
}
-void TSysIpTest::TestIpFromString3() {
- IpFromString("986.0.37.255");
+void TSysIpTest::TestIpFromString3() {
+ IpFromString("986.0.37.255");
}
-void TSysIpTest::TestIpFromString4() {
- IpFromString("256.0.22.365");
+void TSysIpTest::TestIpFromString4() {
+ IpFromString("256.0.22.365");
}
-void TSysIpTest::TestIpFromString5() {
- IpFromString("245.12..0");
+void TSysIpTest::TestIpFromString5() {
+ IpFromString("245.12..0");
}
void TSysIpTest::TestIpToString() {
diff --git a/util/network/nonblock.cpp b/util/network/nonblock.cpp
index e515c27cc5..ad00becbce 100644
--- a/util/network/nonblock.cpp
+++ b/util/network/nonblock.cpp
@@ -1,104 +1,104 @@
-#include "nonblock.h"
-
-#include <util/system/platform.h>
-
-#include <util/generic/singleton.h>
-
-#if defined(_unix_)
- #include <dlfcn.h>
-#endif
-
-#if defined(_linux_)
- #if !defined(SOCK_NONBLOCK)
- #define SOCK_NONBLOCK 04000
- #endif
-#endif
-
-namespace {
- struct TFeatureCheck {
- inline TFeatureCheck()
+#include "nonblock.h"
+
+#include <util/system/platform.h>
+
+#include <util/generic/singleton.h>
+
+#if defined(_unix_)
+ #include <dlfcn.h>
+#endif
+
+#if defined(_linux_)
+ #if !defined(SOCK_NONBLOCK)
+ #define SOCK_NONBLOCK 04000
+ #endif
+#endif
+
+namespace {
+ struct TFeatureCheck {
+ inline TFeatureCheck()
: Accept4(nullptr)
- , HaveSockNonBlock(false)
- {
-#if defined(_unix_) && defined(SOCK_NONBLOCK)
- {
+ , HaveSockNonBlock(false)
+ {
+#if defined(_unix_) && defined(SOCK_NONBLOCK)
+ {
Accept4 = reinterpret_cast<TAccept4>(dlsym(RTLD_DEFAULT, "accept4"));
-
- #if defined(_musl_)
- //musl always statically linked
- if (!Accept4) {
- Accept4 = accept4;
- }
- #endif
-
- if (Accept4) {
+
+ #if defined(_musl_)
+ //musl always statically linked
+ if (!Accept4) {
+ Accept4 = accept4;
+ }
+ #endif
+
+ if (Accept4) {
Accept4(-1, nullptr, nullptr, SOCK_NONBLOCK);
-
- if (errno == ENOSYS) {
+
+ if (errno == ENOSYS) {
Accept4 = nullptr;
- }
- }
- }
-#endif
-
-#if defined(SOCK_NONBLOCK)
- {
- TSocketHolder tmp(socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0));
-
- HaveSockNonBlock = !tmp.Closed();
- }
-#endif
- }
-
- inline SOCKET FastAccept(SOCKET s, struct sockaddr* addr, socklen_t* addrlen) const {
-#if defined(SOCK_NONBLOCK)
- if (Accept4) {
- return Accept4(s, addr, addrlen, SOCK_NONBLOCK);
- }
-#endif
-
- const SOCKET ret = accept(s, addr, addrlen);
-
-#if !defined(_freebsd_)
- //freebsd inherit O_NONBLOCK flag
- if (ret != INVALID_SOCKET) {
- SetNonBlock(ret);
- }
-#endif
-
- return ret;
- }
-
- inline SOCKET FastSocket(int domain, int type, int protocol) const {
-#if defined(SOCK_NONBLOCK)
- if (HaveSockNonBlock) {
- return socket(domain, type | SOCK_NONBLOCK, protocol);
- }
-#endif
-
- const SOCKET ret = socket(domain, type, protocol);
-
- if (ret != INVALID_SOCKET) {
- SetNonBlock(ret);
- }
-
- return ret;
- }
-
+ }
+ }
+ }
+#endif
+
+#if defined(SOCK_NONBLOCK)
+ {
+ TSocketHolder tmp(socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0));
+
+ HaveSockNonBlock = !tmp.Closed();
+ }
+#endif
+ }
+
+ inline SOCKET FastAccept(SOCKET s, struct sockaddr* addr, socklen_t* addrlen) const {
+#if defined(SOCK_NONBLOCK)
+ if (Accept4) {
+ return Accept4(s, addr, addrlen, SOCK_NONBLOCK);
+ }
+#endif
+
+ const SOCKET ret = accept(s, addr, addrlen);
+
+#if !defined(_freebsd_)
+ //freebsd inherit O_NONBLOCK flag
+ if (ret != INVALID_SOCKET) {
+ SetNonBlock(ret);
+ }
+#endif
+
+ return ret;
+ }
+
+ inline SOCKET FastSocket(int domain, int type, int protocol) const {
+#if defined(SOCK_NONBLOCK)
+ if (HaveSockNonBlock) {
+ return socket(domain, type | SOCK_NONBLOCK, protocol);
+ }
+#endif
+
+ const SOCKET ret = socket(domain, type, protocol);
+
+ if (ret != INVALID_SOCKET) {
+ SetNonBlock(ret);
+ }
+
+ return ret;
+ }
+
static inline const TFeatureCheck* Instance() noexcept {
- return Singleton<TFeatureCheck>();
- }
-
+ return Singleton<TFeatureCheck>();
+ }
+
using TAccept4 = int (*)(int sockfd, struct sockaddr* addr, socklen_t* addrlen, int flags);
- TAccept4 Accept4;
- bool HaveSockNonBlock;
- };
-}
-
-SOCKET Accept4(SOCKET s, struct sockaddr* addr, socklen_t* addrlen) {
- return TFeatureCheck::Instance()->FastAccept(s, addr, addrlen);
-}
-
-SOCKET Socket4(int domain, int type, int protocol) {
- return TFeatureCheck::Instance()->FastSocket(domain, type, protocol);
-}
+ TAccept4 Accept4;
+ bool HaveSockNonBlock;
+ };
+}
+
+SOCKET Accept4(SOCKET s, struct sockaddr* addr, socklen_t* addrlen) {
+ return TFeatureCheck::Instance()->FastAccept(s, addr, addrlen);
+}
+
+SOCKET Socket4(int domain, int type, int protocol) {
+ return TFeatureCheck::Instance()->FastSocket(domain, type, protocol);
+}
diff --git a/util/network/nonblock.h b/util/network/nonblock.h
index 54e5e44ae3..a16601b642 100644
--- a/util/network/nonblock.h
+++ b/util/network/nonblock.h
@@ -1,8 +1,8 @@
#pragma once
-
-#include "socket.h"
-
-//assume s is non-blocking, return non-blocking socket
-SOCKET Accept4(SOCKET s, struct sockaddr* addr, socklen_t* addrlen);
-//create non-blocking socket
-SOCKET Socket4(int domain, int type, int protocol);
+
+#include "socket.h"
+
+//assume s is non-blocking, return non-blocking socket
+SOCKET Accept4(SOCKET s, struct sockaddr* addr, socklen_t* addrlen);
+//create non-blocking socket
+SOCKET Socket4(int domain, int type, int protocol);
diff --git a/util/network/pair.cpp b/util/network/pair.cpp
index 9751ef5c96..2e410b842e 100644
--- a/util/network/pair.cpp
+++ b/util/network/pair.cpp
@@ -1,82 +1,82 @@
-#include "pair.h"
-
+#include "pair.h"
+
int SocketPair(SOCKET socks[2], bool overlapped, bool cloexec) {
-#if defined(_win_)
- struct sockaddr_in addr;
- SOCKET listener;
- int e;
- int addrlen = sizeof(addr);
+#if defined(_win_)
+ struct sockaddr_in addr;
+ SOCKET listener;
+ int e;
+ int addrlen = sizeof(addr);
DWORD flags = (overlapped ? WSA_FLAG_OVERLAPPED : 0) | (cloexec ? WSA_FLAG_NO_HANDLE_INHERIT : 0);
-
- if (socks == 0) {
- WSASetLastError(WSAEINVAL);
-
- return SOCKET_ERROR;
- }
-
- socks[0] = INVALID_SOCKET;
- socks[1] = INVALID_SOCKET;
-
- if ((listener = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) {
- return SOCKET_ERROR;
- }
-
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = AF_INET;
- addr.sin_addr.s_addr = htonl(0x7f000001);
- addr.sin_port = 0;
-
- e = bind(listener, (const struct sockaddr*)&addr, sizeof(addr));
-
- if (e == SOCKET_ERROR) {
- e = WSAGetLastError();
- closesocket(listener);
- WSASetLastError(e);
-
- return SOCKET_ERROR;
- }
-
- e = getsockname(listener, (struct sockaddr*)&addr, &addrlen);
-
- if (e == SOCKET_ERROR) {
- e = WSAGetLastError();
- closesocket(listener);
- WSASetLastError(e);
-
- return SOCKET_ERROR;
- }
-
- do {
- if (listen(listener, 1) == SOCKET_ERROR)
- break;
-
+
+ if (socks == 0) {
+ WSASetLastError(WSAEINVAL);
+
+ return SOCKET_ERROR;
+ }
+
+ socks[0] = INVALID_SOCKET;
+ socks[1] = INVALID_SOCKET;
+
+ if ((listener = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) {
+ return SOCKET_ERROR;
+ }
+
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = htonl(0x7f000001);
+ addr.sin_port = 0;
+
+ e = bind(listener, (const struct sockaddr*)&addr, sizeof(addr));
+
+ if (e == SOCKET_ERROR) {
+ e = WSAGetLastError();
+ closesocket(listener);
+ WSASetLastError(e);
+
+ return SOCKET_ERROR;
+ }
+
+ e = getsockname(listener, (struct sockaddr*)&addr, &addrlen);
+
+ if (e == SOCKET_ERROR) {
+ e = WSAGetLastError();
+ closesocket(listener);
+ WSASetLastError(e);
+
+ return SOCKET_ERROR;
+ }
+
+ do {
+ if (listen(listener, 1) == SOCKET_ERROR)
+ break;
+
if ((socks[0] = WSASocket(AF_INET, SOCK_STREAM, 0, nullptr, 0, flags)) == INVALID_SOCKET)
- break;
-
- if (connect(socks[0], (const struct sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR)
- break;
-
+ break;
+
+ if (connect(socks[0], (const struct sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR)
+ break;
+
if ((socks[1] = accept(listener, nullptr, nullptr)) == INVALID_SOCKET)
- break;
-
- closesocket(listener);
-
- return 0;
- } while (0);
-
- e = WSAGetLastError();
- closesocket(listener);
- closesocket(socks[0]);
- closesocket(socks[1]);
- WSASetLastError(e);
-
- return SOCKET_ERROR;
-#else
- (void)overlapped;
-
- #if defined(_linux_)
+ break;
+
+ closesocket(listener);
+
+ return 0;
+ } while (0);
+
+ e = WSAGetLastError();
+ closesocket(listener);
+ closesocket(socks[0]);
+ closesocket(socks[1]);
+ WSASetLastError(e);
+
+ return SOCKET_ERROR;
+#else
+ (void)overlapped;
+
+ #if defined(_linux_)
return socketpair(AF_LOCAL, SOCK_STREAM | (cloexec ? SOCK_CLOEXEC : 0), 0, socks);
- #else
+ #else
int r = socketpair(AF_LOCAL, SOCK_STREAM, 0, socks);
// Non-atomic wrt exec
if (r == 0 && cloexec) {
@@ -92,6 +92,6 @@ int SocketPair(SOCKET socks[2], bool overlapped, bool cloexec) {
}
}
return r;
- #endif
-#endif
-}
+ #endif
+#endif
+}
diff --git a/util/network/pair.h b/util/network/pair.h
index 0d4506f880..8a29be12c7 100644
--- a/util/network/pair.h
+++ b/util/network/pair.h
@@ -1,9 +1,9 @@
#pragma once
-
-#include "init.h"
-
-int SocketPair(SOCKET socks[2], bool overlapped, bool cloexec = false);
-
-static inline int SocketPair(SOCKET socks[2]) {
+
+#include "init.h"
+
+int SocketPair(SOCKET socks[2], bool overlapped, bool cloexec = false);
+
+static inline int SocketPair(SOCKET socks[2]) {
return SocketPair(socks, false, false);
-}
+}
diff --git a/util/network/poller.cpp b/util/network/poller.cpp
index 7954d0e8b5..76f9c1d517 100644
--- a/util/network/poller.cpp
+++ b/util/network/poller.cpp
@@ -1,54 +1,54 @@
-#include "poller.h"
-#include "pollerimpl.h"
-
-#include <util/memory/tempbuf.h>
-
-namespace {
- struct TMutexLocking {
+#include "poller.h"
+#include "pollerimpl.h"
+
+#include <util/memory/tempbuf.h>
+
+namespace {
+ struct TMutexLocking {
using TMyMutex = TMutex;
- };
-}
-
-class TSocketPoller::TImpl: public TPollerImpl<TMutexLocking> {
-public:
- inline size_t DoWaitReal(void** ev, TEvent* events, size_t len, const TInstant& deadLine) {
- const size_t ret = WaitD(events, len, deadLine);
-
- for (size_t i = 0; i < ret; ++i) {
- ev[i] = ExtractEvent(&events[i]);
+ };
+}
+
+class TSocketPoller::TImpl: public TPollerImpl<TMutexLocking> {
+public:
+ inline size_t DoWaitReal(void** ev, TEvent* events, size_t len, const TInstant& deadLine) {
+ const size_t ret = WaitD(events, len, deadLine);
+
+ for (size_t i = 0; i < ret; ++i) {
+ ev[i] = ExtractEvent(&events[i]);
}
-
- return ret;
- }
-
- inline size_t DoWait(void** ev, size_t len, const TInstant& deadLine) {
- if (len == 1) {
- TEvent tmp;
-
- return DoWaitReal(ev, &tmp, 1, deadLine);
- } else {
- TTempArray<TEvent> tmpEvents(len);
-
- return DoWaitReal(ev, tmpEvents.Data(), len, deadLine);
- }
- }
-};
-
-TSocketPoller::TSocketPoller()
- : Impl_(new TImpl())
-{
-}
-
+
+ return ret;
+ }
+
+ inline size_t DoWait(void** ev, size_t len, const TInstant& deadLine) {
+ if (len == 1) {
+ TEvent tmp;
+
+ return DoWaitReal(ev, &tmp, 1, deadLine);
+ } else {
+ TTempArray<TEvent> tmpEvents(len);
+
+ return DoWaitReal(ev, tmpEvents.Data(), len, deadLine);
+ }
+ }
+};
+
+TSocketPoller::TSocketPoller()
+ : Impl_(new TImpl())
+{
+}
+
TSocketPoller::~TSocketPoller() = default;
-
-void TSocketPoller::WaitRead(SOCKET sock, void* cookie) {
- Impl_->Set(cookie, sock, CONT_POLL_READ);
-}
-
-void TSocketPoller::WaitWrite(SOCKET sock, void* cookie) {
- Impl_->Set(cookie, sock, CONT_POLL_WRITE);
-}
-
+
+void TSocketPoller::WaitRead(SOCKET sock, void* cookie) {
+ Impl_->Set(cookie, sock, CONT_POLL_READ);
+}
+
+void TSocketPoller::WaitWrite(SOCKET sock, void* cookie) {
+ Impl_->Set(cookie, sock, CONT_POLL_WRITE);
+}
+
void TSocketPoller::WaitReadWrite(SOCKET sock, void* cookie) {
Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE);
}
@@ -70,17 +70,17 @@ void TSocketPoller::WaitReadWriteOneShot(SOCKET sock, void* cookie) {
}
void TSocketPoller::WaitReadWriteEdgeTriggered(SOCKET sock, void* cookie) {
- Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_EDGE_TRIGGERED);
+ Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_EDGE_TRIGGERED);
}
void TSocketPoller::RestartReadWriteEdgeTriggered(SOCKET sock, void* cookie, bool empty) {
- Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_MODIFY | CONT_POLL_EDGE_TRIGGERED | (empty ? CONT_POLL_BACKLOG_EMPTY : 0));
+ Impl_->Set(cookie, sock, CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_MODIFY | CONT_POLL_EDGE_TRIGGERED | (empty ? CONT_POLL_BACKLOG_EMPTY : 0));
}
-void TSocketPoller::Unwait(SOCKET sock) {
- Impl_->Remove(sock);
-}
-
-size_t TSocketPoller::WaitD(void** ev, size_t len, const TInstant& deadLine) {
- return Impl_->DoWait(ev, len, deadLine);
-}
+void TSocketPoller::Unwait(SOCKET sock) {
+ Impl_->Remove(sock);
+}
+
+size_t TSocketPoller::WaitD(void** ev, size_t len, const TInstant& deadLine) {
+ return Impl_->DoWait(ev, len, deadLine);
+}
diff --git a/util/network/poller.h b/util/network/poller.h
index 8dccd73140..302bb2a78b 100644
--- a/util/network/poller.h
+++ b/util/network/poller.h
@@ -1,58 +1,58 @@
#pragma once
-#include "socket.h"
-
+#include "socket.h"
+
#include <util/generic/ptr.h>
-#include <util/datetime/base.h>
+#include <util/datetime/base.h>
-class TSocketPoller {
-public:
- TSocketPoller();
+class TSocketPoller {
+public:
+ TSocketPoller();
~TSocketPoller();
-
- void WaitRead(SOCKET sock, void* cookie);
- void WaitWrite(SOCKET sock, void* cookie);
- void WaitReadWrite(SOCKET sock, void* cookie);
+
+ void WaitRead(SOCKET sock, void* cookie);
+ void WaitWrite(SOCKET sock, void* cookie);
+ void WaitReadWrite(SOCKET sock, void* cookie);
void WaitRdhup(SOCKET sock, void* cookie);
-
- void WaitReadOneShot(SOCKET sock, void* cookie);
- void WaitWriteOneShot(SOCKET sock, void* cookie);
- void WaitReadWriteOneShot(SOCKET sock, void* cookie);
+
+ void WaitReadOneShot(SOCKET sock, void* cookie);
+ void WaitWriteOneShot(SOCKET sock, void* cookie);
+ void WaitReadWriteOneShot(SOCKET sock, void* cookie);
void WaitReadWriteEdgeTriggered(SOCKET sock, void* cookie);
void RestartReadWriteEdgeTriggered(SOCKET sock, void* cookie, bool empty = true);
- void Unwait(SOCKET sock);
-
- size_t WaitD(void** events, size_t len, const TInstant& deadLine);
-
- inline size_t WaitT(void** events, size_t len, const TDuration& timeOut) {
- return WaitD(events, len, timeOut.ToDeadLine());
- }
-
- inline size_t WaitI(void** events, size_t len) {
- return WaitD(events, len, TInstant::Max());
- }
-
- inline void* WaitD(const TInstant& deadLine) {
- void* ret;
-
- if (WaitD(&ret, 1, deadLine)) {
- return ret;
- }
-
+ void Unwait(SOCKET sock);
+
+ size_t WaitD(void** events, size_t len, const TInstant& deadLine);
+
+ inline size_t WaitT(void** events, size_t len, const TDuration& timeOut) {
+ return WaitD(events, len, timeOut.ToDeadLine());
+ }
+
+ inline size_t WaitI(void** events, size_t len) {
+ return WaitD(events, len, TInstant::Max());
+ }
+
+ inline void* WaitD(const TInstant& deadLine) {
+ void* ret;
+
+ if (WaitD(&ret, 1, deadLine)) {
+ return ret;
+ }
+
return nullptr;
- }
-
- inline void* WaitT(const TDuration& timeOut) {
- return WaitD(timeOut.ToDeadLine());
- }
-
- inline void* WaitI() {
- return WaitD(TInstant::Max());
- }
-
-private:
- class TImpl;
- THolder<TImpl> Impl_;
+ }
+
+ inline void* WaitT(const TDuration& timeOut) {
+ return WaitD(timeOut.ToDeadLine());
+ }
+
+ inline void* WaitI() {
+ return WaitD(TInstant::Max());
+ }
+
+private:
+ class TImpl;
+ THolder<TImpl> Impl_;
};
diff --git a/util/network/poller_ut.cpp b/util/network/poller_ut.cpp
index 6df0dda8ec..181e5ec027 100644
--- a/util/network/poller_ut.cpp
+++ b/util/network/poller_ut.cpp
@@ -14,16 +14,16 @@ Y_UNIT_TEST_SUITE(TSocketPollerTest) {
TSocketHolder s2(sockets[1]);
TSocketPoller poller;
- poller.WaitRead(sockets[1], (void*)17);
+ poller.WaitRead(sockets[1], (void*)17);
UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
for (ui32 i = 0; i < 3; ++i) {
- char buf[] = {18};
+ char buf[] = {18};
UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
- UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
+ UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0));
UNIT_ASSERT_VALUES_EQUAL(18, buf[0]);
@@ -46,7 +46,7 @@ Y_UNIT_TEST_SUITE(TSocketPollerTest) {
UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
for (ui32 i = 0; i < 3; ++i) {
- poller.WaitReadOneShot(sockets[1], (void*)17);
+ poller.WaitReadOneShot(sockets[1], (void*)17);
char buf[1];
@@ -54,7 +54,7 @@ Y_UNIT_TEST_SUITE(TSocketPollerTest) {
UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
- UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
+ UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0));
UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[0]);
@@ -138,7 +138,7 @@ Y_UNIT_TEST_SUITE(TSocketPollerTest) {
// after restart read and write might generate separate events
{
- void* events[3];
+ void* events[3];
size_t count = poller.WaitT(events, 3, TDuration::Zero());
UNIT_ASSERT_GE(count, 1);
UNIT_ASSERT_LE(count, 2);
@@ -222,7 +222,7 @@ Y_UNIT_TEST_SUITE(TSocketPollerTest) {
UNIT_ASSERT_VALUES_EQUAL(1, send(s1, buf, 1, 0));
shutdown(s1, SHUT_WR);
- using TPoller = TGenericPoller<TEpollPoller<TWithoutLocking>>;
+ using TPoller = TGenericPoller<TEpollPoller<TWithoutLocking>>;
TPoller poller;
poller.Set((void*)17, s2, CONT_POLL_RDHUP);
diff --git a/util/network/pollerimpl.cpp b/util/network/pollerimpl.cpp
index bf2ba16cf6..5136700238 100644
--- a/util/network/pollerimpl.cpp
+++ b/util/network/pollerimpl.cpp
@@ -1 +1 @@
-#include "pollerimpl.h"
+#include "pollerimpl.h"
diff --git a/util/network/pollerimpl.h b/util/network/pollerimpl.h
index e8c7e40fba..4ad438228a 100644
--- a/util/network/pollerimpl.h
+++ b/util/network/pollerimpl.h
@@ -1,114 +1,114 @@
#pragma once
-
-#include "socket.h"
-
-#include <util/system/error.h>
-#include <util/system/mutex.h>
+
+#include "socket.h"
+
+#include <util/system/error.h>
+#include <util/system/mutex.h>
#include <util/system/defaults.h>
#include <util/generic/ylimits.h>
#include <util/generic/utility.h>
#include <util/generic/vector.h>
#include <util/generic/yexception.h>
#include <util/datetime/base.h>
-
-#if defined(_freebsd_) || defined(_darwin_)
- #define HAVE_KQUEUE_POLLER
-#endif
-
+
+#if defined(_freebsd_) || defined(_darwin_)
+ #define HAVE_KQUEUE_POLLER
+#endif
+
#if (defined(_linux_) && !defined(_bionic_)) || (__ANDROID_API__ >= 21)
- #define HAVE_EPOLL_POLLER
-#endif
-
-//now we always have it
-#define HAVE_SELECT_POLLER
-
-#if defined(HAVE_KQUEUE_POLLER)
- #include <sys/event.h>
-#endif
-
-#if defined(HAVE_EPOLL_POLLER)
- #include <sys/epoll.h>
-#endif
-
-enum EContPoll {
- CONT_POLL_READ = 1,
- CONT_POLL_WRITE = 2,
- CONT_POLL_RDHUP = 4,
- CONT_POLL_ONE_SHOT = 8, // Disable after first event
- CONT_POLL_MODIFY = 16, // Modify already added event
+ #define HAVE_EPOLL_POLLER
+#endif
+
+//now we always have it
+#define HAVE_SELECT_POLLER
+
+#if defined(HAVE_KQUEUE_POLLER)
+ #include <sys/event.h>
+#endif
+
+#if defined(HAVE_EPOLL_POLLER)
+ #include <sys/epoll.h>
+#endif
+
+enum EContPoll {
+ CONT_POLL_READ = 1,
+ CONT_POLL_WRITE = 2,
+ CONT_POLL_RDHUP = 4,
+ CONT_POLL_ONE_SHOT = 8, // Disable after first event
+ CONT_POLL_MODIFY = 16, // Modify already added event
CONT_POLL_EDGE_TRIGGERED = 32, // Notify only about new events
- CONT_POLL_BACKLOG_EMPTY = 64, // Backlog is empty (seen end of request, EAGAIN or truncated read)
-};
-
+ CONT_POLL_BACKLOG_EMPTY = 64, // Backlog is empty (seen end of request, EAGAIN or truncated read)
+};
+
static inline bool IsSocket(SOCKET fd) noexcept {
- int val = 0;
- socklen_t len = sizeof(val);
-
- if (getsockopt(fd, SOL_SOCKET, SO_TYPE, (char*)&val, &len) == 0) {
- return true;
- }
-
- return LastSystemError() != ENOTSOCK;
-}
-
+ int val = 0;
+ socklen_t len = sizeof(val);
+
+ if (getsockopt(fd, SOL_SOCKET, SO_TYPE, (char*)&val, &len) == 0) {
+ return true;
+ }
+
+ return LastSystemError() != ENOTSOCK;
+}
+
static inline int MicroToMilli(int timeout) noexcept {
- if (timeout) {
- /*
- * 1. API of epoll syscall allows to specify timeout with millisecond
- * accuracy only
- * 2. It is quite complicated to guarantee time resolution of blocking
- * syscall less than kernel 1/HZ
- *
- * Without this rounding we just waste cpu time and do a lot of
- * fast epoll_wait(..., 0) syscalls.
- */
- return Max(timeout / 1000, 1);
- }
-
- return 0;
-}
-
-struct TWithoutLocking {
+ if (timeout) {
+ /*
+ * 1. API of epoll syscall allows to specify timeout with millisecond
+ * accuracy only
+ * 2. It is quite complicated to guarantee time resolution of blocking
+ * syscall less than kernel 1/HZ
+ *
+ * Without this rounding we just waste cpu time and do a lot of
+ * fast epoll_wait(..., 0) syscalls.
+ */
+ return Max(timeout / 1000, 1);
+ }
+
+ return 0;
+}
+
+struct TWithoutLocking {
using TMyMutex = TFakeMutex;
-};
-
-#if defined(HAVE_KQUEUE_POLLER)
-static inline int Kevent(int kq, struct kevent* changelist, int nchanges,
+};
+
+#if defined(HAVE_KQUEUE_POLLER)
+static inline int Kevent(int kq, struct kevent* changelist, int nchanges,
struct kevent* eventlist, int nevents, const struct timespec* timeout) noexcept {
- int ret;
-
- do {
- ret = kevent(kq, changelist, nchanges, eventlist, nevents, timeout);
- } while (ret == -1 && errno == EINTR);
-
- return ret;
-}
-
-template <class TLockPolicy>
-class TKqueuePoller {
-public:
+ int ret;
+
+ do {
+ ret = kevent(kq, changelist, nchanges, eventlist, nevents, timeout);
+ } while (ret == -1 && errno == EINTR);
+
+ return ret;
+}
+
+template <class TLockPolicy>
+class TKqueuePoller {
+public:
typedef struct ::kevent TEvent;
-
- inline TKqueuePoller()
- : Fd_(kqueue())
- {
- if (Fd_ == -1) {
- ythrow TSystemError() << "kqueue failed";
- }
- }
-
+
+ inline TKqueuePoller()
+ : Fd_(kqueue())
+ {
+ if (Fd_ == -1) {
+ ythrow TSystemError() << "kqueue failed";
+ }
+ }
+
inline ~TKqueuePoller() {
- close(Fd_);
- }
-
+ close(Fd_);
+ }
+
inline int Fd() const noexcept {
- return Fd_;
- }
-
- inline void SetImpl(void* data, int fd, int what) {
- TEvent e[2];
+ return Fd_;
+ }
+
+ inline void SetImpl(void* data, int fd, int what) {
+ TEvent e[2];
int flags = EV_ADD;
-
+
if (what & CONT_POLL_EDGE_TRIGGERED) {
if (what & CONT_POLL_BACKLOG_EMPTY) {
// When backlog is empty, edge-triggered does not need restart.
@@ -121,110 +121,110 @@ public:
flags |= EV_ONESHOT;
}
- Zero(e);
-
+ Zero(e);
+
EV_SET(e + 0, fd, EVFILT_READ, flags | ((what & CONT_POLL_READ) ? EV_ENABLE : EV_DISABLE), 0, 0, data);
EV_SET(e + 1, fd, EVFILT_WRITE, flags | ((what & CONT_POLL_WRITE) ? EV_ENABLE : EV_DISABLE), 0, 0, data);
-
+
if (Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1) {
- ythrow TSystemError() << "kevent add failed";
- }
- }
-
+ ythrow TSystemError() << "kevent add failed";
+ }
+ }
+
inline void Remove(int fd) noexcept {
- TEvent e[2];
-
- Zero(e);
-
- EV_SET(e + 0, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
- EV_SET(e + 1, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
-
+ TEvent e[2];
+
+ Zero(e);
+
+ EV_SET(e + 0, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
+ EV_SET(e + 1, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
+
Y_VERIFY(!(Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1 && errno != ENOENT), "kevent remove failed: %s", LastSystemErrorText());
- }
-
+ }
+
inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
- struct timespec ts;
-
- ts.tv_sec = timeout / 1000000;
- ts.tv_nsec = (timeout % 1000000) * 1000;
-
+ struct timespec ts;
+
+ ts.tv_sec = timeout / 1000000;
+ ts.tv_nsec = (timeout % 1000000) * 1000;
+
const int ret = Kevent(Fd_, nullptr, 0, events, len, &ts);
-
+
Y_VERIFY(ret >= 0, "kevent failed: %s", LastSystemErrorText());
-
- return (size_t)ret;
- }
-
+
+ return (size_t)ret;
+ }
+
static inline void* ExtractEvent(const TEvent* event) noexcept {
- return event->udata;
- }
-
+ return event->udata;
+ }
+
static inline int ExtractStatus(const TEvent* event) noexcept {
if (event->flags & EV_ERROR) {
return EIO;
}
return event->fflags;
- }
-
+ }
+
static inline int ExtractFilterImpl(const TEvent* event) noexcept {
- if (event->filter == EVFILT_READ) {
+ if (event->filter == EVFILT_READ) {
return CONT_POLL_READ;
}
-
+
if (event->filter == EVFILT_WRITE) {
return CONT_POLL_WRITE;
}
- if (event->flags & EV_EOF) {
- return CONT_POLL_READ | CONT_POLL_WRITE;
- }
-
+ if (event->flags & EV_EOF) {
+ return CONT_POLL_READ | CONT_POLL_WRITE;
+ }
+
return 0;
}
-private:
- int Fd_;
-};
-#endif
-
-#if defined(HAVE_EPOLL_POLLER)
+private:
+ int Fd_;
+};
+#endif
+
+#if defined(HAVE_EPOLL_POLLER)
static inline int ContEpollWait(int epfd, struct epoll_event* events, int maxevents, int timeout) noexcept {
int ret;
-
+
do {
- ret = epoll_wait(epfd, events, maxevents, Min<int>(timeout, 35 * 60 * 1000));
+ ret = epoll_wait(epfd, events, maxevents, Min<int>(timeout, 35 * 60 * 1000));
} while (ret == -1 && errno == EINTR);
-
+
return ret;
}
-template <class TLockPolicy>
-class TEpollPoller {
-public:
- typedef struct ::epoll_event TEvent;
-
+template <class TLockPolicy>
+class TEpollPoller {
+public:
+ typedef struct ::epoll_event TEvent;
+
inline TEpollPoller(bool closeOnExec = false)
: Fd_(epoll_create1(closeOnExec ? EPOLL_CLOEXEC : 0))
- {
- if (Fd_ == -1) {
- ythrow TSystemError() << "epoll_create failed";
- }
- }
-
+ {
+ if (Fd_ == -1) {
+ ythrow TSystemError() << "epoll_create failed";
+ }
+ }
+
inline ~TEpollPoller() {
- close(Fd_);
- }
-
+ close(Fd_);
+ }
+
inline int Fd() const noexcept {
- return Fd_;
- }
-
- inline void SetImpl(void* data, int fd, int what) {
- TEvent e;
-
- Zero(e);
-
+ return Fd_;
+ }
+
+ inline void SetImpl(void* data, int fd, int what) {
+ TEvent e;
+
+ Zero(e);
+
if (what & CONT_POLL_EDGE_TRIGGERED) {
if (what & CONT_POLL_BACKLOG_EMPTY) {
// When backlog is empty, edge-triggered does not need restart.
@@ -249,50 +249,50 @@ public:
e.events |= EPOLLRDHUP;
}
- e.data.ptr = data;
-
+ e.data.ptr = data;
+
if ((what & CONT_POLL_MODIFY) || epoll_ctl(Fd_, EPOLL_CTL_ADD, fd, &e) == -1) {
- if (epoll_ctl(Fd_, EPOLL_CTL_MOD, fd, &e) == -1) {
- ythrow TSystemError() << "epoll add failed";
- }
- }
- }
-
+ if (epoll_ctl(Fd_, EPOLL_CTL_MOD, fd, &e) == -1) {
+ ythrow TSystemError() << "epoll add failed";
+ }
+ }
+ }
+
inline void Remove(int fd) noexcept {
- TEvent e;
-
- Zero(e);
-
- epoll_ctl(Fd_, EPOLL_CTL_DEL, fd, &e);
- }
-
+ TEvent e;
+
+ Zero(e);
+
+ epoll_ctl(Fd_, EPOLL_CTL_DEL, fd, &e);
+ }
+
inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
- const int ret = ContEpollWait(Fd_, events, len, MicroToMilli(timeout));
-
+ const int ret = ContEpollWait(Fd_, events, len, MicroToMilli(timeout));
+
Y_VERIFY(ret >= 0, "epoll wait error: %s", LastSystemErrorText());
-
- return (size_t)ret;
- }
-
+
+ return (size_t)ret;
+ }
+
static inline void* ExtractEvent(const TEvent* event) noexcept {
- return event->data.ptr;
- }
-
+ return event->data.ptr;
+ }
+
static inline int ExtractStatus(const TEvent* event) noexcept {
- if (event->events & (EPOLLERR | EPOLLHUP)) {
- return EIO;
- }
-
- return 0;
- }
-
+ if (event->events & (EPOLLERR | EPOLLHUP)) {
+ return EIO;
+ }
+
+ return 0;
+ }
+
static inline int ExtractFilterImpl(const TEvent* event) noexcept {
int ret = 0;
if (event->events & EPOLLIN) {
ret |= CONT_POLL_READ;
}
-
+
if (event->events & EPOLLOUT) {
ret |= CONT_POLL_WRITE;
}
@@ -304,177 +304,177 @@ public:
return ret;
}
-private:
- int Fd_;
-};
-#endif
-
-#if defined(HAVE_SELECT_POLLER)
- #include <util/memory/tempbuf.h>
- #include <util/generic/hash.h>
-
- #include "pair.h"
-
+private:
+ int Fd_;
+};
+#endif
+
+#if defined(HAVE_SELECT_POLLER)
+ #include <util/memory/tempbuf.h>
+ #include <util/generic/hash.h>
+
+ #include "pair.h"
+
static inline int ContSelect(int n, fd_set* r, fd_set* w, fd_set* e, struct timeval* t) noexcept {
- int ret;
-
- do {
- ret = select(n, r, w, e, t);
- } while (ret == -1 && errno == EINTR);
-
- return ret;
-}
-
-struct TSelectPollerNoTemplate {
- struct THandle {
- void* Data_;
- int Filter_;
-
- inline THandle()
+ int ret;
+
+ do {
+ ret = select(n, r, w, e, t);
+ } while (ret == -1 && errno == EINTR);
+
+ return ret;
+}
+
+struct TSelectPollerNoTemplate {
+ struct THandle {
+ void* Data_;
+ int Filter_;
+
+ inline THandle()
: Data_(nullptr)
- , Filter_(0)
- {
- }
-
+ , Filter_(0)
+ {
+ }
+
inline void* Data() const noexcept {
- return Data_;
- }
-
+ return Data_;
+ }
+
inline void Set(void* d, int s) noexcept {
- Data_ = d;
- Filter_ = s;
- }
-
+ Data_ = d;
+ Filter_ = s;
+ }
+
inline void Clear(int c) noexcept {
Filter_ &= ~c;
}
inline int Filter() const noexcept {
- return Filter_;
- }
- };
-
+ return Filter_;
+ }
+ };
+
class TFds: public THashMap<SOCKET, THandle> {
- public:
- inline void Set(SOCKET fd, void* data, int filter) {
- (*this)[fd].Set(data, filter);
- }
-
+ public:
+ inline void Set(SOCKET fd, void* data, int filter) {
+ (*this)[fd].Set(data, filter);
+ }
+
inline void Remove(SOCKET fd) {
- erase(fd);
- }
-
+ erase(fd);
+ }
+
inline SOCKET Build(fd_set* r, fd_set* w, fd_set* e) const noexcept {
- SOCKET ret = 0;
-
+ SOCKET ret = 0;
+
for (const auto& it : *this) {
const SOCKET fd = it.first;
const THandle& handle = it.second;
-
- FD_SET(fd, e);
-
- if (handle.Filter() & CONT_POLL_READ) {
- FD_SET(fd, r);
- }
-
- if (handle.Filter() & CONT_POLL_WRITE) {
- FD_SET(fd, w);
- }
-
- if (fd > ret) {
- ret = fd;
- }
- }
-
- return ret;
- }
- };
-
- struct TEvent: public THandle {
+
+ FD_SET(fd, e);
+
+ if (handle.Filter() & CONT_POLL_READ) {
+ FD_SET(fd, r);
+ }
+
+ if (handle.Filter() & CONT_POLL_WRITE) {
+ FD_SET(fd, w);
+ }
+
+ if (fd > ret) {
+ ret = fd;
+ }
+ }
+
+ return ret;
+ }
+ };
+
+ struct TEvent: public THandle {
inline int Status() const noexcept {
- return -Min(Filter(), 0);
- }
-
+ return -Min(Filter(), 0);
+ }
+
inline void Error(void* d, int err) noexcept {
- Set(d, -err);
- }
-
+ Set(d, -err);
+ }
+
inline void Success(void* d, int what) noexcept {
- Set(d, what);
- }
- };
-};
-
-template <class TLockPolicy>
-class TSelectPoller: public TSelectPollerNoTemplate {
+ Set(d, what);
+ }
+ };
+};
+
+template <class TLockPolicy>
+class TSelectPoller: public TSelectPollerNoTemplate {
using TMyMutex = typename TLockPolicy::TMyMutex;
-
-public:
- inline TSelectPoller()
+
+public:
+ inline TSelectPoller()
: Begin_(nullptr)
, End_(nullptr)
- {
- SocketPair(Signal_);
- SetNonBlock(WaitSock());
- SetNonBlock(SigSock());
- }
-
+ {
+ SocketPair(Signal_);
+ SetNonBlock(WaitSock());
+ SetNonBlock(SigSock());
+ }
+
inline ~TSelectPoller() {
- closesocket(Signal_[0]);
- closesocket(Signal_[1]);
- }
-
+ closesocket(Signal_[0]);
+ closesocket(Signal_[1]);
+ }
+
inline void SetImpl(void* data, SOCKET fd, int what) {
- with_lock (CommandLock_) {
+ with_lock (CommandLock_) {
Commands_.push_back(TCommand(fd, what, data));
- }
-
- Signal();
- }
-
+ }
+
+ Signal();
+ }
+
inline void Remove(SOCKET fd) noexcept {
- with_lock (CommandLock_) {
+ with_lock (CommandLock_) {
Commands_.push_back(TCommand(fd, 0));
- }
-
- Signal();
- }
-
+ }
+
+ Signal();
+ }
+
inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
- auto guard = Guard(Lock_);
-
- do {
- if (Begin_ != End_) {
- const size_t ret = Min<size_t>(End_ - Begin_, len);
-
- memcpy(events, Begin_, sizeof(*events) * ret);
- Begin_ += ret;
-
- return ret;
- }
-
- if (len >= EventNumberHint()) {
- return WaitBase(events, len, timeout);
- }
-
- Begin_ = SavedEvents();
- End_ = Begin_ + WaitBase(Begin_, EventNumberHint(), timeout);
- } while (Begin_ != End_);
-
- return 0;
- }
-
- inline TEvent* SavedEvents() {
- if (!SavedEvents_) {
- SavedEvents_.Reset(new TEvent[EventNumberHint()]);
- }
-
- return SavedEvents_.Get();
- }
-
+ auto guard = Guard(Lock_);
+
+ do {
+ if (Begin_ != End_) {
+ const size_t ret = Min<size_t>(End_ - Begin_, len);
+
+ memcpy(events, Begin_, sizeof(*events) * ret);
+ Begin_ += ret;
+
+ return ret;
+ }
+
+ if (len >= EventNumberHint()) {
+ return WaitBase(events, len, timeout);
+ }
+
+ Begin_ = SavedEvents();
+ End_ = Begin_ + WaitBase(Begin_, EventNumberHint(), timeout);
+ } while (Begin_ != End_);
+
+ return 0;
+ }
+
+ inline TEvent* SavedEvents() {
+ if (!SavedEvents_) {
+ SavedEvents_.Reset(new TEvent[EventNumberHint()]);
+ }
+
+ return SavedEvents_.Get();
+ }
+
inline size_t WaitBase(TEvent* events, size_t len, int timeout) noexcept {
- with_lock (CommandLock_) {
- for (auto command = Commands_.begin(); command != Commands_.end(); ++command) {
+ with_lock (CommandLock_) {
+ for (auto command = Commands_.begin(); command != Commands_.end(); ++command) {
if (command->Filter_ != 0) {
Fds_.Set(command->Fd_, command->Cookie_, command->Filter_);
} else {
@@ -486,68 +486,68 @@ public:
}
TTempBuf tmpBuf(3 * sizeof(fd_set) + Fds_.size() * sizeof(SOCKET));
-
- fd_set* in = (fd_set*)tmpBuf.Data();
- fd_set* out = &in[1];
- fd_set* errFds = &in[2];
-
- SOCKET* keysToDeleteBegin = (SOCKET*)&in[3];
+
+ fd_set* in = (fd_set*)tmpBuf.Data();
+ fd_set* out = &in[1];
+ fd_set* errFds = &in[2];
+
+ SOCKET* keysToDeleteBegin = (SOCKET*)&in[3];
SOCKET* keysToDeleteEnd = keysToDeleteBegin;
- #if defined(_msan_enabled_) // msan doesn't handle FD_ZERO and cause false positive BALANCER-1347
+ #if defined(_msan_enabled_) // msan doesn't handle FD_ZERO and cause false positive BALANCER-1347
memset(in, 0, sizeof(*in));
memset(out, 0, sizeof(*out));
memset(errFds, 0, sizeof(*errFds));
- #endif
-
- FD_ZERO(in);
- FD_ZERO(out);
- FD_ZERO(errFds);
-
- FD_SET(WaitSock(), in);
-
- const SOCKET maxFdNum = Max(Fds_.Build(in, out, errFds), WaitSock());
- struct timeval tout;
-
- tout.tv_sec = timeout / 1000000;
- tout.tv_usec = timeout % 1000000;
-
+ #endif
+
+ FD_ZERO(in);
+ FD_ZERO(out);
+ FD_ZERO(errFds);
+
+ FD_SET(WaitSock(), in);
+
+ const SOCKET maxFdNum = Max(Fds_.Build(in, out, errFds), WaitSock());
+ struct timeval tout;
+
+ tout.tv_sec = timeout / 1000000;
+ tout.tv_usec = timeout % 1000000;
+
int ret = ContSelect(int(maxFdNum + 1), in, out, errFds, &tout);
-
- if (ret > 0 && FD_ISSET(WaitSock(), in)) {
- --ret;
- TryWait();
- }
-
+
+ if (ret > 0 && FD_ISSET(WaitSock(), in)) {
+ --ret;
+ TryWait();
+ }
+
Y_VERIFY(ret >= 0 && (size_t)ret <= len, "select error: %s", LastSystemErrorText());
-
- TEvent* eventsStart = events;
-
+
+ TEvent* eventsStart = events;
+
for (typename TFds::iterator it = Fds_.begin(); it != Fds_.end(); ++it) {
const SOCKET fd = it->first;
THandle& handle = it->second;
-
- if (FD_ISSET(fd, errFds)) {
- (events++)->Error(handle.Data(), EIO);
+
+ if (FD_ISSET(fd, errFds)) {
+ (events++)->Error(handle.Data(), EIO);
if (handle.Filter() & CONT_POLL_ONE_SHOT) {
*keysToDeleteEnd = fd;
++keysToDeleteEnd;
}
- } else {
- int what = 0;
-
- if (FD_ISSET(fd, in)) {
- what |= CONT_POLL_READ;
- }
-
- if (FD_ISSET(fd, out)) {
- what |= CONT_POLL_WRITE;
- }
-
- if (what) {
- (events++)->Success(handle.Data(), what);
+ } else {
+ int what = 0;
+
+ if (FD_ISSET(fd, in)) {
+ what |= CONT_POLL_READ;
+ }
+
+ if (FD_ISSET(fd, out)) {
+ what |= CONT_POLL_WRITE;
+ }
+
+ if (what) {
+ (events++)->Success(handle.Data(), what);
if (handle.Filter() & CONT_POLL_ONE_SHOT) {
*keysToDeleteEnd = fd;
@@ -559,148 +559,148 @@ public:
// User must restart waiting this event when needed.
handle.Clear(what);
}
- }
- }
- }
-
+ }
+ }
+ }
+
while (keysToDeleteBegin != keysToDeleteEnd) {
Fds_.erase(*keysToDeleteBegin);
++keysToDeleteBegin;
}
- return events - eventsStart;
- }
-
+ return events - eventsStart;
+ }
+
inline size_t EventNumberHint() const noexcept {
- return sizeof(fd_set) * 8 * 2;
- }
-
+ return sizeof(fd_set) * 8 * 2;
+ }
+
static inline void* ExtractEvent(const TEvent* event) noexcept {
- return event->Data();
- }
-
+ return event->Data();
+ }
+
static inline int ExtractStatus(const TEvent* event) noexcept {
- return event->Status();
- }
-
+ return event->Status();
+ }
+
static inline int ExtractFilterImpl(const TEvent* event) noexcept {
- return event->Filter();
+ return event->Filter();
}
-private:
+private:
inline void Signal() noexcept {
- char ch = 13;
-
- send(SigSock(), &ch, 1, 0);
- }
-
- inline void TryWait() {
- char ch[32];
-
- while (recv(WaitSock(), ch, sizeof(ch), 0) > 0) {
+ char ch = 13;
+
+ send(SigSock(), &ch, 1, 0);
+ }
+
+ inline void TryWait() {
+ char ch[32];
+
+ while (recv(WaitSock(), ch, sizeof(ch), 0) > 0) {
Y_ASSERT(ch[0] == 13);
- }
- }
-
+ }
+ }
+
inline SOCKET WaitSock() const noexcept {
- return Signal_[1];
- }
-
+ return Signal_[1];
+ }
+
inline SOCKET SigSock() const noexcept {
- return Signal_[0];
- }
-
-private:
+ return Signal_[0];
+ }
+
+private:
struct TCommand {
SOCKET Fd_;
int Filter_; // 0 to remove
void* Cookie_;
TCommand(SOCKET fd, int filter, void* cookie)
- : Fd_(fd)
- , Filter_(filter)
- , Cookie_(cookie)
- {
- }
+ : Fd_(fd)
+ , Filter_(filter)
+ , Cookie_(cookie)
+ {
+ }
TCommand(SOCKET fd, int filter)
- : Fd_(fd)
- , Filter_(filter)
- {
- }
+ : Fd_(fd)
+ , Filter_(filter)
+ {
+ }
};
- TFds Fds_;
-
- TMyMutex Lock_;
- TArrayHolder<TEvent> SavedEvents_;
- TEvent* Begin_;
- TEvent* End_;
-
+ TFds Fds_;
+
+ TMyMutex Lock_;
+ TArrayHolder<TEvent> SavedEvents_;
+ TEvent* Begin_;
+ TEvent* End_;
+
TMyMutex CommandLock_;
TVector<TCommand> Commands_;
- SOCKET Signal_[2];
-};
-#endif
-
+ SOCKET Signal_[2];
+};
+#endif
+
static inline TDuration PollStep(const TInstant& deadLine, const TInstant& now) noexcept {
- if (deadLine < now) {
- return TDuration::Zero();
- }
-
- return Min(deadLine - now, TDuration::Seconds(1000));
-}
-
-template <class TBase>
-class TGenericPoller: public TBase {
-public:
+ if (deadLine < now) {
+ return TDuration::Zero();
+ }
+
+ return Min(deadLine - now, TDuration::Seconds(1000));
+}
+
+template <class TBase>
+class TGenericPoller: public TBase {
+public:
using TBase::TBase;
using TEvent = typename TBase::TEvent;
-
+
inline void Set(void* data, SOCKET fd, int what) {
- if (what) {
- this->SetImpl(data, fd, what);
- } else {
- this->Remove(fd);
- }
- }
-
+ if (what) {
+ this->SetImpl(data, fd, what);
+ } else {
+ this->Remove(fd);
+ }
+ }
+
static inline int ExtractFilter(const TEvent* event) noexcept {
- if (TBase::ExtractStatus(event)) {
+ if (TBase::ExtractStatus(event)) {
return CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_RDHUP;
- }
-
- return TBase::ExtractFilterImpl(event);
- }
-
+ }
+
+ return TBase::ExtractFilterImpl(event);
+ }
+
inline size_t WaitD(TEvent* events, size_t len, TInstant deadLine, TInstant now = TInstant::Now()) noexcept {
- if (!len) {
- return 0;
- }
-
- size_t ret;
-
- do {
- ret = this->Wait(events, len, (int)PollStep(deadLine, now).MicroSeconds());
- } while (!ret && ((now = TInstant::Now()) < deadLine));
-
- return ret;
- }
-};
-
-#if defined(HAVE_KQUEUE_POLLER)
- #define TPollerImplBase TKqueuePoller
-#elif defined(HAVE_EPOLL_POLLER)
- #define TPollerImplBase TEpollPoller
-#elif defined(HAVE_SELECT_POLLER)
- #define TPollerImplBase TSelectPoller
-#else
- #error "unsupported platform"
-#endif
-
-template <class TLockPolicy>
-using TPollerImpl = TGenericPoller<TPollerImplBase<TLockPolicy>>;
-
-#undef TPollerImplBase
+ if (!len) {
+ return 0;
+ }
+
+ size_t ret;
+
+ do {
+ ret = this->Wait(events, len, (int)PollStep(deadLine, now).MicroSeconds());
+ } while (!ret && ((now = TInstant::Now()) < deadLine));
+
+ return ret;
+ }
+};
+
+#if defined(HAVE_KQUEUE_POLLER)
+ #define TPollerImplBase TKqueuePoller
+#elif defined(HAVE_EPOLL_POLLER)
+ #define TPollerImplBase TEpollPoller
+#elif defined(HAVE_SELECT_POLLER)
+ #define TPollerImplBase TSelectPoller
+#else
+ #error "unsupported platform"
+#endif
+
+template <class TLockPolicy>
+using TPollerImpl = TGenericPoller<TPollerImplBase<TLockPolicy>>;
+
+#undef TPollerImplBase
diff --git a/util/network/sock.cpp b/util/network/sock.cpp
index d4864a9c1c..91a1c753a0 100644
--- a/util/network/sock.cpp
+++ b/util/network/sock.cpp
@@ -1 +1 @@
-#include "sock.h"
+#include "sock.h"
diff --git a/util/network/sock.h b/util/network/sock.h
index b10be2f715..bc0a31af20 100644
--- a/util/network/sock.h
+++ b/util/network/sock.h
@@ -6,11 +6,11 @@
#include <util/stream/output.h>
#include <util/system/sysstat.h>
-#if defined(_win_) || defined(_cygwin_)
- #include <util/system/file.h>
+#if defined(_win_) || defined(_cygwin_)
+ #include <util/system/file.h>
#else
- #include <sys/un.h>
- #include <sys/stat.h>
+ #include <sys/un.h>
+ #include <sys/stat.h>
#endif //_win_
#include "init.h"
@@ -45,8 +45,8 @@ protected:
virtual int Bind(SOCKET s, ui16 mode) const = 0;
};
-#if defined(_win_) || defined(_cygwin_)
- #define YAF_LOCAL AF_INET
+#if defined(_win_) || defined(_cygwin_)
+ #define YAF_LOCAL AF_INET
struct TSockAddrLocal: public ISockAddr {
TSockAddrLocal() {
Clear();
@@ -112,29 +112,29 @@ struct TSockAddrLocal: public ISockAddr {
int Bind(SOCKET s, ui16 mode) const {
Y_UNUSED(mode);
- int ret = 0;
- // 1. open file
- TFileHandle f(Path, CreateAlways | WrOnly);
- if (!f.IsOpen())
- return -errno;
+ int ret = 0;
+ // 1. open file
+ TFileHandle f(Path, CreateAlways | WrOnly);
+ if (!f.IsOpen())
+ return -errno;
- // 2. find port and bind to it
- in.sin_port = 0;
- ret = bind(s, SockAddr(), Len());
- if (ret != 0)
- return -WSAGetLastError();
+ // 2. find port and bind to it
+ in.sin_port = 0;
+ ret = bind(s, SockAddr(), Len());
+ if (ret != 0)
+ return -WSAGetLastError();
- int size = Size();
- ret = getsockname(s, (struct sockaddr*)(&in), &size);
- if (ret != 0)
- return -WSAGetLastError();
+ int size = Size();
+ ret = getsockname(s, (struct sockaddr*)(&in), &size);
+ if (ret != 0)
+ return -WSAGetLastError();
- // 3. write port to file
- ret = f.Write(&(in.sin_port), sizeof(in.sin_port));
- if (ret != sizeof(in.sin_port))
- return -errno;
+ // 3. write port to file
+ ret = f.Write(&(in.sin_port), sizeof(in.sin_port));
+ if (ret != sizeof(in.sin_port))
+ return -errno;
- return 0;
+ return 0;
}
static constexpr size_t PathSize = 128;
@@ -142,7 +142,7 @@ struct TSockAddrLocal: public ISockAddr {
char Path[PathSize];
};
#else
- #define YAF_LOCAL AF_LOCAL
+ #define YAF_LOCAL AF_LOCAL
struct TSockAddrLocal: public sockaddr_un, public ISockAddr {
TSockAddrLocal() {
Clear();
@@ -171,11 +171,11 @@ struct TSockAddrLocal: public sockaddr_un, public ISockAddr {
}
sockaddr* SockAddr() override {
- return (struct sockaddr*)(struct sockaddr_un*)this;
+ return (struct sockaddr*)(struct sockaddr_un*)this;
}
const sockaddr* SockAddr() const override {
- return (const struct sockaddr*)(const struct sockaddr_un*)this;
+ return (const struct sockaddr*)(const struct sockaddr_un*)this;
}
TString ToString() const override {
@@ -234,11 +234,11 @@ struct TSockAddrInet: public sockaddr_in, public ISockAddr {
}
sockaddr* SockAddr() override {
- return (struct sockaddr*)(struct sockaddr_in*)this;
+ return (struct sockaddr*)(struct sockaddr_in*)this;
}
const sockaddr* SockAddr() const override {
- return (const struct sockaddr*)(const struct sockaddr_in*)this;
+ return (const struct sockaddr*)(const struct sockaddr_in*)this;
}
TString ToString() const override {
@@ -300,11 +300,11 @@ struct TSockAddrInet6: public sockaddr_in6, public ISockAddr {
}
sockaddr* SockAddr() override {
- return (struct sockaddr*)(struct sockaddr_in6*)this;
+ return (struct sockaddr*)(struct sockaddr_in6*)this;
}
const sockaddr* SockAddr() const override {
- return (const struct sockaddr*)(const struct sockaddr_in6*)this;
+ return (const struct sockaddr*)(const struct sockaddr_in6*)this;
}
TString ToString() const override {
@@ -346,7 +346,7 @@ using TSockAddrInetDgram = TSockAddrInet;
using TSockAddrInet6Stream = TSockAddrInet6;
using TSockAddrInet6Dgram = TSockAddrInet6;
-class TBaseSocket: public TSocketHolder {
+class TBaseSocket: public TSocketHolder {
protected:
TBaseSocket(SOCKET fd)
: TSocketHolder(fd)
@@ -355,11 +355,11 @@ protected:
public:
int Bind(const ISockAddr* addr, ui16 mode = DEF_LOCAL_SOCK_MODE) {
- return addr->Bind((SOCKET) * this, mode);
+ return addr->Bind((SOCKET) * this, mode);
}
void CheckSock() {
- if ((SOCKET) * this == INVALID_SOCKET)
+ if ((SOCKET) * this == INVALID_SOCKET)
ythrow TSystemError() << "no socket";
}
@@ -370,7 +370,7 @@ public:
}
};
-class TDgramSocket: public TBaseSocket {
+class TDgramSocket: public TBaseSocket {
protected:
TDgramSocket(SOCKET fd)
: TBaseSocket(fd)
@@ -384,7 +384,7 @@ public:
return -LastSystemError();
}
- ret = sendto((SOCKET) * this, (const char*)msg, (int)len, 0, toAddr->SockAddr(), toAddr->Len());
+ ret = sendto((SOCKET) * this, (const char*)msg, (int)len, 0, toAddr->SockAddr(), toAddr->Len());
if (ret < 0) {
return -LastSystemError();
}
@@ -394,7 +394,7 @@ public:
ssize_t RecvFrom(void* buf, size_t len, ISockAddr* fromAddr) {
socklen_t fromSize = fromAddr->Size();
- const ssize_t ret = recvfrom((SOCKET) * this, (char*)buf, (int)len, 0, fromAddr->SockAddr(), &fromSize);
+ const ssize_t ret = recvfrom((SOCKET) * this, (char*)buf, (int)len, 0, fromAddr->SockAddr(), &fromSize);
if (ret < 0) {
return -LastSystemError();
}
@@ -403,7 +403,7 @@ public:
}
};
-class TStreamSocket: public TBaseSocket {
+class TStreamSocket: public TBaseSocket {
protected:
explicit TStreamSocket(SOCKET fd)
: TBaseSocket(fd)
@@ -411,13 +411,13 @@ protected:
}
public:
- TStreamSocket()
- : TBaseSocket(INVALID_SOCKET)
+ TStreamSocket()
+ : TBaseSocket(INVALID_SOCKET)
{
}
ssize_t Send(const void* msg, size_t len, int flags = 0) {
- const ssize_t ret = send((SOCKET) * this, (const char*)msg, (int)len, flags);
+ const ssize_t ret = send((SOCKET) * this, (const char*)msg, (int)len, flags);
if (ret < 0)
return -errno;
@@ -425,7 +425,7 @@ public:
}
ssize_t Recv(void* buf, size_t len, int flags = 0) {
- const ssize_t ret = recv((SOCKET) * this, (char*)buf, (int)len, flags);
+ const ssize_t ret = recv((SOCKET) * this, (char*)buf, (int)len, flags);
if (ret < 0)
return -errno;
@@ -437,7 +437,7 @@ public:
if (ret < 0)
return -errno;
- ret = connect((SOCKET) * this, addr->SockAddr(), addr->Len());
+ ret = connect((SOCKET) * this, addr->SockAddr(), addr->Len());
if (ret < 0)
return -errno;
@@ -445,7 +445,7 @@ public:
}
int Listen(int backlog) {
- int ret = listen((SOCKET) * this, backlog);
+ int ret = listen((SOCKET) * this, backlog);
if (ret < 0)
return -errno;
@@ -470,7 +470,7 @@ public:
}
};
-class TLocalDgramSocket: public TDgramSocket {
+class TLocalDgramSocket: public TDgramSocket {
public:
TLocalDgramSocket(SOCKET fd)
: TDgramSocket(fd)
@@ -483,7 +483,7 @@ public:
}
};
-class TInetDgramSocket: public TDgramSocket {
+class TInetDgramSocket: public TDgramSocket {
public:
TInetDgramSocket(SOCKET fd)
: TDgramSocket(fd)
@@ -509,7 +509,7 @@ public:
}
};
-class TLocalStreamSocket: public TStreamSocket {
+class TLocalStreamSocket: public TStreamSocket {
public:
TLocalStreamSocket(SOCKET fd)
: TStreamSocket(fd)
@@ -522,7 +522,7 @@ public:
}
};
-class TInetStreamSocket: public TStreamSocket {
+class TInetStreamSocket: public TStreamSocket {
public:
TInetStreamSocket(SOCKET fd)
: TStreamSocket(fd)
@@ -550,16 +550,16 @@ public:
class TStreamSocketInput: public IInputStream {
public:
- TStreamSocketInput(TStreamSocket* socket)
+ TStreamSocketInput(TStreamSocket* socket)
: Socket(socket)
{
}
- void SetSocket(TStreamSocket* socket) {
+ void SetSocket(TStreamSocket* socket) {
Socket = socket;
}
protected:
- TStreamSocket* Socket;
+ TStreamSocket* Socket;
size_t DoRead(void* buf, size_t len) override {
Y_VERIFY(Socket, "TStreamSocketInput: socket isn't set");
@@ -575,11 +575,11 @@ protected:
class TStreamSocketOutput: public IOutputStream {
public:
- TStreamSocketOutput(TStreamSocket* socket)
+ TStreamSocketOutput(TStreamSocket* socket)
: Socket(socket)
{
}
- void SetSocket(TStreamSocket* socket) {
+ void SetSocket(TStreamSocket* socket) {
Socket = socket;
}
@@ -587,12 +587,12 @@ public:
TStreamSocketOutput& operator=(TStreamSocketOutput&&) noexcept = default;
protected:
- TStreamSocket* Socket;
+ TStreamSocket* Socket;
void DoWrite(const void* buf, size_t len) override {
Y_VERIFY(Socket, "TStreamSocketOutput: socket isn't set");
- const char* ptr = (const char*)buf;
+ const char* ptr = (const char*)buf;
while (len) {
const ssize_t ret = Socket->Send(ptr, len);
diff --git a/util/network/sock_ut.cpp b/util/network/sock_ut.cpp
index fd8c783747..9f42f1f1a4 100644
--- a/util/network/sock_ut.cpp
+++ b/util/network/sock_ut.cpp
@@ -1,5 +1,5 @@
-#include "sock.h"
-
+#include "sock.h"
+
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/threading/future/legacy_future.h>
@@ -7,109 +7,109 @@
Y_UNIT_TEST_SUITE(TSocketTest) {
Y_UNIT_TEST(InetDgramTest) {
- char buf[256];
- TSockAddrInetDgram servAddr(IpFromString("127.0.0.1"), 0);
- TSockAddrInetDgram cliAddr(IpFromString("127.0.0.1"), 0);
- TSockAddrInetDgram servFromAddr;
- TSockAddrInetDgram cliFromAddr;
- TInetDgramSocket cliSock;
- TInetDgramSocket servSock;
- cliSock.CheckSock();
- servSock.CheckSock();
-
- TBaseSocket::Check(cliSock.Bind(&cliAddr));
- TBaseSocket::Check(servSock.Bind(&servAddr));
-
- // client
- const char reqStr[] = "Hello, world!!!";
- TBaseSocket::Check(cliSock.SendTo(reqStr, sizeof(reqStr), &servAddr));
-
- // server
- TBaseSocket::Check(servSock.RecvFrom(buf, 256, &servFromAddr));
- UNIT_ASSERT(strcmp(reqStr, buf) == 0);
- const char repStr[] = "The World's greatings to you";
- TBaseSocket::Check(servSock.SendTo(repStr, sizeof(repStr), &servFromAddr));
-
- // client
- TBaseSocket::Check(cliSock.RecvFrom(buf, 256, &cliFromAddr));
- UNIT_ASSERT(strcmp(repStr, buf) == 0);
- }
-
- void RunLocalDgramTest(const char* localServerSockName, const char* localClientSockName) {
- char buf[256];
- TSockAddrLocalDgram servAddr(localServerSockName);
- TSockAddrLocalDgram cliAddr(localClientSockName);
- TSockAddrLocalDgram servFromAddr;
- TSockAddrLocalDgram cliFromAddr;
- TLocalDgramSocket cliSock;
- TLocalDgramSocket servSock;
- cliSock.CheckSock();
- servSock.CheckSock();
-
- TBaseSocket::Check(cliSock.Bind(&cliAddr), "bind client");
- TBaseSocket::Check(servSock.Bind(&servAddr), "bind server");
-
- // client
- const char reqStr[] = "Hello, world!!!";
- TBaseSocket::Check(cliSock.SendTo(reqStr, sizeof(reqStr), &servAddr), "send from client");
-
- // server
- TBaseSocket::Check(servSock.RecvFrom(buf, 256, &servFromAddr), "receive from client");
- UNIT_ASSERT(strcmp(reqStr, buf) == 0);
- const char repStr[] = "The World's greatings to you";
- TBaseSocket::Check(servSock.SendTo(repStr, sizeof(repStr), &servFromAddr), "send to client");
-
- // client
- TBaseSocket::Check(cliSock.RecvFrom(buf, 256, &cliFromAddr), "receive from server");
- UNIT_ASSERT(strcmp(repStr, buf) == 0);
- }
+ char buf[256];
+ TSockAddrInetDgram servAddr(IpFromString("127.0.0.1"), 0);
+ TSockAddrInetDgram cliAddr(IpFromString("127.0.0.1"), 0);
+ TSockAddrInetDgram servFromAddr;
+ TSockAddrInetDgram cliFromAddr;
+ TInetDgramSocket cliSock;
+ TInetDgramSocket servSock;
+ cliSock.CheckSock();
+ servSock.CheckSock();
+
+ TBaseSocket::Check(cliSock.Bind(&cliAddr));
+ TBaseSocket::Check(servSock.Bind(&servAddr));
+
+ // client
+ const char reqStr[] = "Hello, world!!!";
+ TBaseSocket::Check(cliSock.SendTo(reqStr, sizeof(reqStr), &servAddr));
+
+ // server
+ TBaseSocket::Check(servSock.RecvFrom(buf, 256, &servFromAddr));
+ UNIT_ASSERT(strcmp(reqStr, buf) == 0);
+ const char repStr[] = "The World's greatings to you";
+ TBaseSocket::Check(servSock.SendTo(repStr, sizeof(repStr), &servFromAddr));
+
+ // client
+ TBaseSocket::Check(cliSock.RecvFrom(buf, 256, &cliFromAddr));
+ UNIT_ASSERT(strcmp(repStr, buf) == 0);
+ }
+
+ void RunLocalDgramTest(const char* localServerSockName, const char* localClientSockName) {
+ char buf[256];
+ TSockAddrLocalDgram servAddr(localServerSockName);
+ TSockAddrLocalDgram cliAddr(localClientSockName);
+ TSockAddrLocalDgram servFromAddr;
+ TSockAddrLocalDgram cliFromAddr;
+ TLocalDgramSocket cliSock;
+ TLocalDgramSocket servSock;
+ cliSock.CheckSock();
+ servSock.CheckSock();
+
+ TBaseSocket::Check(cliSock.Bind(&cliAddr), "bind client");
+ TBaseSocket::Check(servSock.Bind(&servAddr), "bind server");
+
+ // client
+ const char reqStr[] = "Hello, world!!!";
+ TBaseSocket::Check(cliSock.SendTo(reqStr, sizeof(reqStr), &servAddr), "send from client");
+
+ // server
+ TBaseSocket::Check(servSock.RecvFrom(buf, 256, &servFromAddr), "receive from client");
+ UNIT_ASSERT(strcmp(reqStr, buf) == 0);
+ const char repStr[] = "The World's greatings to you";
+ TBaseSocket::Check(servSock.SendTo(repStr, sizeof(repStr), &servFromAddr), "send to client");
+
+ // client
+ TBaseSocket::Check(cliSock.RecvFrom(buf, 256, &cliFromAddr), "receive from server");
+ UNIT_ASSERT(strcmp(repStr, buf) == 0);
+ }
Y_UNIT_TEST(LocalDgramTest) {
- const char* localServerSockName = "./serv_sock";
- const char* localClientSockName = "./cli_sock";
- RunLocalDgramTest(localServerSockName, localClientSockName);
+ const char* localServerSockName = "./serv_sock";
+ const char* localClientSockName = "./cli_sock";
+ RunLocalDgramTest(localServerSockName, localClientSockName);
NFs::Remove(localServerSockName);
NFs::Remove(localClientSockName);
- }
+ }
- template <class A, class S>
+ template <class A, class S>
void RunInetStreamTest(const char* ip) {
- char buf[256];
+ char buf[256];
A servAddr(ip, 0);
A newAddr;
S cliSock;
S servSock;
S newSock;
- cliSock.CheckSock();
- servSock.CheckSock();
- newSock.CheckSock();
-
- // server
- int yes = 1;
- CheckedSetSockOpt(servSock, SOL_SOCKET, SO_REUSEADDR, yes, "servSock, SO_REUSEADDR");
- TBaseSocket::Check(servSock.Bind(&servAddr), "bind");
- TBaseSocket::Check(servSock.Listen(10), "listen");
-
- // client
- TBaseSocket::Check(cliSock.Connect(&servAddr), "connect");
-
- // server
- TBaseSocket::Check(servSock.Accept(&newSock, &newAddr), "accept");
-
- // client
- const char reqStr[] = "Hello, world!!!";
- TBaseSocket::Check(cliSock.Send(reqStr, sizeof(reqStr)), "send");
-
- // server - new
- TBaseSocket::Check(newSock.Recv(buf, 256), "recv");
- UNIT_ASSERT(strcmp(reqStr, buf) == 0);
- const char repStr[] = "The World's greatings to you";
- TBaseSocket::Check(newSock.Send(repStr, sizeof(repStr)), "send");
-
- // client
- TBaseSocket::Check(cliSock.Recv(buf, 256), "recv");
- UNIT_ASSERT(strcmp(repStr, buf) == 0);
- }
+ cliSock.CheckSock();
+ servSock.CheckSock();
+ newSock.CheckSock();
+
+ // server
+ int yes = 1;
+ CheckedSetSockOpt(servSock, SOL_SOCKET, SO_REUSEADDR, yes, "servSock, SO_REUSEADDR");
+ TBaseSocket::Check(servSock.Bind(&servAddr), "bind");
+ TBaseSocket::Check(servSock.Listen(10), "listen");
+
+ // client
+ TBaseSocket::Check(cliSock.Connect(&servAddr), "connect");
+
+ // server
+ TBaseSocket::Check(servSock.Accept(&newSock, &newAddr), "accept");
+
+ // client
+ const char reqStr[] = "Hello, world!!!";
+ TBaseSocket::Check(cliSock.Send(reqStr, sizeof(reqStr)), "send");
+
+ // server - new
+ TBaseSocket::Check(newSock.Recv(buf, 256), "recv");
+ UNIT_ASSERT(strcmp(reqStr, buf) == 0);
+ const char repStr[] = "The World's greatings to you";
+ TBaseSocket::Check(newSock.Send(repStr, sizeof(repStr)), "send");
+
+ // client
+ TBaseSocket::Check(cliSock.Recv(buf, 256), "recv");
+ UNIT_ASSERT(strcmp(repStr, buf) == 0);
+ }
Y_UNIT_TEST(InetStreamTest) {
RunInetStreamTest<TSockAddrInetStream, TInetStreamSocket>("127.0.0.1");
@@ -119,50 +119,50 @@ Y_UNIT_TEST_SUITE(TSocketTest) {
RunInetStreamTest<TSockAddrInet6Stream, TInet6StreamSocket>("::1");
}
- void RunLocalStreamTest(const char* localServerSockName) {
- char buf[256];
- TSockAddrLocalStream servAddr(localServerSockName);
- TSockAddrLocalStream newAddr;
- TLocalStreamSocket cliSock;
- TLocalStreamSocket servSock;
- TLocalStreamSocket newSock;
- cliSock.CheckSock();
- servSock.CheckSock();
- newSock.CheckSock();
-
- // server
- TBaseSocket::Check(servSock.Bind(&servAddr), "bind");
- TBaseSocket::Check(servSock.Listen(10), "listen");
-
- NThreading::TLegacyFuture<void> f([&]() {
- // server
- TBaseSocket::Check(servSock.Accept(&newSock, &newAddr), "accept");
- });
-
- // client
- TBaseSocket::Check(cliSock.Connect(&servAddr), "connect");
-
- f.Get();
-
- // client
- const char reqStr[] = "Hello, world!!!";
- TBaseSocket::Check(cliSock.Send(reqStr, sizeof(reqStr)), "send");
-
- // server - new
- TBaseSocket::Check(newSock.Recv(buf, 256), "recv");
- UNIT_ASSERT(strcmp(reqStr, buf) == 0);
- const char repStr[] = "The World's greatings to you";
- TBaseSocket::Check(newSock.Send(repStr, sizeof(repStr)), "send");
-
- // client
- TBaseSocket::Check(cliSock.Recv(buf, 256), "recv");
- UNIT_ASSERT(strcmp(repStr, buf) == 0);
- }
+ void RunLocalStreamTest(const char* localServerSockName) {
+ char buf[256];
+ TSockAddrLocalStream servAddr(localServerSockName);
+ TSockAddrLocalStream newAddr;
+ TLocalStreamSocket cliSock;
+ TLocalStreamSocket servSock;
+ TLocalStreamSocket newSock;
+ cliSock.CheckSock();
+ servSock.CheckSock();
+ newSock.CheckSock();
+
+ // server
+ TBaseSocket::Check(servSock.Bind(&servAddr), "bind");
+ TBaseSocket::Check(servSock.Listen(10), "listen");
+
+ NThreading::TLegacyFuture<void> f([&]() {
+ // server
+ TBaseSocket::Check(servSock.Accept(&newSock, &newAddr), "accept");
+ });
+
+ // client
+ TBaseSocket::Check(cliSock.Connect(&servAddr), "connect");
+
+ f.Get();
+
+ // client
+ const char reqStr[] = "Hello, world!!!";
+ TBaseSocket::Check(cliSock.Send(reqStr, sizeof(reqStr)), "send");
+
+ // server - new
+ TBaseSocket::Check(newSock.Recv(buf, 256), "recv");
+ UNIT_ASSERT(strcmp(reqStr, buf) == 0);
+ const char repStr[] = "The World's greatings to you";
+ TBaseSocket::Check(newSock.Send(repStr, sizeof(repStr)), "send");
+
+ // client
+ TBaseSocket::Check(cliSock.Recv(buf, 256), "recv");
+ UNIT_ASSERT(strcmp(repStr, buf) == 0);
+ }
Y_UNIT_TEST(LocalStreamTest) {
- const char* localServerSockName = "./serv_sock2";
- RunLocalStreamTest(localServerSockName);
+ const char* localServerSockName = "./serv_sock2";
+ RunLocalStreamTest(localServerSockName);
NFs::Remove(localServerSockName);
- }
+ }
-}
+}
diff --git a/util/network/socket.cpp b/util/network/socket.cpp
index 4f6e804346..c7a545c9c2 100644
--- a/util/network/socket.cpp
+++ b/util/network/socket.cpp
@@ -7,50 +7,50 @@
#include <util/system/defaults.h>
#include <util/system/byteorder.h>
-#if defined(_unix_)
- #include <netdb.h>
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <sys/un.h>
- #include <sys/ioctl.h>
- #include <netinet/in.h>
- #include <netinet/tcp.h>
- #include <arpa/inet.h>
-#endif
-
-#if defined(_freebsd_)
- #include <sys/module.h>
- #define ACCEPT_FILTER_MOD
- #include <sys/socketvar.h>
-#endif
-
-#if defined(_win_)
- #include <cerrno>
- #include <winsock2.h>
- #include <ws2tcpip.h>
- #include <wspiapi.h>
-
- #include <util/system/compat.h>
-#endif
-
+#if defined(_unix_)
+ #include <netdb.h>
+ #include <sys/types.h>
+ #include <sys/socket.h>
+ #include <sys/un.h>
+ #include <sys/ioctl.h>
+ #include <netinet/in.h>
+ #include <netinet/tcp.h>
+ #include <arpa/inet.h>
+#endif
+
+#if defined(_freebsd_)
+ #include <sys/module.h>
+ #define ACCEPT_FILTER_MOD
+ #include <sys/socketvar.h>
+#endif
+
+#if defined(_win_)
+ #include <cerrno>
+ #include <winsock2.h>
+ #include <ws2tcpip.h>
+ #include <wspiapi.h>
+
+ #include <util/system/compat.h>
+#endif
+
#include <util/generic/ylimits.h>
-
-#include <util/string/cast.h>
+
+#include <util/string/cast.h>
#include <util/stream/mem.h>
-#include <util/system/datetime.h>
+#include <util/system/datetime.h>
#include <util/system/error.h>
-#include <util/memory/tempbuf.h>
-#include <util/generic/singleton.h>
-#include <util/generic/hash_set.h>
-
-#include <stddef.h>
+#include <util/memory/tempbuf.h>
+#include <util/generic/singleton.h>
+#include <util/generic/hash_set.h>
+
+#include <stddef.h>
#include <sys/uio.h>
+
+using namespace NAddr;
+
+#if defined(_win_)
-using namespace NAddr;
-
-#if defined(_win_)
-
-int inet_aton(const char* cp, struct in_addr* inp) {
+int inet_aton(const char* cp, struct in_addr* inp) {
sockaddr_in addr;
addr.sin_family = AF_INET;
int psz = sizeof(addr);
@@ -61,9 +61,9 @@ int inet_aton(const char* cp, struct in_addr* inp) {
return 0;
}
- #if (_WIN32_WINNT < 0x0600)
-const char* inet_ntop(int af, const void* src, char* dst, socklen_t size) {
- if (af != AF_INET) {
+ #if (_WIN32_WINNT < 0x0600)
+const char* inet_ntop(int af, const void* src, char* dst, socklen_t size) {
+ if (af != AF_INET) {
errno = EINVAL;
return 0;
}
@@ -90,7 +90,7 @@ static const evpair evpairs_to_win[] = {
{POLLWRBAND, -1},
{POLLERR, 0},
{POLLHUP, 0},
- {POLLNVAL, 0}};
+ {POLLNVAL, 0}};
static const size_t nevpairs_to_win = sizeof(evpairs_to_win) / sizeof(evpairs_to_win[0]);
@@ -117,7 +117,7 @@ static int convert_events(int events, const evpair* evpairs, size_t nevpairs, bo
result |= winEvent;
}
}
- if (events != 0 && !ignoreUnknown)
+ if (events != 0 && !ignoreUnknown)
return -1;
return result;
}
@@ -128,8 +128,8 @@ private:
public:
inline TWSAEventHolder(HANDLE event) noexcept
- : Event(event)
- {
+ : Event(event)
+ {
}
inline ~TWSAEventHolder() {
@@ -164,28 +164,28 @@ int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept {
if (error == WSAEINVAL || error == WSAENOTSOCK) {
fd->revents = POLLNVAL;
++checked_sockets;
- } else {
+ } else {
errno = EIO;
return -1;
}
}
fd_set readfds;
fd_set writefds;
- struct timeval timeout = {0, 0};
+ struct timeval timeout = {0, 0};
FD_ZERO(&readfds);
FD_ZERO(&writefds);
if (fd->events & POLLIN) {
- FD_SET(fd->fd, &readfds);
+ FD_SET(fd->fd, &readfds);
}
if (fd->events & POLLOUT) {
- FD_SET(fd->fd, &writefds);
+ FD_SET(fd->fd, &writefds);
}
int error = select(0, &readfds, &writefds, nullptr, &timeout);
if (error > 0) {
- if (FD_ISSET(fd->fd, &readfds)) {
+ if (FD_ISSET(fd->fd, &readfds)) {
fd->revents |= POLLIN;
}
- if (FD_ISSET(fd->fd, &writefds)) {
+ if (FD_ISSET(fd->fd, &writefds)) {
fd->revents |= POLLOUT;
}
++checked_sockets;
@@ -201,7 +201,7 @@ int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept {
DWORD wait_result = WSAWaitForMultipleEvents(1, events, TRUE, timeout, FALSE);
if (wait_result == WSA_WAIT_TIMEOUT)
return 0;
- else if (wait_result == WSA_WAIT_EVENT_0) {
+ else if (wait_result == WSA_WAIT_EVENT_0) {
for (pollfd* fd = fds; fd < fds + nfds; ++fd) {
if (fd->revents == POLLNVAL)
continue;
@@ -212,7 +212,7 @@ int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept {
}
fd->revents = 0;
for (int i = 0; i < FD_MAX_EVENTS; ++i) {
- if ((network_events.lNetworkEvents & (1 << i)) != 0 && network_events.iErrorCode[i]) {
+ if ((network_events.lNetworkEvents & (1 << i)) != 0 && network_events.iErrorCode[i]) {
fd->revents = POLLERR;
break;
}
@@ -236,79 +236,79 @@ int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept {
return -1;
}
}
- #endif
+ #endif
#endif
-bool GetRemoteAddr(SOCKET Socket, char* str, socklen_t size) {
- if (!size) {
+bool GetRemoteAddr(SOCKET Socket, char* str, socklen_t size) {
+ if (!size) {
return false;
- }
+ }
- TOpaqueAddr addr;
+ TOpaqueAddr addr;
- if (getpeername(Socket, addr.MutableAddr(), addr.LenPtr()) != 0) {
+ if (getpeername(Socket, addr.MutableAddr(), addr.LenPtr()) != 0) {
return false;
}
-
- try {
- TMemoryOutput out(str, size - 1);
-
- PrintHost(out, addr);
- *out.Buf() = 0;
-
- return true;
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
-
+
+ try {
+ TMemoryOutput out(str, size - 1);
+
+ PrintHost(out, addr);
+ *out.Buf() = 0;
+
+ return true;
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+
return false;
}
-void SetSocketTimeout(SOCKET s, long timeout) {
- SetSocketTimeout(s, timeout, 0);
+void SetSocketTimeout(SOCKET s, long timeout) {
+ SetSocketTimeout(s, timeout, 0);
}
-void SetSocketTimeout(SOCKET s, long sec, long msec) {
+void SetSocketTimeout(SOCKET s, long sec, long msec) {
#ifdef SO_SNDTIMEO
- #ifdef _darwin_
+ #ifdef _darwin_
const timeval timeout = {sec, (__darwin_suseconds_t)msec * 1000};
- #elif defined(_unix_)
- const timeval timeout = {sec, msec * 1000};
- #else
+ #elif defined(_unix_)
+ const timeval timeout = {sec, msec * 1000};
+ #else
const int timeout = sec * 1000 + msec;
- #endif
- CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVTIMEO, timeout, "recv timeout");
- CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDTIMEO, timeout, "send timeout");
+ #endif
+ CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVTIMEO, timeout, "recv timeout");
+ CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDTIMEO, timeout, "send timeout");
#endif
}
void SetLinger(SOCKET s, bool on, unsigned len) {
#ifdef SO_LINGER
struct linger l = {on, (u_short)len};
-
- CheckedSetSockOpt(s, SOL_SOCKET, SO_LINGER, l, "linger");
+
+ CheckedSetSockOpt(s, SOL_SOCKET, SO_LINGER, l, "linger");
#endif
}
-void SetZeroLinger(SOCKET s) {
- SetLinger(s, 1, 0);
-}
-
-void SetKeepAlive(SOCKET s, bool value) {
- CheckedSetSockOpt(s, SOL_SOCKET, SO_KEEPALIVE, (int)value, "keepalive");
-}
-
-void SetOutputBuffer(SOCKET s, unsigned value) {
- CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDBUF, value, "output buffer");
-}
-
-void SetInputBuffer(SOCKET s, unsigned value) {
- CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVBUF, value, "input buffer");
-}
-
+void SetZeroLinger(SOCKET s) {
+ SetLinger(s, 1, 0);
+}
+
+void SetKeepAlive(SOCKET s, bool value) {
+ CheckedSetSockOpt(s, SOL_SOCKET, SO_KEEPALIVE, (int)value, "keepalive");
+}
+
+void SetOutputBuffer(SOCKET s, unsigned value) {
+ CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDBUF, value, "output buffer");
+}
+
+void SetInputBuffer(SOCKET s, unsigned value) {
+ CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVBUF, value, "input buffer");
+}
+
#if defined(_linux_) && !defined(SO_REUSEPORT)
- #define SO_REUSEPORT 15
+ #define SO_REUSEPORT 15
#endif
void SetReusePort(SOCKET s, bool value) {
@@ -321,10 +321,10 @@ void SetReusePort(SOCKET s, bool value) {
#endif
}
-void SetNoDelay(SOCKET s, bool value) {
- CheckedSetSockOpt(s, IPPROTO_TCP, TCP_NODELAY, (int)value, "tcp no delay");
-}
-
+void SetNoDelay(SOCKET s, bool value) {
+ CheckedSetSockOpt(s, IPPROTO_TCP, TCP_NODELAY, (int)value, "tcp no delay");
+}
+
void SetCloseOnExec(SOCKET s, bool value) {
#if defined(_unix_)
int flags = fcntl(s, F_GETFD);
@@ -345,21 +345,21 @@ void SetCloseOnExec(SOCKET s, bool value) {
#endif
}
-size_t GetMaximumSegmentSize(SOCKET s) {
-#if defined(TCP_MAXSEG)
- int val;
-
- if (GetSockOpt(s, IPPROTO_TCP, TCP_MAXSEG, val) == 0) {
- return (size_t)val;
- }
-#endif
-
- /*
- * probably a good guess...
- */
- return 8192;
-}
-
+size_t GetMaximumSegmentSize(SOCKET s) {
+#if defined(TCP_MAXSEG)
+ int val;
+
+ if (GetSockOpt(s, IPPROTO_TCP, TCP_MAXSEG, val) == 0) {
+ return (size_t)val;
+ }
+#endif
+
+ /*
+ * probably a good guess...
+ */
+ return 8192;
+}
+
size_t GetMaximumTransferUnit(SOCKET /*s*/) {
// for someone who'll dare to write it
// Linux: there rummored to be IP_MTU getsockopt() request
@@ -371,8 +371,8 @@ size_t GetMaximumTransferUnit(SOCKET /*s*/) {
int GetSocketToS(SOCKET s) {
TOpaqueAddr addr;
- if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) {
- ythrow TSystemError() << "getsockname() failed";
+ if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) {
+ ythrow TSystemError() << "getsockname() failed";
}
return GetSocketToS(s, &addr);
@@ -382,15 +382,15 @@ int GetSocketToS(SOCKET s, const IRemoteAddr* addr) {
int result = 0;
switch (addr->Addr()->sa_family) {
- case AF_INET:
- CheckedGetSockOpt(s, IPPROTO_IP, IP_TOS, result, "tos");
- break;
+ case AF_INET:
+ CheckedGetSockOpt(s, IPPROTO_IP, IP_TOS, result, "tos");
+ break;
- case AF_INET6:
+ case AF_INET6:
#ifdef IPV6_TCLASS
- CheckedGetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, result, "tos");
+ CheckedGetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, result, "tos");
#endif
- break;
+ break;
}
return result;
@@ -398,16 +398,16 @@ int GetSocketToS(SOCKET s, const IRemoteAddr* addr) {
void SetSocketToS(SOCKET s, const NAddr::IRemoteAddr* addr, int tos) {
switch (addr->Addr()->sa_family) {
- case AF_INET:
- CheckedSetSockOpt(s, IPPROTO_IP, IP_TOS, tos, "tos");
- return;
+ case AF_INET:
+ CheckedSetSockOpt(s, IPPROTO_IP, IP_TOS, tos, "tos");
+ return;
- case AF_INET6:
+ case AF_INET6:
#ifdef IPV6_TCLASS
- CheckedSetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, tos, "tos");
- return;
+ CheckedSetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, tos, "tos");
+ return;
#endif
- break;
+ break;
}
ythrow yexception() << "SetSocketToS unsupported for family " << addr->Addr()->sa_family;
@@ -416,8 +416,8 @@ void SetSocketToS(SOCKET s, const NAddr::IRemoteAddr* addr, int tos) {
void SetSocketToS(SOCKET s, int tos) {
TOpaqueAddr addr;
- if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) {
- ythrow TSystemError() << "getsockname() failed";
+ if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) {
+ ythrow TSystemError() << "getsockname() failed";
}
SetSocketToS(s, &addr, tos);
@@ -450,9 +450,9 @@ bool HasLocalAddress(SOCKET socket) {
namespace {
#if defined(_linux_)
- #if !defined(TCP_FASTOPEN)
- #define TCP_FASTOPEN 23
- #endif
+ #if !defined(TCP_FASTOPEN)
+ #define TCP_FASTOPEN 23
+ #endif
#endif
#if defined(TCP_FASTOPEN)
@@ -511,7 +511,7 @@ struct TUnblockingGuard {
static int MsgPeek(SOCKET s) {
int flags = MSG_PEEK;
-#if defined(_win_)
+#if defined(_win_)
TUnblockingGuard unblocker(s);
Y_UNUSED(unblocker);
#else
@@ -552,20 +552,20 @@ static ssize_t DoSendMsg(SOCKET sock, const struct iovec* iov, int iovcnt) {
return sendmsg(sock, &message, MSG_NOSIGNAL);
}
#endif
-
+
void TSocketHolder::Close() noexcept {
if (Fd_ != INVALID_SOCKET) {
bool ok = (closesocket(Fd_) == 0);
if (!ok) {
-// Do not quietly close bad descriptor,
-// because often it means double close
-// that is disasterous
+// Do not quietly close bad descriptor,
+// because often it means double close
+// that is disasterous
#ifdef _win_
Y_VERIFY(WSAGetLastError() != WSAENOTSOCK, "must not quietly close bad socket descriptor");
#elif defined(_unix_)
Y_VERIFY(errno != EBADF, "must not quietly close bad descriptor: fd=%d", int(Fd_));
#else
- #error unsupported platform
+ #error unsupported platform
#endif
}
@@ -573,239 +573,239 @@ void TSocketHolder::Close() noexcept {
}
}
-class TSocket::TImpl: public TAtomicRefCount<TImpl> {
+class TSocket::TImpl: public TAtomicRefCount<TImpl> {
using TOps = TSocket::TOps;
-
-public:
- inline TImpl(SOCKET fd, TOps* ops)
- : Fd_(fd)
- , Ops_(ops)
- {
- }
-
+
+public:
+ inline TImpl(SOCKET fd, TOps* ops)
+ : Fd_(fd)
+ , Ops_(ops)
+ {
+ }
+
inline ~TImpl() = default;
-
+
inline SOCKET Fd() const noexcept {
- return Fd_;
- }
-
- inline ssize_t Send(const void* data, size_t len) {
- return Ops_->Send(Fd_, data, len);
- }
-
- inline ssize_t Recv(void* buf, size_t len) {
- return Ops_->Recv(Fd_, buf, len);
- }
-
- inline ssize_t SendV(const TPart* parts, size_t count) {
- return Ops_->SendV(Fd_, parts, count);
- }
-
+ return Fd_;
+ }
+
+ inline ssize_t Send(const void* data, size_t len) {
+ return Ops_->Send(Fd_, data, len);
+ }
+
+ inline ssize_t Recv(void* buf, size_t len) {
+ return Ops_->Recv(Fd_, buf, len);
+ }
+
+ inline ssize_t SendV(const TPart* parts, size_t count) {
+ return Ops_->SendV(Fd_, parts, count);
+ }
+
inline void Close() {
Fd_.Close();
}
-private:
- TSocketHolder Fd_;
- TOps* Ops_;
-};
-
+private:
+ TSocketHolder Fd_;
+ TOps* Ops_;
+};
+
template <>
void Out<const struct addrinfo*>(IOutputStream& os, const struct addrinfo* ai) {
- if (ai->ai_flags & AI_CANONNAME) {
+ if (ai->ai_flags & AI_CANONNAME) {
os << "`" << ai->ai_canonname << "' ";
- }
+ }
os << '[';
- for (int i = 0; ai; ++i, ai = ai->ai_next) {
- if (i > 0) {
+ for (int i = 0; ai; ++i, ai = ai->ai_next) {
+ if (i > 0) {
os << ", ";
- }
+ }
- os << (const IRemoteAddr&)TAddrInfo(ai);
+ os << (const IRemoteAddr&)TAddrInfo(ai);
}
os << ']';
}
template <>
void Out<struct addrinfo*>(IOutputStream& os, struct addrinfo* ai) {
- Out<const struct addrinfo*>(os, static_cast<const struct addrinfo*>(ai));
+ Out<const struct addrinfo*>(os, static_cast<const struct addrinfo*>(ai));
}
-template <>
+template <>
void Out<TNetworkAddress>(IOutputStream& os, const TNetworkAddress& addr) {
- os << &*addr.Begin();
-}
-
+ os << &*addr.Begin();
+}
+
static inline const struct addrinfo* Iterate(const struct addrinfo* addr, const struct addrinfo* addr0, const int sockerr) {
- if (addr->ai_next) {
- return addr->ai_next;
- }
-
+ if (addr->ai_next) {
+ return addr->ai_next;
+ }
+
ythrow TSystemError(sockerr) << "can not connect to " << addr0;
-}
-
-static inline SOCKET DoConnectImpl(const struct addrinfo* res, const TInstant& deadLine) {
- const struct addrinfo* addr0 = res;
-
- while (res) {
- TSocketHolder s(socket(res->ai_family, res->ai_socktype, res->ai_protocol));
-
- if (s.Closed()) {
+}
+
+static inline SOCKET DoConnectImpl(const struct addrinfo* res, const TInstant& deadLine) {
+ const struct addrinfo* addr0 = res;
+
+ while (res) {
+ TSocketHolder s(socket(res->ai_family, res->ai_socktype, res->ai_protocol));
+
+ if (s.Closed()) {
res = Iterate(res, addr0, LastSystemError());
-
- continue;
- }
-
- SetNonBlock(s, true);
-
+
+ continue;
+ }
+
+ SetNonBlock(s, true);
+
if (connect(s, res->ai_addr, (int)res->ai_addrlen)) {
int err = LastSystemError();
-
- if (err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK) {
- /*
- * must wait
- */
- struct pollfd p = {
- (SOCKET)s,
- POLLOUT,
- 0};
-
+
+ if (err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK) {
+ /*
+ * must wait
+ */
+ struct pollfd p = {
+ (SOCKET)s,
+ POLLOUT,
+ 0};
+
const ssize_t n = PollD(&p, 1, deadLine);
-
- /*
- * timeout occured
- */
- if (n < 0) {
+
+ /*
+ * timeout occured
+ */
+ if (n < 0) {
ythrow TSystemError(-(int)n) << "can not connect";
- }
-
+ }
+
CheckedGetSockOpt(s, SOL_SOCKET, SO_ERROR, err, "socket error");
- if (!err) {
- return s.Release();
- }
- }
-
+ if (!err) {
+ return s.Release();
+ }
+ }
+
res = Iterate(res, addr0, err);
-
- continue;
- }
-
- return s.Release();
- }
-
+
+ continue;
+ }
+
+ return s.Release();
+ }
+
ythrow yexception() << "something went wrong: nullptr at addrinfo";
-}
-
-static inline SOCKET DoConnect(const struct addrinfo* res, const TInstant& deadLine) {
- TSocketHolder ret(DoConnectImpl(res, deadLine));
-
- SetNonBlock(ret, false);
-
- return ret.Release();
-}
-
-static inline ssize_t DoSendV(SOCKET fd, const struct iovec* iov, size_t count) {
+}
+
+static inline SOCKET DoConnect(const struct addrinfo* res, const TInstant& deadLine) {
+ TSocketHolder ret(DoConnectImpl(res, deadLine));
+
+ SetNonBlock(ret, false);
+
+ return ret.Release();
+}
+
+static inline ssize_t DoSendV(SOCKET fd, const struct iovec* iov, size_t count) {
ssize_t ret = -1;
do {
ret = DoSendMsg(fd, iov, (int)count);
} while (ret == -1 && errno == EINTR);
-
- if (ret < 0) {
+
+ if (ret < 0) {
return -LastSystemError();
- }
-
- return ret;
-}
-
-template <bool isCompat>
-struct TSender {
+ }
+
+ return ret;
+}
+
+template <bool isCompat>
+struct TSender {
using TPart = TSocket::TPart;
-
- static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) {
- return DoSendV(fd, (const iovec*)parts, count);
- }
-};
-
-template <>
-struct TSender<false> {
+
+ static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) {
+ return DoSendV(fd, (const iovec*)parts, count);
+ }
+};
+
+template <>
+struct TSender<false> {
using TPart = TSocket::TPart;
-
- static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) {
- TTempBuf tempbuf(sizeof(struct iovec) * count);
- struct iovec* iov = (struct iovec*)tempbuf.Data();
-
- for (size_t i = 0; i < count; ++i) {
- struct iovec& io = iov[i];
- const TPart& part = parts[i];
-
- io.iov_base = (char*)part.buf;
- io.iov_len = part.len;
- }
-
- return DoSendV(fd, iov, count);
- }
-};
-
-class TCommonSockOps: public TSocket::TOps {
+
+ static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) {
+ TTempBuf tempbuf(sizeof(struct iovec) * count);
+ struct iovec* iov = (struct iovec*)tempbuf.Data();
+
+ for (size_t i = 0; i < count; ++i) {
+ struct iovec& io = iov[i];
+ const TPart& part = parts[i];
+
+ io.iov_base = (char*)part.buf;
+ io.iov_len = part.len;
+ }
+
+ return DoSendV(fd, iov, count);
+ }
+};
+
+class TCommonSockOps: public TSocket::TOps {
using TPart = TSocket::TPart;
-
-public:
+
+public:
inline TCommonSockOps() noexcept {
- }
-
+ }
+
~TCommonSockOps() override = default;
-
+
ssize_t Send(SOCKET fd, const void* data, size_t len) override {
- ssize_t ret = -1;
- do {
+ ssize_t ret = -1;
+ do {
ret = send(fd, (const char*)data, (int)len, MSG_NOSIGNAL);
- } while (ret == -1 && errno == EINTR);
-
- if (ret < 0) {
- return -LastSystemError();
- }
-
- return ret;
- }
-
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret < 0) {
+ return -LastSystemError();
+ }
+
+ return ret;
+ }
+
ssize_t Recv(SOCKET fd, void* buf, size_t len) override {
- ssize_t ret = -1;
- do {
- ret = recv(fd, (char*)buf, (int)len, 0);
- } while (ret == -1 && errno == EINTR);
-
- if (ret < 0) {
- return -LastSystemError();
- }
-
- return ret;
- }
+ ssize_t ret = -1;
+ do {
+ ret = recv(fd, (char*)buf, (int)len, 0);
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret < 0) {
+ return -LastSystemError();
+ }
+
+ return ret;
+ }
ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) override {
- ssize_t ret = SendVImpl(fd, parts, count);
+ ssize_t ret = SendVImpl(fd, parts, count);
- if (ret < 0) {
- return ret;
- }
+ if (ret < 0) {
+ return ret;
+ }
- size_t len = TContIOVector::Bytes(parts, count);
+ size_t len = TContIOVector::Bytes(parts, count);
- if ((size_t)ret == len) {
- return ret;
- }
+ if ((size_t)ret == len) {
+ return ret;
+ }
- return SendVPartial(fd, parts, count, ret);
- }
-
- inline ssize_t SendVImpl(SOCKET fd, const TPart* parts, size_t count) {
- return TSender < (sizeof(iovec) == sizeof(TPart)) && (offsetof(iovec, iov_base) == offsetof(TPart, buf)) && (offsetof(iovec, iov_len) == offsetof(TPart, len)) > ::SendV(fd, parts, count);
- }
-
- ssize_t SendVPartial(SOCKET fd, const TPart* constParts, size_t count, size_t written);
-};
+ return SendVPartial(fd, parts, count, ret);
+ }
+ inline ssize_t SendVImpl(SOCKET fd, const TPart* parts, size_t count) {
+ return TSender < (sizeof(iovec) == sizeof(TPart)) && (offsetof(iovec, iov_base) == offsetof(TPart, buf)) && (offsetof(iovec, iov_len) == offsetof(TPart, len)) > ::SendV(fd, parts, count);
+ }
+
+ ssize_t SendVPartial(SOCKET fd, const TPart* constParts, size_t count, size_t written);
+};
+
ssize_t TCommonSockOps::SendVPartial(SOCKET fd, const TPart* constParts, size_t count, size_t written) {
TTempBuf tempbuf(sizeof(TPart) * count);
TPart* parts = (TPart*)tempbuf.Data();
@@ -820,9 +820,9 @@ ssize_t TCommonSockOps::SendVPartial(SOCKET fd, const TPart* constParts, size_t
while (!vec.Complete()) {
ssize_t ret = SendVImpl(fd, vec.Parts(), vec.Count());
- if (ret < 0) {
+ if (ret < 0) {
return ret;
- }
+ }
written += ret;
@@ -833,129 +833,129 @@ ssize_t TCommonSockOps::SendVPartial(SOCKET fd, const TPart* constParts, size_t
}
static inline TSocket::TOps* GetCommonSockOps() noexcept {
- return Singleton<TCommonSockOps>();
-}
-
-TSocket::TSocket()
- : Impl_(new TImpl(INVALID_SOCKET, GetCommonSockOps()))
-{
-}
-
-TSocket::TSocket(SOCKET fd)
- : Impl_(new TImpl(fd, GetCommonSockOps()))
-{
-}
-
-TSocket::TSocket(SOCKET fd, TOps* ops)
- : Impl_(new TImpl(fd, ops))
-{
-}
-
-TSocket::TSocket(const TNetworkAddress& addr)
- : Impl_(new TImpl(DoConnect(addr.Info(), TInstant::Max()), GetCommonSockOps()))
-{
-}
-
-TSocket::TSocket(const TNetworkAddress& addr, const TDuration& timeOut)
- : Impl_(new TImpl(DoConnect(addr.Info(), timeOut.ToDeadLine()), GetCommonSockOps()))
-{
-}
-
-TSocket::TSocket(const TNetworkAddress& addr, const TInstant& deadLine)
- : Impl_(new TImpl(DoConnect(addr.Info(), deadLine), GetCommonSockOps()))
-{
-}
-
+ return Singleton<TCommonSockOps>();
+}
+
+TSocket::TSocket()
+ : Impl_(new TImpl(INVALID_SOCKET, GetCommonSockOps()))
+{
+}
+
+TSocket::TSocket(SOCKET fd)
+ : Impl_(new TImpl(fd, GetCommonSockOps()))
+{
+}
+
+TSocket::TSocket(SOCKET fd, TOps* ops)
+ : Impl_(new TImpl(fd, ops))
+{
+}
+
+TSocket::TSocket(const TNetworkAddress& addr)
+ : Impl_(new TImpl(DoConnect(addr.Info(), TInstant::Max()), GetCommonSockOps()))
+{
+}
+
+TSocket::TSocket(const TNetworkAddress& addr, const TDuration& timeOut)
+ : Impl_(new TImpl(DoConnect(addr.Info(), timeOut.ToDeadLine()), GetCommonSockOps()))
+{
+}
+
+TSocket::TSocket(const TNetworkAddress& addr, const TInstant& deadLine)
+ : Impl_(new TImpl(DoConnect(addr.Info(), deadLine), GetCommonSockOps()))
+{
+}
+
TSocket::~TSocket() = default;
-
+
SOCKET TSocket::Fd() const noexcept {
- return Impl_->Fd();
-}
-
-ssize_t TSocket::Send(const void* data, size_t len) {
- return Impl_->Send(data, len);
-}
-
-ssize_t TSocket::Recv(void* buf, size_t len) {
- return Impl_->Recv(buf, len);
-}
-
-ssize_t TSocket::SendV(const TPart* parts, size_t count) {
- return Impl_->SendV(parts, count);
-}
-
+ return Impl_->Fd();
+}
+
+ssize_t TSocket::Send(const void* data, size_t len) {
+ return Impl_->Send(data, len);
+}
+
+ssize_t TSocket::Recv(void* buf, size_t len) {
+ return Impl_->Recv(buf, len);
+}
+
+ssize_t TSocket::SendV(const TPart* parts, size_t count) {
+ return Impl_->SendV(parts, count);
+}
+
void TSocket::Close() {
Impl_->Close();
}
TSocketInput::TSocketInput(const TSocket& s) noexcept
- : S_(s)
-{
-}
-
+ : S_(s)
+{
+}
+
TSocketInput::~TSocketInput() = default;
-
-size_t TSocketInput::DoRead(void* buf, size_t len) {
- const ssize_t ret = S_.Recv(buf, len);
-
- if (ret >= 0) {
- return (size_t)ret;
- }
-
+
+size_t TSocketInput::DoRead(void* buf, size_t len) {
+ const ssize_t ret = S_.Recv(buf, len);
+
+ if (ret >= 0) {
+ return (size_t)ret;
+ }
+
ythrow TSystemError(-(int)ret) << "can not read from socket input stream";
-}
-
+}
+
TSocketOutput::TSocketOutput(const TSocket& s) noexcept
- : S_(s)
-{
-}
-
+ : S_(s)
+{
+}
+
TSocketOutput::~TSocketOutput() {
- try {
- Finish();
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
-}
-
-void TSocketOutput::DoWrite(const void* buf, size_t len) {
+ try {
+ Finish();
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+}
+
+void TSocketOutput::DoWrite(const void* buf, size_t len) {
size_t send = 0;
while (len) {
const ssize_t ret = S_.Send(buf, len);
-
+
if (ret < 0) {
ythrow TSystemError(-(int)ret) << "can not write to socket output stream; " << send << " bytes already send";
}
- buf = (const char*)buf + ret;
+ buf = (const char*)buf + ret;
len -= ret;
send += ret;
- }
-}
-
-void TSocketOutput::DoWriteV(const TPart* parts, size_t count) {
- const ssize_t ret = S_.SendV(parts, count);
-
- if (ret < 0) {
+ }
+}
+
+void TSocketOutput::DoWriteV(const TPart* parts, size_t count) {
+ const ssize_t ret = S_.SendV(parts, count);
+
+ if (ret < 0) {
ythrow TSystemError(-(int)ret) << "can not writev to socket output stream";
- }
-
- /*
- * todo for nonblocking sockets?
- */
-}
-
-namespace {
- //https://bugzilla.mozilla.org/attachment.cgi?id=503263&action=diff
-
+ }
+
+ /*
+ * todo for nonblocking sockets?
+ */
+}
+
+namespace {
+ //https://bugzilla.mozilla.org/attachment.cgi?id=503263&action=diff
+
struct TLocalNames: public THashSet<TStringBuf> {
- inline TLocalNames() {
- insert("localhost");
- insert("localhost.localdomain");
- insert("localhost6");
- insert("localhost6.localdomain6");
+ inline TLocalNames() {
+ insert("localhost");
+ insert("localhost.localdomain");
+ insert("localhost6");
+ insert("localhost6.localdomain6");
insert("::1");
- }
-
+ }
+
inline bool IsLocalName(const char* name) const noexcept {
struct sockaddr_in sa;
memset(&sa, 0, sizeof(sa));
@@ -965,18 +965,18 @@ namespace {
}
return contains(name);
- }
- };
-}
-
-class TNetworkAddress::TImpl: public TAtomicRefCount<TImpl> {
+ }
+ };
+}
+
+class TNetworkAddress::TImpl: public TAtomicRefCount<TImpl> {
private:
class TAddrInfoDeleter {
public:
TAddrInfoDeleter(bool useFreeAddrInfo = true)
: UseFreeAddrInfo_(useFreeAddrInfo)
- {
- }
+ {
+ }
void operator()(struct addrinfo* ai) noexcept {
if (!UseFreeAddrInfo_ && ai != NULL) {
@@ -984,7 +984,7 @@ private:
free(ai->ai_addr);
}
- struct addrinfo* p;
+ struct addrinfo* p;
while (ai != NULL) {
p = ai;
ai = ai->ai_next;
@@ -1000,38 +1000,38 @@ private:
bool UseFreeAddrInfo_ = true;
};
-public:
+public:
inline TImpl(const char* host, ui16 port, int flags)
: Info_(nullptr, TAddrInfoDeleter{})
- {
+ {
const TString port_st(ToString(port));
- struct addrinfo hints;
-
- memset(&hints, 0, sizeof(hints));
-
- hints.ai_flags = flags;
- hints.ai_family = PF_UNSPEC;
- hints.ai_socktype = SOCK_STREAM;
-
- if (!host) {
- hints.ai_flags |= AI_PASSIVE;
- } else {
- if (!Singleton<TLocalNames>()->IsLocalName(host)) {
- hints.ai_flags |= AI_ADDRCONFIG;
- }
- }
-
+ struct addrinfo hints;
+
+ memset(&hints, 0, sizeof(hints));
+
+ hints.ai_flags = flags;
+ hints.ai_family = PF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+
+ if (!host) {
+ hints.ai_flags |= AI_PASSIVE;
+ } else {
+ if (!Singleton<TLocalNames>()->IsLocalName(host)) {
+ hints.ai_flags |= AI_ADDRCONFIG;
+ }
+ }
+
struct addrinfo* pai = NULL;
const int error = getaddrinfo(host, port_st.data(), &hints, &pai);
-
- if (error) {
+
+ if (error) {
TAddrInfoDeleter()(pai);
- ythrow TNetworkResolutionError(error) << ": can not resolve " << host << ":" << port;
- }
+ ythrow TNetworkResolutionError(error) << ": can not resolve " << host << ":" << port;
+ }
Info_.reset(pai);
- }
-
+ }
+
inline TImpl(const char* path, int flags)
: Info_(nullptr, TAddrInfoDeleter{/* useFreeAddrInfo = */ false})
{
@@ -1052,18 +1052,18 @@ public:
hints->ai_addr = (struct sockaddr*)sockAddr.Release();
Info_.reset(hints.release());
- }
-
+ }
+
inline struct addrinfo* Info() const noexcept {
return Info_.get();
- }
-
-private:
+ }
+
+private:
using TAddrInfoPtr = std::unique_ptr<struct addrinfo, TAddrInfoDeleter>;
TAddrInfoPtr Info_;
-};
-
+};
+
TNetworkAddress::TNetworkAddress(const TUnixSocketPath& unixSocketPath, int flags)
: Impl_(new TImpl(unixSocketPath.Path.data(), flags))
{
@@ -1076,24 +1076,24 @@ TNetworkAddress::TNetworkAddress(const TString& host, ui16 port, int flags)
TNetworkAddress::TNetworkAddress(const TString& host, ui16 port)
: Impl_(new TImpl(host.data(), port, 0))
-{
-}
-
-TNetworkAddress::TNetworkAddress(ui16 port)
+{
+}
+
+TNetworkAddress::TNetworkAddress(ui16 port)
: Impl_(new TImpl(nullptr, port, 0))
-{
-}
-
+{
+}
+
TNetworkAddress::~TNetworkAddress() = default;
-
+
struct addrinfo* TNetworkAddress::Info() const noexcept {
- return Impl_->Info();
-}
-
+ return Impl_->Info();
+}
+
TNetworkResolutionError::TNetworkResolutionError(int error) {
const char* errMsg = nullptr;
#ifdef _win_
- errMsg = LastSystemErrorText(error); // gai_strerror is not thread-safe on Windows
+ errMsg = LastSystemErrorText(error); // gai_strerror is not thread-safe on Windows
#else
errMsg = gai_strerror(error);
#endif
@@ -1108,153 +1108,153 @@ TNetworkResolutionError::TNetworkResolutionError(int error) {
(*this) << "): ";
}
-#if defined(_unix_)
-static inline int GetFlags(int fd) {
- const int ret = fcntl(fd, F_GETFL);
-
- if (ret == -1) {
- ythrow TSystemError() << "can not get fd flags";
- }
-
- return ret;
-}
-
-static inline void SetFlags(int fd, int flags) {
- if (fcntl(fd, F_SETFL, flags) == -1) {
- ythrow TSystemError() << "can not set fd flags";
- }
-}
-
-static inline void EnableFlag(int fd, int flag) {
- const int oldf = GetFlags(fd);
- const int newf = oldf | flag;
-
- if (oldf != newf) {
- SetFlags(fd, newf);
- }
-}
-
-static inline void DisableFlag(int fd, int flag) {
- const int oldf = GetFlags(fd);
- const int newf = oldf & (~flag);
-
- if (oldf != newf) {
- SetFlags(fd, newf);
- }
-}
-
-static inline void SetFlag(int fd, int flag, bool value) {
- if (value) {
- EnableFlag(fd, flag);
- } else {
- DisableFlag(fd, flag);
- }
-}
-
-static inline bool FlagsAreEnabled(int fd, int flags) {
- return GetFlags(fd) & flags;
-}
-#endif
-
-#if defined(_win_)
+#if defined(_unix_)
+static inline int GetFlags(int fd) {
+ const int ret = fcntl(fd, F_GETFL);
+
+ if (ret == -1) {
+ ythrow TSystemError() << "can not get fd flags";
+ }
+
+ return ret;
+}
+
+static inline void SetFlags(int fd, int flags) {
+ if (fcntl(fd, F_SETFL, flags) == -1) {
+ ythrow TSystemError() << "can not set fd flags";
+ }
+}
+
+static inline void EnableFlag(int fd, int flag) {
+ const int oldf = GetFlags(fd);
+ const int newf = oldf | flag;
+
+ if (oldf != newf) {
+ SetFlags(fd, newf);
+ }
+}
+
+static inline void DisableFlag(int fd, int flag) {
+ const int oldf = GetFlags(fd);
+ const int newf = oldf & (~flag);
+
+ if (oldf != newf) {
+ SetFlags(fd, newf);
+ }
+}
+
+static inline void SetFlag(int fd, int flag, bool value) {
+ if (value) {
+ EnableFlag(fd, flag);
+ } else {
+ DisableFlag(fd, flag);
+ }
+}
+
+static inline bool FlagsAreEnabled(int fd, int flags) {
+ return GetFlags(fd) & flags;
+}
+#endif
+
+#if defined(_win_)
static inline void SetNonBlockSocket(SOCKET fd, int value) {
- unsigned long inbuf = value;
- unsigned long outbuf = 0;
- DWORD written = 0;
-
+ unsigned long inbuf = value;
+ unsigned long outbuf = 0;
+ DWORD written = 0;
+
if (!inbuf) {
WSAEventSelect(fd, nullptr, 0);
}
- if (WSAIoctl(fd, FIONBIO, &inbuf, sizeof(inbuf), &outbuf, sizeof(outbuf), &written, 0, 0) == SOCKET_ERROR) {
- ythrow TSystemError() << "can not set non block socket state";
- }
-}
-
+ if (WSAIoctl(fd, FIONBIO, &inbuf, sizeof(inbuf), &outbuf, sizeof(outbuf), &written, 0, 0) == SOCKET_ERROR) {
+ ythrow TSystemError() << "can not set non block socket state";
+ }
+}
+
static inline bool IsNonBlockSocket(SOCKET fd) {
- unsigned long buf = 0;
-
- if (WSAIoctl(fd, FIONBIO, 0, 0, &buf, sizeof(buf), 0, 0, 0) == SOCKET_ERROR) {
- ythrow TSystemError() << "can not get non block socket state";
- }
-
- return buf;
-}
-#endif
-
+ unsigned long buf = 0;
+
+ if (WSAIoctl(fd, FIONBIO, 0, 0, &buf, sizeof(buf), 0, 0, 0) == SOCKET_ERROR) {
+ ythrow TSystemError() << "can not get non block socket state";
+ }
+
+ return buf;
+}
+#endif
+
void SetNonBlock(SOCKET fd, bool value) {
-#if defined(_unix_)
- #if defined(FIONBIO)
+#if defined(_unix_)
+ #if defined(FIONBIO)
Y_UNUSED(SetFlag); // shut up clang about unused function
- int nb = value;
-
- if (ioctl(fd, FIONBIO, &nb) < 0) {
- ythrow TSystemError() << "ioctl failed";
- }
- #else
- SetFlag(fd, O_NONBLOCK, value);
- #endif
-#elif defined(_win_)
- SetNonBlockSocket(fd, value);
-#else
- #error todo
-#endif
-}
-
+ int nb = value;
+
+ if (ioctl(fd, FIONBIO, &nb) < 0) {
+ ythrow TSystemError() << "ioctl failed";
+ }
+ #else
+ SetFlag(fd, O_NONBLOCK, value);
+ #endif
+#elif defined(_win_)
+ SetNonBlockSocket(fd, value);
+#else
+ #error todo
+#endif
+}
+
bool IsNonBlock(SOCKET fd) {
-#if defined(_unix_)
- return FlagsAreEnabled(fd, O_NONBLOCK);
-#elif defined(_win_)
- return IsNonBlockSocket(fd);
-#else
- #error todo
-#endif
-}
-
-void SetDeferAccept(SOCKET s) {
- (void)s;
-
-#if defined(TCP_DEFER_ACCEPT)
- CheckedSetSockOpt(s, IPPROTO_TCP, TCP_DEFER_ACCEPT, 10, "defer accept");
-#endif
-
-#if defined(SO_ACCEPTFILTER)
- struct accept_filter_arg afa;
-
- Zero(afa);
- strcpy(afa.af_name, "dataready");
- SetSockOpt(s, SOL_SOCKET, SO_ACCEPTFILTER, afa);
-#endif
-}
-
+#if defined(_unix_)
+ return FlagsAreEnabled(fd, O_NONBLOCK);
+#elif defined(_win_)
+ return IsNonBlockSocket(fd);
+#else
+ #error todo
+#endif
+}
+
+void SetDeferAccept(SOCKET s) {
+ (void)s;
+
+#if defined(TCP_DEFER_ACCEPT)
+ CheckedSetSockOpt(s, IPPROTO_TCP, TCP_DEFER_ACCEPT, 10, "defer accept");
+#endif
+
+#if defined(SO_ACCEPTFILTER)
+ struct accept_filter_arg afa;
+
+ Zero(afa);
+ strcpy(afa.af_name, "dataready");
+ SetSockOpt(s, SOL_SOCKET, SO_ACCEPTFILTER, afa);
+#endif
+}
+
ssize_t PollD(struct pollfd fds[], nfds_t nfds, const TInstant& deadLine) noexcept {
- TInstant now = TInstant::Now();
-
- do {
- const TDuration toWait = PollStep(deadLine, now);
- const int res = poll(fds, nfds, MicroToMilli(toWait.MicroSeconds()));
-
- if (res > 0) {
- return res;
- }
-
- if (res < 0) {
- const int err = LastSystemError();
-
+ TInstant now = TInstant::Now();
+
+ do {
+ const TDuration toWait = PollStep(deadLine, now);
+ const int res = poll(fds, nfds, MicroToMilli(toWait.MicroSeconds()));
+
+ if (res > 0) {
+ return res;
+ }
+
+ if (res < 0) {
+ const int err = LastSystemError();
+
if (err != ETIMEDOUT && err != EINTR) {
- return -err;
- }
- }
- } while ((now = TInstant::Now()) < deadLine);
-
- return -ETIMEDOUT;
-}
-
-void ShutDown(SOCKET s, int mode) {
- if (shutdown(s, mode)) {
- ythrow TSystemError() << "shutdown socket error";
- }
-}
+ return -err;
+ }
+ }
+ } while ((now = TInstant::Now()) < deadLine);
+
+ return -ETIMEDOUT;
+}
+
+void ShutDown(SOCKET s, int mode) {
+ if (shutdown(s, mode)) {
+ ythrow TSystemError() << "shutdown socket error";
+ }
+}
extern "C" bool IsReusePortAvailable() {
// SO_REUSEPORT is always defined for linux builds, see SetReusePort() implementation above
diff --git a/util/network/socket.h b/util/network/socket.h
index 40c8648b40..3bf3f32f51 100644
--- a/util/network/socket.h
+++ b/util/network/socket.h
@@ -1,82 +1,82 @@
#pragma once
-#include "init.h"
-
-#include <util/system/yassert.h>
+#include "init.h"
+
+#include <util/system/yassert.h>
#include <util/system/defaults.h>
#include <util/system/error.h>
#include <util/stream/output.h>
#include <util/stream/input.h>
-#include <util/generic/ptr.h>
-#include <util/generic/yexception.h>
-#include <util/generic/noncopyable.h>
-#include <util/datetime/base.h>
-
-#include <cerrno>
+#include <util/generic/ptr.h>
+#include <util/generic/yexception.h>
+#include <util/generic/noncopyable.h>
+#include <util/datetime/base.h>
+#include <cerrno>
+
#ifndef INET_ADDRSTRLEN
- #define INET_ADDRSTRLEN 16
+ #define INET_ADDRSTRLEN 16
#endif
-#if defined(_unix_)
- #define get_host_error() h_errno
-#elif defined(_win_)
- #pragma comment(lib, "Ws2_32.lib")
-
- #if _WIN32_WINNT < 0x0600
-struct pollfd {
- SOCKET fd;
- short events;
- short revents;
-};
-
- #define POLLIN (1 << 0)
- #define POLLRDNORM (1 << 1)
- #define POLLRDBAND (1 << 2)
- #define POLLPRI (1 << 3)
- #define POLLOUT (1 << 4)
- #define POLLWRNORM (1 << 5)
- #define POLLWRBAND (1 << 6)
- #define POLLERR (1 << 7)
- #define POLLHUP (1 << 8)
- #define POLLNVAL (1 << 9)
-
-const char* inet_ntop(int af, const void* src, char* dst, socklen_t size);
+#if defined(_unix_)
+ #define get_host_error() h_errno
+#elif defined(_win_)
+ #pragma comment(lib, "Ws2_32.lib")
+
+ #if _WIN32_WINNT < 0x0600
+struct pollfd {
+ SOCKET fd;
+ short events;
+ short revents;
+};
+
+ #define POLLIN (1 << 0)
+ #define POLLRDNORM (1 << 1)
+ #define POLLRDBAND (1 << 2)
+ #define POLLPRI (1 << 3)
+ #define POLLOUT (1 << 4)
+ #define POLLWRNORM (1 << 5)
+ #define POLLWRBAND (1 << 6)
+ #define POLLERR (1 << 7)
+ #define POLLHUP (1 << 8)
+ #define POLLNVAL (1 << 9)
+
+const char* inet_ntop(int af, const void* src, char* dst, socklen_t size);
int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept;
- #else
- #define poll(fds, nfds, timeout) WSAPoll(fds, nfds, timeout)
- #endif
+ #else
+ #define poll(fds, nfds, timeout) WSAPoll(fds, nfds, timeout)
+ #endif
-int inet_aton(const char* cp, struct in_addr* inp);
+int inet_aton(const char* cp, struct in_addr* inp);
- #define get_host_error() WSAGetLastError()
+ #define get_host_error() WSAGetLastError()
- #define SHUT_RD SD_RECEIVE
- #define SHUT_WR SD_SEND
- #define SHUT_RDWR SD_BOTH
+ #define SHUT_RD SD_RECEIVE
+ #define SHUT_WR SD_SEND
+ #define SHUT_RDWR SD_BOTH
- #define INFTIM (-1)
+ #define INFTIM (-1)
#endif
-template <class T>
+template <class T>
static inline int SetSockOpt(SOCKET s, int level, int optname, T opt) noexcept {
- return setsockopt(s, level, optname, (const char*)&opt, sizeof(opt));
-}
-
-template <class T>
+ return setsockopt(s, level, optname, (const char*)&opt, sizeof(opt));
+}
+
+template <class T>
static inline int GetSockOpt(SOCKET s, int level, int optname, T& opt) noexcept {
- socklen_t len = sizeof(opt);
-
- return getsockopt(s, level, optname, (char*)&opt, &len);
-}
-
-template <class T>
-static inline void CheckedSetSockOpt(SOCKET s, int level, int optname, T opt, const char* err) {
- if (SetSockOpt<T>(s, level, optname, opt)) {
- ythrow TSystemError() << "setsockopt() failed for " << err;
- }
-}
-
+ socklen_t len = sizeof(opt);
+
+ return getsockopt(s, level, optname, (char*)&opt, &len);
+}
+
+template <class T>
+static inline void CheckedSetSockOpt(SOCKET s, int level, int optname, T opt, const char* err) {
+ if (SetSockOpt<T>(s, level, optname, opt)) {
+ ythrow TSystemError() << "setsockopt() failed for " << err;
+ }
+}
+
template <class T>
static inline void CheckedGetSockOpt(SOCKET s, int level, int optname, T& opt, const char* err) {
if (GetSockOpt<T>(s, level, optname, opt)) {
@@ -84,34 +84,34 @@ static inline void CheckedGetSockOpt(SOCKET s, int level, int optname, T& opt, c
}
}
-static inline void FixIPv6ListenSocket(SOCKET s) {
-#if defined(IPV6_V6ONLY)
- SetSockOpt(s, IPPROTO_IPV6, IPV6_V6ONLY, 1);
-#else
- (void)s;
-#endif
-}
-
+static inline void FixIPv6ListenSocket(SOCKET s) {
+#if defined(IPV6_V6ONLY)
+ SetSockOpt(s, IPPROTO_IPV6, IPV6_V6ONLY, 1);
+#else
+ (void)s;
+#endif
+}
+
namespace NAddr {
class IRemoteAddr;
}
-void SetSocketTimeout(SOCKET s, long timeout);
-void SetSocketTimeout(SOCKET s, long sec, long msec);
-void SetNoDelay(SOCKET s, bool value);
-void SetKeepAlive(SOCKET s);
-void SetLinger(SOCKET s, bool on, unsigned len);
-void SetZeroLinger(SOCKET s);
-void SetKeepAlive(SOCKET s, bool value);
+void SetSocketTimeout(SOCKET s, long timeout);
+void SetSocketTimeout(SOCKET s, long sec, long msec);
+void SetNoDelay(SOCKET s, bool value);
+void SetKeepAlive(SOCKET s);
+void SetLinger(SOCKET s, bool on, unsigned len);
+void SetZeroLinger(SOCKET s);
+void SetKeepAlive(SOCKET s, bool value);
void SetCloseOnExec(SOCKET s, bool value);
-void SetOutputBuffer(SOCKET s, unsigned value);
-void SetInputBuffer(SOCKET s, unsigned value);
+void SetOutputBuffer(SOCKET s, unsigned value);
+void SetInputBuffer(SOCKET s, unsigned value);
void SetReusePort(SOCKET s, bool value);
-void ShutDown(SOCKET s, int mode);
-bool GetRemoteAddr(SOCKET s, char* str, socklen_t size);
-size_t GetMaximumSegmentSize(SOCKET s);
-size_t GetMaximumTransferUnit(SOCKET s);
-void SetDeferAccept(SOCKET s);
+void ShutDown(SOCKET s, int mode);
+bool GetRemoteAddr(SOCKET s, char* str, socklen_t size);
+size_t GetMaximumSegmentSize(SOCKET s);
+size_t GetMaximumTransferUnit(SOCKET s);
+void SetDeferAccept(SOCKET s);
void SetSocketToS(SOCKET s, int tos);
void SetSocketToS(SOCKET s, const NAddr::IRemoteAddr* addr, int tos);
int GetSocketToS(SOCKET s);
@@ -135,7 +135,7 @@ ESocketReadStatus HasSocketDataToRead(SOCKET s);
* Determines whether connection on socket is local (same machine) or not.
**/
bool HasLocalAddress(SOCKET socket);
-
+
/**
* Runtime check if current kernel supports SO_REUSEPORT option.
**/
@@ -143,15 +143,15 @@ extern "C" bool IsReusePortAvailable();
bool IsNonBlock(SOCKET fd);
void SetNonBlock(SOCKET fd, bool nonBlock = true);
-
-struct addrinfo;
-
-class TNetworkResolutionError: public yexception {
-public:
+
+struct addrinfo;
+
+class TNetworkResolutionError: public yexception {
+public:
// @param error error code (EAI_XXX) returned by getaddrinfo or getnameinfo (not errno)
TNetworkResolutionError(int error);
-};
-
+};
+
struct TUnixSocketPath {
TString Path;
@@ -159,95 +159,95 @@ struct TUnixSocketPath {
// TUnixSocketPath("/tmp/unixsocket") -> "/tmp/unixsocket"
explicit TUnixSocketPath(const TString& path)
: Path(path)
- {
- }
+ {
+ }
};
-class TNetworkAddress {
- friend class TSocket;
-
-public:
- class TIterator {
- public:
- inline TIterator(struct addrinfo* begin)
- : C_(begin)
- {
- }
-
+class TNetworkAddress {
+ friend class TSocket;
+
+public:
+ class TIterator {
+ public:
+ inline TIterator(struct addrinfo* begin)
+ : C_(begin)
+ {
+ }
+
inline void Next() noexcept {
- C_ = C_->ai_next;
- }
-
+ C_ = C_->ai_next;
+ }
+
inline TIterator operator++(int) noexcept {
- TIterator old(*this);
-
- Next();
-
- return old;
- }
-
+ TIterator old(*this);
+
+ Next();
+
+ return old;
+ }
+
inline TIterator& operator++() noexcept {
- Next();
-
- return *this;
- }
-
+ Next();
+
+ return *this;
+ }
+
friend inline bool operator==(const TIterator& l, const TIterator& r) noexcept {
- return l.C_ == r.C_;
- }
-
+ return l.C_ == r.C_;
+ }
+
friend inline bool operator!=(const TIterator& l, const TIterator& r) noexcept {
- return !(l == r);
- }
-
+ return !(l == r);
+ }
+
inline struct addrinfo& operator*() const noexcept {
- return *C_;
- }
-
+ return *C_;
+ }
+
inline struct addrinfo* operator->() const noexcept {
- return C_;
- }
-
- private:
- struct addrinfo* C_;
- };
-
- TNetworkAddress(ui16 port);
+ return C_;
+ }
+
+ private:
+ struct addrinfo* C_;
+ };
+
+ TNetworkAddress(ui16 port);
TNetworkAddress(const TString& host, ui16 port);
TNetworkAddress(const TString& host, ui16 port, int flags);
TNetworkAddress(const TUnixSocketPath& unixSocketPath, int flags = 0);
~TNetworkAddress();
-
+
inline TIterator Begin() const noexcept {
- return TIterator(Info());
- }
-
+ return TIterator(Info());
+ }
+
inline TIterator End() const noexcept {
return TIterator(nullptr);
- }
-
-private:
+ }
+
+private:
struct addrinfo* Info() const noexcept;
-
-private:
- class TImpl;
- TSimpleIntrusivePtr<TImpl> Impl_;
-};
-
+
+private:
+ class TImpl;
+ TSimpleIntrusivePtr<TImpl> Impl_;
+};
+
class TSocket;
class TSocketHolder: public TMoveOnly {
-public:
- inline TSocketHolder()
- : Fd_(INVALID_SOCKET)
- {
- }
-
- inline TSocketHolder(SOCKET fd)
- : Fd_(fd)
- {
- }
-
+public:
+ inline TSocketHolder()
+ : Fd_(INVALID_SOCKET)
+ {
+ }
+
+ inline TSocketHolder(SOCKET fd)
+ : Fd_(fd)
+ {
+ }
+
inline TSocketHolder(TSocketHolder&& other) noexcept {
Fd_ = other.Fd_;
other.Fd_ = INVALID_SOCKET;
@@ -261,172 +261,172 @@ public:
}
inline ~TSocketHolder() {
- Close();
- }
-
+ Close();
+ }
+
inline SOCKET Release() noexcept {
- SOCKET ret = Fd_;
- Fd_ = INVALID_SOCKET;
- return ret;
- }
-
+ SOCKET ret = Fd_;
+ Fd_ = INVALID_SOCKET;
+ return ret;
+ }
+
void Close() noexcept;
-
- inline void ShutDown(int mode) const {
- ::ShutDown(Fd_, mode);
- }
-
+
+ inline void ShutDown(int mode) const {
+ ::ShutDown(Fd_, mode);
+ }
+
inline void Swap(TSocketHolder& r) noexcept {
- DoSwap(Fd_, r.Fd_);
- }
+ DoSwap(Fd_, r.Fd_);
+ }
inline bool Closed() const noexcept {
- return Fd_ == INVALID_SOCKET;
- }
-
+ return Fd_ == INVALID_SOCKET;
+ }
+
inline operator SOCKET() const noexcept {
- return Fd_;
- }
-
-private:
- SOCKET Fd_;
+ return Fd_;
+ }
+
+private:
+ SOCKET Fd_;
// do not allow construction of TSocketHolder from TSocket
TSocketHolder(const TSocket& fd);
-};
-
-class TSocket {
-public:
+};
+
+class TSocket {
+public:
using TPart = IOutputStream::TPart;
-
- class TOps {
- public:
+
+ class TOps {
+ public:
inline TOps() noexcept = default;
virtual ~TOps() = default;
-
- virtual ssize_t Send(SOCKET fd, const void* data, size_t len) = 0;
- virtual ssize_t Recv(SOCKET fd, void* buf, size_t len) = 0;
- virtual ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) = 0;
- };
-
- TSocket();
- TSocket(SOCKET fd);
- TSocket(SOCKET fd, TOps* ops);
- TSocket(const TNetworkAddress& addr);
- TSocket(const TNetworkAddress& addr, const TDuration& timeOut);
- TSocket(const TNetworkAddress& addr, const TInstant& deadLine);
-
+
+ virtual ssize_t Send(SOCKET fd, const void* data, size_t len) = 0;
+ virtual ssize_t Recv(SOCKET fd, void* buf, size_t len) = 0;
+ virtual ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) = 0;
+ };
+
+ TSocket();
+ TSocket(SOCKET fd);
+ TSocket(SOCKET fd, TOps* ops);
+ TSocket(const TNetworkAddress& addr);
+ TSocket(const TNetworkAddress& addr, const TDuration& timeOut);
+ TSocket(const TNetworkAddress& addr, const TInstant& deadLine);
+
~TSocket();
-
- template <class T>
- inline void SetSockOpt(int level, int optname, T opt) {
- CheckedSetSockOpt(Fd(), level, optname, opt, "TSocket");
- }
-
- inline void SetSocketTimeout(long timeout) {
- ::SetSocketTimeout(Fd(), timeout);
- }
-
- inline void SetSocketTimeout(long sec, long msec) {
- ::SetSocketTimeout(Fd(), sec, msec);
- }
-
- inline void SetNoDelay(bool value) {
- ::SetNoDelay(Fd(), value);
- }
-
- inline void SetLinger(bool on, unsigned len) {
- ::SetLinger(Fd(), on, len);
- }
-
- inline void SetZeroLinger() {
- ::SetZeroLinger(Fd());
- }
-
- inline void SetKeepAlive(bool value) {
- ::SetKeepAlive(Fd(), value);
- }
-
- inline void SetOutputBuffer(unsigned value) {
- ::SetOutputBuffer(Fd(), value);
- }
-
- inline void SetInputBuffer(unsigned value) {
- ::SetInputBuffer(Fd(), value);
- }
-
- inline size_t MaximumSegmentSize() const {
- return GetMaximumSegmentSize(Fd());
- }
-
- inline size_t MaximumTransferUnit() const {
- return GetMaximumTransferUnit(Fd());
- }
-
- inline void ShutDown(int mode) const {
- ::ShutDown(Fd(), mode);
- }
+
+ template <class T>
+ inline void SetSockOpt(int level, int optname, T opt) {
+ CheckedSetSockOpt(Fd(), level, optname, opt, "TSocket");
+ }
+
+ inline void SetSocketTimeout(long timeout) {
+ ::SetSocketTimeout(Fd(), timeout);
+ }
+
+ inline void SetSocketTimeout(long sec, long msec) {
+ ::SetSocketTimeout(Fd(), sec, msec);
+ }
+
+ inline void SetNoDelay(bool value) {
+ ::SetNoDelay(Fd(), value);
+ }
+
+ inline void SetLinger(bool on, unsigned len) {
+ ::SetLinger(Fd(), on, len);
+ }
+
+ inline void SetZeroLinger() {
+ ::SetZeroLinger(Fd());
+ }
+
+ inline void SetKeepAlive(bool value) {
+ ::SetKeepAlive(Fd(), value);
+ }
+
+ inline void SetOutputBuffer(unsigned value) {
+ ::SetOutputBuffer(Fd(), value);
+ }
+
+ inline void SetInputBuffer(unsigned value) {
+ ::SetInputBuffer(Fd(), value);
+ }
+
+ inline size_t MaximumSegmentSize() const {
+ return GetMaximumSegmentSize(Fd());
+ }
+
+ inline size_t MaximumTransferUnit() const {
+ return GetMaximumTransferUnit(Fd());
+ }
+
+ inline void ShutDown(int mode) const {
+ ::ShutDown(Fd(), mode);
+ }
void Close();
- ssize_t Send(const void* data, size_t len);
- ssize_t Recv(void* buf, size_t len);
-
- /*
- * scatter/gather io
- */
- ssize_t SendV(const TPart* parts, size_t count);
-
+ ssize_t Send(const void* data, size_t len);
+ ssize_t Recv(void* buf, size_t len);
+
+ /*
+ * scatter/gather io
+ */
+ ssize_t SendV(const TPart* parts, size_t count);
+
inline operator SOCKET() const noexcept {
- return Fd();
- }
-
-private:
+ return Fd();
+ }
+
+private:
SOCKET Fd() const noexcept;
-
-private:
- class TImpl;
- TSimpleIntrusivePtr<TImpl> Impl_;
-};
-
+
+private:
+ class TImpl;
+ TSimpleIntrusivePtr<TImpl> Impl_;
+};
+
class TSocketInput: public IInputStream {
-public:
+public:
TSocketInput(const TSocket& s) noexcept;
~TSocketInput() override;
-
+
TSocketInput(TSocketInput&&) noexcept = default;
TSocketInput& operator=(TSocketInput&&) noexcept = default;
const TSocket& GetSocket() const noexcept {
- return S_;
- }
+ return S_;
+ }
-private:
+private:
size_t DoRead(void* buf, size_t len) override;
-
-private:
- TSocket S_;
-};
-
+
+private:
+ TSocket S_;
+};
+
class TSocketOutput: public IOutputStream {
-public:
+public:
TSocketOutput(const TSocket& s) noexcept;
~TSocketOutput() override;
-
+
TSocketOutput(TSocketOutput&&) noexcept = default;
TSocketOutput& operator=(TSocketOutput&&) noexcept = default;
const TSocket& GetSocket() const noexcept {
- return S_;
- }
+ return S_;
+ }
-private:
+private:
void DoWrite(const void* buf, size_t len) override;
void DoWriteV(const TPart* parts, size_t count) override;
-
-private:
- TSocket S_;
-};
-
-//return -(error code) if error occured, or number of ready fds
+
+private:
+ TSocket S_;
+};
+
+//return -(error code) if error occured, or number of ready fds
ssize_t PollD(struct pollfd fds[], nfds_t nfds, const TInstant& deadLine) noexcept;
diff --git a/util/network/socket_ut.cpp b/util/network/socket_ut.cpp
index 6b20e11f70..b93d18e307 100644
--- a/util/network/socket_ut.cpp
+++ b/util/network/socket_ut.cpp
@@ -1,57 +1,57 @@
-#include "socket.h"
-
+#include "socket.h"
+
#include "pair.h"
#include <library/cpp/testing/unittest/registar.h>
-
-#include <util/string/builder.h>
+
+#include <util/string/builder.h>
#include <util/generic/vector.h>
-
-#include <ctime>
-
+
+#include <ctime>
+
#ifdef _linux_
- #include <linux/version.h>
- #include <sys/utsname.h>
+ #include <linux/version.h>
+ #include <sys/utsname.h>
#endif
-class TSockTest: public TTestBase {
+class TSockTest: public TTestBase {
UNIT_TEST_SUITE(TSockTest);
- UNIT_TEST(TestSock);
- UNIT_TEST(TestTimeout);
+ UNIT_TEST(TestSock);
+ UNIT_TEST(TestTimeout);
#ifndef _win_ // Test hangs on Windows
- UNIT_TEST_EXCEPTION(TestConnectionRefused, yexception);
+ UNIT_TEST_EXCEPTION(TestConnectionRefused, yexception);
#endif
- UNIT_TEST(TestNetworkResolutionError);
+ UNIT_TEST(TestNetworkResolutionError);
UNIT_TEST(TestNetworkResolutionErrorMessage);
UNIT_TEST(TestBrokenPipe);
UNIT_TEST(TestClose);
UNIT_TEST(TestReusePortAvailCheck);
UNIT_TEST_SUITE_END();
-
-public:
- void TestSock();
- void TestTimeout();
- void TestConnectionRefused();
- void TestNetworkResolutionError();
+
+public:
+ void TestSock();
+ void TestTimeout();
+ void TestConnectionRefused();
+ void TestNetworkResolutionError();
void TestNetworkResolutionErrorMessage();
void TestBrokenPipe();
void TestClose();
void TestReusePortAvailCheck();
-};
-
-UNIT_TEST_SUITE_REGISTRATION(TSockTest);
-
-void TSockTest::TestSock() {
- TNetworkAddress addr("yandex.ru", 80);
- TSocket s(addr);
- TSocketOutput so(s);
- TSocketInput si(s);
+};
+
+UNIT_TEST_SUITE_REGISTRATION(TSockTest);
+
+void TSockTest::TestSock() {
+ TNetworkAddress addr("yandex.ru", 80);
+ TSocket s(addr);
+ TSocketOutput so(s);
+ TSocketInput si(s);
const TStringBuf req = "GET / HTTP/1.1\r\nHost: yandex.ru\r\n\r\n";
-
+
so.Write(req.data(), req.size());
-
- UNIT_ASSERT(!si.ReadLine().empty());
-}
+
+ UNIT_ASSERT(!si.ReadLine().empty());
+}
void TSockTest::TestTimeout() {
static const int timeout = 1000;
@@ -59,7 +59,7 @@ void TSockTest::TestTimeout() {
try {
TNetworkAddress addr("localhost", 1313);
TSocket s(addr, TDuration::MilliSeconds(timeout));
- } catch (const yexception&) {
+ } catch (const yexception&) {
}
int realTimeout = (int)(millisec() - startTime);
if (realTimeout > timeout + 2000) {
@@ -77,18 +77,18 @@ void TSockTest::TestNetworkResolutionError() {
TString errMsg;
try {
TNetworkAddress addr("", 0);
- } catch (const TNetworkResolutionError& e) {
+ } catch (const TNetworkResolutionError& e) {
errMsg = e.what();
}
- if (errMsg.empty()) {
+ if (errMsg.empty()) {
return; // on Windows getaddrinfo("", 0, ...) returns "OK"
- }
+ }
int expectedErr = EAI_NONAME;
TString expectedErrMsg = gai_strerror(expectedErr);
if (errMsg.find(expectedErrMsg) == TString::npos) {
- UNIT_FAIL("TNetworkResolutionError contains\nInvalid msg: " + errMsg + "\nExpected msg: " + expectedErrMsg + "\n");
+ UNIT_FAIL("TNetworkResolutionError contains\nInvalid msg: " + errMsg + "\nExpected msg: " + expectedErrMsg + "\n");
}
}
@@ -128,7 +128,7 @@ void TSockTest::TestNetworkResolutionErrorMessage() {
#endif
}
-class TTempEnableSigPipe {
+class TTempEnableSigPipe {
public:
TTempEnableSigPipe() {
OriginalSigHandler_ = signal(SIGPIPE, SIG_DFL);
@@ -144,7 +144,7 @@ private:
void (*OriginalSigHandler_)(int);
};
-void TSockTest::TestBrokenPipe() {
+void TSockTest::TestBrokenPipe() {
TTempEnableSigPipe guard;
SOCKET socks[2];
@@ -185,7 +185,7 @@ void TSockTest::TestClose() {
UNIT_ASSERT_EQUAL(static_cast<SOCKET>(receiver), INVALID_SOCKET);
}
-void TSockTest::TestReusePortAvailCheck() {
+void TSockTest::TestReusePortAvailCheck() {
#if defined _linux_
utsname sysInfo;
Y_VERIFY(!uname(&sysInfo), "Error while call uname: %s", LastSystemErrorText());
@@ -212,27 +212,27 @@ void TSockTest::TestReusePortAvailCheck() {
#endif
}
-class TPollTest: public TTestBase {
- UNIT_TEST_SUITE(TPollTest);
- UNIT_TEST(TestPollInOut);
- UNIT_TEST_SUITE_END();
-
-public:
- inline TPollTest() {
+class TPollTest: public TTestBase {
+ UNIT_TEST_SUITE(TPollTest);
+ UNIT_TEST(TestPollInOut);
+ UNIT_TEST_SUITE_END();
+
+public:
+ inline TPollTest() {
srand(static_cast<unsigned int>(time(nullptr)));
- }
-
- void TestPollInOut();
-
-private:
- sockaddr_in GetAddress(ui32 ip, ui16 port);
- SOCKET CreateSocket();
- SOCKET StartServerSocket(ui16 port, int backlog);
- SOCKET StartClientSocket(ui32 ip, ui16 port);
- SOCKET AcceptConnection(SOCKET serverSocket);
+ }
+
+ void TestPollInOut();
+
+private:
+ sockaddr_in GetAddress(ui32 ip, ui16 port);
+ SOCKET CreateSocket();
+ SOCKET StartServerSocket(ui16 port, int backlog);
+ SOCKET StartClientSocket(ui32 ip, ui16 port);
+ SOCKET AcceptConnection(SOCKET serverSocket);
};
-UNIT_TEST_SUITE_REGISTRATION(TPollTest);
+UNIT_TEST_SUITE_REGISTRATION(TPollTest);
sockaddr_in TPollTest::GetAddress(ui32 ip, ui16 port) {
struct sockaddr_in addr;
@@ -245,38 +245,38 @@ sockaddr_in TPollTest::GetAddress(ui32 ip, ui16 port) {
SOCKET TPollTest::CreateSocket() {
SOCKET s = socket(AF_INET, SOCK_STREAM, 0);
- if (s == INVALID_SOCKET) {
+ if (s == INVALID_SOCKET) {
ythrow yexception() << "Can not create socket (" << LastSystemErrorText() << ")";
- }
+ }
return s;
}
SOCKET TPollTest::StartServerSocket(ui16 port, int backlog) {
TSocketHolder s(CreateSocket());
sockaddr_in addr = GetAddress(ntohl(INADDR_ANY), port);
- if (bind(s, (sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR) {
+ if (bind(s, (sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR) {
ythrow yexception() << "Can not bind server socket (" << LastSystemErrorText() << ")";
- }
- if (listen(s, backlog) == SOCKET_ERROR) {
+ }
+ if (listen(s, backlog) == SOCKET_ERROR) {
ythrow yexception() << "Can not listen on server socket (" << LastSystemErrorText() << ")";
- }
+ }
return s.Release();
}
SOCKET TPollTest::StartClientSocket(ui32 ip, ui16 port) {
TSocketHolder s(CreateSocket());
sockaddr_in addr = GetAddress(ip, port);
- if (connect(s, (sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR) {
+ if (connect(s, (sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR) {
ythrow yexception() << "Can not connect client socket (" << LastSystemErrorText() << ")";
- }
+ }
return s.Release();
}
SOCKET TPollTest::AcceptConnection(SOCKET serverSocket) {
SOCKET connectedSocket = accept(serverSocket, nullptr, nullptr);
- if (connectedSocket == INVALID_SOCKET) {
+ if (connectedSocket == INVALID_SOCKET) {
ythrow yexception() << "Can not accept connection on server socket (" << LastSystemErrorText() << ")";
- }
+ }
return connectedSocket;
}
@@ -294,7 +294,7 @@ void TPollTest::TestPollInOut() {
TVector<pollfd> fds;
for (size_t i = 0; i < socketCount; ++i) {
- TSimpleSharedPtr<TSocketHolder> clientSocket(new TSocketHolder(StartClientSocket(localIp, port)));
+ TSimpleSharedPtr<TSocketHolder> clientSocket(new TSocketHolder(StartClientSocket(localIp, port)));
clientSockets.push_back(clientSocket);
if (i % 5 == 0 || i % 5 == 2) {
@@ -303,7 +303,7 @@ void TPollTest::TestPollInOut() {
ythrow yexception() << "Can not send (" << LastSystemErrorText() << ")";
}
- TSimpleSharedPtr<TSocketHolder> connectedSocket(new TSocketHolder(AcceptConnection(serverSocket)));
+ TSimpleSharedPtr<TSocketHolder> connectedSocket(new TSocketHolder(AcceptConnection(serverSocket)));
connectedSockets.push_back(connectedSocket);
if (i % 5 == 2 || i % 5 == 3) {
@@ -313,7 +313,7 @@ void TPollTest::TestPollInOut() {
}
int expectedCount = 0;
- for (size_t i = 0; i < connectedSockets.size(); ++i) {
+ for (size_t i = 0; i < connectedSockets.size(); ++i) {
pollfd fd = {(i % 5 == 4) ? INVALID_SOCKET : static_cast<SOCKET>(*connectedSockets[i]), POLLIN | POLLOUT, 0};
fds.push_back(fd);
if (i % 5 != 4)
@@ -338,4 +338,4 @@ void TPollTest::TestPollInOut() {
}
}
#endif
-}
+}
diff --git a/util/network/ut/ya.make b/util/network/ut/ya.make
index 1ba03e167c..f7a20bb722 100644
--- a/util/network/ut/ya.make
+++ b/util/network/ut/ya.make
@@ -1,21 +1,21 @@
-UNITTEST_FOR(util)
+UNITTEST_FOR(util)
REQUIREMENTS(network:full)
OWNER(g:util)
SUBSCRIBER(g:util-subscribers)
-PEERDIR(
+PEERDIR(
library/cpp/threading/future
-)
-
+)
+
SRCS(
- network/address_ut.cpp
- network/endpoint_ut.cpp
- network/ip_ut.cpp
- network/poller_ut.cpp
- network/sock_ut.cpp
- network/socket_ut.cpp
+ network/address_ut.cpp
+ network/endpoint_ut.cpp
+ network/ip_ut.cpp
+ network/poller_ut.cpp
+ network/sock_ut.cpp
+ network/socket_ut.cpp
)
INCLUDE(${ARCADIA_ROOT}/util/tests/ya_util_tests.inc)