aboutsummaryrefslogtreecommitdiffstats
path: root/util/network/socket.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /util/network/socket.cpp
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'util/network/socket.cpp')
-rw-r--r--util/network/socket.cpp1230
1 files changed, 615 insertions, 615 deletions
diff --git a/util/network/socket.cpp b/util/network/socket.cpp
index 4f6e804346..c7a545c9c2 100644
--- a/util/network/socket.cpp
+++ b/util/network/socket.cpp
@@ -7,50 +7,50 @@
#include <util/system/defaults.h>
#include <util/system/byteorder.h>
-#if defined(_unix_)
- #include <netdb.h>
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <sys/un.h>
- #include <sys/ioctl.h>
- #include <netinet/in.h>
- #include <netinet/tcp.h>
- #include <arpa/inet.h>
-#endif
-
-#if defined(_freebsd_)
- #include <sys/module.h>
- #define ACCEPT_FILTER_MOD
- #include <sys/socketvar.h>
-#endif
-
-#if defined(_win_)
- #include <cerrno>
- #include <winsock2.h>
- #include <ws2tcpip.h>
- #include <wspiapi.h>
-
- #include <util/system/compat.h>
-#endif
-
+#if defined(_unix_)
+ #include <netdb.h>
+ #include <sys/types.h>
+ #include <sys/socket.h>
+ #include <sys/un.h>
+ #include <sys/ioctl.h>
+ #include <netinet/in.h>
+ #include <netinet/tcp.h>
+ #include <arpa/inet.h>
+#endif
+
+#if defined(_freebsd_)
+ #include <sys/module.h>
+ #define ACCEPT_FILTER_MOD
+ #include <sys/socketvar.h>
+#endif
+
+#if defined(_win_)
+ #include <cerrno>
+ #include <winsock2.h>
+ #include <ws2tcpip.h>
+ #include <wspiapi.h>
+
+ #include <util/system/compat.h>
+#endif
+
#include <util/generic/ylimits.h>
-
-#include <util/string/cast.h>
+
+#include <util/string/cast.h>
#include <util/stream/mem.h>
-#include <util/system/datetime.h>
+#include <util/system/datetime.h>
#include <util/system/error.h>
-#include <util/memory/tempbuf.h>
-#include <util/generic/singleton.h>
-#include <util/generic/hash_set.h>
-
-#include <stddef.h>
+#include <util/memory/tempbuf.h>
+#include <util/generic/singleton.h>
+#include <util/generic/hash_set.h>
+
+#include <stddef.h>
#include <sys/uio.h>
+
+using namespace NAddr;
+
+#if defined(_win_)
-using namespace NAddr;
-
-#if defined(_win_)
-
-int inet_aton(const char* cp, struct in_addr* inp) {
+int inet_aton(const char* cp, struct in_addr* inp) {
sockaddr_in addr;
addr.sin_family = AF_INET;
int psz = sizeof(addr);
@@ -61,9 +61,9 @@ int inet_aton(const char* cp, struct in_addr* inp) {
return 0;
}
- #if (_WIN32_WINNT < 0x0600)
-const char* inet_ntop(int af, const void* src, char* dst, socklen_t size) {
- if (af != AF_INET) {
+ #if (_WIN32_WINNT < 0x0600)
+const char* inet_ntop(int af, const void* src, char* dst, socklen_t size) {
+ if (af != AF_INET) {
errno = EINVAL;
return 0;
}
@@ -90,7 +90,7 @@ static const evpair evpairs_to_win[] = {
{POLLWRBAND, -1},
{POLLERR, 0},
{POLLHUP, 0},
- {POLLNVAL, 0}};
+ {POLLNVAL, 0}};
static const size_t nevpairs_to_win = sizeof(evpairs_to_win) / sizeof(evpairs_to_win[0]);
@@ -117,7 +117,7 @@ static int convert_events(int events, const evpair* evpairs, size_t nevpairs, bo
result |= winEvent;
}
}
- if (events != 0 && !ignoreUnknown)
+ if (events != 0 && !ignoreUnknown)
return -1;
return result;
}
@@ -128,8 +128,8 @@ private:
public:
inline TWSAEventHolder(HANDLE event) noexcept
- : Event(event)
- {
+ : Event(event)
+ {
}
inline ~TWSAEventHolder() {
@@ -164,28 +164,28 @@ int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept {
if (error == WSAEINVAL || error == WSAENOTSOCK) {
fd->revents = POLLNVAL;
++checked_sockets;
- } else {
+ } else {
errno = EIO;
return -1;
}
}
fd_set readfds;
fd_set writefds;
- struct timeval timeout = {0, 0};
+ struct timeval timeout = {0, 0};
FD_ZERO(&readfds);
FD_ZERO(&writefds);
if (fd->events & POLLIN) {
- FD_SET(fd->fd, &readfds);
+ FD_SET(fd->fd, &readfds);
}
if (fd->events & POLLOUT) {
- FD_SET(fd->fd, &writefds);
+ FD_SET(fd->fd, &writefds);
}
int error = select(0, &readfds, &writefds, nullptr, &timeout);
if (error > 0) {
- if (FD_ISSET(fd->fd, &readfds)) {
+ if (FD_ISSET(fd->fd, &readfds)) {
fd->revents |= POLLIN;
}
- if (FD_ISSET(fd->fd, &writefds)) {
+ if (FD_ISSET(fd->fd, &writefds)) {
fd->revents |= POLLOUT;
}
++checked_sockets;
@@ -201,7 +201,7 @@ int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept {
DWORD wait_result = WSAWaitForMultipleEvents(1, events, TRUE, timeout, FALSE);
if (wait_result == WSA_WAIT_TIMEOUT)
return 0;
- else if (wait_result == WSA_WAIT_EVENT_0) {
+ else if (wait_result == WSA_WAIT_EVENT_0) {
for (pollfd* fd = fds; fd < fds + nfds; ++fd) {
if (fd->revents == POLLNVAL)
continue;
@@ -212,7 +212,7 @@ int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept {
}
fd->revents = 0;
for (int i = 0; i < FD_MAX_EVENTS; ++i) {
- if ((network_events.lNetworkEvents & (1 << i)) != 0 && network_events.iErrorCode[i]) {
+ if ((network_events.lNetworkEvents & (1 << i)) != 0 && network_events.iErrorCode[i]) {
fd->revents = POLLERR;
break;
}
@@ -236,79 +236,79 @@ int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept {
return -1;
}
}
- #endif
+ #endif
#endif
-bool GetRemoteAddr(SOCKET Socket, char* str, socklen_t size) {
- if (!size) {
+bool GetRemoteAddr(SOCKET Socket, char* str, socklen_t size) {
+ if (!size) {
return false;
- }
+ }
- TOpaqueAddr addr;
+ TOpaqueAddr addr;
- if (getpeername(Socket, addr.MutableAddr(), addr.LenPtr()) != 0) {
+ if (getpeername(Socket, addr.MutableAddr(), addr.LenPtr()) != 0) {
return false;
}
-
- try {
- TMemoryOutput out(str, size - 1);
-
- PrintHost(out, addr);
- *out.Buf() = 0;
-
- return true;
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
-
+
+ try {
+ TMemoryOutput out(str, size - 1);
+
+ PrintHost(out, addr);
+ *out.Buf() = 0;
+
+ return true;
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+
return false;
}
-void SetSocketTimeout(SOCKET s, long timeout) {
- SetSocketTimeout(s, timeout, 0);
+void SetSocketTimeout(SOCKET s, long timeout) {
+ SetSocketTimeout(s, timeout, 0);
}
-void SetSocketTimeout(SOCKET s, long sec, long msec) {
+void SetSocketTimeout(SOCKET s, long sec, long msec) {
#ifdef SO_SNDTIMEO
- #ifdef _darwin_
+ #ifdef _darwin_
const timeval timeout = {sec, (__darwin_suseconds_t)msec * 1000};
- #elif defined(_unix_)
- const timeval timeout = {sec, msec * 1000};
- #else
+ #elif defined(_unix_)
+ const timeval timeout = {sec, msec * 1000};
+ #else
const int timeout = sec * 1000 + msec;
- #endif
- CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVTIMEO, timeout, "recv timeout");
- CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDTIMEO, timeout, "send timeout");
+ #endif
+ CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVTIMEO, timeout, "recv timeout");
+ CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDTIMEO, timeout, "send timeout");
#endif
}
void SetLinger(SOCKET s, bool on, unsigned len) {
#ifdef SO_LINGER
struct linger l = {on, (u_short)len};
-
- CheckedSetSockOpt(s, SOL_SOCKET, SO_LINGER, l, "linger");
+
+ CheckedSetSockOpt(s, SOL_SOCKET, SO_LINGER, l, "linger");
#endif
}
-void SetZeroLinger(SOCKET s) {
- SetLinger(s, 1, 0);
-}
-
-void SetKeepAlive(SOCKET s, bool value) {
- CheckedSetSockOpt(s, SOL_SOCKET, SO_KEEPALIVE, (int)value, "keepalive");
-}
-
-void SetOutputBuffer(SOCKET s, unsigned value) {
- CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDBUF, value, "output buffer");
-}
-
-void SetInputBuffer(SOCKET s, unsigned value) {
- CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVBUF, value, "input buffer");
-}
-
+void SetZeroLinger(SOCKET s) {
+ SetLinger(s, 1, 0);
+}
+
+void SetKeepAlive(SOCKET s, bool value) {
+ CheckedSetSockOpt(s, SOL_SOCKET, SO_KEEPALIVE, (int)value, "keepalive");
+}
+
+void SetOutputBuffer(SOCKET s, unsigned value) {
+ CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDBUF, value, "output buffer");
+}
+
+void SetInputBuffer(SOCKET s, unsigned value) {
+ CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVBUF, value, "input buffer");
+}
+
#if defined(_linux_) && !defined(SO_REUSEPORT)
- #define SO_REUSEPORT 15
+ #define SO_REUSEPORT 15
#endif
void SetReusePort(SOCKET s, bool value) {
@@ -321,10 +321,10 @@ void SetReusePort(SOCKET s, bool value) {
#endif
}
-void SetNoDelay(SOCKET s, bool value) {
- CheckedSetSockOpt(s, IPPROTO_TCP, TCP_NODELAY, (int)value, "tcp no delay");
-}
-
+void SetNoDelay(SOCKET s, bool value) {
+ CheckedSetSockOpt(s, IPPROTO_TCP, TCP_NODELAY, (int)value, "tcp no delay");
+}
+
void SetCloseOnExec(SOCKET s, bool value) {
#if defined(_unix_)
int flags = fcntl(s, F_GETFD);
@@ -345,21 +345,21 @@ void SetCloseOnExec(SOCKET s, bool value) {
#endif
}
-size_t GetMaximumSegmentSize(SOCKET s) {
-#if defined(TCP_MAXSEG)
- int val;
-
- if (GetSockOpt(s, IPPROTO_TCP, TCP_MAXSEG, val) == 0) {
- return (size_t)val;
- }
-#endif
-
- /*
- * probably a good guess...
- */
- return 8192;
-}
-
+size_t GetMaximumSegmentSize(SOCKET s) {
+#if defined(TCP_MAXSEG)
+ int val;
+
+ if (GetSockOpt(s, IPPROTO_TCP, TCP_MAXSEG, val) == 0) {
+ return (size_t)val;
+ }
+#endif
+
+ /*
+ * probably a good guess...
+ */
+ return 8192;
+}
+
size_t GetMaximumTransferUnit(SOCKET /*s*/) {
// for someone who'll dare to write it
// Linux: there rummored to be IP_MTU getsockopt() request
@@ -371,8 +371,8 @@ size_t GetMaximumTransferUnit(SOCKET /*s*/) {
int GetSocketToS(SOCKET s) {
TOpaqueAddr addr;
- if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) {
- ythrow TSystemError() << "getsockname() failed";
+ if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) {
+ ythrow TSystemError() << "getsockname() failed";
}
return GetSocketToS(s, &addr);
@@ -382,15 +382,15 @@ int GetSocketToS(SOCKET s, const IRemoteAddr* addr) {
int result = 0;
switch (addr->Addr()->sa_family) {
- case AF_INET:
- CheckedGetSockOpt(s, IPPROTO_IP, IP_TOS, result, "tos");
- break;
+ case AF_INET:
+ CheckedGetSockOpt(s, IPPROTO_IP, IP_TOS, result, "tos");
+ break;
- case AF_INET6:
+ case AF_INET6:
#ifdef IPV6_TCLASS
- CheckedGetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, result, "tos");
+ CheckedGetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, result, "tos");
#endif
- break;
+ break;
}
return result;
@@ -398,16 +398,16 @@ int GetSocketToS(SOCKET s, const IRemoteAddr* addr) {
void SetSocketToS(SOCKET s, const NAddr::IRemoteAddr* addr, int tos) {
switch (addr->Addr()->sa_family) {
- case AF_INET:
- CheckedSetSockOpt(s, IPPROTO_IP, IP_TOS, tos, "tos");
- return;
+ case AF_INET:
+ CheckedSetSockOpt(s, IPPROTO_IP, IP_TOS, tos, "tos");
+ return;
- case AF_INET6:
+ case AF_INET6:
#ifdef IPV6_TCLASS
- CheckedSetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, tos, "tos");
- return;
+ CheckedSetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, tos, "tos");
+ return;
#endif
- break;
+ break;
}
ythrow yexception() << "SetSocketToS unsupported for family " << addr->Addr()->sa_family;
@@ -416,8 +416,8 @@ void SetSocketToS(SOCKET s, const NAddr::IRemoteAddr* addr, int tos) {
void SetSocketToS(SOCKET s, int tos) {
TOpaqueAddr addr;
- if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) {
- ythrow TSystemError() << "getsockname() failed";
+ if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) {
+ ythrow TSystemError() << "getsockname() failed";
}
SetSocketToS(s, &addr, tos);
@@ -450,9 +450,9 @@ bool HasLocalAddress(SOCKET socket) {
namespace {
#if defined(_linux_)
- #if !defined(TCP_FASTOPEN)
- #define TCP_FASTOPEN 23
- #endif
+ #if !defined(TCP_FASTOPEN)
+ #define TCP_FASTOPEN 23
+ #endif
#endif
#if defined(TCP_FASTOPEN)
@@ -511,7 +511,7 @@ struct TUnblockingGuard {
static int MsgPeek(SOCKET s) {
int flags = MSG_PEEK;
-#if defined(_win_)
+#if defined(_win_)
TUnblockingGuard unblocker(s);
Y_UNUSED(unblocker);
#else
@@ -552,20 +552,20 @@ static ssize_t DoSendMsg(SOCKET sock, const struct iovec* iov, int iovcnt) {
return sendmsg(sock, &message, MSG_NOSIGNAL);
}
#endif
-
+
void TSocketHolder::Close() noexcept {
if (Fd_ != INVALID_SOCKET) {
bool ok = (closesocket(Fd_) == 0);
if (!ok) {
-// Do not quietly close bad descriptor,
-// because often it means double close
-// that is disasterous
+// Do not quietly close bad descriptor,
+// because often it means double close
+// that is disasterous
#ifdef _win_
Y_VERIFY(WSAGetLastError() != WSAENOTSOCK, "must not quietly close bad socket descriptor");
#elif defined(_unix_)
Y_VERIFY(errno != EBADF, "must not quietly close bad descriptor: fd=%d", int(Fd_));
#else
- #error unsupported platform
+ #error unsupported platform
#endif
}
@@ -573,239 +573,239 @@ void TSocketHolder::Close() noexcept {
}
}
-class TSocket::TImpl: public TAtomicRefCount<TImpl> {
+class TSocket::TImpl: public TAtomicRefCount<TImpl> {
using TOps = TSocket::TOps;
-
-public:
- inline TImpl(SOCKET fd, TOps* ops)
- : Fd_(fd)
- , Ops_(ops)
- {
- }
-
+
+public:
+ inline TImpl(SOCKET fd, TOps* ops)
+ : Fd_(fd)
+ , Ops_(ops)
+ {
+ }
+
inline ~TImpl() = default;
-
+
inline SOCKET Fd() const noexcept {
- return Fd_;
- }
-
- inline ssize_t Send(const void* data, size_t len) {
- return Ops_->Send(Fd_, data, len);
- }
-
- inline ssize_t Recv(void* buf, size_t len) {
- return Ops_->Recv(Fd_, buf, len);
- }
-
- inline ssize_t SendV(const TPart* parts, size_t count) {
- return Ops_->SendV(Fd_, parts, count);
- }
-
+ return Fd_;
+ }
+
+ inline ssize_t Send(const void* data, size_t len) {
+ return Ops_->Send(Fd_, data, len);
+ }
+
+ inline ssize_t Recv(void* buf, size_t len) {
+ return Ops_->Recv(Fd_, buf, len);
+ }
+
+ inline ssize_t SendV(const TPart* parts, size_t count) {
+ return Ops_->SendV(Fd_, parts, count);
+ }
+
inline void Close() {
Fd_.Close();
}
-private:
- TSocketHolder Fd_;
- TOps* Ops_;
-};
-
+private:
+ TSocketHolder Fd_;
+ TOps* Ops_;
+};
+
template <>
void Out<const struct addrinfo*>(IOutputStream& os, const struct addrinfo* ai) {
- if (ai->ai_flags & AI_CANONNAME) {
+ if (ai->ai_flags & AI_CANONNAME) {
os << "`" << ai->ai_canonname << "' ";
- }
+ }
os << '[';
- for (int i = 0; ai; ++i, ai = ai->ai_next) {
- if (i > 0) {
+ for (int i = 0; ai; ++i, ai = ai->ai_next) {
+ if (i > 0) {
os << ", ";
- }
+ }
- os << (const IRemoteAddr&)TAddrInfo(ai);
+ os << (const IRemoteAddr&)TAddrInfo(ai);
}
os << ']';
}
template <>
void Out<struct addrinfo*>(IOutputStream& os, struct addrinfo* ai) {
- Out<const struct addrinfo*>(os, static_cast<const struct addrinfo*>(ai));
+ Out<const struct addrinfo*>(os, static_cast<const struct addrinfo*>(ai));
}
-template <>
+template <>
void Out<TNetworkAddress>(IOutputStream& os, const TNetworkAddress& addr) {
- os << &*addr.Begin();
-}
-
+ os << &*addr.Begin();
+}
+
static inline const struct addrinfo* Iterate(const struct addrinfo* addr, const struct addrinfo* addr0, const int sockerr) {
- if (addr->ai_next) {
- return addr->ai_next;
- }
-
+ if (addr->ai_next) {
+ return addr->ai_next;
+ }
+
ythrow TSystemError(sockerr) << "can not connect to " << addr0;
-}
-
-static inline SOCKET DoConnectImpl(const struct addrinfo* res, const TInstant& deadLine) {
- const struct addrinfo* addr0 = res;
-
- while (res) {
- TSocketHolder s(socket(res->ai_family, res->ai_socktype, res->ai_protocol));
-
- if (s.Closed()) {
+}
+
+static inline SOCKET DoConnectImpl(const struct addrinfo* res, const TInstant& deadLine) {
+ const struct addrinfo* addr0 = res;
+
+ while (res) {
+ TSocketHolder s(socket(res->ai_family, res->ai_socktype, res->ai_protocol));
+
+ if (s.Closed()) {
res = Iterate(res, addr0, LastSystemError());
-
- continue;
- }
-
- SetNonBlock(s, true);
-
+
+ continue;
+ }
+
+ SetNonBlock(s, true);
+
if (connect(s, res->ai_addr, (int)res->ai_addrlen)) {
int err = LastSystemError();
-
- if (err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK) {
- /*
- * must wait
- */
- struct pollfd p = {
- (SOCKET)s,
- POLLOUT,
- 0};
-
+
+ if (err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK) {
+ /*
+ * must wait
+ */
+ struct pollfd p = {
+ (SOCKET)s,
+ POLLOUT,
+ 0};
+
const ssize_t n = PollD(&p, 1, deadLine);
-
- /*
- * timeout occured
- */
- if (n < 0) {
+
+ /*
+ * timeout occured
+ */
+ if (n < 0) {
ythrow TSystemError(-(int)n) << "can not connect";
- }
-
+ }
+
CheckedGetSockOpt(s, SOL_SOCKET, SO_ERROR, err, "socket error");
- if (!err) {
- return s.Release();
- }
- }
-
+ if (!err) {
+ return s.Release();
+ }
+ }
+
res = Iterate(res, addr0, err);
-
- continue;
- }
-
- return s.Release();
- }
-
+
+ continue;
+ }
+
+ return s.Release();
+ }
+
ythrow yexception() << "something went wrong: nullptr at addrinfo";
-}
-
-static inline SOCKET DoConnect(const struct addrinfo* res, const TInstant& deadLine) {
- TSocketHolder ret(DoConnectImpl(res, deadLine));
-
- SetNonBlock(ret, false);
-
- return ret.Release();
-}
-
-static inline ssize_t DoSendV(SOCKET fd, const struct iovec* iov, size_t count) {
+}
+
+static inline SOCKET DoConnect(const struct addrinfo* res, const TInstant& deadLine) {
+ TSocketHolder ret(DoConnectImpl(res, deadLine));
+
+ SetNonBlock(ret, false);
+
+ return ret.Release();
+}
+
+static inline ssize_t DoSendV(SOCKET fd, const struct iovec* iov, size_t count) {
ssize_t ret = -1;
do {
ret = DoSendMsg(fd, iov, (int)count);
} while (ret == -1 && errno == EINTR);
-
- if (ret < 0) {
+
+ if (ret < 0) {
return -LastSystemError();
- }
-
- return ret;
-}
-
-template <bool isCompat>
-struct TSender {
+ }
+
+ return ret;
+}
+
+template <bool isCompat>
+struct TSender {
using TPart = TSocket::TPart;
-
- static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) {
- return DoSendV(fd, (const iovec*)parts, count);
- }
-};
-
-template <>
-struct TSender<false> {
+
+ static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) {
+ return DoSendV(fd, (const iovec*)parts, count);
+ }
+};
+
+template <>
+struct TSender<false> {
using TPart = TSocket::TPart;
-
- static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) {
- TTempBuf tempbuf(sizeof(struct iovec) * count);
- struct iovec* iov = (struct iovec*)tempbuf.Data();
-
- for (size_t i = 0; i < count; ++i) {
- struct iovec& io = iov[i];
- const TPart& part = parts[i];
-
- io.iov_base = (char*)part.buf;
- io.iov_len = part.len;
- }
-
- return DoSendV(fd, iov, count);
- }
-};
-
-class TCommonSockOps: public TSocket::TOps {
+
+ static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) {
+ TTempBuf tempbuf(sizeof(struct iovec) * count);
+ struct iovec* iov = (struct iovec*)tempbuf.Data();
+
+ for (size_t i = 0; i < count; ++i) {
+ struct iovec& io = iov[i];
+ const TPart& part = parts[i];
+
+ io.iov_base = (char*)part.buf;
+ io.iov_len = part.len;
+ }
+
+ return DoSendV(fd, iov, count);
+ }
+};
+
+class TCommonSockOps: public TSocket::TOps {
using TPart = TSocket::TPart;
-
-public:
+
+public:
inline TCommonSockOps() noexcept {
- }
-
+ }
+
~TCommonSockOps() override = default;
-
+
ssize_t Send(SOCKET fd, const void* data, size_t len) override {
- ssize_t ret = -1;
- do {
+ ssize_t ret = -1;
+ do {
ret = send(fd, (const char*)data, (int)len, MSG_NOSIGNAL);
- } while (ret == -1 && errno == EINTR);
-
- if (ret < 0) {
- return -LastSystemError();
- }
-
- return ret;
- }
-
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret < 0) {
+ return -LastSystemError();
+ }
+
+ return ret;
+ }
+
ssize_t Recv(SOCKET fd, void* buf, size_t len) override {
- ssize_t ret = -1;
- do {
- ret = recv(fd, (char*)buf, (int)len, 0);
- } while (ret == -1 && errno == EINTR);
-
- if (ret < 0) {
- return -LastSystemError();
- }
-
- return ret;
- }
+ ssize_t ret = -1;
+ do {
+ ret = recv(fd, (char*)buf, (int)len, 0);
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret < 0) {
+ return -LastSystemError();
+ }
+
+ return ret;
+ }
ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) override {
- ssize_t ret = SendVImpl(fd, parts, count);
+ ssize_t ret = SendVImpl(fd, parts, count);
- if (ret < 0) {
- return ret;
- }
+ if (ret < 0) {
+ return ret;
+ }
- size_t len = TContIOVector::Bytes(parts, count);
+ size_t len = TContIOVector::Bytes(parts, count);
- if ((size_t)ret == len) {
- return ret;
- }
+ if ((size_t)ret == len) {
+ return ret;
+ }
- return SendVPartial(fd, parts, count, ret);
- }
-
- inline ssize_t SendVImpl(SOCKET fd, const TPart* parts, size_t count) {
- return TSender < (sizeof(iovec) == sizeof(TPart)) && (offsetof(iovec, iov_base) == offsetof(TPart, buf)) && (offsetof(iovec, iov_len) == offsetof(TPart, len)) > ::SendV(fd, parts, count);
- }
-
- ssize_t SendVPartial(SOCKET fd, const TPart* constParts, size_t count, size_t written);
-};
+ return SendVPartial(fd, parts, count, ret);
+ }
+ inline ssize_t SendVImpl(SOCKET fd, const TPart* parts, size_t count) {
+ return TSender < (sizeof(iovec) == sizeof(TPart)) && (offsetof(iovec, iov_base) == offsetof(TPart, buf)) && (offsetof(iovec, iov_len) == offsetof(TPart, len)) > ::SendV(fd, parts, count);
+ }
+
+ ssize_t SendVPartial(SOCKET fd, const TPart* constParts, size_t count, size_t written);
+};
+
ssize_t TCommonSockOps::SendVPartial(SOCKET fd, const TPart* constParts, size_t count, size_t written) {
TTempBuf tempbuf(sizeof(TPart) * count);
TPart* parts = (TPart*)tempbuf.Data();
@@ -820,9 +820,9 @@ ssize_t TCommonSockOps::SendVPartial(SOCKET fd, const TPart* constParts, size_t
while (!vec.Complete()) {
ssize_t ret = SendVImpl(fd, vec.Parts(), vec.Count());
- if (ret < 0) {
+ if (ret < 0) {
return ret;
- }
+ }
written += ret;
@@ -833,129 +833,129 @@ ssize_t TCommonSockOps::SendVPartial(SOCKET fd, const TPart* constParts, size_t
}
static inline TSocket::TOps* GetCommonSockOps() noexcept {
- return Singleton<TCommonSockOps>();
-}
-
-TSocket::TSocket()
- : Impl_(new TImpl(INVALID_SOCKET, GetCommonSockOps()))
-{
-}
-
-TSocket::TSocket(SOCKET fd)
- : Impl_(new TImpl(fd, GetCommonSockOps()))
-{
-}
-
-TSocket::TSocket(SOCKET fd, TOps* ops)
- : Impl_(new TImpl(fd, ops))
-{
-}
-
-TSocket::TSocket(const TNetworkAddress& addr)
- : Impl_(new TImpl(DoConnect(addr.Info(), TInstant::Max()), GetCommonSockOps()))
-{
-}
-
-TSocket::TSocket(const TNetworkAddress& addr, const TDuration& timeOut)
- : Impl_(new TImpl(DoConnect(addr.Info(), timeOut.ToDeadLine()), GetCommonSockOps()))
-{
-}
-
-TSocket::TSocket(const TNetworkAddress& addr, const TInstant& deadLine)
- : Impl_(new TImpl(DoConnect(addr.Info(), deadLine), GetCommonSockOps()))
-{
-}
-
+ return Singleton<TCommonSockOps>();
+}
+
+TSocket::TSocket()
+ : Impl_(new TImpl(INVALID_SOCKET, GetCommonSockOps()))
+{
+}
+
+TSocket::TSocket(SOCKET fd)
+ : Impl_(new TImpl(fd, GetCommonSockOps()))
+{
+}
+
+TSocket::TSocket(SOCKET fd, TOps* ops)
+ : Impl_(new TImpl(fd, ops))
+{
+}
+
+TSocket::TSocket(const TNetworkAddress& addr)
+ : Impl_(new TImpl(DoConnect(addr.Info(), TInstant::Max()), GetCommonSockOps()))
+{
+}
+
+TSocket::TSocket(const TNetworkAddress& addr, const TDuration& timeOut)
+ : Impl_(new TImpl(DoConnect(addr.Info(), timeOut.ToDeadLine()), GetCommonSockOps()))
+{
+}
+
+TSocket::TSocket(const TNetworkAddress& addr, const TInstant& deadLine)
+ : Impl_(new TImpl(DoConnect(addr.Info(), deadLine), GetCommonSockOps()))
+{
+}
+
TSocket::~TSocket() = default;
-
+
SOCKET TSocket::Fd() const noexcept {
- return Impl_->Fd();
-}
-
-ssize_t TSocket::Send(const void* data, size_t len) {
- return Impl_->Send(data, len);
-}
-
-ssize_t TSocket::Recv(void* buf, size_t len) {
- return Impl_->Recv(buf, len);
-}
-
-ssize_t TSocket::SendV(const TPart* parts, size_t count) {
- return Impl_->SendV(parts, count);
-}
-
+ return Impl_->Fd();
+}
+
+ssize_t TSocket::Send(const void* data, size_t len) {
+ return Impl_->Send(data, len);
+}
+
+ssize_t TSocket::Recv(void* buf, size_t len) {
+ return Impl_->Recv(buf, len);
+}
+
+ssize_t TSocket::SendV(const TPart* parts, size_t count) {
+ return Impl_->SendV(parts, count);
+}
+
void TSocket::Close() {
Impl_->Close();
}
TSocketInput::TSocketInput(const TSocket& s) noexcept
- : S_(s)
-{
-}
-
+ : S_(s)
+{
+}
+
TSocketInput::~TSocketInput() = default;
-
-size_t TSocketInput::DoRead(void* buf, size_t len) {
- const ssize_t ret = S_.Recv(buf, len);
-
- if (ret >= 0) {
- return (size_t)ret;
- }
-
+
+size_t TSocketInput::DoRead(void* buf, size_t len) {
+ const ssize_t ret = S_.Recv(buf, len);
+
+ if (ret >= 0) {
+ return (size_t)ret;
+ }
+
ythrow TSystemError(-(int)ret) << "can not read from socket input stream";
-}
-
+}
+
TSocketOutput::TSocketOutput(const TSocket& s) noexcept
- : S_(s)
-{
-}
-
+ : S_(s)
+{
+}
+
TSocketOutput::~TSocketOutput() {
- try {
- Finish();
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
-}
-
-void TSocketOutput::DoWrite(const void* buf, size_t len) {
+ try {
+ Finish();
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+}
+
+void TSocketOutput::DoWrite(const void* buf, size_t len) {
size_t send = 0;
while (len) {
const ssize_t ret = S_.Send(buf, len);
-
+
if (ret < 0) {
ythrow TSystemError(-(int)ret) << "can not write to socket output stream; " << send << " bytes already send";
}
- buf = (const char*)buf + ret;
+ buf = (const char*)buf + ret;
len -= ret;
send += ret;
- }
-}
-
-void TSocketOutput::DoWriteV(const TPart* parts, size_t count) {
- const ssize_t ret = S_.SendV(parts, count);
-
- if (ret < 0) {
+ }
+}
+
+void TSocketOutput::DoWriteV(const TPart* parts, size_t count) {
+ const ssize_t ret = S_.SendV(parts, count);
+
+ if (ret < 0) {
ythrow TSystemError(-(int)ret) << "can not writev to socket output stream";
- }
-
- /*
- * todo for nonblocking sockets?
- */
-}
-
-namespace {
- //https://bugzilla.mozilla.org/attachment.cgi?id=503263&action=diff
-
+ }
+
+ /*
+ * todo for nonblocking sockets?
+ */
+}
+
+namespace {
+ //https://bugzilla.mozilla.org/attachment.cgi?id=503263&action=diff
+
struct TLocalNames: public THashSet<TStringBuf> {
- inline TLocalNames() {
- insert("localhost");
- insert("localhost.localdomain");
- insert("localhost6");
- insert("localhost6.localdomain6");
+ inline TLocalNames() {
+ insert("localhost");
+ insert("localhost.localdomain");
+ insert("localhost6");
+ insert("localhost6.localdomain6");
insert("::1");
- }
-
+ }
+
inline bool IsLocalName(const char* name) const noexcept {
struct sockaddr_in sa;
memset(&sa, 0, sizeof(sa));
@@ -965,18 +965,18 @@ namespace {
}
return contains(name);
- }
- };
-}
-
-class TNetworkAddress::TImpl: public TAtomicRefCount<TImpl> {
+ }
+ };
+}
+
+class TNetworkAddress::TImpl: public TAtomicRefCount<TImpl> {
private:
class TAddrInfoDeleter {
public:
TAddrInfoDeleter(bool useFreeAddrInfo = true)
: UseFreeAddrInfo_(useFreeAddrInfo)
- {
- }
+ {
+ }
void operator()(struct addrinfo* ai) noexcept {
if (!UseFreeAddrInfo_ && ai != NULL) {
@@ -984,7 +984,7 @@ private:
free(ai->ai_addr);
}
- struct addrinfo* p;
+ struct addrinfo* p;
while (ai != NULL) {
p = ai;
ai = ai->ai_next;
@@ -1000,38 +1000,38 @@ private:
bool UseFreeAddrInfo_ = true;
};
-public:
+public:
inline TImpl(const char* host, ui16 port, int flags)
: Info_(nullptr, TAddrInfoDeleter{})
- {
+ {
const TString port_st(ToString(port));
- struct addrinfo hints;
-
- memset(&hints, 0, sizeof(hints));
-
- hints.ai_flags = flags;
- hints.ai_family = PF_UNSPEC;
- hints.ai_socktype = SOCK_STREAM;
-
- if (!host) {
- hints.ai_flags |= AI_PASSIVE;
- } else {
- if (!Singleton<TLocalNames>()->IsLocalName(host)) {
- hints.ai_flags |= AI_ADDRCONFIG;
- }
- }
-
+ struct addrinfo hints;
+
+ memset(&hints, 0, sizeof(hints));
+
+ hints.ai_flags = flags;
+ hints.ai_family = PF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+
+ if (!host) {
+ hints.ai_flags |= AI_PASSIVE;
+ } else {
+ if (!Singleton<TLocalNames>()->IsLocalName(host)) {
+ hints.ai_flags |= AI_ADDRCONFIG;
+ }
+ }
+
struct addrinfo* pai = NULL;
const int error = getaddrinfo(host, port_st.data(), &hints, &pai);
-
- if (error) {
+
+ if (error) {
TAddrInfoDeleter()(pai);
- ythrow TNetworkResolutionError(error) << ": can not resolve " << host << ":" << port;
- }
+ ythrow TNetworkResolutionError(error) << ": can not resolve " << host << ":" << port;
+ }
Info_.reset(pai);
- }
-
+ }
+
inline TImpl(const char* path, int flags)
: Info_(nullptr, TAddrInfoDeleter{/* useFreeAddrInfo = */ false})
{
@@ -1052,18 +1052,18 @@ public:
hints->ai_addr = (struct sockaddr*)sockAddr.Release();
Info_.reset(hints.release());
- }
-
+ }
+
inline struct addrinfo* Info() const noexcept {
return Info_.get();
- }
-
-private:
+ }
+
+private:
using TAddrInfoPtr = std::unique_ptr<struct addrinfo, TAddrInfoDeleter>;
TAddrInfoPtr Info_;
-};
-
+};
+
TNetworkAddress::TNetworkAddress(const TUnixSocketPath& unixSocketPath, int flags)
: Impl_(new TImpl(unixSocketPath.Path.data(), flags))
{
@@ -1076,24 +1076,24 @@ TNetworkAddress::TNetworkAddress(const TString& host, ui16 port, int flags)
TNetworkAddress::TNetworkAddress(const TString& host, ui16 port)
: Impl_(new TImpl(host.data(), port, 0))
-{
-}
-
-TNetworkAddress::TNetworkAddress(ui16 port)
+{
+}
+
+TNetworkAddress::TNetworkAddress(ui16 port)
: Impl_(new TImpl(nullptr, port, 0))
-{
-}
-
+{
+}
+
TNetworkAddress::~TNetworkAddress() = default;
-
+
struct addrinfo* TNetworkAddress::Info() const noexcept {
- return Impl_->Info();
-}
-
+ return Impl_->Info();
+}
+
TNetworkResolutionError::TNetworkResolutionError(int error) {
const char* errMsg = nullptr;
#ifdef _win_
- errMsg = LastSystemErrorText(error); // gai_strerror is not thread-safe on Windows
+ errMsg = LastSystemErrorText(error); // gai_strerror is not thread-safe on Windows
#else
errMsg = gai_strerror(error);
#endif
@@ -1108,153 +1108,153 @@ TNetworkResolutionError::TNetworkResolutionError(int error) {
(*this) << "): ";
}
-#if defined(_unix_)
-static inline int GetFlags(int fd) {
- const int ret = fcntl(fd, F_GETFL);
-
- if (ret == -1) {
- ythrow TSystemError() << "can not get fd flags";
- }
-
- return ret;
-}
-
-static inline void SetFlags(int fd, int flags) {
- if (fcntl(fd, F_SETFL, flags) == -1) {
- ythrow TSystemError() << "can not set fd flags";
- }
-}
-
-static inline void EnableFlag(int fd, int flag) {
- const int oldf = GetFlags(fd);
- const int newf = oldf | flag;
-
- if (oldf != newf) {
- SetFlags(fd, newf);
- }
-}
-
-static inline void DisableFlag(int fd, int flag) {
- const int oldf = GetFlags(fd);
- const int newf = oldf & (~flag);
-
- if (oldf != newf) {
- SetFlags(fd, newf);
- }
-}
-
-static inline void SetFlag(int fd, int flag, bool value) {
- if (value) {
- EnableFlag(fd, flag);
- } else {
- DisableFlag(fd, flag);
- }
-}
-
-static inline bool FlagsAreEnabled(int fd, int flags) {
- return GetFlags(fd) & flags;
-}
-#endif
-
-#if defined(_win_)
+#if defined(_unix_)
+static inline int GetFlags(int fd) {
+ const int ret = fcntl(fd, F_GETFL);
+
+ if (ret == -1) {
+ ythrow TSystemError() << "can not get fd flags";
+ }
+
+ return ret;
+}
+
+static inline void SetFlags(int fd, int flags) {
+ if (fcntl(fd, F_SETFL, flags) == -1) {
+ ythrow TSystemError() << "can not set fd flags";
+ }
+}
+
+static inline void EnableFlag(int fd, int flag) {
+ const int oldf = GetFlags(fd);
+ const int newf = oldf | flag;
+
+ if (oldf != newf) {
+ SetFlags(fd, newf);
+ }
+}
+
+static inline void DisableFlag(int fd, int flag) {
+ const int oldf = GetFlags(fd);
+ const int newf = oldf & (~flag);
+
+ if (oldf != newf) {
+ SetFlags(fd, newf);
+ }
+}
+
+static inline void SetFlag(int fd, int flag, bool value) {
+ if (value) {
+ EnableFlag(fd, flag);
+ } else {
+ DisableFlag(fd, flag);
+ }
+}
+
+static inline bool FlagsAreEnabled(int fd, int flags) {
+ return GetFlags(fd) & flags;
+}
+#endif
+
+#if defined(_win_)
static inline void SetNonBlockSocket(SOCKET fd, int value) {
- unsigned long inbuf = value;
- unsigned long outbuf = 0;
- DWORD written = 0;
-
+ unsigned long inbuf = value;
+ unsigned long outbuf = 0;
+ DWORD written = 0;
+
if (!inbuf) {
WSAEventSelect(fd, nullptr, 0);
}
- if (WSAIoctl(fd, FIONBIO, &inbuf, sizeof(inbuf), &outbuf, sizeof(outbuf), &written, 0, 0) == SOCKET_ERROR) {
- ythrow TSystemError() << "can not set non block socket state";
- }
-}
-
+ if (WSAIoctl(fd, FIONBIO, &inbuf, sizeof(inbuf), &outbuf, sizeof(outbuf), &written, 0, 0) == SOCKET_ERROR) {
+ ythrow TSystemError() << "can not set non block socket state";
+ }
+}
+
static inline bool IsNonBlockSocket(SOCKET fd) {
- unsigned long buf = 0;
-
- if (WSAIoctl(fd, FIONBIO, 0, 0, &buf, sizeof(buf), 0, 0, 0) == SOCKET_ERROR) {
- ythrow TSystemError() << "can not get non block socket state";
- }
-
- return buf;
-}
-#endif
-
+ unsigned long buf = 0;
+
+ if (WSAIoctl(fd, FIONBIO, 0, 0, &buf, sizeof(buf), 0, 0, 0) == SOCKET_ERROR) {
+ ythrow TSystemError() << "can not get non block socket state";
+ }
+
+ return buf;
+}
+#endif
+
void SetNonBlock(SOCKET fd, bool value) {
-#if defined(_unix_)
- #if defined(FIONBIO)
+#if defined(_unix_)
+ #if defined(FIONBIO)
Y_UNUSED(SetFlag); // shut up clang about unused function
- int nb = value;
-
- if (ioctl(fd, FIONBIO, &nb) < 0) {
- ythrow TSystemError() << "ioctl failed";
- }
- #else
- SetFlag(fd, O_NONBLOCK, value);
- #endif
-#elif defined(_win_)
- SetNonBlockSocket(fd, value);
-#else
- #error todo
-#endif
-}
-
+ int nb = value;
+
+ if (ioctl(fd, FIONBIO, &nb) < 0) {
+ ythrow TSystemError() << "ioctl failed";
+ }
+ #else
+ SetFlag(fd, O_NONBLOCK, value);
+ #endif
+#elif defined(_win_)
+ SetNonBlockSocket(fd, value);
+#else
+ #error todo
+#endif
+}
+
bool IsNonBlock(SOCKET fd) {
-#if defined(_unix_)
- return FlagsAreEnabled(fd, O_NONBLOCK);
-#elif defined(_win_)
- return IsNonBlockSocket(fd);
-#else
- #error todo
-#endif
-}
-
-void SetDeferAccept(SOCKET s) {
- (void)s;
-
-#if defined(TCP_DEFER_ACCEPT)
- CheckedSetSockOpt(s, IPPROTO_TCP, TCP_DEFER_ACCEPT, 10, "defer accept");
-#endif
-
-#if defined(SO_ACCEPTFILTER)
- struct accept_filter_arg afa;
-
- Zero(afa);
- strcpy(afa.af_name, "dataready");
- SetSockOpt(s, SOL_SOCKET, SO_ACCEPTFILTER, afa);
-#endif
-}
-
+#if defined(_unix_)
+ return FlagsAreEnabled(fd, O_NONBLOCK);
+#elif defined(_win_)
+ return IsNonBlockSocket(fd);
+#else
+ #error todo
+#endif
+}
+
+void SetDeferAccept(SOCKET s) {
+ (void)s;
+
+#if defined(TCP_DEFER_ACCEPT)
+ CheckedSetSockOpt(s, IPPROTO_TCP, TCP_DEFER_ACCEPT, 10, "defer accept");
+#endif
+
+#if defined(SO_ACCEPTFILTER)
+ struct accept_filter_arg afa;
+
+ Zero(afa);
+ strcpy(afa.af_name, "dataready");
+ SetSockOpt(s, SOL_SOCKET, SO_ACCEPTFILTER, afa);
+#endif
+}
+
ssize_t PollD(struct pollfd fds[], nfds_t nfds, const TInstant& deadLine) noexcept {
- TInstant now = TInstant::Now();
-
- do {
- const TDuration toWait = PollStep(deadLine, now);
- const int res = poll(fds, nfds, MicroToMilli(toWait.MicroSeconds()));
-
- if (res > 0) {
- return res;
- }
-
- if (res < 0) {
- const int err = LastSystemError();
-
+ TInstant now = TInstant::Now();
+
+ do {
+ const TDuration toWait = PollStep(deadLine, now);
+ const int res = poll(fds, nfds, MicroToMilli(toWait.MicroSeconds()));
+
+ if (res > 0) {
+ return res;
+ }
+
+ if (res < 0) {
+ const int err = LastSystemError();
+
if (err != ETIMEDOUT && err != EINTR) {
- return -err;
- }
- }
- } while ((now = TInstant::Now()) < deadLine);
-
- return -ETIMEDOUT;
-}
-
-void ShutDown(SOCKET s, int mode) {
- if (shutdown(s, mode)) {
- ythrow TSystemError() << "shutdown socket error";
- }
-}
+ return -err;
+ }
+ }
+ } while ((now = TInstant::Now()) < deadLine);
+
+ return -ETIMEDOUT;
+}
+
+void ShutDown(SOCKET s, int mode) {
+ if (shutdown(s, mode)) {
+ ythrow TSystemError() << "shutdown socket error";
+ }
+}
extern "C" bool IsReusePortAvailable() {
// SO_REUSEPORT is always defined for linux builds, see SetReusePort() implementation above