diff options
| author | monster <[email protected]> | 2022-07-07 14:41:37 +0300 | 
|---|---|---|
| committer | monster <[email protected]> | 2022-07-07 14:41:37 +0300 | 
| commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
| tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/netliba/socket | |
| parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
fix ya.make
Diffstat (limited to 'library/cpp/netliba/socket')
| -rw-r--r-- | library/cpp/netliba/socket/allocator.h | 14 | ||||
| -rw-r--r-- | library/cpp/netliba/socket/creators.cpp | 141 | ||||
| -rw-r--r-- | library/cpp/netliba/socket/packet_queue.h | 97 | ||||
| -rw-r--r-- | library/cpp/netliba/socket/protocols.h | 48 | ||||
| -rw-r--r-- | library/cpp/netliba/socket/socket.cpp | 1086 | ||||
| -rw-r--r-- | library/cpp/netliba/socket/socket.h | 126 | ||||
| -rw-r--r-- | library/cpp/netliba/socket/stdafx.cpp | 1 | ||||
| -rw-r--r-- | library/cpp/netliba/socket/stdafx.h | 16 | ||||
| -rw-r--r-- | library/cpp/netliba/socket/udp_recv_packet.h | 79 | 
9 files changed, 1608 insertions, 0 deletions
diff --git a/library/cpp/netliba/socket/allocator.h b/library/cpp/netliba/socket/allocator.h new file mode 100644 index 00000000000..f09b0dabcfa --- /dev/null +++ b/library/cpp/netliba/socket/allocator.h @@ -0,0 +1,14 @@ +#pragma once + +#ifdef NETLIBA_WITH_NALF +#include <library/cpp/malloc/nalf/alloc_helpers.h> +using TWithCustomAllocator = TWithNalfIncrementalAlloc; +template <typename T> +using TCustomAllocator = TNalfIncrementalAllocator<T>; +#else +#include <memory> +typedef struct { +} TWithCustomAllocator; +template <typename T> +using TCustomAllocator = std::allocator<T>; +#endif diff --git a/library/cpp/netliba/socket/creators.cpp b/library/cpp/netliba/socket/creators.cpp new file mode 100644 index 00000000000..3821bf55b96 --- /dev/null +++ b/library/cpp/netliba/socket/creators.cpp @@ -0,0 +1,141 @@ +#include "stdafx.h" +#include <string.h> +#include <util/generic/utility.h> +#include <util/network/init.h> +#include <util/system/defaults.h> +#include <util/system/yassert.h> +#include "socket.h" + +namespace NNetlibaSocket { +    void* CreateTos(const ui8 tos, void* buffer) { +#ifdef _win_ +        *(int*)buffer = (int)tos; +#else +        // glibc bug: http://sourceware.org/bugzilla/show_bug.cgi?id=13500 +        memset(buffer, 0, TOS_BUFFER_SIZE); + +        msghdr dummy; +        Zero(dummy); +        dummy.msg_control = buffer; +        dummy.msg_controllen = TOS_BUFFER_SIZE; + +        // TODO: in FreeBSD setting TOS for dual stack sockets does not affect ipv4 frames +        cmsghdr* cmsg = CMSG_FIRSTHDR(&dummy); +        cmsg->cmsg_level = IPPROTO_IPV6; +        cmsg->cmsg_type = IPV6_TCLASS; +        cmsg->cmsg_len = CMSG_LEN(sizeof(int)); +        memcpy(CMSG_DATA(cmsg), &tos, sizeof(tos)); // memcpy shut ups alias restrict warning + +        Y_ASSERT(CMSG_NXTHDR(&dummy, cmsg) == nullptr); +#endif +        return buffer; +    } + +    TMsgHdr* AddSockAuxData(TMsgHdr* header, const ui8 tos, const sockaddr_in6& myAddr, void* buffer, size_t bufferSize) { +#ifdef _win_ +        Y_UNUSED(header); +        Y_UNUSED(tos); +        Y_UNUSED(myAddr); +        Y_UNUSED(buffer); +        Y_UNUSED(bufferSize); +        return nullptr; +#else +        header->msg_control = buffer; +        header->msg_controllen = bufferSize; + +        size_t totalLen = 0; +#ifdef _cygwin_ +        Y_UNUSED(tos); +#else +        // Cygwin does not support IPV6_TCLASS, so we ignore it +        cmsghdr* cmsgTos = CMSG_FIRSTHDR(header); +        if (cmsgTos == nullptr) { +            header->msg_control = nullptr; +            header->msg_controllen = 0; +            return nullptr; +        } +        cmsgTos->cmsg_level = IPPROTO_IPV6; +        cmsgTos->cmsg_type = IPV6_TCLASS; +        cmsgTos->cmsg_len = CMSG_LEN(sizeof(int)); +        totalLen = CMSG_SPACE(sizeof(int)); +        *(ui8*)CMSG_DATA(cmsgTos) = tos; +#endif + +        if (*(ui64*)myAddr.sin6_addr.s6_addr != 0u) { +            in6_pktinfo* pktInfo; +#ifdef _cygwin_ +            cmsghdr* cmsgAddr = CMSG_FIRSTHDR(header); +#else +            cmsghdr* cmsgAddr = CMSG_NXTHDR(header, cmsgTos); +#endif +            if (cmsgAddr == nullptr) { +                // leave only previous record +                header->msg_controllen = totalLen; +                return nullptr; +            } +            cmsgAddr->cmsg_level = IPPROTO_IPV6; +            cmsgAddr->cmsg_type = IPV6_PKTINFO; +            cmsgAddr->cmsg_len = CMSG_LEN(sizeof(*pktInfo)); +            totalLen += CMSG_SPACE(sizeof(*pktInfo)); +            pktInfo = (in6_pktinfo*)CMSG_DATA(cmsgAddr); + +            pktInfo->ipi6_addr = myAddr.sin6_addr; +            pktInfo->ipi6_ifindex = 0; /* 0 = use interface specified in routing table */ +        } +        header->msg_controllen = totalLen; //write right len + +        return header; +#endif +    } + +    TIoVec CreateIoVec(char* data, const size_t dataSize) { +        TIoVec result; +        Zero(result); + +        result.iov_base = data; +        result.iov_len = dataSize; + +        return result; +    } + +    TMsgHdr CreateSendMsgHdr(const sockaddr_in6& addr, const TIoVec& iov, void* tosBuffer) { +        TMsgHdr result; +        Zero(result); + +        result.msg_name = (void*)&addr; +        result.msg_namelen = sizeof(addr); +        result.msg_iov = (TIoVec*)&iov; +        result.msg_iovlen = 1; + +        if (tosBuffer) { +#ifdef _win_ +            result.Tos = *(int*)tosBuffer; +#else +            result.msg_control = tosBuffer; +            result.msg_controllen = TOS_BUFFER_SIZE; +#endif +        } + +        return result; +    } + +    TMsgHdr CreateRecvMsgHdr(sockaddr_in6* addrBuf, const TIoVec& iov, void* controllBuffer) { +        TMsgHdr result; +        Zero(result); + +        Zero(*addrBuf); +        result.msg_name = addrBuf; +        result.msg_namelen = sizeof(*addrBuf); + +        result.msg_iov = (TIoVec*)&iov; +        result.msg_iovlen = 1; +#ifndef _win_ +        if (controllBuffer) { +            memset(controllBuffer, 0, CTRL_BUFFER_SIZE); +            result.msg_control = controllBuffer; +            result.msg_controllen = CTRL_BUFFER_SIZE; +        } +#endif +        return result; +    } +} diff --git a/library/cpp/netliba/socket/packet_queue.h b/library/cpp/netliba/socket/packet_queue.h new file mode 100644 index 00000000000..58a84709c24 --- /dev/null +++ b/library/cpp/netliba/socket/packet_queue.h @@ -0,0 +1,97 @@ +#pragma once + +#include "udp_recv_packet.h" + +#include <library/cpp/threading/chunk_queue/queue.h> + +#include <util/network/init.h> +#include <library/cpp/deprecated/atomic/atomic.h> +#include <util/system/event.h> +#include <util/system/yassert.h> +#include <library/cpp/deprecated/atomic/atomic_ops.h> +#include <utility> + +namespace NNetlibaSocket { +    struct TPacketMeta { +        sockaddr_in6 RemoteAddr; +        sockaddr_in6 MyAddr; +    }; + +    template <size_t TTNumWriterThreads> +    class TLockFreePacketQueue { +    private: +        enum { MAX_PACKETS_IN_QUEUE = INT_MAX, +               CMD_QUEUE_RESERVE = 1 << 20, +               MAX_DATA_IN_QUEUE = 32 << 20 }; + +        typedef std::pair<TUdpRecvPacket*, TPacketMeta> TPacket; +        typedef std::conditional_t<TTNumWriterThreads == 1, NThreading::TOneOneQueue<TPacket>, NThreading::TManyOneQueue<TPacket, TTNumWriterThreads>> TImpl; + +        mutable TImpl Queue; +        mutable TSystemEvent QueueEvent; + +        mutable TAtomic NumPackets; +        TAtomic DataSize; + +    public: +        TLockFreePacketQueue() +            : NumPackets(0) +            , DataSize(0) +        { +        } + +        ~TLockFreePacketQueue() { +            TPacket packet; +            while (Queue.Dequeue(packet)) { +                delete packet.first; +            } +        } + +        bool IsDataPartFull() const { +            return (AtomicGet(NumPackets) >= MAX_PACKETS_IN_QUEUE || AtomicGet(DataSize) >= MAX_DATA_IN_QUEUE - CMD_QUEUE_RESERVE); +        } + +        bool Push(TUdpRecvPacket* packet, const TPacketMeta& meta) { +            // simulate OS behavior on buffer overflow - drop packets. +            // yeah it contains small data race (we can add little bit more packets, but nobody cares) +            if (AtomicGet(NumPackets) >= MAX_PACKETS_IN_QUEUE || AtomicGet(DataSize) >= MAX_DATA_IN_QUEUE) { +                return false; +            } +            AtomicAdd(NumPackets, 1); +            AtomicAdd(DataSize, packet->DataSize); +            Y_ASSERT(packet->DataStart == 0); + +            Queue.Enqueue(TPacket(std::make_pair(packet, meta))); +            QueueEvent.Signal(); +            return true; +        } + +        bool Pop(TUdpRecvPacket** packet, sockaddr_in6* srcAddr, sockaddr_in6* dstAddr) { +            TPacket p; +            if (!Queue.Dequeue(p)) { +                QueueEvent.Reset(); +                if (!Queue.Dequeue(p)) { +                    return false; +                } +                QueueEvent.Signal(); +            } +            *packet = p.first; +            *srcAddr = p.second.RemoteAddr; +            *dstAddr = p.second.MyAddr; + +            AtomicSub(NumPackets, 1); +            AtomicSub(DataSize, (*packet)->DataSize); +            Y_ASSERT(AtomicGet(NumPackets) >= 0 && AtomicGet(DataSize) >= 0); + +            return true; +        } + +        bool IsEmpty() const { +            return AtomicAdd(NumPackets, 0) == 0; +        } + +        TSystemEvent& GetEvent() const { +            return QueueEvent; +        } +    }; +} diff --git a/library/cpp/netliba/socket/protocols.h b/library/cpp/netliba/socket/protocols.h new file mode 100644 index 00000000000..ec6896ab9b8 --- /dev/null +++ b/library/cpp/netliba/socket/protocols.h @@ -0,0 +1,48 @@ +#pragma once + +namespace NNetlibaSocket { +    namespace NNetliba_v12 { +        const ui8 CMD_POS = 11; +        enum EUdpCmd { +            CMD_BEGIN = 1, + +            DATA = CMD_BEGIN, +            DATA_SMALL,   // no jumbo-packets +            DO_NOT_USE_1, //just reserved +            DO_NOT_USE_2, //just reserved + +            CANCEL_TRANSFER, + +            ACK, +            ACK_COMPLETE, +            ACK_CANCELED, +            ACK_RESEND_NOSHMEM, + +            PING, +            PONG, +            PONG_IB, + +            KILL, + +            CMD_END, +        }; +    } + +    namespace NNetliba { +        const ui8 CMD_POS = 8; +        enum EUdpCmd { +            DATA, +            ACK, +            ACK_COMPLETE, +            ACK_RESEND, +            DATA_SMALL, // no jumbo-packets +            PING, +            PONG, +            DATA_SHMEM, +            DATA_SMALL_SHMEM, +            KILL, +            ACK_RESEND_NOSHMEM, +        }; +    } + +} diff --git a/library/cpp/netliba/socket/socket.cpp b/library/cpp/netliba/socket/socket.cpp new file mode 100644 index 00000000000..c10236229b2 --- /dev/null +++ b/library/cpp/netliba/socket/socket.cpp @@ -0,0 +1,1086 @@ +#include "stdafx.h" +#include <util/datetime/cputimer.h> +#include <util/draft/holder_vector.h> +#include <util/generic/utility.h> +#include <util/generic/vector.h> +#include <util/network/init.h> +#include <util/network/poller.h> +#include <library/cpp/deprecated/atomic/atomic.h> +#include <util/system/byteorder.h> +#include <util/system/defaults.h> +#include <util/system/error.h> +#include <util/system/event.h> +#include <util/system/thread.h> +#include <util/system/yassert.h> +#include <util/system/rwlock.h> +#include <util/system/env.h> + +#include "socket.h" +#include "packet_queue.h" +#include "udp_recv_packet.h" + +#include <array> +#include <stdlib.h> + +/////////////////////////////////////////////////////////////////////////////// + +#ifndef _win_ +#include <netinet/in.h> +#endif + +#ifdef _linux_ +#include <dlfcn.h> // dlsym +#endif + +template <class T> +static T GetAddressOf(const char* name) { +#ifdef _linux_ +    if (!GetEnv("DISABLE_MMSG")) { +        return (T)dlsym(RTLD_DEFAULT, name); +    } +#endif +    Y_UNUSED(name); +    return nullptr; +} + +/////////////////////////////////////////////////////////////////////////////// + +namespace NNetlibaSocket { +    /////////////////////////////////////////////////////////////////////////////// + +    struct timespec; // we use it only as NULL pointer +    typedef int (*TSendMMsgFunc)(SOCKET, TMMsgHdr*, unsigned int, unsigned int); +    typedef int (*TRecvMMsgFunc)(SOCKET, TMMsgHdr*, unsigned int, unsigned int, timespec*); + +    static const TSendMMsgFunc SendMMsgFunc = GetAddressOf<TSendMMsgFunc>("sendmmsg"); +    static const TRecvMMsgFunc RecvMMsgFunc = GetAddressOf<TRecvMMsgFunc>("recvmmsg"); + +    /////////////////////////////////////////////////////////////////////////////// + +    bool ReadTos(const TMsgHdr& msgHdr, ui8* tos) { +#ifdef _win_ +        Y_UNUSED(msgHdr); +        Y_UNUSED(tos); +        return false; +#else +        cmsghdr* cmsg = CMSG_FIRSTHDR(&msgHdr); +        if (!cmsg) +            return false; +        //Y_ASSERT(cmsg->cmsg_level == IPPROTO_IPV6); +        //Y_ASSERT(cmsg->cmsg_type == IPV6_TCLASS); +        if (cmsg->cmsg_len != CMSG_LEN(sizeof(int))) +            return false; +        *tos = *(ui8*)CMSG_DATA(cmsg); +        return true; +#endif +    } + +    bool ExtractDestinationAddress(TMsgHdr& msgHdr, sockaddr_in6* addrBuf) { +        Zero(*addrBuf); +#ifdef _win_ +        Y_UNUSED(msgHdr); +        Y_UNUSED(addrBuf); +        return false; +#else +        cmsghdr* cmsg; +        for (cmsg = CMSG_FIRSTHDR(&msgHdr); cmsg != nullptr; cmsg = CMSG_NXTHDR(&msgHdr, cmsg)) { +            if ((cmsg->cmsg_level == IPPROTO_IPV6) && (cmsg->cmsg_type == IPV6_PKTINFO)) { +                in6_pktinfo* i = (in6_pktinfo*)CMSG_DATA(cmsg); +                addrBuf->sin6_addr = i->ipi6_addr; +                addrBuf->sin6_family = AF_INET6; +                return true; +            } +        } +        return false; +#endif +    } + +    // all send and recv methods are thread safe! +    class TAbstractSocket: public ISocket { +    private: +        SOCKET S; +        mutable TSocketPoller Poller; +        sockaddr_in6 SelfAddress; + +        int SendSysSocketSize; +        int SendSysSocketSizePrev; + +        int CreateSocket(int netPort); +        int DetectSelfAddress(); + +    protected: +        int SetSockOpt(int level, int option_name, const void* option_value, socklen_t option_len); + +        int OpenImpl(int port); +        void CloseImpl(); + +        void WaitImpl(float timeoutSec) const; +        void CancelWaitImpl(const sockaddr_in6* address = nullptr); // NULL means "self" + +        ssize_t RecvMsgImpl(TMsgHdr* hdr, int flags); +        TUdpRecvPacket* RecvImpl(TUdpHostRecvBufAlloc* buf, sockaddr_in6* srcAddr, sockaddr_in6* dstAddr); +        int RecvMMsgImpl(TMMsgHdr* msgvec, unsigned int vlen, unsigned int flags, timespec* timeout); + +        bool IsFragmentationForbiden(); +        void ForbidFragmentation(); +        void EnableFragmentation(); + +        //Shared state for setsockopt. Forbid simultaneous transfer while sender asking for specific options (i.e. DONOT_FRAG) +        TRWMutex Mutex; +        TAtomic RecvLag = 0; + +    public: +        TAbstractSocket(); +        ~TAbstractSocket() override; +#ifdef _unix_ +        void Reset(const TAbstractSocket& rhv); +#endif + +        bool IsValid() const override; + +        const sockaddr_in6& GetSelfAddress() const override; +        int GetNetworkOrderPort() const override; +        int GetPort() const override; + +        int GetSockOpt(int level, int option_name, void* option_value, socklen_t* option_len) override; + +        // send all packets to this and only this address by default +        int Connect(const struct sockaddr* address, socklen_t address_len) override; + +        void CancelWaitHost(const sockaddr_in6 addr) override; + +        bool IsSendMMsgSupported() const override; +        int SendMMsg(TMMsgHdr* msgvec, unsigned int vlen, unsigned int flags) override; +        ssize_t SendMsg(const TMsgHdr* hdr, int flags, const EFragFlag frag) override; +        bool IncreaseSendBuff() override; +        int GetSendSysSocketSize() override; +        void SetRecvLagTime(NHPTimer::STime time) override; +    }; + +    TAbstractSocket::TAbstractSocket() +        : S(INVALID_SOCKET) +        , SendSysSocketSize(0) +        , SendSysSocketSizePrev(0) +    { +        Zero(SelfAddress); +    } + +    TAbstractSocket::~TAbstractSocket() { +        CloseImpl(); +    } + +#ifdef _unix_ +    void TAbstractSocket::Reset(const TAbstractSocket& rhv) { +        Close(); +        S = dup(rhv.S); +        SelfAddress = rhv.SelfAddress; +    } +#endif + +    int TAbstractSocket::CreateSocket(int netPort) { +        if (IsValid()) { +            Y_ASSERT(0); +            return 0; +        } +        S = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP); +        if (S == INVALID_SOCKET) { +            return -1; +        } +        { +            int flag = 0; +            Y_VERIFY(SetSockOpt(IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&flag, sizeof(flag)) == 0, "IPV6_V6ONLY failed"); +        } +        { +            int flag = 1; +            Y_VERIFY(SetSockOpt(SOL_SOCKET, SO_REUSEADDR, (const char*)&flag, sizeof(flag)) == 0, "SO_REUSEADDR failed"); +        } +#if defined(_win_) +        unsigned long dummy = 1; +        ioctlsocket(S, FIONBIO, &dummy); +#else +        Y_VERIFY(fcntl(S, F_SETFL, O_NONBLOCK) == 0, "fnctl failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError()); +        Y_VERIFY(fcntl(S, F_SETFD, FD_CLOEXEC) == 0, "fnctl failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError()); +        { +            int flag = 1; +#ifndef IPV6_RECVPKTINFO /* Darwin platforms require this */ +            Y_VERIFY(SetSockOpt(IPPROTO_IPV6, IPV6_PKTINFO, (const char*)&flag, sizeof(flag)) == 0, "IPV6_PKTINFO failed"); +#else +            Y_VERIFY(SetSockOpt(IPPROTO_IPV6, IPV6_RECVPKTINFO, (const char*)&flag, sizeof(flag)) == 0, "IPV6_RECVPKTINFO failed"); +#endif +        } +#endif + +        Poller.WaitRead(S, nullptr); + +        { +            // bind socket +            sockaddr_in6 name; +            Zero(name); +            name.sin6_family = AF_INET6; +            name.sin6_addr = in6addr_any; +            name.sin6_port = netPort; +            if (bind(S, (sockaddr*)&name, sizeof(name)) != 0) { +                fprintf(stderr, "netliba_socket could not bind to port %d: %s (errno = %d)\n", InetToHost((ui16)netPort), LastSystemErrorText(), LastSystemError()); +                CloseImpl(); // we call this CloseImpl after Poller initialization +                return -1; +            } +        } +        //Default behavior is allowing fragmentation (according to netliba v6 behavior) +        //If we want to sent packet with DF flag we have to use SendMsg() +        EnableFragmentation(); + +        { +            socklen_t sz = sizeof(SendSysSocketSize); +            if (GetSockOpt(SOL_SOCKET, SO_SNDBUF, &SendSysSocketSize, &sz)) { +                fprintf(stderr, "Can`t get SO_SNDBUF"); +            } +        } +        return 0; +    } + +    bool TAbstractSocket::IsValid() const { +        return S != INVALID_SOCKET; +    } + +    int TAbstractSocket::DetectSelfAddress() { +        socklen_t nameLen = sizeof(SelfAddress); +        if (getsockname(S, (sockaddr*)&SelfAddress, &nameLen) != 0) { // actually we use only sin6_port +            return -1; +        } +        Y_ASSERT(SelfAddress.sin6_family == AF_INET6); +        SelfAddress.sin6_addr = in6addr_loopback; +        return 0; +    } + +    const sockaddr_in6& TAbstractSocket::GetSelfAddress() const { +        return SelfAddress; +    } + +    int TAbstractSocket::GetNetworkOrderPort() const { +        return SelfAddress.sin6_port; +    } + +    int TAbstractSocket::GetPort() const { +        return InetToHost((ui16)SelfAddress.sin6_port); +    } + +    int TAbstractSocket::SetSockOpt(int level, int option_name, const void* option_value, socklen_t option_len) { +        const int rv = setsockopt(S, level, option_name, (const char*)option_value, option_len); +        Y_VERIFY_DEBUG(rv == 0, "SetSockOpt failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError()); +        return rv; +    } + +    int TAbstractSocket::GetSockOpt(int level, int option_name, void* option_value, socklen_t* option_len) { +        const int rv = getsockopt(S, level, option_name, (char*)option_value, option_len); +        Y_VERIFY_DEBUG(rv == 0, "GetSockOpt failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError()); +        return rv; +    } + +    bool TAbstractSocket::IsFragmentationForbiden() { +#if defined(_win_) +        DWORD flag = 0; +        socklen_t sz = sizeof(flag); +        Y_VERIFY(GetSockOpt(IPPROTO_IP, IP_DONTFRAGMENT, (char*)&flag, &sz) == 0, ""); +        return flag; +#elif defined(_linux_) +        int flag = 0; +        socklen_t sz = sizeof(flag); +        Y_VERIFY(GetSockOpt(IPPROTO_IPV6, IPV6_MTU_DISCOVER, (char*)&flag, &sz) == 0, ""); +        return flag == IPV6_PMTUDISC_DO; +#elif !defined(_darwin_) +        int flag = 0; +        socklen_t sz = sizeof(flag); +        Y_VERIFY(GetSockOpt(IPPROTO_IPV6, IPV6_DONTFRAG, (char*)&flag, &sz) == 0, ""); +        return flag; +#endif +        return false; +    } + +    void TAbstractSocket::ForbidFragmentation() { +    // do not fragment ping packets +#if defined(_win_) +        DWORD flag = 1; +        SetSockOpt(IPPROTO_IP, IP_DONTFRAGMENT, (const char*)&flag, sizeof(flag)); +#elif defined(_linux_) +        int flag = IP_PMTUDISC_DO; +        SetSockOpt(IPPROTO_IP, IP_MTU_DISCOVER, (const char*)&flag, sizeof(flag)); + +        flag = IPV6_PMTUDISC_DO; +        SetSockOpt(IPPROTO_IPV6, IPV6_MTU_DISCOVER, (const char*)&flag, sizeof(flag)); +#elif !defined(_darwin_) +        int flag = 1; +        //SetSockOpt(IPPROTO_IP, IP_DONTFRAG, (const char*)&flag, sizeof(flag)); +        SetSockOpt(IPPROTO_IPV6, IPV6_DONTFRAG, (const char*)&flag, sizeof(flag)); +#endif +    } + +    void TAbstractSocket::EnableFragmentation() { +#if defined(_win_) +        DWORD flag = 0; +        SetSockOpt(IPPROTO_IP, IP_DONTFRAGMENT, (const char*)&flag, sizeof(flag)); +#elif defined(_linux_) +        int flag = IP_PMTUDISC_WANT; +        SetSockOpt(IPPROTO_IP, IP_MTU_DISCOVER, (const char*)&flag, sizeof(flag)); + +        flag = IPV6_PMTUDISC_WANT; +        SetSockOpt(IPPROTO_IPV6, IPV6_MTU_DISCOVER, (const char*)&flag, sizeof(flag)); +#elif !defined(_darwin_) +        int flag = 0; +        //SetSockOpt(IPPROTO_IP, IP_DONTFRAG, (const char*)&flag, sizeof(flag)); +        SetSockOpt(IPPROTO_IPV6, IPV6_DONTFRAG, (const char*)&flag, sizeof(flag)); +#endif +    } + +    int TAbstractSocket::Connect(const sockaddr* address, socklen_t address_len) { +        Y_ASSERT(IsValid()); +        return connect(S, address, address_len); +    } + +    void TAbstractSocket::CancelWaitHost(const sockaddr_in6 addr) { +        CancelWaitImpl(&addr); +    } + +    bool TAbstractSocket::IsSendMMsgSupported() const { +        return SendMMsgFunc != nullptr; +    } + +    int TAbstractSocket::SendMMsg(TMMsgHdr* msgvec, unsigned int vlen, unsigned int flags) { +        Y_ASSERT(IsValid()); +        Y_VERIFY(SendMMsgFunc, "sendmmsg is not supported!"); +        TReadGuard rg(Mutex); +        static bool checked = 0; +        Y_VERIFY(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior"); +        return SendMMsgFunc(S, msgvec, vlen, flags); +    } + +    ssize_t TAbstractSocket::SendMsg(const TMsgHdr* hdr, int flags, const EFragFlag frag) { +        Y_ASSERT(IsValid()); +#ifdef _win32_ +        static bool checked = 0; +        Y_VERIFY(hdr->msg_iov->iov_len == 1, "Scatter/gather is currenly not supported on Windows"); +        if (hdr->Tos || frag == FF_DONT_FRAG) { +            TWriteGuard wg(Mutex); +            if (frag == FF_DONT_FRAG) { +                ForbidFragmentation(); +            } else { +                Y_VERIFY(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior"); +            } +            int originalTos; +            if (hdr->Tos) { +                socklen_t sz = sizeof(originalTos); +                Y_VERIFY(GetSockOpt(IPPROTO_IP, IP_TOS, (char*)&originalTos, &sz) == 0, ""); +                Y_VERIFY(SetSockOpt(IPPROTO_IP, IP_TOS, (char*)&hdr->Tos, sizeof(hdr->Tos)) == 0, ""); +            } +            const ssize_t rv = sendto(S, hdr->msg_iov->iov_base, hdr->msg_iov->iov_len, flags, (sockaddr*)hdr->msg_name, hdr->msg_namelen); +            if (hdr->Tos) { +                Y_VERIFY(SetSockOpt(IPPROTO_IP, IP_TOS, (char*)&originalTos, sizeof(originalTos)) == 0, ""); +            } +            if (frag == FF_DONT_FRAG) { +                EnableFragmentation(); +            } +            return rv; +        } +        TReadGuard rg(Mutex); +        Y_VERIFY(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior"); +        return sendto(S, hdr->msg_iov->iov_base, hdr->msg_iov->iov_len, flags, (sockaddr*)hdr->msg_name, hdr->msg_namelen); +#else +        if (frag == FF_DONT_FRAG) { +            TWriteGuard wg(Mutex); +            ForbidFragmentation(); +            const ssize_t rv = sendmsg(S, hdr, flags); +            EnableFragmentation(); +            return rv; +        } + +        TReadGuard rg(Mutex); +#ifndef _darwin_ +        static bool checked = 0; +        Y_VERIFY(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior"); +#endif +        return sendmsg(S, hdr, flags); +#endif +    } + +    bool TAbstractSocket::IncreaseSendBuff() { +        int buffSize; +        socklen_t sz = sizeof(buffSize); +        if (GetSockOpt(SOL_SOCKET, SO_SNDBUF, &buffSize, &sz)) { +            return false; +        } +        // worst case: 200000 pps * 8k * 0.01sec = 16Mb so 32Mb hard limit is reasonable value +        if (buffSize < 0 || buffSize > (1 << 25)) { +            fprintf(stderr, "GetSockOpt returns wrong or too big value for SO_SNDBUF: %d\n", buffSize); +            return false; +        } +            //linux returns the doubled value. man 7 socket: +            // +            // SO_SNDBUF +            //         Sets or gets the maximum socket send buffer in bytes.  The  ker- +            //         nel doubles this value (to allow space for bookkeeping overhead) +            //         when it is set using setsockopt(), and  this  doubled  value  is +            //         returned  by  getsockopt().   The  default  value  is set by the +            //         wmem_default sysctl and the maximum allowed value is set by  the +            //         wmem_max sysctl.  The minimum (doubled) value for this option is +            //         2048. +            // +#ifndef _linux_ +        buffSize += buffSize; +#endif + +        // false if previous value was less than current value. +        // It means setsockopt was not successful. (for example: system limits) +        // we will try to set it again but return false +        const bool rv = !(buffSize <= SendSysSocketSizePrev); +        if (SetSockOpt(SOL_SOCKET, SO_SNDBUF, &buffSize, sz) == 0) { +            SendSysSocketSize = buffSize; +            SendSysSocketSizePrev = buffSize; +            return rv; +        } +        return false; +    } + +    int TAbstractSocket::GetSendSysSocketSize() { +        return SendSysSocketSize; +    } + +    void TAbstractSocket::SetRecvLagTime(NHPTimer::STime time) { +        AtomicSet(RecvLag, time); +    } + +    int TAbstractSocket::OpenImpl(int port) { +        Y_ASSERT(!IsValid()); +        const int netPort = port ? htons((u_short)port) : 0; + +#ifdef _freebsd_ +        // alternative OS +        if (netPort == 0) { +            static ui64 pp = GetCycleCount(); +            for (int attempt = 0; attempt < 100; ++attempt) { +                const int tryPort = htons((pp & 0x3fff) + 0xc000); +                ++pp; +                if (CreateSocket(tryPort) != 0) { +                    Y_ASSERT(!IsValid()); +                    continue; +                } + +                if (DetectSelfAddress() != 0 || tryPort != SelfAddress.sin6_port) { +                    // FreeBSD suck! +                    CloseImpl(); +                    Y_ASSERT(!IsValid()); +                    continue; +                } +                break; +            } +            if (!IsValid()) { +                return -1; +            } +        } else { +            if (CreateSocket(netPort) != 0) { +                Y_ASSERT(!IsValid()); +                return -1; +            } +        } +#else +        // regular OS +        if (CreateSocket(netPort) != 0) { +            Y_ASSERT(!IsValid()); +            return -1; +        } +#endif + +        if (IsValid() && DetectSelfAddress() != 0) { +            CloseImpl(); +            Y_ASSERT(!IsValid()); +            return -1; +        } + +        Y_ASSERT(IsValid()); +        return 0; +    } + +    void TAbstractSocket::CloseImpl() { +        if (IsValid()) { +            Poller.Unwait(S); +            Y_VERIFY(closesocket(S) == 0, "closesocket failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError()); +        } +        S = INVALID_SOCKET; +    } + +    void TAbstractSocket::WaitImpl(float timeoutSec) const { +        Y_VERIFY(IsValid(), "something went wrong"); +        Poller.WaitT(TDuration::Seconds(timeoutSec)); +    } + +    void TAbstractSocket::CancelWaitImpl(const sockaddr_in6* address) { +        Y_ASSERT(IsValid()); + +        // darwin ignores packets with msg_iovlen == 0, also windows implementation uses sendto of first iovec. +        TIoVec v = CreateIoVec(nullptr, 0); +        TMsgHdr hdr = CreateSendMsgHdr((address ? *address : SelfAddress), v, nullptr); + +        // send self fake packet +        TAbstractSocket::SendMsg(&hdr, 0, FF_ALLOW_FRAG); +    } + +    ssize_t TAbstractSocket::RecvMsgImpl(TMsgHdr* hdr, int flags) { +        Y_ASSERT(IsValid()); + +#ifdef _win32_ +        Y_VERIFY(hdr->msg_iov->iov_len == 1, "Scatter/gather is currenly not supported on Windows"); +        return recvfrom(S, hdr->msg_iov->iov_base, hdr->msg_iov->iov_len, flags, (sockaddr*)hdr->msg_name, &hdr->msg_namelen); +#else +        return recvmsg(S, hdr, flags); +#endif +    } + +    TUdpRecvPacket* TAbstractSocket::RecvImpl(TUdpHostRecvBufAlloc* buf, sockaddr_in6* srcAddr, sockaddr_in6* dstAddr) { +        Y_ASSERT(IsValid()); + +        const TIoVec iov = CreateIoVec(buf->GetDataPtr(), buf->GetBufSize()); +        char controllBuffer[CTRL_BUFFER_SIZE]; //used to get dst address from socket +        TMsgHdr hdr = CreateRecvMsgHdr(srcAddr, iov, controllBuffer); + +        const ssize_t rv = TAbstractSocket::RecvMsgImpl(&hdr, 0); +        if (rv < 0) { +            Y_ASSERT(LastSystemError() == EAGAIN || LastSystemError() == EWOULDBLOCK); +            return nullptr; +        } +        if (dstAddr && !ExtractDestinationAddress(hdr, dstAddr)) { +            //fprintf(stderr, "can`t get destination ip\n"); +        } + +        // we extract packet and allocate new buffer only if packet arrived +        TUdpRecvPacket* result = buf->ExtractPacket(); +        result->DataStart = 0; +        result->DataSize = (int)rv; +        return result; +    } + +    // thread-safe +    int TAbstractSocket::RecvMMsgImpl(TMMsgHdr* msgvec, unsigned int vlen, unsigned int flags, timespec* timeout) { +        Y_ASSERT(IsValid()); +        Y_VERIFY(RecvMMsgFunc, "recvmmsg is not supported!"); +        return RecvMMsgFunc(S, msgvec, vlen, flags, timeout); +    } + +    /////////////////////////////////////////////////////////////////////////////// + +    class TSocket: public TAbstractSocket { +    public: +        int Open(int port) override; +        void Close() override; + +        void Wait(float timeoutSec, int netlibaVersion) const override; +        void CancelWait(int netlibaVersion) override; + +        bool IsRecvMsgSupported() const override; +        ssize_t RecvMsg(TMsgHdr* hdr, int flags) override; +        TUdpRecvPacket* Recv(sockaddr_in6* srcAddr, sockaddr_in6* dstAddr, int netlibaVersion) override; + +    private: +        TUdpHostRecvBufAlloc RecvBuf; +    }; + +    int TSocket::Open(int port) { +        return OpenImpl(port); +    } + +    void TSocket::Close() { +        CloseImpl(); +    } + +    void TSocket::Wait(float timeoutSec, int netlibaVersion) const { +        Y_UNUSED(netlibaVersion); +        WaitImpl(timeoutSec); +    } + +    void TSocket::CancelWait(int netlibaVersion) { +        Y_UNUSED(netlibaVersion); +        CancelWaitImpl(); +    } + +    bool TSocket::IsRecvMsgSupported() const { +        return true; +    } + +    ssize_t TSocket::RecvMsg(TMsgHdr* hdr, int flags) { +        return RecvMsgImpl(hdr, flags); +    } + +    TUdpRecvPacket* TSocket::Recv(sockaddr_in6* srcAddr, sockaddr_in6* dstAddr, int netlibaVersion) { +        Y_UNUSED(netlibaVersion); +        return RecvImpl(&RecvBuf, srcAddr, dstAddr); +    } + +    /////////////////////////////////////////////////////////////////////////////// + +    class TTryToRecvMMsgSocket: public TAbstractSocket { +    private: +        THolderVector<TUdpHostRecvBufAlloc> RecvPackets; +        TVector<sockaddr_in6> RecvPacketsSrcAddresses; +        TVector<TIoVec> RecvPacketsIoVecs; +        size_t RecvPacketsBegin;      // first non returned to user +        size_t RecvPacketsHeadersEnd; // next after last one with data +        TVector<TMMsgHdr> RecvPacketsHeaders; +        TVector<std::array<char, CTRL_BUFFER_SIZE>> RecvPacketsCtrlBuffers; + +        int FillRecvBuffers(); + +    public: +        static bool IsRecvMMsgSupported(); + +        // Tests showed best performance on queue size 128 (+7%). +        // If memory is limited you can use 12 - it gives +4%. +        // Do not use lower values - for example recvmmsg with 1 element is 3% slower that recvmsg! +        // (tested with junk/f0b0s/neTBasicSocket_queue_test). +        TTryToRecvMMsgSocket(const size_t recvQueueSize = 128); +        ~TTryToRecvMMsgSocket() override; + +        int Open(int port) override; +        void Close() override; + +        void Wait(float timeoutSec, int netlibaVersion) const override; +        void CancelWait(int netlibaVersion) override; + +        bool IsRecvMsgSupported() const override { +            return false; +        } +        ssize_t RecvMsg(TMsgHdr* hdr, int flags) override { +            Y_UNUSED(hdr); +            Y_UNUSED(flags); +            Y_VERIFY(false, "Use TBasicSocket for RecvMsg call! TRecvMMsgSocket implementation must use memcpy which is suboptimal and thus forbidden!"); +        } +        TUdpRecvPacket* Recv(sockaddr_in6* addr, sockaddr_in6* dstAddr, int netlibaVersion) override; +    }; + +    TTryToRecvMMsgSocket::TTryToRecvMMsgSocket(const size_t recvQueueSize) +        : RecvPacketsBegin(0) +        , RecvPacketsHeadersEnd(0) +    { +        // recvmmsg is not supported - will act like TSocket, +        // we can't just VERIFY - TTryToRecvMMsgSocket is used as base class for TDualStackSocket. +        if (!IsRecvMMsgSupported()) { +            RecvPackets.reserve(1); +            RecvPackets.PushBack(new TUdpHostRecvBufAlloc); +            return; +        } + +        RecvPackets.reserve(recvQueueSize); +        for (size_t i = 0; i != recvQueueSize; ++i) { +            RecvPackets.PushBack(new TUdpHostRecvBufAlloc); +        } + +        RecvPacketsSrcAddresses.resize(recvQueueSize); +        RecvPacketsIoVecs.resize(recvQueueSize); +        RecvPacketsHeaders.resize(recvQueueSize); +        RecvPacketsCtrlBuffers.resize(recvQueueSize); + +        for (size_t i = 0; i != recvQueueSize; ++i) { +            TMMsgHdr& mhdr = RecvPacketsHeaders[i]; +            Zero(mhdr); + +            RecvPacketsIoVecs[i] = CreateIoVec(RecvPackets[i]->GetDataPtr(), RecvPackets[i]->GetBufSize()); +            char* buf = RecvPacketsCtrlBuffers[i].data(); +            memset(buf, 0, CTRL_BUFFER_SIZE); +            mhdr.msg_hdr = CreateRecvMsgHdr(&RecvPacketsSrcAddresses[i], RecvPacketsIoVecs[i], buf); +        } +    } + +    TTryToRecvMMsgSocket::~TTryToRecvMMsgSocket() { +        Close(); +    } + +    int TTryToRecvMMsgSocket::Open(int port) { +        return OpenImpl(port); +    } + +    void TTryToRecvMMsgSocket::Close() { +        CloseImpl(); +    } + +    void TTryToRecvMMsgSocket::Wait(float timeoutSec, int netlibaVersion) const { +        Y_UNUSED(netlibaVersion); +        Y_ASSERT(RecvPacketsBegin == RecvPacketsHeadersEnd || IsRecvMMsgSupported()); +        if (RecvPacketsBegin == RecvPacketsHeadersEnd) { +            WaitImpl(timeoutSec); +        } +    } + +    void TTryToRecvMMsgSocket::CancelWait(int netlibaVersion) { +        Y_UNUSED(netlibaVersion); +        CancelWaitImpl(); +    } + +    bool TTryToRecvMMsgSocket::IsRecvMMsgSupported() { +        return RecvMMsgFunc != nullptr; +    } + +    int TTryToRecvMMsgSocket::FillRecvBuffers() { +        Y_ASSERT(IsRecvMMsgSupported()); +        Y_ASSERT(RecvPacketsBegin <= RecvPacketsHeadersEnd); +        if (RecvPacketsBegin < RecvPacketsHeadersEnd) { +            return RecvPacketsHeadersEnd - RecvPacketsBegin; +        } + +        // no packets left from last recvmmsg call +        for (size_t i = 0; i != RecvPacketsHeadersEnd; ++i) { // reinit only used by last recvmmsg call headers +            RecvPacketsIoVecs[i] = CreateIoVec(RecvPackets[i]->GetDataPtr(), RecvPackets[i]->GetBufSize()); +        } +        RecvPacketsBegin = RecvPacketsHeadersEnd = 0; + +        const int r = RecvMMsgImpl(&RecvPacketsHeaders[0], (unsigned int)RecvPacketsHeaders.size(), 0, nullptr); +        if (r >= 0) { +            RecvPacketsHeadersEnd = r; +        } else { +            Y_ASSERT(LastSystemError() == EAGAIN || LastSystemError() == EWOULDBLOCK); +        } +        return r; +    } + +    // not thread-safe +    TUdpRecvPacket* TTryToRecvMMsgSocket::Recv(sockaddr_in6* fromAddress, sockaddr_in6* dstAddr, int) { +        // act like TSocket +        if (!IsRecvMMsgSupported()) { +            return RecvImpl(RecvPackets[0], fromAddress, dstAddr); +        } + +        if (FillRecvBuffers() <= 0) { +            return nullptr; +        } + +        TUdpRecvPacket* result = RecvPackets[RecvPacketsBegin]->ExtractPacket(); +        TMMsgHdr& mmsgHdr = RecvPacketsHeaders[RecvPacketsBegin]; +        result->DataSize = (ssize_t)mmsgHdr.msg_len; +        if (dstAddr && !ExtractDestinationAddress(mmsgHdr.msg_hdr, dstAddr)) { +            //    fprintf(stderr, "can`t get destination ip\n"); +        } +        *fromAddress = RecvPacketsSrcAddresses[RecvPacketsBegin]; +        //we must clean ctrlbuffer to be able to use it later +#ifndef _win_ +        memset(mmsgHdr.msg_hdr.msg_control, 0, CTRL_BUFFER_SIZE); +        mmsgHdr.msg_hdr.msg_controllen = CTRL_BUFFER_SIZE; +#endif +        RecvPacketsBegin++; + +        return result; +    } + +    /////////////////////////////////////////////////////////////////////////////// + +    /*  TODO: too slow, needs to be optimized +template<size_t TTNumRecvThreads> +class TMTRecvSocket: public TAbstractSocket +{ +private: +    typedef TLockFreePacketQueue<TTNumRecvThreads> TPacketQueue; + +    static void* RecvThreadFunc(void* that) +    { +        static_cast<TMTRecvSocket*>(that)->RecvLoop(); +        return NULL; +    } + +    void RecvLoop() +    { +        TBestUnixRecvSocket impl; +        impl.Reset(*this); + +        while (AtomicAdd(NumThreadsToDie, 0) == -1) { +            sockaddr_in6 addr; +            TUdpRecvPacket* packet = impl.Recv(&addr, NETLIBA_ANY_VERSION); +            if (!packet) { +                impl.Wait(0.0001, NETLIBA_ANY_VERSION);  // so small tiomeout because we can't guarantee that 1 thread won't get all packets +                continue; +            } +            Queue.Push(packet, addr); +        } + +        if (AtomicDecrement(NumThreadsToDie)) { +            impl.CancelWait(NETLIBA_ANY_VERSION); +        } else { +            AllThreadsAreDead.Signal(); +        } +    } + +    THolderVector<TThread> RecvThreads; +    TAtomic NumThreadsToDie; +    TSystemEvent AllThreadsAreDead; + +    TPacketQueue Queue; + +public: +    TMTRecvSocket() +        : NumThreadsToDie(-1) {} + +    ~TMTRecvSocket() +    { +        Close(); +    } + +    int Open(int port) +    { +        if (OpenImpl(port) != 0) { +            Y_ASSERT(!IsValid()); +            return -1; +        } + +        NumThreadsToDie = -1; +        RecvThreads.reserve(TTNumRecvThreads); +        for (size_t i = 0; i != TTNumRecvThreads; ++i) { +            RecvThreads.PushBack(new TThread(TThread::TParams(RecvThreadFunc, this).SetName("nl12_recv_skt"))); +            RecvThreads.back()->Start(); +            RecvThreads.back()->Detach(); +        } +        return 0; +    } + +    void Close() +    { +        if (!IsValid()) { +            return; +        } + +        AtomicSwap(&NumThreadsToDie, (int)RecvThreads.size()); +        CancelWaitImpl(); +        Y_VERIFY(AllThreadsAreDead.WaitT(TDuration::Seconds(30)), "TMTRecvSocket destruction failed"); + +        CloseImpl(); +    } + +    void Wait(float timeoutSec, int netlibaVersion) const +    { +        Y_UNUSED(netlibaVersion); +        Queue.GetEvent().WaitT(TDuration::Seconds(timeoutSec)); +    } +    void CancelWait(int netlibaVersion) +    { +        Y_UNUSED(netlibaVersion); +        Queue.GetEvent().Signal(); +    } + +    TUdpRecvPacket* Recv(sockaddr_in6 *addr, int netlibaVersion) +    { +        Y_UNUSED(netlibaVersion); +        TUdpRecvPacket* result; +        if (!Queue.Pop(&result, addr)) { +            return NULL; +        } +        return result; +    } + +    bool IsRecvMsgSupported() const { return false; } +    ssize_t RecvMsg(TMsgHdr* hdr, int flags) { Y_VERIFY(false, "Use TBasicSocket for RecvMsg call! TMTRecvSocket implementation must use memcpy which is suboptimal and thus forbidden!"); } +}; +*/ + +    /////////////////////////////////////////////////////////////////////////////// + +    // Send.*, Recv, Wait and CancelWait are thread-safe. +    class TDualStackSocket: public TTryToRecvMMsgSocket { +    private: +        typedef TTryToRecvMMsgSocket TBase; +        typedef TLockFreePacketQueue<1> TPacketQueue; + +        static void* RecvThreadFunc(void* that); +        void RecvLoop(); + +        struct TFilteredPacketQueue { +            enum EPushResult { +                PR_FULL = 0, +                PR_OK = 1, +                PR_FILTERED = 2 +            }; +            const ui8 F1; +            const ui8 F2; +            const ui8 CmdPos; +            TFilteredPacketQueue(ui8 f1, ui8 f2, ui8 cmdPos) +                : F1(f1) +                , F2(f2) +                , CmdPos(cmdPos) +            { +            } +            bool Pop(TUdpRecvPacket** packet, sockaddr_in6* srcAddr, sockaddr_in6* dstAddr) { +                return Queue.Pop(packet, srcAddr, dstAddr); +            } +            ui8 Push(TUdpRecvPacket* packet, const TPacketMeta& meta) { +                if (Queue.IsDataPartFull()) { +                    const ui8 cmd = packet->Data.get()[CmdPos]; +                    if (cmd == F1 || cmd == F2) +                        return PR_FILTERED; +                } +                return Queue.Push(packet, meta); //false - PR_FULL, true - PR_OK +            } +            TPacketQueue Queue; +        }; + +        TFilteredPacketQueue& GetRecvQueue(int netlibaVersion) const; +        TSystemEvent& GetQueueEvent(const TFilteredPacketQueue& queue) const; + +        TThread RecvThread; +        TAtomic ShouldDie; +        TSystemEvent DieEvent; + +        mutable TFilteredPacketQueue RecvQueue6; +        mutable TFilteredPacketQueue RecvQueue12; + +    public: +        TDualStackSocket(); +        ~TDualStackSocket() override; + +        int Open(int port) override; +        void Close() override; + +        void Wait(float timeoutSec, int netlibaVersion) const override; +        void CancelWait(int netlibaVersion) override; + +        bool IsRecvMsgSupported() const override { +            return false; +        } +        ssize_t RecvMsg(TMsgHdr* hdr, int flags) override { +            Y_UNUSED(hdr); +            Y_UNUSED(flags); +            Y_VERIFY(false, "Use TBasicSocket for RecvMsg call! TDualStackSocket implementation must use memcpy which is suboptimal and thus forbidden!"); +        } + +        TUdpRecvPacket* Recv(sockaddr_in6* addr, sockaddr_in6* dstAddr, int netlibaVersion) override; +    }; + +    TDualStackSocket::TDualStackSocket() +        : RecvThread(TThread::TParams(RecvThreadFunc, this).SetName("nl12_dual_stack")) +        , ShouldDie(0) +        , RecvQueue6(NNetliba::DATA, NNetliba::DATA_SMALL, NNetliba::CMD_POS) +        , RecvQueue12(NNetliba_v12::DATA, NNetliba_v12::DATA_SMALL, NNetliba_v12::CMD_POS) +    { +    } + +    // virtual functions don't work in dtors! +    TDualStackSocket::~TDualStackSocket() { +        Close(); + +        sockaddr_in6 srcAdd; +        sockaddr_in6 dstAddr; +        TUdpRecvPacket* ptr = nullptr; + +        while (GetRecvQueue(NETLIBA_ANY_VERSION).Pop(&ptr, &srcAdd, &dstAddr)) { +            delete ptr; +        } +        while (GetRecvQueue(NETLIBA_V12_VERSION).Pop(&ptr, &srcAdd, &dstAddr)) { +            delete ptr; +        } +    } + +    int TDualStackSocket::Open(int port) { +        if (TBase::Open(port) != 0) { +            Y_ASSERT(!IsValid()); +            return -1; +        } + +        AtomicSet(ShouldDie, 0); +        DieEvent.Reset(); +        RecvThread.Start(); +        RecvThread.Detach(); +        return 0; +    } + +    void TDualStackSocket::Close() { +        if (!IsValid()) { +            return; +        } + +        AtomicSwap(&ShouldDie, 1); +        CancelWaitImpl(); +        Y_VERIFY(DieEvent.WaitT(TDuration::Seconds(30)), "TDualStackSocket::Close failed"); + +        TBase::Close(); +    } + +    TDualStackSocket::TFilteredPacketQueue& TDualStackSocket::GetRecvQueue(int netlibaVersion) const { +        return netlibaVersion == NETLIBA_V12_VERSION ? RecvQueue12 : RecvQueue6; +    } + +    TSystemEvent& TDualStackSocket::GetQueueEvent(const TFilteredPacketQueue& queue) const { +        return queue.Queue.GetEvent(); +    } + +    void* TDualStackSocket::RecvThreadFunc(void* that) { +        SetHighestThreadPriority(); +        static_cast<TDualStackSocket*>(that)->RecvLoop(); +        return nullptr; +    } + +    void TDualStackSocket::RecvLoop() { +        for (;;) { +            TUdpRecvPacket* p = nullptr; +            sockaddr_in6 srcAddr; +            sockaddr_in6 dstAddr; +            while (AtomicAdd(ShouldDie, 0) == 0 && (p = TBase::Recv(&srcAddr, &dstAddr, NETLIBA_ANY_VERSION))) { +                Y_ASSERT(p->DataStart == 0); +                if (p->DataSize < 12) { +                    continue; +                } + +                TFilteredPacketQueue& q = GetRecvQueue(p->Data.get()[8]); +                const ui8 res = q.Push(p, {srcAddr, dstAddr}); +                if (res == TFilteredPacketQueue::PR_OK) { +                    GetQueueEvent(q).Signal(); +                } else { +                    // simulate OS behavior on buffer overflow - drop packets. +                    const NHPTimer::STime time = AtomicGet(RecvLag); +                    const float sec = NHPTimer::GetSeconds(time); +                    fprintf(stderr, "TDualStackSocket::RecvLoop netliba v%d queue overflow, recv lag: %f sec, dropping packet, res: %u\n", +                            &q == &RecvQueue12 ? 12 : 6, sec, res); +                    delete p; +                } +            } + +            if (AtomicAdd(ShouldDie, 0)) { +                DieEvent.Signal(); +                return; +            } + +            TBase::Wait(0.1f, NETLIBA_ANY_VERSION); +        } +    } + +    void TDualStackSocket::Wait(float timeoutSec, int netlibaVersion) const { +        TFilteredPacketQueue& q = GetRecvQueue(netlibaVersion); +        if (q.Queue.IsEmpty()) { +            GetQueueEvent(q).Reset(); +            if (q.Queue.IsEmpty()) { +                GetQueueEvent(q).WaitT(TDuration::Seconds(timeoutSec)); +            } +        } +    } + +    void TDualStackSocket::CancelWait(int netlibaVersion) { +        GetQueueEvent(GetRecvQueue(netlibaVersion)).Signal(); +    } + +    // thread-safe +    TUdpRecvPacket* TDualStackSocket::Recv(sockaddr_in6* srcAddr, sockaddr_in6* dstAddr, int netlibaVersion) { +        TUdpRecvPacket* result = nullptr; +        if (!GetRecvQueue(netlibaVersion).Pop(&result, srcAddr, dstAddr)) { +            return nullptr; +        } +        return result; +    } + +    /////////////////////////////////////////////////////////////////////////////// + +    TIntrusivePtr<ISocket> CreateSocket() { +        return new TSocket(); +    } + +    TIntrusivePtr<ISocket> CreateDualStackSocket() { +        return new TDualStackSocket(); +    } + +    TIntrusivePtr<ISocket> CreateBestRecvSocket() { +        // TSocket is faster than TRecvMMsgFunc in case of unsupported recvmmsg +        if (!TTryToRecvMMsgSocket::IsRecvMMsgSupported()) { +            return new TSocket(); +        } +        return new TTryToRecvMMsgSocket(); +    } + +} diff --git a/library/cpp/netliba/socket/socket.h b/library/cpp/netliba/socket/socket.h new file mode 100644 index 00000000000..c1da3c145fc --- /dev/null +++ b/library/cpp/netliba/socket/socket.h @@ -0,0 +1,126 @@ +#pragma once + +#include <util/system/platform.h> +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/network/init.h> +#include <util/system/defaults.h> +#include <util/system/hp_timer.h> +#include "udp_recv_packet.h" +#include "protocols.h" + +#include <sys/uio.h> + +namespace NNetlibaSocket { +    typedef iovec TIoVec; + +#ifdef _win32_ +   struct TMsgHdr { +        void* msg_name;  /* optional address */ +        int msg_namelen; /* size of address */ +        TIoVec* msg_iov; /* scatter/gather array */ +        int msg_iovlen;  /* # elements in msg_iov */ + +        int Tos; // netlib_socket extension +    }; +#else +#include <sys/socket.h> +    typedef msghdr TMsgHdr; +#endif + +    // equal to glibc 2.14 mmsghdr definition, defined for windows and darwin compatibility +    struct TMMsgHdr { +        TMsgHdr msg_hdr; +        unsigned int msg_len; +    }; + +#if defined(_linux_) +#include <linux/version.h> +#include <features.h> +// sendmmsg was added in glibc 2.14 and linux 3.0 +#if __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 14 && LINUX_VERSION_CODE >= KERNEL_VERSION(3, 0, 0) +#include <sys/socket.h> // sendmmsg +    static_assert(sizeof(TMMsgHdr) == sizeof(mmsghdr), "expect sizeof(TMMsgHdr) == sizeof(mmsghdr)"); +#endif +#endif + +#ifdef _win32_ +    const size_t TOS_BUFFER_SIZE = sizeof(int); +    const size_t CTRL_BUFFER_SIZE = 32; +#else +#if defined(_darwin_) +#define Y_DARWIN_ALIGN32(p) ((__darwin_size_t)((__darwin_size_t)(p) + __DARWIN_ALIGNBYTES32) & ~__DARWIN_ALIGNBYTES32) +#define Y_CMSG_SPACE(l) (Y_DARWIN_ALIGN32(sizeof(struct cmsghdr)) + Y_DARWIN_ALIGN32(l)) +#else +#define Y_CMSG_SPACE(l) CMSG_SPACE(l) +#endif + +    constexpr size_t TOS_BUFFER_SIZE = Y_CMSG_SPACE(sizeof(int)); +    constexpr size_t CTRL_BUFFER_SIZE = Y_CMSG_SPACE(sizeof(int)) + Y_CMSG_SPACE(sizeof(struct in6_pktinfo)); +#endif + +    /////////////////////////////////////////////////////////////////////////////// +    // Warning: every variable (tosBuffer, data, addr, iov) passed and returned from these functions must exist until actual send!!! +    void* CreateTos(const ui8 tos, void* tosBuffer); +    TIoVec CreateIoVec(char* data, const size_t dataSize); +    TMsgHdr CreateSendMsgHdr(const sockaddr_in6& addr, const TIoVec& iov, void* tosBuffer); +    TMsgHdr CreateRecvMsgHdr(sockaddr_in6* addrBuf, const TIoVec& iov, void* ctrlBuffer = nullptr); +    TMsgHdr* AddSockAuxData(TMsgHdr* header, const ui8 tos, const sockaddr_in6& addr, void* buffer, size_t bufferSize); +    /////////////////////////////////////////////////////////////////////////////// +    //returns false if TOS wasn't readed and do not touch *tos +    bool ReadTos(const TMsgHdr& msgHdr, ui8* tos); +    bool ExtractDestinationAddress(TMsgHdr& msgHdr, sockaddr_in6* addrBuf); + +    /////////////////////////////////////////////////////////////////////////////// + +    // currently netliba v6 version id is any number which's not equal to NETLIBA_V12_VERSION +    enum { NETLIBA_ANY_VERSION = -1, +           NETLIBA_V12_VERSION = 112 }; + +    enum EFragFlag { +        FF_ALLOW_FRAG, +        FF_DONT_FRAG +    }; + +    /////////////////////////////////////////////////////////////////////////////// + +    class ISocket: public TNonCopyable, public TThrRefBase { +    public: +        ~ISocket() override { +        } + +        virtual int Open(int port) = 0; +        virtual void Close() = 0; +        virtual bool IsValid() const = 0; + +        virtual const sockaddr_in6& GetSelfAddress() const = 0; +        virtual int GetNetworkOrderPort() const = 0; +        virtual int GetPort() const = 0; + +        virtual int GetSockOpt(int level, int option_name, void* option_value, socklen_t* option_len) = 0; + +        // send all packets to this and only this address by default +        virtual int Connect(const struct sockaddr* address, socklen_t address_len) = 0; + +        virtual void Wait(float timeoutSec, int netlibaVersion = NETLIBA_ANY_VERSION) const = 0; +        virtual void CancelWait(int netlibaVersion = NETLIBA_ANY_VERSION) = 0; +        virtual void CancelWaitHost(const sockaddr_in6 address) = 0; + +        virtual bool IsSendMMsgSupported() const = 0; +        virtual int SendMMsg(struct TMMsgHdr* msgvec, unsigned int vlen, unsigned int flags) = 0; +        virtual ssize_t SendMsg(const TMsgHdr* hdr, int flags, const EFragFlag frag) = 0; + +        virtual bool IsRecvMsgSupported() const = 0; +        virtual ssize_t RecvMsg(TMsgHdr* hdr, int flags) = 0; +        virtual TUdpRecvPacket* Recv(sockaddr_in6* srcAddr, sockaddr_in6* dstAddr, int netlibaVersion = NETLIBA_ANY_VERSION) = 0; +        virtual bool IncreaseSendBuff() = 0; +        virtual int GetSendSysSocketSize() = 0; +        virtual void SetRecvLagTime(NHPTimer::STime time) = 0; +    }; + +    TIntrusivePtr<ISocket> CreateSocket();          // not thread safe! +    TIntrusivePtr<ISocket> CreateDualStackSocket(); // has thread safe send/recv methods + +    // this function was added mostly for testing +    TIntrusivePtr<ISocket> CreateBestRecvSocket(); +} diff --git a/library/cpp/netliba/socket/stdafx.cpp b/library/cpp/netliba/socket/stdafx.cpp new file mode 100644 index 00000000000..fd4f341c7b2 --- /dev/null +++ b/library/cpp/netliba/socket/stdafx.cpp @@ -0,0 +1 @@ +#include "stdafx.h" diff --git a/library/cpp/netliba/socket/stdafx.h b/library/cpp/netliba/socket/stdafx.h new file mode 100644 index 00000000000..7d99e5dc106 --- /dev/null +++ b/library/cpp/netliba/socket/stdafx.h @@ -0,0 +1,16 @@ +#pragma once + +#include <util/system/platform.h> +#if defined(_darwin_) +#define __APPLE_USE_RFC_2292 +#endif + +#include <util/system/compat.h> +#include <util/network/init.h> +#if defined(_unix_) +#include <netdb.h> +#include <fcntl.h> +#elif defined(_win_) +#include <winsock2.h> +using socklen_t = int; +#endif diff --git a/library/cpp/netliba/socket/udp_recv_packet.h b/library/cpp/netliba/socket/udp_recv_packet.h new file mode 100644 index 00000000000..a2777fbcbfd --- /dev/null +++ b/library/cpp/netliba/socket/udp_recv_packet.h @@ -0,0 +1,79 @@ +#pragma once + +#include <util/generic/noncopyable.h> +#include <util/system/defaults.h> + +#include <memory> +#include "allocator.h" + +namespace NNetlibaSocket { +    enum { UDP_MAX_PACKET_SIZE = 8900 }; + +    class TUdpHostRecvBufAlloc; +    struct TUdpRecvPacket: public TWithCustomAllocator { +        friend class TUdpHostRecvBufAlloc; +        int DataStart = 0, DataSize = 0; +        std::shared_ptr<char> Data; + +    private: +        int ArraySize_ = 0; +    }; + +    /////////////////////////////////////////////////////////////////////////////// + +    class TUdpHostRecvBufAlloc: public TNonCopyable { +    private: +        mutable TUdpRecvPacket* RecvPktBuf; + +        static TUdpRecvPacket* Alloc() { +            return new TUdpRecvPacket(); +        } + +    public: +        static TUdpRecvPacket* Create(const int dataSize) { +            TUdpRecvPacket* result = Alloc(); +            result->Data.reset(TCustomAllocator<char>().allocate(dataSize), [=](char* p) { TCustomAllocator<char>().deallocate(p, dataSize); }, TCustomAllocator<char>()); +            result->ArraySize_ = dataSize; +            return result; +        } +        void SetNewPacket() const { +            RecvPktBuf = CreateNewPacket(); +        } + +    public: +        static TUdpRecvPacket* CreateNewSmallPacket(int dataSize) { +            return Create(dataSize); +        } +        static TUdpRecvPacket* CreateNewPacket() { +            return Create(UDP_MAX_PACKET_SIZE); +        } +        static TUdpRecvPacket* Clone(const TUdpRecvPacket* pkt) { +            TUdpRecvPacket* result = Alloc(); +            result->DataStart = pkt->DataStart; +            result->DataSize = pkt->DataSize; +            result->Data = pkt->Data; +            result->ArraySize_ = pkt->ArraySize_; +            return result; +        } + +        TUdpHostRecvBufAlloc() { +            SetNewPacket(); +        } +        ~TUdpHostRecvBufAlloc() { +            delete RecvPktBuf; +        } + +        TUdpRecvPacket* ExtractPacket() { +            TUdpRecvPacket* res = RecvPktBuf; +            SetNewPacket(); +            return res; +        } + +        int GetBufSize() const { +            return RecvPktBuf->ArraySize_; +        } +        char* GetDataPtr() const { +            return RecvPktBuf->Data.get(); +        } +    }; +}  | 
