aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/netliba/socket
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/netliba/socket
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'library/cpp/netliba/socket')
-rw-r--r--library/cpp/netliba/socket/allocator.h14
-rw-r--r--library/cpp/netliba/socket/creators.cpp141
-rw-r--r--library/cpp/netliba/socket/packet_queue.h97
-rw-r--r--library/cpp/netliba/socket/protocols.h48
-rw-r--r--library/cpp/netliba/socket/socket.cpp1086
-rw-r--r--library/cpp/netliba/socket/socket.h126
-rw-r--r--library/cpp/netliba/socket/stdafx.cpp1
-rw-r--r--library/cpp/netliba/socket/stdafx.h16
-rw-r--r--library/cpp/netliba/socket/udp_recv_packet.h79
9 files changed, 1608 insertions, 0 deletions
diff --git a/library/cpp/netliba/socket/allocator.h b/library/cpp/netliba/socket/allocator.h
new file mode 100644
index 0000000000..f09b0dabcf
--- /dev/null
+++ b/library/cpp/netliba/socket/allocator.h
@@ -0,0 +1,14 @@
+#pragma once
+
+#ifdef NETLIBA_WITH_NALF
+#include <library/cpp/malloc/nalf/alloc_helpers.h>
+using TWithCustomAllocator = TWithNalfIncrementalAlloc;
+template <typename T>
+using TCustomAllocator = TNalfIncrementalAllocator<T>;
+#else
+#include <memory>
+typedef struct {
+} TWithCustomAllocator;
+template <typename T>
+using TCustomAllocator = std::allocator<T>;
+#endif
diff --git a/library/cpp/netliba/socket/creators.cpp b/library/cpp/netliba/socket/creators.cpp
new file mode 100644
index 0000000000..3821bf55b9
--- /dev/null
+++ b/library/cpp/netliba/socket/creators.cpp
@@ -0,0 +1,141 @@
+#include "stdafx.h"
+#include <string.h>
+#include <util/generic/utility.h>
+#include <util/network/init.h>
+#include <util/system/defaults.h>
+#include <util/system/yassert.h>
+#include "socket.h"
+
+namespace NNetlibaSocket {
+ void* CreateTos(const ui8 tos, void* buffer) {
+#ifdef _win_
+ *(int*)buffer = (int)tos;
+#else
+ // glibc bug: http://sourceware.org/bugzilla/show_bug.cgi?id=13500
+ memset(buffer, 0, TOS_BUFFER_SIZE);
+
+ msghdr dummy;
+ Zero(dummy);
+ dummy.msg_control = buffer;
+ dummy.msg_controllen = TOS_BUFFER_SIZE;
+
+ // TODO: in FreeBSD setting TOS for dual stack sockets does not affect ipv4 frames
+ cmsghdr* cmsg = CMSG_FIRSTHDR(&dummy);
+ cmsg->cmsg_level = IPPROTO_IPV6;
+ cmsg->cmsg_type = IPV6_TCLASS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(int));
+ memcpy(CMSG_DATA(cmsg), &tos, sizeof(tos)); // memcpy shut ups alias restrict warning
+
+ Y_ASSERT(CMSG_NXTHDR(&dummy, cmsg) == nullptr);
+#endif
+ return buffer;
+ }
+
+ TMsgHdr* AddSockAuxData(TMsgHdr* header, const ui8 tos, const sockaddr_in6& myAddr, void* buffer, size_t bufferSize) {
+#ifdef _win_
+ Y_UNUSED(header);
+ Y_UNUSED(tos);
+ Y_UNUSED(myAddr);
+ Y_UNUSED(buffer);
+ Y_UNUSED(bufferSize);
+ return nullptr;
+#else
+ header->msg_control = buffer;
+ header->msg_controllen = bufferSize;
+
+ size_t totalLen = 0;
+#ifdef _cygwin_
+ Y_UNUSED(tos);
+#else
+ // Cygwin does not support IPV6_TCLASS, so we ignore it
+ cmsghdr* cmsgTos = CMSG_FIRSTHDR(header);
+ if (cmsgTos == nullptr) {
+ header->msg_control = nullptr;
+ header->msg_controllen = 0;
+ return nullptr;
+ }
+ cmsgTos->cmsg_level = IPPROTO_IPV6;
+ cmsgTos->cmsg_type = IPV6_TCLASS;
+ cmsgTos->cmsg_len = CMSG_LEN(sizeof(int));
+ totalLen = CMSG_SPACE(sizeof(int));
+ *(ui8*)CMSG_DATA(cmsgTos) = tos;
+#endif
+
+ if (*(ui64*)myAddr.sin6_addr.s6_addr != 0u) {
+ in6_pktinfo* pktInfo;
+#ifdef _cygwin_
+ cmsghdr* cmsgAddr = CMSG_FIRSTHDR(header);
+#else
+ cmsghdr* cmsgAddr = CMSG_NXTHDR(header, cmsgTos);
+#endif
+ if (cmsgAddr == nullptr) {
+ // leave only previous record
+ header->msg_controllen = totalLen;
+ return nullptr;
+ }
+ cmsgAddr->cmsg_level = IPPROTO_IPV6;
+ cmsgAddr->cmsg_type = IPV6_PKTINFO;
+ cmsgAddr->cmsg_len = CMSG_LEN(sizeof(*pktInfo));
+ totalLen += CMSG_SPACE(sizeof(*pktInfo));
+ pktInfo = (in6_pktinfo*)CMSG_DATA(cmsgAddr);
+
+ pktInfo->ipi6_addr = myAddr.sin6_addr;
+ pktInfo->ipi6_ifindex = 0; /* 0 = use interface specified in routing table */
+ }
+ header->msg_controllen = totalLen; //write right len
+
+ return header;
+#endif
+ }
+
+ TIoVec CreateIoVec(char* data, const size_t dataSize) {
+ TIoVec result;
+ Zero(result);
+
+ result.iov_base = data;
+ result.iov_len = dataSize;
+
+ return result;
+ }
+
+ TMsgHdr CreateSendMsgHdr(const sockaddr_in6& addr, const TIoVec& iov, void* tosBuffer) {
+ TMsgHdr result;
+ Zero(result);
+
+ result.msg_name = (void*)&addr;
+ result.msg_namelen = sizeof(addr);
+ result.msg_iov = (TIoVec*)&iov;
+ result.msg_iovlen = 1;
+
+ if (tosBuffer) {
+#ifdef _win_
+ result.Tos = *(int*)tosBuffer;
+#else
+ result.msg_control = tosBuffer;
+ result.msg_controllen = TOS_BUFFER_SIZE;
+#endif
+ }
+
+ return result;
+ }
+
+ TMsgHdr CreateRecvMsgHdr(sockaddr_in6* addrBuf, const TIoVec& iov, void* controllBuffer) {
+ TMsgHdr result;
+ Zero(result);
+
+ Zero(*addrBuf);
+ result.msg_name = addrBuf;
+ result.msg_namelen = sizeof(*addrBuf);
+
+ result.msg_iov = (TIoVec*)&iov;
+ result.msg_iovlen = 1;
+#ifndef _win_
+ if (controllBuffer) {
+ memset(controllBuffer, 0, CTRL_BUFFER_SIZE);
+ result.msg_control = controllBuffer;
+ result.msg_controllen = CTRL_BUFFER_SIZE;
+ }
+#endif
+ return result;
+ }
+}
diff --git a/library/cpp/netliba/socket/packet_queue.h b/library/cpp/netliba/socket/packet_queue.h
new file mode 100644
index 0000000000..58a84709c2
--- /dev/null
+++ b/library/cpp/netliba/socket/packet_queue.h
@@ -0,0 +1,97 @@
+#pragma once
+
+#include "udp_recv_packet.h"
+
+#include <library/cpp/threading/chunk_queue/queue.h>
+
+#include <util/network/init.h>
+#include <library/cpp/deprecated/atomic/atomic.h>
+#include <util/system/event.h>
+#include <util/system/yassert.h>
+#include <library/cpp/deprecated/atomic/atomic_ops.h>
+#include <utility>
+
+namespace NNetlibaSocket {
+ struct TPacketMeta {
+ sockaddr_in6 RemoteAddr;
+ sockaddr_in6 MyAddr;
+ };
+
+ template <size_t TTNumWriterThreads>
+ class TLockFreePacketQueue {
+ private:
+ enum { MAX_PACKETS_IN_QUEUE = INT_MAX,
+ CMD_QUEUE_RESERVE = 1 << 20,
+ MAX_DATA_IN_QUEUE = 32 << 20 };
+
+ typedef std::pair<TUdpRecvPacket*, TPacketMeta> TPacket;
+ typedef std::conditional_t<TTNumWriterThreads == 1, NThreading::TOneOneQueue<TPacket>, NThreading::TManyOneQueue<TPacket, TTNumWriterThreads>> TImpl;
+
+ mutable TImpl Queue;
+ mutable TSystemEvent QueueEvent;
+
+ mutable TAtomic NumPackets;
+ TAtomic DataSize;
+
+ public:
+ TLockFreePacketQueue()
+ : NumPackets(0)
+ , DataSize(0)
+ {
+ }
+
+ ~TLockFreePacketQueue() {
+ TPacket packet;
+ while (Queue.Dequeue(packet)) {
+ delete packet.first;
+ }
+ }
+
+ bool IsDataPartFull() const {
+ return (AtomicGet(NumPackets) >= MAX_PACKETS_IN_QUEUE || AtomicGet(DataSize) >= MAX_DATA_IN_QUEUE - CMD_QUEUE_RESERVE);
+ }
+
+ bool Push(TUdpRecvPacket* packet, const TPacketMeta& meta) {
+ // simulate OS behavior on buffer overflow - drop packets.
+ // yeah it contains small data race (we can add little bit more packets, but nobody cares)
+ if (AtomicGet(NumPackets) >= MAX_PACKETS_IN_QUEUE || AtomicGet(DataSize) >= MAX_DATA_IN_QUEUE) {
+ return false;
+ }
+ AtomicAdd(NumPackets, 1);
+ AtomicAdd(DataSize, packet->DataSize);
+ Y_ASSERT(packet->DataStart == 0);
+
+ Queue.Enqueue(TPacket(std::make_pair(packet, meta)));
+ QueueEvent.Signal();
+ return true;
+ }
+
+ bool Pop(TUdpRecvPacket** packet, sockaddr_in6* srcAddr, sockaddr_in6* dstAddr) {
+ TPacket p;
+ if (!Queue.Dequeue(p)) {
+ QueueEvent.Reset();
+ if (!Queue.Dequeue(p)) {
+ return false;
+ }
+ QueueEvent.Signal();
+ }
+ *packet = p.first;
+ *srcAddr = p.second.RemoteAddr;
+ *dstAddr = p.second.MyAddr;
+
+ AtomicSub(NumPackets, 1);
+ AtomicSub(DataSize, (*packet)->DataSize);
+ Y_ASSERT(AtomicGet(NumPackets) >= 0 && AtomicGet(DataSize) >= 0);
+
+ return true;
+ }
+
+ bool IsEmpty() const {
+ return AtomicAdd(NumPackets, 0) == 0;
+ }
+
+ TSystemEvent& GetEvent() const {
+ return QueueEvent;
+ }
+ };
+}
diff --git a/library/cpp/netliba/socket/protocols.h b/library/cpp/netliba/socket/protocols.h
new file mode 100644
index 0000000000..ec6896ab9b
--- /dev/null
+++ b/library/cpp/netliba/socket/protocols.h
@@ -0,0 +1,48 @@
+#pragma once
+
+namespace NNetlibaSocket {
+ namespace NNetliba_v12 {
+ const ui8 CMD_POS = 11;
+ enum EUdpCmd {
+ CMD_BEGIN = 1,
+
+ DATA = CMD_BEGIN,
+ DATA_SMALL, // no jumbo-packets
+ DO_NOT_USE_1, //just reserved
+ DO_NOT_USE_2, //just reserved
+
+ CANCEL_TRANSFER,
+
+ ACK,
+ ACK_COMPLETE,
+ ACK_CANCELED,
+ ACK_RESEND_NOSHMEM,
+
+ PING,
+ PONG,
+ PONG_IB,
+
+ KILL,
+
+ CMD_END,
+ };
+ }
+
+ namespace NNetliba {
+ const ui8 CMD_POS = 8;
+ enum EUdpCmd {
+ DATA,
+ ACK,
+ ACK_COMPLETE,
+ ACK_RESEND,
+ DATA_SMALL, // no jumbo-packets
+ PING,
+ PONG,
+ DATA_SHMEM,
+ DATA_SMALL_SHMEM,
+ KILL,
+ ACK_RESEND_NOSHMEM,
+ };
+ }
+
+}
diff --git a/library/cpp/netliba/socket/socket.cpp b/library/cpp/netliba/socket/socket.cpp
new file mode 100644
index 0000000000..c10236229b
--- /dev/null
+++ b/library/cpp/netliba/socket/socket.cpp
@@ -0,0 +1,1086 @@
+#include "stdafx.h"
+#include <util/datetime/cputimer.h>
+#include <util/draft/holder_vector.h>
+#include <util/generic/utility.h>
+#include <util/generic/vector.h>
+#include <util/network/init.h>
+#include <util/network/poller.h>
+#include <library/cpp/deprecated/atomic/atomic.h>
+#include <util/system/byteorder.h>
+#include <util/system/defaults.h>
+#include <util/system/error.h>
+#include <util/system/event.h>
+#include <util/system/thread.h>
+#include <util/system/yassert.h>
+#include <util/system/rwlock.h>
+#include <util/system/env.h>
+
+#include "socket.h"
+#include "packet_queue.h"
+#include "udp_recv_packet.h"
+
+#include <array>
+#include <stdlib.h>
+
+///////////////////////////////////////////////////////////////////////////////
+
+#ifndef _win_
+#include <netinet/in.h>
+#endif
+
+#ifdef _linux_
+#include <dlfcn.h> // dlsym
+#endif
+
+template <class T>
+static T GetAddressOf(const char* name) {
+#ifdef _linux_
+ if (!GetEnv("DISABLE_MMSG")) {
+ return (T)dlsym(RTLD_DEFAULT, name);
+ }
+#endif
+ Y_UNUSED(name);
+ return nullptr;
+}
+
+///////////////////////////////////////////////////////////////////////////////
+
+namespace NNetlibaSocket {
+ ///////////////////////////////////////////////////////////////////////////////
+
+ struct timespec; // we use it only as NULL pointer
+ typedef int (*TSendMMsgFunc)(SOCKET, TMMsgHdr*, unsigned int, unsigned int);
+ typedef int (*TRecvMMsgFunc)(SOCKET, TMMsgHdr*, unsigned int, unsigned int, timespec*);
+
+ static const TSendMMsgFunc SendMMsgFunc = GetAddressOf<TSendMMsgFunc>("sendmmsg");
+ static const TRecvMMsgFunc RecvMMsgFunc = GetAddressOf<TRecvMMsgFunc>("recvmmsg");
+
+ ///////////////////////////////////////////////////////////////////////////////
+
+ bool ReadTos(const TMsgHdr& msgHdr, ui8* tos) {
+#ifdef _win_
+ Y_UNUSED(msgHdr);
+ Y_UNUSED(tos);
+ return false;
+#else
+ cmsghdr* cmsg = CMSG_FIRSTHDR(&msgHdr);
+ if (!cmsg)
+ return false;
+ //Y_ASSERT(cmsg->cmsg_level == IPPROTO_IPV6);
+ //Y_ASSERT(cmsg->cmsg_type == IPV6_TCLASS);
+ if (cmsg->cmsg_len != CMSG_LEN(sizeof(int)))
+ return false;
+ *tos = *(ui8*)CMSG_DATA(cmsg);
+ return true;
+#endif
+ }
+
+ bool ExtractDestinationAddress(TMsgHdr& msgHdr, sockaddr_in6* addrBuf) {
+ Zero(*addrBuf);
+#ifdef _win_
+ Y_UNUSED(msgHdr);
+ Y_UNUSED(addrBuf);
+ return false;
+#else
+ cmsghdr* cmsg;
+ for (cmsg = CMSG_FIRSTHDR(&msgHdr); cmsg != nullptr; cmsg = CMSG_NXTHDR(&msgHdr, cmsg)) {
+ if ((cmsg->cmsg_level == IPPROTO_IPV6) && (cmsg->cmsg_type == IPV6_PKTINFO)) {
+ in6_pktinfo* i = (in6_pktinfo*)CMSG_DATA(cmsg);
+ addrBuf->sin6_addr = i->ipi6_addr;
+ addrBuf->sin6_family = AF_INET6;
+ return true;
+ }
+ }
+ return false;
+#endif
+ }
+
+ // all send and recv methods are thread safe!
+ class TAbstractSocket: public ISocket {
+ private:
+ SOCKET S;
+ mutable TSocketPoller Poller;
+ sockaddr_in6 SelfAddress;
+
+ int SendSysSocketSize;
+ int SendSysSocketSizePrev;
+
+ int CreateSocket(int netPort);
+ int DetectSelfAddress();
+
+ protected:
+ int SetSockOpt(int level, int option_name, const void* option_value, socklen_t option_len);
+
+ int OpenImpl(int port);
+ void CloseImpl();
+
+ void WaitImpl(float timeoutSec) const;
+ void CancelWaitImpl(const sockaddr_in6* address = nullptr); // NULL means "self"
+
+ ssize_t RecvMsgImpl(TMsgHdr* hdr, int flags);
+ TUdpRecvPacket* RecvImpl(TUdpHostRecvBufAlloc* buf, sockaddr_in6* srcAddr, sockaddr_in6* dstAddr);
+ int RecvMMsgImpl(TMMsgHdr* msgvec, unsigned int vlen, unsigned int flags, timespec* timeout);
+
+ bool IsFragmentationForbiden();
+ void ForbidFragmentation();
+ void EnableFragmentation();
+
+ //Shared state for setsockopt. Forbid simultaneous transfer while sender asking for specific options (i.e. DONOT_FRAG)
+ TRWMutex Mutex;
+ TAtomic RecvLag = 0;
+
+ public:
+ TAbstractSocket();
+ ~TAbstractSocket() override;
+#ifdef _unix_
+ void Reset(const TAbstractSocket& rhv);
+#endif
+
+ bool IsValid() const override;
+
+ const sockaddr_in6& GetSelfAddress() const override;
+ int GetNetworkOrderPort() const override;
+ int GetPort() const override;
+
+ int GetSockOpt(int level, int option_name, void* option_value, socklen_t* option_len) override;
+
+ // send all packets to this and only this address by default
+ int Connect(const struct sockaddr* address, socklen_t address_len) override;
+
+ void CancelWaitHost(const sockaddr_in6 addr) override;
+
+ bool IsSendMMsgSupported() const override;
+ int SendMMsg(TMMsgHdr* msgvec, unsigned int vlen, unsigned int flags) override;
+ ssize_t SendMsg(const TMsgHdr* hdr, int flags, const EFragFlag frag) override;
+ bool IncreaseSendBuff() override;
+ int GetSendSysSocketSize() override;
+ void SetRecvLagTime(NHPTimer::STime time) override;
+ };
+
+ TAbstractSocket::TAbstractSocket()
+ : S(INVALID_SOCKET)
+ , SendSysSocketSize(0)
+ , SendSysSocketSizePrev(0)
+ {
+ Zero(SelfAddress);
+ }
+
+ TAbstractSocket::~TAbstractSocket() {
+ CloseImpl();
+ }
+
+#ifdef _unix_
+ void TAbstractSocket::Reset(const TAbstractSocket& rhv) {
+ Close();
+ S = dup(rhv.S);
+ SelfAddress = rhv.SelfAddress;
+ }
+#endif
+
+ int TAbstractSocket::CreateSocket(int netPort) {
+ if (IsValid()) {
+ Y_ASSERT(0);
+ return 0;
+ }
+ S = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
+ if (S == INVALID_SOCKET) {
+ return -1;
+ }
+ {
+ int flag = 0;
+ Y_VERIFY(SetSockOpt(IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&flag, sizeof(flag)) == 0, "IPV6_V6ONLY failed");
+ }
+ {
+ int flag = 1;
+ Y_VERIFY(SetSockOpt(SOL_SOCKET, SO_REUSEADDR, (const char*)&flag, sizeof(flag)) == 0, "SO_REUSEADDR failed");
+ }
+#if defined(_win_)
+ unsigned long dummy = 1;
+ ioctlsocket(S, FIONBIO, &dummy);
+#else
+ Y_VERIFY(fcntl(S, F_SETFL, O_NONBLOCK) == 0, "fnctl failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError());
+ Y_VERIFY(fcntl(S, F_SETFD, FD_CLOEXEC) == 0, "fnctl failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError());
+ {
+ int flag = 1;
+#ifndef IPV6_RECVPKTINFO /* Darwin platforms require this */
+ Y_VERIFY(SetSockOpt(IPPROTO_IPV6, IPV6_PKTINFO, (const char*)&flag, sizeof(flag)) == 0, "IPV6_PKTINFO failed");
+#else
+ Y_VERIFY(SetSockOpt(IPPROTO_IPV6, IPV6_RECVPKTINFO, (const char*)&flag, sizeof(flag)) == 0, "IPV6_RECVPKTINFO failed");
+#endif
+ }
+#endif
+
+ Poller.WaitRead(S, nullptr);
+
+ {
+ // bind socket
+ sockaddr_in6 name;
+ Zero(name);
+ name.sin6_family = AF_INET6;
+ name.sin6_addr = in6addr_any;
+ name.sin6_port = netPort;
+ if (bind(S, (sockaddr*)&name, sizeof(name)) != 0) {
+ fprintf(stderr, "netliba_socket could not bind to port %d: %s (errno = %d)\n", InetToHost((ui16)netPort), LastSystemErrorText(), LastSystemError());
+ CloseImpl(); // we call this CloseImpl after Poller initialization
+ return -1;
+ }
+ }
+ //Default behavior is allowing fragmentation (according to netliba v6 behavior)
+ //If we want to sent packet with DF flag we have to use SendMsg()
+ EnableFragmentation();
+
+ {
+ socklen_t sz = sizeof(SendSysSocketSize);
+ if (GetSockOpt(SOL_SOCKET, SO_SNDBUF, &SendSysSocketSize, &sz)) {
+ fprintf(stderr, "Can`t get SO_SNDBUF");
+ }
+ }
+ return 0;
+ }
+
+ bool TAbstractSocket::IsValid() const {
+ return S != INVALID_SOCKET;
+ }
+
+ int TAbstractSocket::DetectSelfAddress() {
+ socklen_t nameLen = sizeof(SelfAddress);
+ if (getsockname(S, (sockaddr*)&SelfAddress, &nameLen) != 0) { // actually we use only sin6_port
+ return -1;
+ }
+ Y_ASSERT(SelfAddress.sin6_family == AF_INET6);
+ SelfAddress.sin6_addr = in6addr_loopback;
+ return 0;
+ }
+
+ const sockaddr_in6& TAbstractSocket::GetSelfAddress() const {
+ return SelfAddress;
+ }
+
+ int TAbstractSocket::GetNetworkOrderPort() const {
+ return SelfAddress.sin6_port;
+ }
+
+ int TAbstractSocket::GetPort() const {
+ return InetToHost((ui16)SelfAddress.sin6_port);
+ }
+
+ int TAbstractSocket::SetSockOpt(int level, int option_name, const void* option_value, socklen_t option_len) {
+ const int rv = setsockopt(S, level, option_name, (const char*)option_value, option_len);
+ Y_VERIFY_DEBUG(rv == 0, "SetSockOpt failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError());
+ return rv;
+ }
+
+ int TAbstractSocket::GetSockOpt(int level, int option_name, void* option_value, socklen_t* option_len) {
+ const int rv = getsockopt(S, level, option_name, (char*)option_value, option_len);
+ Y_VERIFY_DEBUG(rv == 0, "GetSockOpt failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError());
+ return rv;
+ }
+
+ bool TAbstractSocket::IsFragmentationForbiden() {
+#if defined(_win_)
+ DWORD flag = 0;
+ socklen_t sz = sizeof(flag);
+ Y_VERIFY(GetSockOpt(IPPROTO_IP, IP_DONTFRAGMENT, (char*)&flag, &sz) == 0, "");
+ return flag;
+#elif defined(_linux_)
+ int flag = 0;
+ socklen_t sz = sizeof(flag);
+ Y_VERIFY(GetSockOpt(IPPROTO_IPV6, IPV6_MTU_DISCOVER, (char*)&flag, &sz) == 0, "");
+ return flag == IPV6_PMTUDISC_DO;
+#elif !defined(_darwin_)
+ int flag = 0;
+ socklen_t sz = sizeof(flag);
+ Y_VERIFY(GetSockOpt(IPPROTO_IPV6, IPV6_DONTFRAG, (char*)&flag, &sz) == 0, "");
+ return flag;
+#endif
+ return false;
+ }
+
+ void TAbstractSocket::ForbidFragmentation() {
+ // do not fragment ping packets
+#if defined(_win_)
+ DWORD flag = 1;
+ SetSockOpt(IPPROTO_IP, IP_DONTFRAGMENT, (const char*)&flag, sizeof(flag));
+#elif defined(_linux_)
+ int flag = IP_PMTUDISC_DO;
+ SetSockOpt(IPPROTO_IP, IP_MTU_DISCOVER, (const char*)&flag, sizeof(flag));
+
+ flag = IPV6_PMTUDISC_DO;
+ SetSockOpt(IPPROTO_IPV6, IPV6_MTU_DISCOVER, (const char*)&flag, sizeof(flag));
+#elif !defined(_darwin_)
+ int flag = 1;
+ //SetSockOpt(IPPROTO_IP, IP_DONTFRAG, (const char*)&flag, sizeof(flag));
+ SetSockOpt(IPPROTO_IPV6, IPV6_DONTFRAG, (const char*)&flag, sizeof(flag));
+#endif
+ }
+
+ void TAbstractSocket::EnableFragmentation() {
+#if defined(_win_)
+ DWORD flag = 0;
+ SetSockOpt(IPPROTO_IP, IP_DONTFRAGMENT, (const char*)&flag, sizeof(flag));
+#elif defined(_linux_)
+ int flag = IP_PMTUDISC_WANT;
+ SetSockOpt(IPPROTO_IP, IP_MTU_DISCOVER, (const char*)&flag, sizeof(flag));
+
+ flag = IPV6_PMTUDISC_WANT;
+ SetSockOpt(IPPROTO_IPV6, IPV6_MTU_DISCOVER, (const char*)&flag, sizeof(flag));
+#elif !defined(_darwin_)
+ int flag = 0;
+ //SetSockOpt(IPPROTO_IP, IP_DONTFRAG, (const char*)&flag, sizeof(flag));
+ SetSockOpt(IPPROTO_IPV6, IPV6_DONTFRAG, (const char*)&flag, sizeof(flag));
+#endif
+ }
+
+ int TAbstractSocket::Connect(const sockaddr* address, socklen_t address_len) {
+ Y_ASSERT(IsValid());
+ return connect(S, address, address_len);
+ }
+
+ void TAbstractSocket::CancelWaitHost(const sockaddr_in6 addr) {
+ CancelWaitImpl(&addr);
+ }
+
+ bool TAbstractSocket::IsSendMMsgSupported() const {
+ return SendMMsgFunc != nullptr;
+ }
+
+ int TAbstractSocket::SendMMsg(TMMsgHdr* msgvec, unsigned int vlen, unsigned int flags) {
+ Y_ASSERT(IsValid());
+ Y_VERIFY(SendMMsgFunc, "sendmmsg is not supported!");
+ TReadGuard rg(Mutex);
+ static bool checked = 0;
+ Y_VERIFY(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior");
+ return SendMMsgFunc(S, msgvec, vlen, flags);
+ }
+
+ ssize_t TAbstractSocket::SendMsg(const TMsgHdr* hdr, int flags, const EFragFlag frag) {
+ Y_ASSERT(IsValid());
+#ifdef _win32_
+ static bool checked = 0;
+ Y_VERIFY(hdr->msg_iov->iov_len == 1, "Scatter/gather is currenly not supported on Windows");
+ if (hdr->Tos || frag == FF_DONT_FRAG) {
+ TWriteGuard wg(Mutex);
+ if (frag == FF_DONT_FRAG) {
+ ForbidFragmentation();
+ } else {
+ Y_VERIFY(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior");
+ }
+ int originalTos;
+ if (hdr->Tos) {
+ socklen_t sz = sizeof(originalTos);
+ Y_VERIFY(GetSockOpt(IPPROTO_IP, IP_TOS, (char*)&originalTos, &sz) == 0, "");
+ Y_VERIFY(SetSockOpt(IPPROTO_IP, IP_TOS, (char*)&hdr->Tos, sizeof(hdr->Tos)) == 0, "");
+ }
+ const ssize_t rv = sendto(S, hdr->msg_iov->iov_base, hdr->msg_iov->iov_len, flags, (sockaddr*)hdr->msg_name, hdr->msg_namelen);
+ if (hdr->Tos) {
+ Y_VERIFY(SetSockOpt(IPPROTO_IP, IP_TOS, (char*)&originalTos, sizeof(originalTos)) == 0, "");
+ }
+ if (frag == FF_DONT_FRAG) {
+ EnableFragmentation();
+ }
+ return rv;
+ }
+ TReadGuard rg(Mutex);
+ Y_VERIFY(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior");
+ return sendto(S, hdr->msg_iov->iov_base, hdr->msg_iov->iov_len, flags, (sockaddr*)hdr->msg_name, hdr->msg_namelen);
+#else
+ if (frag == FF_DONT_FRAG) {
+ TWriteGuard wg(Mutex);
+ ForbidFragmentation();
+ const ssize_t rv = sendmsg(S, hdr, flags);
+ EnableFragmentation();
+ return rv;
+ }
+
+ TReadGuard rg(Mutex);
+#ifndef _darwin_
+ static bool checked = 0;
+ Y_VERIFY(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior");
+#endif
+ return sendmsg(S, hdr, flags);
+#endif
+ }
+
+ bool TAbstractSocket::IncreaseSendBuff() {
+ int buffSize;
+ socklen_t sz = sizeof(buffSize);
+ if (GetSockOpt(SOL_SOCKET, SO_SNDBUF, &buffSize, &sz)) {
+ return false;
+ }
+ // worst case: 200000 pps * 8k * 0.01sec = 16Mb so 32Mb hard limit is reasonable value
+ if (buffSize < 0 || buffSize > (1 << 25)) {
+ fprintf(stderr, "GetSockOpt returns wrong or too big value for SO_SNDBUF: %d\n", buffSize);
+ return false;
+ }
+ //linux returns the doubled value. man 7 socket:
+ //
+ // SO_SNDBUF
+ // Sets or gets the maximum socket send buffer in bytes. The ker-
+ // nel doubles this value (to allow space for bookkeeping overhead)
+ // when it is set using setsockopt(), and this doubled value is
+ // returned by getsockopt(). The default value is set by the
+ // wmem_default sysctl and the maximum allowed value is set by the
+ // wmem_max sysctl. The minimum (doubled) value for this option is
+ // 2048.
+ //
+#ifndef _linux_
+ buffSize += buffSize;
+#endif
+
+ // false if previous value was less than current value.
+ // It means setsockopt was not successful. (for example: system limits)
+ // we will try to set it again but return false
+ const bool rv = !(buffSize <= SendSysSocketSizePrev);
+ if (SetSockOpt(SOL_SOCKET, SO_SNDBUF, &buffSize, sz) == 0) {
+ SendSysSocketSize = buffSize;
+ SendSysSocketSizePrev = buffSize;
+ return rv;
+ }
+ return false;
+ }
+
+ int TAbstractSocket::GetSendSysSocketSize() {
+ return SendSysSocketSize;
+ }
+
+ void TAbstractSocket::SetRecvLagTime(NHPTimer::STime time) {
+ AtomicSet(RecvLag, time);
+ }
+
+ int TAbstractSocket::OpenImpl(int port) {
+ Y_ASSERT(!IsValid());
+ const int netPort = port ? htons((u_short)port) : 0;
+
+#ifdef _freebsd_
+ // alternative OS
+ if (netPort == 0) {
+ static ui64 pp = GetCycleCount();
+ for (int attempt = 0; attempt < 100; ++attempt) {
+ const int tryPort = htons((pp & 0x3fff) + 0xc000);
+ ++pp;
+ if (CreateSocket(tryPort) != 0) {
+ Y_ASSERT(!IsValid());
+ continue;
+ }
+
+ if (DetectSelfAddress() != 0 || tryPort != SelfAddress.sin6_port) {
+ // FreeBSD suck!
+ CloseImpl();
+ Y_ASSERT(!IsValid());
+ continue;
+ }
+ break;
+ }
+ if (!IsValid()) {
+ return -1;
+ }
+ } else {
+ if (CreateSocket(netPort) != 0) {
+ Y_ASSERT(!IsValid());
+ return -1;
+ }
+ }
+#else
+ // regular OS
+ if (CreateSocket(netPort) != 0) {
+ Y_ASSERT(!IsValid());
+ return -1;
+ }
+#endif
+
+ if (IsValid() && DetectSelfAddress() != 0) {
+ CloseImpl();
+ Y_ASSERT(!IsValid());
+ return -1;
+ }
+
+ Y_ASSERT(IsValid());
+ return 0;
+ }
+
+ void TAbstractSocket::CloseImpl() {
+ if (IsValid()) {
+ Poller.Unwait(S);
+ Y_VERIFY(closesocket(S) == 0, "closesocket failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError());
+ }
+ S = INVALID_SOCKET;
+ }
+
+ void TAbstractSocket::WaitImpl(float timeoutSec) const {
+ Y_VERIFY(IsValid(), "something went wrong");
+ Poller.WaitT(TDuration::Seconds(timeoutSec));
+ }
+
+ void TAbstractSocket::CancelWaitImpl(const sockaddr_in6* address) {
+ Y_ASSERT(IsValid());
+
+ // darwin ignores packets with msg_iovlen == 0, also windows implementation uses sendto of first iovec.
+ TIoVec v = CreateIoVec(nullptr, 0);
+ TMsgHdr hdr = CreateSendMsgHdr((address ? *address : SelfAddress), v, nullptr);
+
+ // send self fake packet
+ TAbstractSocket::SendMsg(&hdr, 0, FF_ALLOW_FRAG);
+ }
+
+ ssize_t TAbstractSocket::RecvMsgImpl(TMsgHdr* hdr, int flags) {
+ Y_ASSERT(IsValid());
+
+#ifdef _win32_
+ Y_VERIFY(hdr->msg_iov->iov_len == 1, "Scatter/gather is currenly not supported on Windows");
+ return recvfrom(S, hdr->msg_iov->iov_base, hdr->msg_iov->iov_len, flags, (sockaddr*)hdr->msg_name, &hdr->msg_namelen);
+#else
+ return recvmsg(S, hdr, flags);
+#endif
+ }
+
+ TUdpRecvPacket* TAbstractSocket::RecvImpl(TUdpHostRecvBufAlloc* buf, sockaddr_in6* srcAddr, sockaddr_in6* dstAddr) {
+ Y_ASSERT(IsValid());
+
+ const TIoVec iov = CreateIoVec(buf->GetDataPtr(), buf->GetBufSize());
+ char controllBuffer[CTRL_BUFFER_SIZE]; //used to get dst address from socket
+ TMsgHdr hdr = CreateRecvMsgHdr(srcAddr, iov, controllBuffer);
+
+ const ssize_t rv = TAbstractSocket::RecvMsgImpl(&hdr, 0);
+ if (rv < 0) {
+ Y_ASSERT(LastSystemError() == EAGAIN || LastSystemError() == EWOULDBLOCK);
+ return nullptr;
+ }
+ if (dstAddr && !ExtractDestinationAddress(hdr, dstAddr)) {
+ //fprintf(stderr, "can`t get destination ip\n");
+ }
+
+ // we extract packet and allocate new buffer only if packet arrived
+ TUdpRecvPacket* result = buf->ExtractPacket();
+ result->DataStart = 0;
+ result->DataSize = (int)rv;
+ return result;
+ }
+
+ // thread-safe
+ int TAbstractSocket::RecvMMsgImpl(TMMsgHdr* msgvec, unsigned int vlen, unsigned int flags, timespec* timeout) {
+ Y_ASSERT(IsValid());
+ Y_VERIFY(RecvMMsgFunc, "recvmmsg is not supported!");
+ return RecvMMsgFunc(S, msgvec, vlen, flags, timeout);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////
+
+ class TSocket: public TAbstractSocket {
+ public:
+ int Open(int port) override;
+ void Close() override;
+
+ void Wait(float timeoutSec, int netlibaVersion) const override;
+ void CancelWait(int netlibaVersion) override;
+
+ bool IsRecvMsgSupported() const override;
+ ssize_t RecvMsg(TMsgHdr* hdr, int flags) override;
+ TUdpRecvPacket* Recv(sockaddr_in6* srcAddr, sockaddr_in6* dstAddr, int netlibaVersion) override;
+
+ private:
+ TUdpHostRecvBufAlloc RecvBuf;
+ };
+
+ int TSocket::Open(int port) {
+ return OpenImpl(port);
+ }
+
+ void TSocket::Close() {
+ CloseImpl();
+ }
+
+ void TSocket::Wait(float timeoutSec, int netlibaVersion) const {
+ Y_UNUSED(netlibaVersion);
+ WaitImpl(timeoutSec);
+ }
+
+ void TSocket::CancelWait(int netlibaVersion) {
+ Y_UNUSED(netlibaVersion);
+ CancelWaitImpl();
+ }
+
+ bool TSocket::IsRecvMsgSupported() const {
+ return true;
+ }
+
+ ssize_t TSocket::RecvMsg(TMsgHdr* hdr, int flags) {
+ return RecvMsgImpl(hdr, flags);
+ }
+
+ TUdpRecvPacket* TSocket::Recv(sockaddr_in6* srcAddr, sockaddr_in6* dstAddr, int netlibaVersion) {
+ Y_UNUSED(netlibaVersion);
+ return RecvImpl(&RecvBuf, srcAddr, dstAddr);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////
+
+ class TTryToRecvMMsgSocket: public TAbstractSocket {
+ private:
+ THolderVector<TUdpHostRecvBufAlloc> RecvPackets;
+ TVector<sockaddr_in6> RecvPacketsSrcAddresses;
+ TVector<TIoVec> RecvPacketsIoVecs;
+ size_t RecvPacketsBegin; // first non returned to user
+ size_t RecvPacketsHeadersEnd; // next after last one with data
+ TVector<TMMsgHdr> RecvPacketsHeaders;
+ TVector<std::array<char, CTRL_BUFFER_SIZE>> RecvPacketsCtrlBuffers;
+
+ int FillRecvBuffers();
+
+ public:
+ static bool IsRecvMMsgSupported();
+
+ // Tests showed best performance on queue size 128 (+7%).
+ // If memory is limited you can use 12 - it gives +4%.
+ // Do not use lower values - for example recvmmsg with 1 element is 3% slower that recvmsg!
+ // (tested with junk/f0b0s/neTBasicSocket_queue_test).
+ TTryToRecvMMsgSocket(const size_t recvQueueSize = 128);
+ ~TTryToRecvMMsgSocket() override;
+
+ int Open(int port) override;
+ void Close() override;
+
+ void Wait(float timeoutSec, int netlibaVersion) const override;
+ void CancelWait(int netlibaVersion) override;
+
+ bool IsRecvMsgSupported() const override {
+ return false;
+ }
+ ssize_t RecvMsg(TMsgHdr* hdr, int flags) override {
+ Y_UNUSED(hdr);
+ Y_UNUSED(flags);
+ Y_VERIFY(false, "Use TBasicSocket for RecvMsg call! TRecvMMsgSocket implementation must use memcpy which is suboptimal and thus forbidden!");
+ }
+ TUdpRecvPacket* Recv(sockaddr_in6* addr, sockaddr_in6* dstAddr, int netlibaVersion) override;
+ };
+
+ TTryToRecvMMsgSocket::TTryToRecvMMsgSocket(const size_t recvQueueSize)
+ : RecvPacketsBegin(0)
+ , RecvPacketsHeadersEnd(0)
+ {
+ // recvmmsg is not supported - will act like TSocket,
+ // we can't just VERIFY - TTryToRecvMMsgSocket is used as base class for TDualStackSocket.
+ if (!IsRecvMMsgSupported()) {
+ RecvPackets.reserve(1);
+ RecvPackets.PushBack(new TUdpHostRecvBufAlloc);
+ return;
+ }
+
+ RecvPackets.reserve(recvQueueSize);
+ for (size_t i = 0; i != recvQueueSize; ++i) {
+ RecvPackets.PushBack(new TUdpHostRecvBufAlloc);
+ }
+
+ RecvPacketsSrcAddresses.resize(recvQueueSize);
+ RecvPacketsIoVecs.resize(recvQueueSize);
+ RecvPacketsHeaders.resize(recvQueueSize);
+ RecvPacketsCtrlBuffers.resize(recvQueueSize);
+
+ for (size_t i = 0; i != recvQueueSize; ++i) {
+ TMMsgHdr& mhdr = RecvPacketsHeaders[i];
+ Zero(mhdr);
+
+ RecvPacketsIoVecs[i] = CreateIoVec(RecvPackets[i]->GetDataPtr(), RecvPackets[i]->GetBufSize());
+ char* buf = RecvPacketsCtrlBuffers[i].data();
+ memset(buf, 0, CTRL_BUFFER_SIZE);
+ mhdr.msg_hdr = CreateRecvMsgHdr(&RecvPacketsSrcAddresses[i], RecvPacketsIoVecs[i], buf);
+ }
+ }
+
+ TTryToRecvMMsgSocket::~TTryToRecvMMsgSocket() {
+ Close();
+ }
+
+ int TTryToRecvMMsgSocket::Open(int port) {
+ return OpenImpl(port);
+ }
+
+ void TTryToRecvMMsgSocket::Close() {
+ CloseImpl();
+ }
+
+ void TTryToRecvMMsgSocket::Wait(float timeoutSec, int netlibaVersion) const {
+ Y_UNUSED(netlibaVersion);
+ Y_ASSERT(RecvPacketsBegin == RecvPacketsHeadersEnd || IsRecvMMsgSupported());
+ if (RecvPacketsBegin == RecvPacketsHeadersEnd) {
+ WaitImpl(timeoutSec);
+ }
+ }
+
+ void TTryToRecvMMsgSocket::CancelWait(int netlibaVersion) {
+ Y_UNUSED(netlibaVersion);
+ CancelWaitImpl();
+ }
+
+ bool TTryToRecvMMsgSocket::IsRecvMMsgSupported() {
+ return RecvMMsgFunc != nullptr;
+ }
+
+ int TTryToRecvMMsgSocket::FillRecvBuffers() {
+ Y_ASSERT(IsRecvMMsgSupported());
+ Y_ASSERT(RecvPacketsBegin <= RecvPacketsHeadersEnd);
+ if (RecvPacketsBegin < RecvPacketsHeadersEnd) {
+ return RecvPacketsHeadersEnd - RecvPacketsBegin;
+ }
+
+ // no packets left from last recvmmsg call
+ for (size_t i = 0; i != RecvPacketsHeadersEnd; ++i) { // reinit only used by last recvmmsg call headers
+ RecvPacketsIoVecs[i] = CreateIoVec(RecvPackets[i]->GetDataPtr(), RecvPackets[i]->GetBufSize());
+ }
+ RecvPacketsBegin = RecvPacketsHeadersEnd = 0;
+
+ const int r = RecvMMsgImpl(&RecvPacketsHeaders[0], (unsigned int)RecvPacketsHeaders.size(), 0, nullptr);
+ if (r >= 0) {
+ RecvPacketsHeadersEnd = r;
+ } else {
+ Y_ASSERT(LastSystemError() == EAGAIN || LastSystemError() == EWOULDBLOCK);
+ }
+ return r;
+ }
+
+ // not thread-safe
+ TUdpRecvPacket* TTryToRecvMMsgSocket::Recv(sockaddr_in6* fromAddress, sockaddr_in6* dstAddr, int) {
+ // act like TSocket
+ if (!IsRecvMMsgSupported()) {
+ return RecvImpl(RecvPackets[0], fromAddress, dstAddr);
+ }
+
+ if (FillRecvBuffers() <= 0) {
+ return nullptr;
+ }
+
+ TUdpRecvPacket* result = RecvPackets[RecvPacketsBegin]->ExtractPacket();
+ TMMsgHdr& mmsgHdr = RecvPacketsHeaders[RecvPacketsBegin];
+ result->DataSize = (ssize_t)mmsgHdr.msg_len;
+ if (dstAddr && !ExtractDestinationAddress(mmsgHdr.msg_hdr, dstAddr)) {
+ // fprintf(stderr, "can`t get destination ip\n");
+ }
+ *fromAddress = RecvPacketsSrcAddresses[RecvPacketsBegin];
+ //we must clean ctrlbuffer to be able to use it later
+#ifndef _win_
+ memset(mmsgHdr.msg_hdr.msg_control, 0, CTRL_BUFFER_SIZE);
+ mmsgHdr.msg_hdr.msg_controllen = CTRL_BUFFER_SIZE;
+#endif
+ RecvPacketsBegin++;
+
+ return result;
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////
+
+ /* TODO: too slow, needs to be optimized
+template<size_t TTNumRecvThreads>
+class TMTRecvSocket: public TAbstractSocket
+{
+private:
+ typedef TLockFreePacketQueue<TTNumRecvThreads> TPacketQueue;
+
+ static void* RecvThreadFunc(void* that)
+ {
+ static_cast<TMTRecvSocket*>(that)->RecvLoop();
+ return NULL;
+ }
+
+ void RecvLoop()
+ {
+ TBestUnixRecvSocket impl;
+ impl.Reset(*this);
+
+ while (AtomicAdd(NumThreadsToDie, 0) == -1) {
+ sockaddr_in6 addr;
+ TUdpRecvPacket* packet = impl.Recv(&addr, NETLIBA_ANY_VERSION);
+ if (!packet) {
+ impl.Wait(0.0001, NETLIBA_ANY_VERSION); // so small tiomeout because we can't guarantee that 1 thread won't get all packets
+ continue;
+ }
+ Queue.Push(packet, addr);
+ }
+
+ if (AtomicDecrement(NumThreadsToDie)) {
+ impl.CancelWait(NETLIBA_ANY_VERSION);
+ } else {
+ AllThreadsAreDead.Signal();
+ }
+ }
+
+ THolderVector<TThread> RecvThreads;
+ TAtomic NumThreadsToDie;
+ TSystemEvent AllThreadsAreDead;
+
+ TPacketQueue Queue;
+
+public:
+ TMTRecvSocket()
+ : NumThreadsToDie(-1) {}
+
+ ~TMTRecvSocket()
+ {
+ Close();
+ }
+
+ int Open(int port)
+ {
+ if (OpenImpl(port) != 0) {
+ Y_ASSERT(!IsValid());
+ return -1;
+ }
+
+ NumThreadsToDie = -1;
+ RecvThreads.reserve(TTNumRecvThreads);
+ for (size_t i = 0; i != TTNumRecvThreads; ++i) {
+ RecvThreads.PushBack(new TThread(TThread::TParams(RecvThreadFunc, this).SetName("nl12_recv_skt")));
+ RecvThreads.back()->Start();
+ RecvThreads.back()->Detach();
+ }
+ return 0;
+ }
+
+ void Close()
+ {
+ if (!IsValid()) {
+ return;
+ }
+
+ AtomicSwap(&NumThreadsToDie, (int)RecvThreads.size());
+ CancelWaitImpl();
+ Y_VERIFY(AllThreadsAreDead.WaitT(TDuration::Seconds(30)), "TMTRecvSocket destruction failed");
+
+ CloseImpl();
+ }
+
+ void Wait(float timeoutSec, int netlibaVersion) const
+ {
+ Y_UNUSED(netlibaVersion);
+ Queue.GetEvent().WaitT(TDuration::Seconds(timeoutSec));
+ }
+ void CancelWait(int netlibaVersion)
+ {
+ Y_UNUSED(netlibaVersion);
+ Queue.GetEvent().Signal();
+ }
+
+ TUdpRecvPacket* Recv(sockaddr_in6 *addr, int netlibaVersion)
+ {
+ Y_UNUSED(netlibaVersion);
+ TUdpRecvPacket* result;
+ if (!Queue.Pop(&result, addr)) {
+ return NULL;
+ }
+ return result;
+ }
+
+ bool IsRecvMsgSupported() const { return false; }
+ ssize_t RecvMsg(TMsgHdr* hdr, int flags) { Y_VERIFY(false, "Use TBasicSocket for RecvMsg call! TMTRecvSocket implementation must use memcpy which is suboptimal and thus forbidden!"); }
+};
+*/
+
+ ///////////////////////////////////////////////////////////////////////////////
+
+ // Send.*, Recv, Wait and CancelWait are thread-safe.
+ class TDualStackSocket: public TTryToRecvMMsgSocket {
+ private:
+ typedef TTryToRecvMMsgSocket TBase;
+ typedef TLockFreePacketQueue<1> TPacketQueue;
+
+ static void* RecvThreadFunc(void* that);
+ void RecvLoop();
+
+ struct TFilteredPacketQueue {
+ enum EPushResult {
+ PR_FULL = 0,
+ PR_OK = 1,
+ PR_FILTERED = 2
+ };
+ const ui8 F1;
+ const ui8 F2;
+ const ui8 CmdPos;
+ TFilteredPacketQueue(ui8 f1, ui8 f2, ui8 cmdPos)
+ : F1(f1)
+ , F2(f2)
+ , CmdPos(cmdPos)
+ {
+ }
+ bool Pop(TUdpRecvPacket** packet, sockaddr_in6* srcAddr, sockaddr_in6* dstAddr) {
+ return Queue.Pop(packet, srcAddr, dstAddr);
+ }
+ ui8 Push(TUdpRecvPacket* packet, const TPacketMeta& meta) {
+ if (Queue.IsDataPartFull()) {
+ const ui8 cmd = packet->Data.get()[CmdPos];
+ if (cmd == F1 || cmd == F2)
+ return PR_FILTERED;
+ }
+ return Queue.Push(packet, meta); //false - PR_FULL, true - PR_OK
+ }
+ TPacketQueue Queue;
+ };
+
+ TFilteredPacketQueue& GetRecvQueue(int netlibaVersion) const;
+ TSystemEvent& GetQueueEvent(const TFilteredPacketQueue& queue) const;
+
+ TThread RecvThread;
+ TAtomic ShouldDie;
+ TSystemEvent DieEvent;
+
+ mutable TFilteredPacketQueue RecvQueue6;
+ mutable TFilteredPacketQueue RecvQueue12;
+
+ public:
+ TDualStackSocket();
+ ~TDualStackSocket() override;
+
+ int Open(int port) override;
+ void Close() override;
+
+ void Wait(float timeoutSec, int netlibaVersion) const override;
+ void CancelWait(int netlibaVersion) override;
+
+ bool IsRecvMsgSupported() const override {
+ return false;
+ }
+ ssize_t RecvMsg(TMsgHdr* hdr, int flags) override {
+ Y_UNUSED(hdr);
+ Y_UNUSED(flags);
+ Y_VERIFY(false, "Use TBasicSocket for RecvMsg call! TDualStackSocket implementation must use memcpy which is suboptimal and thus forbidden!");
+ }
+
+ TUdpRecvPacket* Recv(sockaddr_in6* addr, sockaddr_in6* dstAddr, int netlibaVersion) override;
+ };
+
+ TDualStackSocket::TDualStackSocket()
+ : RecvThread(TThread::TParams(RecvThreadFunc, this).SetName("nl12_dual_stack"))
+ , ShouldDie(0)
+ , RecvQueue6(NNetliba::DATA, NNetliba::DATA_SMALL, NNetliba::CMD_POS)
+ , RecvQueue12(NNetliba_v12::DATA, NNetliba_v12::DATA_SMALL, NNetliba_v12::CMD_POS)
+ {
+ }
+
+ // virtual functions don't work in dtors!
+ TDualStackSocket::~TDualStackSocket() {
+ Close();
+
+ sockaddr_in6 srcAdd;
+ sockaddr_in6 dstAddr;
+ TUdpRecvPacket* ptr = nullptr;
+
+ while (GetRecvQueue(NETLIBA_ANY_VERSION).Pop(&ptr, &srcAdd, &dstAddr)) {
+ delete ptr;
+ }
+ while (GetRecvQueue(NETLIBA_V12_VERSION).Pop(&ptr, &srcAdd, &dstAddr)) {
+ delete ptr;
+ }
+ }
+
+ int TDualStackSocket::Open(int port) {
+ if (TBase::Open(port) != 0) {
+ Y_ASSERT(!IsValid());
+ return -1;
+ }
+
+ AtomicSet(ShouldDie, 0);
+ DieEvent.Reset();
+ RecvThread.Start();
+ RecvThread.Detach();
+ return 0;
+ }
+
+ void TDualStackSocket::Close() {
+ if (!IsValid()) {
+ return;
+ }
+
+ AtomicSwap(&ShouldDie, 1);
+ CancelWaitImpl();
+ Y_VERIFY(DieEvent.WaitT(TDuration::Seconds(30)), "TDualStackSocket::Close failed");
+
+ TBase::Close();
+ }
+
+ TDualStackSocket::TFilteredPacketQueue& TDualStackSocket::GetRecvQueue(int netlibaVersion) const {
+ return netlibaVersion == NETLIBA_V12_VERSION ? RecvQueue12 : RecvQueue6;
+ }
+
+ TSystemEvent& TDualStackSocket::GetQueueEvent(const TFilteredPacketQueue& queue) const {
+ return queue.Queue.GetEvent();
+ }
+
+ void* TDualStackSocket::RecvThreadFunc(void* that) {
+ SetHighestThreadPriority();
+ static_cast<TDualStackSocket*>(that)->RecvLoop();
+ return nullptr;
+ }
+
+ void TDualStackSocket::RecvLoop() {
+ for (;;) {
+ TUdpRecvPacket* p = nullptr;
+ sockaddr_in6 srcAddr;
+ sockaddr_in6 dstAddr;
+ while (AtomicAdd(ShouldDie, 0) == 0 && (p = TBase::Recv(&srcAddr, &dstAddr, NETLIBA_ANY_VERSION))) {
+ Y_ASSERT(p->DataStart == 0);
+ if (p->DataSize < 12) {
+ continue;
+ }
+
+ TFilteredPacketQueue& q = GetRecvQueue(p->Data.get()[8]);
+ const ui8 res = q.Push(p, {srcAddr, dstAddr});
+ if (res == TFilteredPacketQueue::PR_OK) {
+ GetQueueEvent(q).Signal();
+ } else {
+ // simulate OS behavior on buffer overflow - drop packets.
+ const NHPTimer::STime time = AtomicGet(RecvLag);
+ const float sec = NHPTimer::GetSeconds(time);
+ fprintf(stderr, "TDualStackSocket::RecvLoop netliba v%d queue overflow, recv lag: %f sec, dropping packet, res: %u\n",
+ &q == &RecvQueue12 ? 12 : 6, sec, res);
+ delete p;
+ }
+ }
+
+ if (AtomicAdd(ShouldDie, 0)) {
+ DieEvent.Signal();
+ return;
+ }
+
+ TBase::Wait(0.1f, NETLIBA_ANY_VERSION);
+ }
+ }
+
+ void TDualStackSocket::Wait(float timeoutSec, int netlibaVersion) const {
+ TFilteredPacketQueue& q = GetRecvQueue(netlibaVersion);
+ if (q.Queue.IsEmpty()) {
+ GetQueueEvent(q).Reset();
+ if (q.Queue.IsEmpty()) {
+ GetQueueEvent(q).WaitT(TDuration::Seconds(timeoutSec));
+ }
+ }
+ }
+
+ void TDualStackSocket::CancelWait(int netlibaVersion) {
+ GetQueueEvent(GetRecvQueue(netlibaVersion)).Signal();
+ }
+
+ // thread-safe
+ TUdpRecvPacket* TDualStackSocket::Recv(sockaddr_in6* srcAddr, sockaddr_in6* dstAddr, int netlibaVersion) {
+ TUdpRecvPacket* result = nullptr;
+ if (!GetRecvQueue(netlibaVersion).Pop(&result, srcAddr, dstAddr)) {
+ return nullptr;
+ }
+ return result;
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////
+
+ TIntrusivePtr<ISocket> CreateSocket() {
+ return new TSocket();
+ }
+
+ TIntrusivePtr<ISocket> CreateDualStackSocket() {
+ return new TDualStackSocket();
+ }
+
+ TIntrusivePtr<ISocket> CreateBestRecvSocket() {
+ // TSocket is faster than TRecvMMsgFunc in case of unsupported recvmmsg
+ if (!TTryToRecvMMsgSocket::IsRecvMMsgSupported()) {
+ return new TSocket();
+ }
+ return new TTryToRecvMMsgSocket();
+ }
+
+}
diff --git a/library/cpp/netliba/socket/socket.h b/library/cpp/netliba/socket/socket.h
new file mode 100644
index 0000000000..c1da3c145f
--- /dev/null
+++ b/library/cpp/netliba/socket/socket.h
@@ -0,0 +1,126 @@
+#pragma once
+
+#include <util/system/platform.h>
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+#include <util/network/init.h>
+#include <util/system/defaults.h>
+#include <util/system/hp_timer.h>
+#include "udp_recv_packet.h"
+#include "protocols.h"
+
+#include <sys/uio.h>
+
+namespace NNetlibaSocket {
+ typedef iovec TIoVec;
+
+#ifdef _win32_
+ struct TMsgHdr {
+ void* msg_name; /* optional address */
+ int msg_namelen; /* size of address */
+ TIoVec* msg_iov; /* scatter/gather array */
+ int msg_iovlen; /* # elements in msg_iov */
+
+ int Tos; // netlib_socket extension
+ };
+#else
+#include <sys/socket.h>
+ typedef msghdr TMsgHdr;
+#endif
+
+ // equal to glibc 2.14 mmsghdr definition, defined for windows and darwin compatibility
+ struct TMMsgHdr {
+ TMsgHdr msg_hdr;
+ unsigned int msg_len;
+ };
+
+#if defined(_linux_)
+#include <linux/version.h>
+#include <features.h>
+// sendmmsg was added in glibc 2.14 and linux 3.0
+#if __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 14 && LINUX_VERSION_CODE >= KERNEL_VERSION(3, 0, 0)
+#include <sys/socket.h> // sendmmsg
+ static_assert(sizeof(TMMsgHdr) == sizeof(mmsghdr), "expect sizeof(TMMsgHdr) == sizeof(mmsghdr)");
+#endif
+#endif
+
+#ifdef _win32_
+ const size_t TOS_BUFFER_SIZE = sizeof(int);
+ const size_t CTRL_BUFFER_SIZE = 32;
+#else
+#if defined(_darwin_)
+#define Y_DARWIN_ALIGN32(p) ((__darwin_size_t)((__darwin_size_t)(p) + __DARWIN_ALIGNBYTES32) & ~__DARWIN_ALIGNBYTES32)
+#define Y_CMSG_SPACE(l) (Y_DARWIN_ALIGN32(sizeof(struct cmsghdr)) + Y_DARWIN_ALIGN32(l))
+#else
+#define Y_CMSG_SPACE(l) CMSG_SPACE(l)
+#endif
+
+ constexpr size_t TOS_BUFFER_SIZE = Y_CMSG_SPACE(sizeof(int));
+ constexpr size_t CTRL_BUFFER_SIZE = Y_CMSG_SPACE(sizeof(int)) + Y_CMSG_SPACE(sizeof(struct in6_pktinfo));
+#endif
+
+ ///////////////////////////////////////////////////////////////////////////////
+ // Warning: every variable (tosBuffer, data, addr, iov) passed and returned from these functions must exist until actual send!!!
+ void* CreateTos(const ui8 tos, void* tosBuffer);
+ TIoVec CreateIoVec(char* data, const size_t dataSize);
+ TMsgHdr CreateSendMsgHdr(const sockaddr_in6& addr, const TIoVec& iov, void* tosBuffer);
+ TMsgHdr CreateRecvMsgHdr(sockaddr_in6* addrBuf, const TIoVec& iov, void* ctrlBuffer = nullptr);
+ TMsgHdr* AddSockAuxData(TMsgHdr* header, const ui8 tos, const sockaddr_in6& addr, void* buffer, size_t bufferSize);
+ ///////////////////////////////////////////////////////////////////////////////
+ //returns false if TOS wasn't readed and do not touch *tos
+ bool ReadTos(const TMsgHdr& msgHdr, ui8* tos);
+ bool ExtractDestinationAddress(TMsgHdr& msgHdr, sockaddr_in6* addrBuf);
+
+ ///////////////////////////////////////////////////////////////////////////////
+
+ // currently netliba v6 version id is any number which's not equal to NETLIBA_V12_VERSION
+ enum { NETLIBA_ANY_VERSION = -1,
+ NETLIBA_V12_VERSION = 112 };
+
+ enum EFragFlag {
+ FF_ALLOW_FRAG,
+ FF_DONT_FRAG
+ };
+
+ ///////////////////////////////////////////////////////////////////////////////
+
+ class ISocket: public TNonCopyable, public TThrRefBase {
+ public:
+ ~ISocket() override {
+ }
+
+ virtual int Open(int port) = 0;
+ virtual void Close() = 0;
+ virtual bool IsValid() const = 0;
+
+ virtual const sockaddr_in6& GetSelfAddress() const = 0;
+ virtual int GetNetworkOrderPort() const = 0;
+ virtual int GetPort() const = 0;
+
+ virtual int GetSockOpt(int level, int option_name, void* option_value, socklen_t* option_len) = 0;
+
+ // send all packets to this and only this address by default
+ virtual int Connect(const struct sockaddr* address, socklen_t address_len) = 0;
+
+ virtual void Wait(float timeoutSec, int netlibaVersion = NETLIBA_ANY_VERSION) const = 0;
+ virtual void CancelWait(int netlibaVersion = NETLIBA_ANY_VERSION) = 0;
+ virtual void CancelWaitHost(const sockaddr_in6 address) = 0;
+
+ virtual bool IsSendMMsgSupported() const = 0;
+ virtual int SendMMsg(struct TMMsgHdr* msgvec, unsigned int vlen, unsigned int flags) = 0;
+ virtual ssize_t SendMsg(const TMsgHdr* hdr, int flags, const EFragFlag frag) = 0;
+
+ virtual bool IsRecvMsgSupported() const = 0;
+ virtual ssize_t RecvMsg(TMsgHdr* hdr, int flags) = 0;
+ virtual TUdpRecvPacket* Recv(sockaddr_in6* srcAddr, sockaddr_in6* dstAddr, int netlibaVersion = NETLIBA_ANY_VERSION) = 0;
+ virtual bool IncreaseSendBuff() = 0;
+ virtual int GetSendSysSocketSize() = 0;
+ virtual void SetRecvLagTime(NHPTimer::STime time) = 0;
+ };
+
+ TIntrusivePtr<ISocket> CreateSocket(); // not thread safe!
+ TIntrusivePtr<ISocket> CreateDualStackSocket(); // has thread safe send/recv methods
+
+ // this function was added mostly for testing
+ TIntrusivePtr<ISocket> CreateBestRecvSocket();
+}
diff --git a/library/cpp/netliba/socket/stdafx.cpp b/library/cpp/netliba/socket/stdafx.cpp
new file mode 100644
index 0000000000..fd4f341c7b
--- /dev/null
+++ b/library/cpp/netliba/socket/stdafx.cpp
@@ -0,0 +1 @@
+#include "stdafx.h"
diff --git a/library/cpp/netliba/socket/stdafx.h b/library/cpp/netliba/socket/stdafx.h
new file mode 100644
index 0000000000..7d99e5dc10
--- /dev/null
+++ b/library/cpp/netliba/socket/stdafx.h
@@ -0,0 +1,16 @@
+#pragma once
+
+#include <util/system/platform.h>
+#if defined(_darwin_)
+#define __APPLE_USE_RFC_2292
+#endif
+
+#include <util/system/compat.h>
+#include <util/network/init.h>
+#if defined(_unix_)
+#include <netdb.h>
+#include <fcntl.h>
+#elif defined(_win_)
+#include <winsock2.h>
+using socklen_t = int;
+#endif
diff --git a/library/cpp/netliba/socket/udp_recv_packet.h b/library/cpp/netliba/socket/udp_recv_packet.h
new file mode 100644
index 0000000000..a2777fbcbf
--- /dev/null
+++ b/library/cpp/netliba/socket/udp_recv_packet.h
@@ -0,0 +1,79 @@
+#pragma once
+
+#include <util/generic/noncopyable.h>
+#include <util/system/defaults.h>
+
+#include <memory>
+#include "allocator.h"
+
+namespace NNetlibaSocket {
+ enum { UDP_MAX_PACKET_SIZE = 8900 };
+
+ class TUdpHostRecvBufAlloc;
+ struct TUdpRecvPacket: public TWithCustomAllocator {
+ friend class TUdpHostRecvBufAlloc;
+ int DataStart = 0, DataSize = 0;
+ std::shared_ptr<char> Data;
+
+ private:
+ int ArraySize_ = 0;
+ };
+
+ ///////////////////////////////////////////////////////////////////////////////
+
+ class TUdpHostRecvBufAlloc: public TNonCopyable {
+ private:
+ mutable TUdpRecvPacket* RecvPktBuf;
+
+ static TUdpRecvPacket* Alloc() {
+ return new TUdpRecvPacket();
+ }
+
+ public:
+ static TUdpRecvPacket* Create(const int dataSize) {
+ TUdpRecvPacket* result = Alloc();
+ result->Data.reset(TCustomAllocator<char>().allocate(dataSize), [=](char* p) { TCustomAllocator<char>().deallocate(p, dataSize); }, TCustomAllocator<char>());
+ result->ArraySize_ = dataSize;
+ return result;
+ }
+ void SetNewPacket() const {
+ RecvPktBuf = CreateNewPacket();
+ }
+
+ public:
+ static TUdpRecvPacket* CreateNewSmallPacket(int dataSize) {
+ return Create(dataSize);
+ }
+ static TUdpRecvPacket* CreateNewPacket() {
+ return Create(UDP_MAX_PACKET_SIZE);
+ }
+ static TUdpRecvPacket* Clone(const TUdpRecvPacket* pkt) {
+ TUdpRecvPacket* result = Alloc();
+ result->DataStart = pkt->DataStart;
+ result->DataSize = pkt->DataSize;
+ result->Data = pkt->Data;
+ result->ArraySize_ = pkt->ArraySize_;
+ return result;
+ }
+
+ TUdpHostRecvBufAlloc() {
+ SetNewPacket();
+ }
+ ~TUdpHostRecvBufAlloc() {
+ delete RecvPktBuf;
+ }
+
+ TUdpRecvPacket* ExtractPacket() {
+ TUdpRecvPacket* res = RecvPktBuf;
+ SetNewPacket();
+ return res;
+ }
+
+ int GetBufSize() const {
+ return RecvPktBuf->ArraySize_;
+ }
+ char* GetDataPtr() const {
+ return RecvPktBuf->Data.get();
+ }
+ };
+}