diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /util/network/socket.cpp | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'util/network/socket.cpp')
-rw-r--r-- | util/network/socket.cpp | 1230 |
1 files changed, 615 insertions, 615 deletions
diff --git a/util/network/socket.cpp b/util/network/socket.cpp index 4f6e804346..c7a545c9c2 100644 --- a/util/network/socket.cpp +++ b/util/network/socket.cpp @@ -7,50 +7,50 @@ #include <util/system/defaults.h> #include <util/system/byteorder.h> -#if defined(_unix_) - #include <netdb.h> - #include <sys/types.h> - #include <sys/socket.h> - #include <sys/un.h> - #include <sys/ioctl.h> - #include <netinet/in.h> - #include <netinet/tcp.h> - #include <arpa/inet.h> -#endif - -#if defined(_freebsd_) - #include <sys/module.h> - #define ACCEPT_FILTER_MOD - #include <sys/socketvar.h> -#endif - -#if defined(_win_) - #include <cerrno> - #include <winsock2.h> - #include <ws2tcpip.h> - #include <wspiapi.h> - - #include <util/system/compat.h> -#endif - +#if defined(_unix_) + #include <netdb.h> + #include <sys/types.h> + #include <sys/socket.h> + #include <sys/un.h> + #include <sys/ioctl.h> + #include <netinet/in.h> + #include <netinet/tcp.h> + #include <arpa/inet.h> +#endif + +#if defined(_freebsd_) + #include <sys/module.h> + #define ACCEPT_FILTER_MOD + #include <sys/socketvar.h> +#endif + +#if defined(_win_) + #include <cerrno> + #include <winsock2.h> + #include <ws2tcpip.h> + #include <wspiapi.h> + + #include <util/system/compat.h> +#endif + #include <util/generic/ylimits.h> - -#include <util/string/cast.h> + +#include <util/string/cast.h> #include <util/stream/mem.h> -#include <util/system/datetime.h> +#include <util/system/datetime.h> #include <util/system/error.h> -#include <util/memory/tempbuf.h> -#include <util/generic/singleton.h> -#include <util/generic/hash_set.h> - -#include <stddef.h> +#include <util/memory/tempbuf.h> +#include <util/generic/singleton.h> +#include <util/generic/hash_set.h> + +#include <stddef.h> #include <sys/uio.h> + +using namespace NAddr; + +#if defined(_win_) -using namespace NAddr; - -#if defined(_win_) - -int inet_aton(const char* cp, struct in_addr* inp) { +int inet_aton(const char* cp, struct in_addr* inp) { sockaddr_in addr; addr.sin_family = AF_INET; int psz = sizeof(addr); @@ -61,9 +61,9 @@ int inet_aton(const char* cp, struct in_addr* inp) { return 0; } - #if (_WIN32_WINNT < 0x0600) -const char* inet_ntop(int af, const void* src, char* dst, socklen_t size) { - if (af != AF_INET) { + #if (_WIN32_WINNT < 0x0600) +const char* inet_ntop(int af, const void* src, char* dst, socklen_t size) { + if (af != AF_INET) { errno = EINVAL; return 0; } @@ -90,7 +90,7 @@ static const evpair evpairs_to_win[] = { {POLLWRBAND, -1}, {POLLERR, 0}, {POLLHUP, 0}, - {POLLNVAL, 0}}; + {POLLNVAL, 0}}; static const size_t nevpairs_to_win = sizeof(evpairs_to_win) / sizeof(evpairs_to_win[0]); @@ -117,7 +117,7 @@ static int convert_events(int events, const evpair* evpairs, size_t nevpairs, bo result |= winEvent; } } - if (events != 0 && !ignoreUnknown) + if (events != 0 && !ignoreUnknown) return -1; return result; } @@ -128,8 +128,8 @@ private: public: inline TWSAEventHolder(HANDLE event) noexcept - : Event(event) - { + : Event(event) + { } inline ~TWSAEventHolder() { @@ -164,28 +164,28 @@ int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept { if (error == WSAEINVAL || error == WSAENOTSOCK) { fd->revents = POLLNVAL; ++checked_sockets; - } else { + } else { errno = EIO; return -1; } } fd_set readfds; fd_set writefds; - struct timeval timeout = {0, 0}; + struct timeval timeout = {0, 0}; FD_ZERO(&readfds); FD_ZERO(&writefds); if (fd->events & POLLIN) { - FD_SET(fd->fd, &readfds); + FD_SET(fd->fd, &readfds); } if (fd->events & POLLOUT) { - FD_SET(fd->fd, &writefds); + FD_SET(fd->fd, &writefds); } int error = select(0, &readfds, &writefds, nullptr, &timeout); if (error > 0) { - if (FD_ISSET(fd->fd, &readfds)) { + if (FD_ISSET(fd->fd, &readfds)) { fd->revents |= POLLIN; } - if (FD_ISSET(fd->fd, &writefds)) { + if (FD_ISSET(fd->fd, &writefds)) { fd->revents |= POLLOUT; } ++checked_sockets; @@ -201,7 +201,7 @@ int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept { DWORD wait_result = WSAWaitForMultipleEvents(1, events, TRUE, timeout, FALSE); if (wait_result == WSA_WAIT_TIMEOUT) return 0; - else if (wait_result == WSA_WAIT_EVENT_0) { + else if (wait_result == WSA_WAIT_EVENT_0) { for (pollfd* fd = fds; fd < fds + nfds; ++fd) { if (fd->revents == POLLNVAL) continue; @@ -212,7 +212,7 @@ int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept { } fd->revents = 0; for (int i = 0; i < FD_MAX_EVENTS; ++i) { - if ((network_events.lNetworkEvents & (1 << i)) != 0 && network_events.iErrorCode[i]) { + if ((network_events.lNetworkEvents & (1 << i)) != 0 && network_events.iErrorCode[i]) { fd->revents = POLLERR; break; } @@ -236,79 +236,79 @@ int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept { return -1; } } - #endif + #endif #endif -bool GetRemoteAddr(SOCKET Socket, char* str, socklen_t size) { - if (!size) { +bool GetRemoteAddr(SOCKET Socket, char* str, socklen_t size) { + if (!size) { return false; - } + } - TOpaqueAddr addr; + TOpaqueAddr addr; - if (getpeername(Socket, addr.MutableAddr(), addr.LenPtr()) != 0) { + if (getpeername(Socket, addr.MutableAddr(), addr.LenPtr()) != 0) { return false; } - - try { - TMemoryOutput out(str, size - 1); - - PrintHost(out, addr); - *out.Buf() = 0; - - return true; - } catch (...) { - // ¯\_(ツ)_/¯ - } - + + try { + TMemoryOutput out(str, size - 1); + + PrintHost(out, addr); + *out.Buf() = 0; + + return true; + } catch (...) { + // ¯\_(ツ)_/¯ + } + return false; } -void SetSocketTimeout(SOCKET s, long timeout) { - SetSocketTimeout(s, timeout, 0); +void SetSocketTimeout(SOCKET s, long timeout) { + SetSocketTimeout(s, timeout, 0); } -void SetSocketTimeout(SOCKET s, long sec, long msec) { +void SetSocketTimeout(SOCKET s, long sec, long msec) { #ifdef SO_SNDTIMEO - #ifdef _darwin_ + #ifdef _darwin_ const timeval timeout = {sec, (__darwin_suseconds_t)msec * 1000}; - #elif defined(_unix_) - const timeval timeout = {sec, msec * 1000}; - #else + #elif defined(_unix_) + const timeval timeout = {sec, msec * 1000}; + #else const int timeout = sec * 1000 + msec; - #endif - CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVTIMEO, timeout, "recv timeout"); - CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDTIMEO, timeout, "send timeout"); + #endif + CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVTIMEO, timeout, "recv timeout"); + CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDTIMEO, timeout, "send timeout"); #endif } void SetLinger(SOCKET s, bool on, unsigned len) { #ifdef SO_LINGER struct linger l = {on, (u_short)len}; - - CheckedSetSockOpt(s, SOL_SOCKET, SO_LINGER, l, "linger"); + + CheckedSetSockOpt(s, SOL_SOCKET, SO_LINGER, l, "linger"); #endif } -void SetZeroLinger(SOCKET s) { - SetLinger(s, 1, 0); -} - -void SetKeepAlive(SOCKET s, bool value) { - CheckedSetSockOpt(s, SOL_SOCKET, SO_KEEPALIVE, (int)value, "keepalive"); -} - -void SetOutputBuffer(SOCKET s, unsigned value) { - CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDBUF, value, "output buffer"); -} - -void SetInputBuffer(SOCKET s, unsigned value) { - CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVBUF, value, "input buffer"); -} - +void SetZeroLinger(SOCKET s) { + SetLinger(s, 1, 0); +} + +void SetKeepAlive(SOCKET s, bool value) { + CheckedSetSockOpt(s, SOL_SOCKET, SO_KEEPALIVE, (int)value, "keepalive"); +} + +void SetOutputBuffer(SOCKET s, unsigned value) { + CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDBUF, value, "output buffer"); +} + +void SetInputBuffer(SOCKET s, unsigned value) { + CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVBUF, value, "input buffer"); +} + #if defined(_linux_) && !defined(SO_REUSEPORT) - #define SO_REUSEPORT 15 + #define SO_REUSEPORT 15 #endif void SetReusePort(SOCKET s, bool value) { @@ -321,10 +321,10 @@ void SetReusePort(SOCKET s, bool value) { #endif } -void SetNoDelay(SOCKET s, bool value) { - CheckedSetSockOpt(s, IPPROTO_TCP, TCP_NODELAY, (int)value, "tcp no delay"); -} - +void SetNoDelay(SOCKET s, bool value) { + CheckedSetSockOpt(s, IPPROTO_TCP, TCP_NODELAY, (int)value, "tcp no delay"); +} + void SetCloseOnExec(SOCKET s, bool value) { #if defined(_unix_) int flags = fcntl(s, F_GETFD); @@ -345,21 +345,21 @@ void SetCloseOnExec(SOCKET s, bool value) { #endif } -size_t GetMaximumSegmentSize(SOCKET s) { -#if defined(TCP_MAXSEG) - int val; - - if (GetSockOpt(s, IPPROTO_TCP, TCP_MAXSEG, val) == 0) { - return (size_t)val; - } -#endif - - /* - * probably a good guess... - */ - return 8192; -} - +size_t GetMaximumSegmentSize(SOCKET s) { +#if defined(TCP_MAXSEG) + int val; + + if (GetSockOpt(s, IPPROTO_TCP, TCP_MAXSEG, val) == 0) { + return (size_t)val; + } +#endif + + /* + * probably a good guess... + */ + return 8192; +} + size_t GetMaximumTransferUnit(SOCKET /*s*/) { // for someone who'll dare to write it // Linux: there rummored to be IP_MTU getsockopt() request @@ -371,8 +371,8 @@ size_t GetMaximumTransferUnit(SOCKET /*s*/) { int GetSocketToS(SOCKET s) { TOpaqueAddr addr; - if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) { - ythrow TSystemError() << "getsockname() failed"; + if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) { + ythrow TSystemError() << "getsockname() failed"; } return GetSocketToS(s, &addr); @@ -382,15 +382,15 @@ int GetSocketToS(SOCKET s, const IRemoteAddr* addr) { int result = 0; switch (addr->Addr()->sa_family) { - case AF_INET: - CheckedGetSockOpt(s, IPPROTO_IP, IP_TOS, result, "tos"); - break; + case AF_INET: + CheckedGetSockOpt(s, IPPROTO_IP, IP_TOS, result, "tos"); + break; - case AF_INET6: + case AF_INET6: #ifdef IPV6_TCLASS - CheckedGetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, result, "tos"); + CheckedGetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, result, "tos"); #endif - break; + break; } return result; @@ -398,16 +398,16 @@ int GetSocketToS(SOCKET s, const IRemoteAddr* addr) { void SetSocketToS(SOCKET s, const NAddr::IRemoteAddr* addr, int tos) { switch (addr->Addr()->sa_family) { - case AF_INET: - CheckedSetSockOpt(s, IPPROTO_IP, IP_TOS, tos, "tos"); - return; + case AF_INET: + CheckedSetSockOpt(s, IPPROTO_IP, IP_TOS, tos, "tos"); + return; - case AF_INET6: + case AF_INET6: #ifdef IPV6_TCLASS - CheckedSetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, tos, "tos"); - return; + CheckedSetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, tos, "tos"); + return; #endif - break; + break; } ythrow yexception() << "SetSocketToS unsupported for family " << addr->Addr()->sa_family; @@ -416,8 +416,8 @@ void SetSocketToS(SOCKET s, const NAddr::IRemoteAddr* addr, int tos) { void SetSocketToS(SOCKET s, int tos) { TOpaqueAddr addr; - if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) { - ythrow TSystemError() << "getsockname() failed"; + if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) { + ythrow TSystemError() << "getsockname() failed"; } SetSocketToS(s, &addr, tos); @@ -450,9 +450,9 @@ bool HasLocalAddress(SOCKET socket) { namespace { #if defined(_linux_) - #if !defined(TCP_FASTOPEN) - #define TCP_FASTOPEN 23 - #endif + #if !defined(TCP_FASTOPEN) + #define TCP_FASTOPEN 23 + #endif #endif #if defined(TCP_FASTOPEN) @@ -511,7 +511,7 @@ struct TUnblockingGuard { static int MsgPeek(SOCKET s) { int flags = MSG_PEEK; -#if defined(_win_) +#if defined(_win_) TUnblockingGuard unblocker(s); Y_UNUSED(unblocker); #else @@ -552,20 +552,20 @@ static ssize_t DoSendMsg(SOCKET sock, const struct iovec* iov, int iovcnt) { return sendmsg(sock, &message, MSG_NOSIGNAL); } #endif - + void TSocketHolder::Close() noexcept { if (Fd_ != INVALID_SOCKET) { bool ok = (closesocket(Fd_) == 0); if (!ok) { -// Do not quietly close bad descriptor, -// because often it means double close -// that is disasterous +// Do not quietly close bad descriptor, +// because often it means double close +// that is disasterous #ifdef _win_ Y_VERIFY(WSAGetLastError() != WSAENOTSOCK, "must not quietly close bad socket descriptor"); #elif defined(_unix_) Y_VERIFY(errno != EBADF, "must not quietly close bad descriptor: fd=%d", int(Fd_)); #else - #error unsupported platform + #error unsupported platform #endif } @@ -573,239 +573,239 @@ void TSocketHolder::Close() noexcept { } } -class TSocket::TImpl: public TAtomicRefCount<TImpl> { +class TSocket::TImpl: public TAtomicRefCount<TImpl> { using TOps = TSocket::TOps; - -public: - inline TImpl(SOCKET fd, TOps* ops) - : Fd_(fd) - , Ops_(ops) - { - } - + +public: + inline TImpl(SOCKET fd, TOps* ops) + : Fd_(fd) + , Ops_(ops) + { + } + inline ~TImpl() = default; - + inline SOCKET Fd() const noexcept { - return Fd_; - } - - inline ssize_t Send(const void* data, size_t len) { - return Ops_->Send(Fd_, data, len); - } - - inline ssize_t Recv(void* buf, size_t len) { - return Ops_->Recv(Fd_, buf, len); - } - - inline ssize_t SendV(const TPart* parts, size_t count) { - return Ops_->SendV(Fd_, parts, count); - } - + return Fd_; + } + + inline ssize_t Send(const void* data, size_t len) { + return Ops_->Send(Fd_, data, len); + } + + inline ssize_t Recv(void* buf, size_t len) { + return Ops_->Recv(Fd_, buf, len); + } + + inline ssize_t SendV(const TPart* parts, size_t count) { + return Ops_->SendV(Fd_, parts, count); + } + inline void Close() { Fd_.Close(); } -private: - TSocketHolder Fd_; - TOps* Ops_; -}; - +private: + TSocketHolder Fd_; + TOps* Ops_; +}; + template <> void Out<const struct addrinfo*>(IOutputStream& os, const struct addrinfo* ai) { - if (ai->ai_flags & AI_CANONNAME) { + if (ai->ai_flags & AI_CANONNAME) { os << "`" << ai->ai_canonname << "' "; - } + } os << '['; - for (int i = 0; ai; ++i, ai = ai->ai_next) { - if (i > 0) { + for (int i = 0; ai; ++i, ai = ai->ai_next) { + if (i > 0) { os << ", "; - } + } - os << (const IRemoteAddr&)TAddrInfo(ai); + os << (const IRemoteAddr&)TAddrInfo(ai); } os << ']'; } template <> void Out<struct addrinfo*>(IOutputStream& os, struct addrinfo* ai) { - Out<const struct addrinfo*>(os, static_cast<const struct addrinfo*>(ai)); + Out<const struct addrinfo*>(os, static_cast<const struct addrinfo*>(ai)); } -template <> +template <> void Out<TNetworkAddress>(IOutputStream& os, const TNetworkAddress& addr) { - os << &*addr.Begin(); -} - + os << &*addr.Begin(); +} + static inline const struct addrinfo* Iterate(const struct addrinfo* addr, const struct addrinfo* addr0, const int sockerr) { - if (addr->ai_next) { - return addr->ai_next; - } - + if (addr->ai_next) { + return addr->ai_next; + } + ythrow TSystemError(sockerr) << "can not connect to " << addr0; -} - -static inline SOCKET DoConnectImpl(const struct addrinfo* res, const TInstant& deadLine) { - const struct addrinfo* addr0 = res; - - while (res) { - TSocketHolder s(socket(res->ai_family, res->ai_socktype, res->ai_protocol)); - - if (s.Closed()) { +} + +static inline SOCKET DoConnectImpl(const struct addrinfo* res, const TInstant& deadLine) { + const struct addrinfo* addr0 = res; + + while (res) { + TSocketHolder s(socket(res->ai_family, res->ai_socktype, res->ai_protocol)); + + if (s.Closed()) { res = Iterate(res, addr0, LastSystemError()); - - continue; - } - - SetNonBlock(s, true); - + + continue; + } + + SetNonBlock(s, true); + if (connect(s, res->ai_addr, (int)res->ai_addrlen)) { int err = LastSystemError(); - - if (err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK) { - /* - * must wait - */ - struct pollfd p = { - (SOCKET)s, - POLLOUT, - 0}; - + + if (err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK) { + /* + * must wait + */ + struct pollfd p = { + (SOCKET)s, + POLLOUT, + 0}; + const ssize_t n = PollD(&p, 1, deadLine); - - /* - * timeout occured - */ - if (n < 0) { + + /* + * timeout occured + */ + if (n < 0) { ythrow TSystemError(-(int)n) << "can not connect"; - } - + } + CheckedGetSockOpt(s, SOL_SOCKET, SO_ERROR, err, "socket error"); - if (!err) { - return s.Release(); - } - } - + if (!err) { + return s.Release(); + } + } + res = Iterate(res, addr0, err); - - continue; - } - - return s.Release(); - } - + + continue; + } + + return s.Release(); + } + ythrow yexception() << "something went wrong: nullptr at addrinfo"; -} - -static inline SOCKET DoConnect(const struct addrinfo* res, const TInstant& deadLine) { - TSocketHolder ret(DoConnectImpl(res, deadLine)); - - SetNonBlock(ret, false); - - return ret.Release(); -} - -static inline ssize_t DoSendV(SOCKET fd, const struct iovec* iov, size_t count) { +} + +static inline SOCKET DoConnect(const struct addrinfo* res, const TInstant& deadLine) { + TSocketHolder ret(DoConnectImpl(res, deadLine)); + + SetNonBlock(ret, false); + + return ret.Release(); +} + +static inline ssize_t DoSendV(SOCKET fd, const struct iovec* iov, size_t count) { ssize_t ret = -1; do { ret = DoSendMsg(fd, iov, (int)count); } while (ret == -1 && errno == EINTR); - - if (ret < 0) { + + if (ret < 0) { return -LastSystemError(); - } - - return ret; -} - -template <bool isCompat> -struct TSender { + } + + return ret; +} + +template <bool isCompat> +struct TSender { using TPart = TSocket::TPart; - - static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) { - return DoSendV(fd, (const iovec*)parts, count); - } -}; - -template <> -struct TSender<false> { + + static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) { + return DoSendV(fd, (const iovec*)parts, count); + } +}; + +template <> +struct TSender<false> { using TPart = TSocket::TPart; - - static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) { - TTempBuf tempbuf(sizeof(struct iovec) * count); - struct iovec* iov = (struct iovec*)tempbuf.Data(); - - for (size_t i = 0; i < count; ++i) { - struct iovec& io = iov[i]; - const TPart& part = parts[i]; - - io.iov_base = (char*)part.buf; - io.iov_len = part.len; - } - - return DoSendV(fd, iov, count); - } -}; - -class TCommonSockOps: public TSocket::TOps { + + static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) { + TTempBuf tempbuf(sizeof(struct iovec) * count); + struct iovec* iov = (struct iovec*)tempbuf.Data(); + + for (size_t i = 0; i < count; ++i) { + struct iovec& io = iov[i]; + const TPart& part = parts[i]; + + io.iov_base = (char*)part.buf; + io.iov_len = part.len; + } + + return DoSendV(fd, iov, count); + } +}; + +class TCommonSockOps: public TSocket::TOps { using TPart = TSocket::TPart; - -public: + +public: inline TCommonSockOps() noexcept { - } - + } + ~TCommonSockOps() override = default; - + ssize_t Send(SOCKET fd, const void* data, size_t len) override { - ssize_t ret = -1; - do { + ssize_t ret = -1; + do { ret = send(fd, (const char*)data, (int)len, MSG_NOSIGNAL); - } while (ret == -1 && errno == EINTR); - - if (ret < 0) { - return -LastSystemError(); - } - - return ret; - } - + } while (ret == -1 && errno == EINTR); + + if (ret < 0) { + return -LastSystemError(); + } + + return ret; + } + ssize_t Recv(SOCKET fd, void* buf, size_t len) override { - ssize_t ret = -1; - do { - ret = recv(fd, (char*)buf, (int)len, 0); - } while (ret == -1 && errno == EINTR); - - if (ret < 0) { - return -LastSystemError(); - } - - return ret; - } + ssize_t ret = -1; + do { + ret = recv(fd, (char*)buf, (int)len, 0); + } while (ret == -1 && errno == EINTR); + + if (ret < 0) { + return -LastSystemError(); + } + + return ret; + } ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) override { - ssize_t ret = SendVImpl(fd, parts, count); + ssize_t ret = SendVImpl(fd, parts, count); - if (ret < 0) { - return ret; - } + if (ret < 0) { + return ret; + } - size_t len = TContIOVector::Bytes(parts, count); + size_t len = TContIOVector::Bytes(parts, count); - if ((size_t)ret == len) { - return ret; - } + if ((size_t)ret == len) { + return ret; + } - return SendVPartial(fd, parts, count, ret); - } - - inline ssize_t SendVImpl(SOCKET fd, const TPart* parts, size_t count) { - return TSender < (sizeof(iovec) == sizeof(TPart)) && (offsetof(iovec, iov_base) == offsetof(TPart, buf)) && (offsetof(iovec, iov_len) == offsetof(TPart, len)) > ::SendV(fd, parts, count); - } - - ssize_t SendVPartial(SOCKET fd, const TPart* constParts, size_t count, size_t written); -}; + return SendVPartial(fd, parts, count, ret); + } + inline ssize_t SendVImpl(SOCKET fd, const TPart* parts, size_t count) { + return TSender < (sizeof(iovec) == sizeof(TPart)) && (offsetof(iovec, iov_base) == offsetof(TPart, buf)) && (offsetof(iovec, iov_len) == offsetof(TPart, len)) > ::SendV(fd, parts, count); + } + + ssize_t SendVPartial(SOCKET fd, const TPart* constParts, size_t count, size_t written); +}; + ssize_t TCommonSockOps::SendVPartial(SOCKET fd, const TPart* constParts, size_t count, size_t written) { TTempBuf tempbuf(sizeof(TPart) * count); TPart* parts = (TPart*)tempbuf.Data(); @@ -820,9 +820,9 @@ ssize_t TCommonSockOps::SendVPartial(SOCKET fd, const TPart* constParts, size_t while (!vec.Complete()) { ssize_t ret = SendVImpl(fd, vec.Parts(), vec.Count()); - if (ret < 0) { + if (ret < 0) { return ret; - } + } written += ret; @@ -833,129 +833,129 @@ ssize_t TCommonSockOps::SendVPartial(SOCKET fd, const TPart* constParts, size_t } static inline TSocket::TOps* GetCommonSockOps() noexcept { - return Singleton<TCommonSockOps>(); -} - -TSocket::TSocket() - : Impl_(new TImpl(INVALID_SOCKET, GetCommonSockOps())) -{ -} - -TSocket::TSocket(SOCKET fd) - : Impl_(new TImpl(fd, GetCommonSockOps())) -{ -} - -TSocket::TSocket(SOCKET fd, TOps* ops) - : Impl_(new TImpl(fd, ops)) -{ -} - -TSocket::TSocket(const TNetworkAddress& addr) - : Impl_(new TImpl(DoConnect(addr.Info(), TInstant::Max()), GetCommonSockOps())) -{ -} - -TSocket::TSocket(const TNetworkAddress& addr, const TDuration& timeOut) - : Impl_(new TImpl(DoConnect(addr.Info(), timeOut.ToDeadLine()), GetCommonSockOps())) -{ -} - -TSocket::TSocket(const TNetworkAddress& addr, const TInstant& deadLine) - : Impl_(new TImpl(DoConnect(addr.Info(), deadLine), GetCommonSockOps())) -{ -} - + return Singleton<TCommonSockOps>(); +} + +TSocket::TSocket() + : Impl_(new TImpl(INVALID_SOCKET, GetCommonSockOps())) +{ +} + +TSocket::TSocket(SOCKET fd) + : Impl_(new TImpl(fd, GetCommonSockOps())) +{ +} + +TSocket::TSocket(SOCKET fd, TOps* ops) + : Impl_(new TImpl(fd, ops)) +{ +} + +TSocket::TSocket(const TNetworkAddress& addr) + : Impl_(new TImpl(DoConnect(addr.Info(), TInstant::Max()), GetCommonSockOps())) +{ +} + +TSocket::TSocket(const TNetworkAddress& addr, const TDuration& timeOut) + : Impl_(new TImpl(DoConnect(addr.Info(), timeOut.ToDeadLine()), GetCommonSockOps())) +{ +} + +TSocket::TSocket(const TNetworkAddress& addr, const TInstant& deadLine) + : Impl_(new TImpl(DoConnect(addr.Info(), deadLine), GetCommonSockOps())) +{ +} + TSocket::~TSocket() = default; - + SOCKET TSocket::Fd() const noexcept { - return Impl_->Fd(); -} - -ssize_t TSocket::Send(const void* data, size_t len) { - return Impl_->Send(data, len); -} - -ssize_t TSocket::Recv(void* buf, size_t len) { - return Impl_->Recv(buf, len); -} - -ssize_t TSocket::SendV(const TPart* parts, size_t count) { - return Impl_->SendV(parts, count); -} - + return Impl_->Fd(); +} + +ssize_t TSocket::Send(const void* data, size_t len) { + return Impl_->Send(data, len); +} + +ssize_t TSocket::Recv(void* buf, size_t len) { + return Impl_->Recv(buf, len); +} + +ssize_t TSocket::SendV(const TPart* parts, size_t count) { + return Impl_->SendV(parts, count); +} + void TSocket::Close() { Impl_->Close(); } TSocketInput::TSocketInput(const TSocket& s) noexcept - : S_(s) -{ -} - + : S_(s) +{ +} + TSocketInput::~TSocketInput() = default; - -size_t TSocketInput::DoRead(void* buf, size_t len) { - const ssize_t ret = S_.Recv(buf, len); - - if (ret >= 0) { - return (size_t)ret; - } - + +size_t TSocketInput::DoRead(void* buf, size_t len) { + const ssize_t ret = S_.Recv(buf, len); + + if (ret >= 0) { + return (size_t)ret; + } + ythrow TSystemError(-(int)ret) << "can not read from socket input stream"; -} - +} + TSocketOutput::TSocketOutput(const TSocket& s) noexcept - : S_(s) -{ -} - + : S_(s) +{ +} + TSocketOutput::~TSocketOutput() { - try { - Finish(); - } catch (...) { - // ¯\_(ツ)_/¯ - } -} - -void TSocketOutput::DoWrite(const void* buf, size_t len) { + try { + Finish(); + } catch (...) { + // ¯\_(ツ)_/¯ + } +} + +void TSocketOutput::DoWrite(const void* buf, size_t len) { size_t send = 0; while (len) { const ssize_t ret = S_.Send(buf, len); - + if (ret < 0) { ythrow TSystemError(-(int)ret) << "can not write to socket output stream; " << send << " bytes already send"; } - buf = (const char*)buf + ret; + buf = (const char*)buf + ret; len -= ret; send += ret; - } -} - -void TSocketOutput::DoWriteV(const TPart* parts, size_t count) { - const ssize_t ret = S_.SendV(parts, count); - - if (ret < 0) { + } +} + +void TSocketOutput::DoWriteV(const TPart* parts, size_t count) { + const ssize_t ret = S_.SendV(parts, count); + + if (ret < 0) { ythrow TSystemError(-(int)ret) << "can not writev to socket output stream"; - } - - /* - * todo for nonblocking sockets? - */ -} - -namespace { - //https://bugzilla.mozilla.org/attachment.cgi?id=503263&action=diff - + } + + /* + * todo for nonblocking sockets? + */ +} + +namespace { + //https://bugzilla.mozilla.org/attachment.cgi?id=503263&action=diff + struct TLocalNames: public THashSet<TStringBuf> { - inline TLocalNames() { - insert("localhost"); - insert("localhost.localdomain"); - insert("localhost6"); - insert("localhost6.localdomain6"); + inline TLocalNames() { + insert("localhost"); + insert("localhost.localdomain"); + insert("localhost6"); + insert("localhost6.localdomain6"); insert("::1"); - } - + } + inline bool IsLocalName(const char* name) const noexcept { struct sockaddr_in sa; memset(&sa, 0, sizeof(sa)); @@ -965,18 +965,18 @@ namespace { } return contains(name); - } - }; -} - -class TNetworkAddress::TImpl: public TAtomicRefCount<TImpl> { + } + }; +} + +class TNetworkAddress::TImpl: public TAtomicRefCount<TImpl> { private: class TAddrInfoDeleter { public: TAddrInfoDeleter(bool useFreeAddrInfo = true) : UseFreeAddrInfo_(useFreeAddrInfo) - { - } + { + } void operator()(struct addrinfo* ai) noexcept { if (!UseFreeAddrInfo_ && ai != NULL) { @@ -984,7 +984,7 @@ private: free(ai->ai_addr); } - struct addrinfo* p; + struct addrinfo* p; while (ai != NULL) { p = ai; ai = ai->ai_next; @@ -1000,38 +1000,38 @@ private: bool UseFreeAddrInfo_ = true; }; -public: +public: inline TImpl(const char* host, ui16 port, int flags) : Info_(nullptr, TAddrInfoDeleter{}) - { + { const TString port_st(ToString(port)); - struct addrinfo hints; - - memset(&hints, 0, sizeof(hints)); - - hints.ai_flags = flags; - hints.ai_family = PF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - - if (!host) { - hints.ai_flags |= AI_PASSIVE; - } else { - if (!Singleton<TLocalNames>()->IsLocalName(host)) { - hints.ai_flags |= AI_ADDRCONFIG; - } - } - + struct addrinfo hints; + + memset(&hints, 0, sizeof(hints)); + + hints.ai_flags = flags; + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + if (!host) { + hints.ai_flags |= AI_PASSIVE; + } else { + if (!Singleton<TLocalNames>()->IsLocalName(host)) { + hints.ai_flags |= AI_ADDRCONFIG; + } + } + struct addrinfo* pai = NULL; const int error = getaddrinfo(host, port_st.data(), &hints, &pai); - - if (error) { + + if (error) { TAddrInfoDeleter()(pai); - ythrow TNetworkResolutionError(error) << ": can not resolve " << host << ":" << port; - } + ythrow TNetworkResolutionError(error) << ": can not resolve " << host << ":" << port; + } Info_.reset(pai); - } - + } + inline TImpl(const char* path, int flags) : Info_(nullptr, TAddrInfoDeleter{/* useFreeAddrInfo = */ false}) { @@ -1052,18 +1052,18 @@ public: hints->ai_addr = (struct sockaddr*)sockAddr.Release(); Info_.reset(hints.release()); - } - + } + inline struct addrinfo* Info() const noexcept { return Info_.get(); - } - -private: + } + +private: using TAddrInfoPtr = std::unique_ptr<struct addrinfo, TAddrInfoDeleter>; TAddrInfoPtr Info_; -}; - +}; + TNetworkAddress::TNetworkAddress(const TUnixSocketPath& unixSocketPath, int flags) : Impl_(new TImpl(unixSocketPath.Path.data(), flags)) { @@ -1076,24 +1076,24 @@ TNetworkAddress::TNetworkAddress(const TString& host, ui16 port, int flags) TNetworkAddress::TNetworkAddress(const TString& host, ui16 port) : Impl_(new TImpl(host.data(), port, 0)) -{ -} - -TNetworkAddress::TNetworkAddress(ui16 port) +{ +} + +TNetworkAddress::TNetworkAddress(ui16 port) : Impl_(new TImpl(nullptr, port, 0)) -{ -} - +{ +} + TNetworkAddress::~TNetworkAddress() = default; - + struct addrinfo* TNetworkAddress::Info() const noexcept { - return Impl_->Info(); -} - + return Impl_->Info(); +} + TNetworkResolutionError::TNetworkResolutionError(int error) { const char* errMsg = nullptr; #ifdef _win_ - errMsg = LastSystemErrorText(error); // gai_strerror is not thread-safe on Windows + errMsg = LastSystemErrorText(error); // gai_strerror is not thread-safe on Windows #else errMsg = gai_strerror(error); #endif @@ -1108,153 +1108,153 @@ TNetworkResolutionError::TNetworkResolutionError(int error) { (*this) << "): "; } -#if defined(_unix_) -static inline int GetFlags(int fd) { - const int ret = fcntl(fd, F_GETFL); - - if (ret == -1) { - ythrow TSystemError() << "can not get fd flags"; - } - - return ret; -} - -static inline void SetFlags(int fd, int flags) { - if (fcntl(fd, F_SETFL, flags) == -1) { - ythrow TSystemError() << "can not set fd flags"; - } -} - -static inline void EnableFlag(int fd, int flag) { - const int oldf = GetFlags(fd); - const int newf = oldf | flag; - - if (oldf != newf) { - SetFlags(fd, newf); - } -} - -static inline void DisableFlag(int fd, int flag) { - const int oldf = GetFlags(fd); - const int newf = oldf & (~flag); - - if (oldf != newf) { - SetFlags(fd, newf); - } -} - -static inline void SetFlag(int fd, int flag, bool value) { - if (value) { - EnableFlag(fd, flag); - } else { - DisableFlag(fd, flag); - } -} - -static inline bool FlagsAreEnabled(int fd, int flags) { - return GetFlags(fd) & flags; -} -#endif - -#if defined(_win_) +#if defined(_unix_) +static inline int GetFlags(int fd) { + const int ret = fcntl(fd, F_GETFL); + + if (ret == -1) { + ythrow TSystemError() << "can not get fd flags"; + } + + return ret; +} + +static inline void SetFlags(int fd, int flags) { + if (fcntl(fd, F_SETFL, flags) == -1) { + ythrow TSystemError() << "can not set fd flags"; + } +} + +static inline void EnableFlag(int fd, int flag) { + const int oldf = GetFlags(fd); + const int newf = oldf | flag; + + if (oldf != newf) { + SetFlags(fd, newf); + } +} + +static inline void DisableFlag(int fd, int flag) { + const int oldf = GetFlags(fd); + const int newf = oldf & (~flag); + + if (oldf != newf) { + SetFlags(fd, newf); + } +} + +static inline void SetFlag(int fd, int flag, bool value) { + if (value) { + EnableFlag(fd, flag); + } else { + DisableFlag(fd, flag); + } +} + +static inline bool FlagsAreEnabled(int fd, int flags) { + return GetFlags(fd) & flags; +} +#endif + +#if defined(_win_) static inline void SetNonBlockSocket(SOCKET fd, int value) { - unsigned long inbuf = value; - unsigned long outbuf = 0; - DWORD written = 0; - + unsigned long inbuf = value; + unsigned long outbuf = 0; + DWORD written = 0; + if (!inbuf) { WSAEventSelect(fd, nullptr, 0); } - if (WSAIoctl(fd, FIONBIO, &inbuf, sizeof(inbuf), &outbuf, sizeof(outbuf), &written, 0, 0) == SOCKET_ERROR) { - ythrow TSystemError() << "can not set non block socket state"; - } -} - + if (WSAIoctl(fd, FIONBIO, &inbuf, sizeof(inbuf), &outbuf, sizeof(outbuf), &written, 0, 0) == SOCKET_ERROR) { + ythrow TSystemError() << "can not set non block socket state"; + } +} + static inline bool IsNonBlockSocket(SOCKET fd) { - unsigned long buf = 0; - - if (WSAIoctl(fd, FIONBIO, 0, 0, &buf, sizeof(buf), 0, 0, 0) == SOCKET_ERROR) { - ythrow TSystemError() << "can not get non block socket state"; - } - - return buf; -} -#endif - + unsigned long buf = 0; + + if (WSAIoctl(fd, FIONBIO, 0, 0, &buf, sizeof(buf), 0, 0, 0) == SOCKET_ERROR) { + ythrow TSystemError() << "can not get non block socket state"; + } + + return buf; +} +#endif + void SetNonBlock(SOCKET fd, bool value) { -#if defined(_unix_) - #if defined(FIONBIO) +#if defined(_unix_) + #if defined(FIONBIO) Y_UNUSED(SetFlag); // shut up clang about unused function - int nb = value; - - if (ioctl(fd, FIONBIO, &nb) < 0) { - ythrow TSystemError() << "ioctl failed"; - } - #else - SetFlag(fd, O_NONBLOCK, value); - #endif -#elif defined(_win_) - SetNonBlockSocket(fd, value); -#else - #error todo -#endif -} - + int nb = value; + + if (ioctl(fd, FIONBIO, &nb) < 0) { + ythrow TSystemError() << "ioctl failed"; + } + #else + SetFlag(fd, O_NONBLOCK, value); + #endif +#elif defined(_win_) + SetNonBlockSocket(fd, value); +#else + #error todo +#endif +} + bool IsNonBlock(SOCKET fd) { -#if defined(_unix_) - return FlagsAreEnabled(fd, O_NONBLOCK); -#elif defined(_win_) - return IsNonBlockSocket(fd); -#else - #error todo -#endif -} - -void SetDeferAccept(SOCKET s) { - (void)s; - -#if defined(TCP_DEFER_ACCEPT) - CheckedSetSockOpt(s, IPPROTO_TCP, TCP_DEFER_ACCEPT, 10, "defer accept"); -#endif - -#if defined(SO_ACCEPTFILTER) - struct accept_filter_arg afa; - - Zero(afa); - strcpy(afa.af_name, "dataready"); - SetSockOpt(s, SOL_SOCKET, SO_ACCEPTFILTER, afa); -#endif -} - +#if defined(_unix_) + return FlagsAreEnabled(fd, O_NONBLOCK); +#elif defined(_win_) + return IsNonBlockSocket(fd); +#else + #error todo +#endif +} + +void SetDeferAccept(SOCKET s) { + (void)s; + +#if defined(TCP_DEFER_ACCEPT) + CheckedSetSockOpt(s, IPPROTO_TCP, TCP_DEFER_ACCEPT, 10, "defer accept"); +#endif + +#if defined(SO_ACCEPTFILTER) + struct accept_filter_arg afa; + + Zero(afa); + strcpy(afa.af_name, "dataready"); + SetSockOpt(s, SOL_SOCKET, SO_ACCEPTFILTER, afa); +#endif +} + ssize_t PollD(struct pollfd fds[], nfds_t nfds, const TInstant& deadLine) noexcept { - TInstant now = TInstant::Now(); - - do { - const TDuration toWait = PollStep(deadLine, now); - const int res = poll(fds, nfds, MicroToMilli(toWait.MicroSeconds())); - - if (res > 0) { - return res; - } - - if (res < 0) { - const int err = LastSystemError(); - + TInstant now = TInstant::Now(); + + do { + const TDuration toWait = PollStep(deadLine, now); + const int res = poll(fds, nfds, MicroToMilli(toWait.MicroSeconds())); + + if (res > 0) { + return res; + } + + if (res < 0) { + const int err = LastSystemError(); + if (err != ETIMEDOUT && err != EINTR) { - return -err; - } - } - } while ((now = TInstant::Now()) < deadLine); - - return -ETIMEDOUT; -} - -void ShutDown(SOCKET s, int mode) { - if (shutdown(s, mode)) { - ythrow TSystemError() << "shutdown socket error"; - } -} + return -err; + } + } + } while ((now = TInstant::Now()) < deadLine); + + return -ETIMEDOUT; +} + +void ShutDown(SOCKET s, int mode) { + if (shutdown(s, mode)) { + ythrow TSystemError() << "shutdown socket error"; + } +} extern "C" bool IsReusePortAvailable() { // SO_REUSEPORT is always defined for linux builds, see SetReusePort() implementation above |