#include "ip.h"
#include "socket.h"
#include "address.h"
#include "pollerimpl.h"
#include "iovec.h"

#include <util/system/defaults.h>
#include <util/system/byteorder.h>

#if defined(_unix_)
    #include <netdb.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <sys/un.h>
    #include <sys/ioctl.h>
    #include <netinet/in.h>
    #include <netinet/tcp.h>
    #include <arpa/inet.h>
#endif

#if defined(_freebsd_)
    #include <sys/module.h>
    #define ACCEPT_FILTER_MOD
    #include <sys/socketvar.h>
#endif

#if defined(_win_)
    #include <cerrno>
    #include <winsock2.h>
    #include <ws2tcpip.h>
    #include <wspiapi.h>

    #include <util/system/compat.h>
#endif

#include <util/generic/ylimits.h>

#include <util/string/cast.h>
#include <util/stream/mem.h>
#include <util/system/datetime.h>
#include <util/system/error.h>
#include <util/memory/tempbuf.h>
#include <util/generic/singleton.h>
#include <util/generic/hash_set.h>

#include <stddef.h>
#include <sys/uio.h>

using namespace NAddr;

#if defined(_win_)

int inet_aton(const char* cp, struct in_addr* inp) {
    sockaddr_in addr;
    addr.sin_family = AF_INET;
    int psz = sizeof(addr);
    if (0 == WSAStringToAddress((char*)cp, AF_INET, nullptr, (LPSOCKADDR)&addr, &psz)) {
        memcpy(inp, &addr.sin_addr, sizeof(in_addr));
        return 1;
    }
    return 0;
}

    #if (_WIN32_WINNT < 0x0600)
const char* inet_ntop(int af, const void* src, char* dst, socklen_t size) {
    if (af != AF_INET) {
        errno = EINVAL;
        return 0;
    }
    const ui8* ia = (ui8*)src;
    if (snprintf(dst, size, "%u.%u.%u.%u", ia[0], ia[1], ia[2], ia[3]) >= (int)size) {
        errno = ENOSPC;
        return 0;
    }
    return dst;
}

struct evpair {
    int event;
    int winevent;
};

static const evpair evpairs_to_win[] = {
    {POLLIN, FD_READ | FD_CLOSE | FD_ACCEPT},
    {POLLRDNORM, FD_READ | FD_CLOSE | FD_ACCEPT},
    {POLLRDBAND, -1},
    {POLLPRI, -1},
    {POLLOUT, FD_WRITE | FD_CLOSE},
    {POLLWRNORM, FD_WRITE | FD_CLOSE},
    {POLLWRBAND, -1},
    {POLLERR, 0},
    {POLLHUP, 0},
    {POLLNVAL, 0}};

static const size_t nevpairs_to_win = sizeof(evpairs_to_win) / sizeof(evpairs_to_win[0]);

static const evpair evpairs_to_unix[] = {
    {FD_ACCEPT, POLLIN | POLLRDNORM},
    {FD_READ, POLLIN | POLLRDNORM},
    {FD_WRITE, POLLOUT | POLLWRNORM},
    {FD_CLOSE, POLLHUP},
};

static const size_t nevpairs_to_unix = sizeof(evpairs_to_unix) / sizeof(evpairs_to_unix[0]);

static int convert_events(int events, const evpair* evpairs, size_t nevpairs, bool ignoreUnknown) noexcept {
    int result = 0;
    for (size_t i = 0; i < nevpairs; ++i) {
        int event = evpairs[i].event;
        if (events & event) {
            events ^= event;
            long winEvent = evpairs[i].winevent;
            if (winEvent == -1)
                return -1;
            if (winEvent == 0)
                continue;
            result |= winEvent;
        }
    }
    if (events != 0 && !ignoreUnknown)
        return -1;
    return result;
}

class TWSAEventHolder {
private:
    HANDLE Event;

public:
    inline TWSAEventHolder(HANDLE event) noexcept
        : Event(event)
    {
    }

    inline ~TWSAEventHolder() {
        WSACloseEvent(Event);
    }

    inline HANDLE Get() noexcept {
        return Event;
    }
};

int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept {
    HANDLE rawEvent = WSACreateEvent();
    if (rawEvent == WSA_INVALID_EVENT) {
        errno = EIO;
        return -1;
    }

    TWSAEventHolder event(rawEvent);

    int checked_sockets = 0;

    for (pollfd* fd = fds; fd < fds + nfds; ++fd) {
        int win_events = convert_events(fd->events, evpairs_to_win, nevpairs_to_win, false);
        if (win_events == -1) {
            errno = EINVAL;
            return -1;
        }
        fd->revents = 0;
        if (WSAEventSelect(fd->fd, event.Get(), win_events)) {
            int error = WSAGetLastError();
            if (error == WSAEINVAL || error == WSAENOTSOCK) {
                fd->revents = POLLNVAL;
                ++checked_sockets;
            } else {
                errno = EIO;
                return -1;
            }
        }
        fd_set readfds;
        fd_set writefds;
        struct timeval timeout = {0, 0};
        FD_ZERO(&readfds);
        FD_ZERO(&writefds);
        if (fd->events & POLLIN) {
            FD_SET(fd->fd, &readfds);
        }
        if (fd->events & POLLOUT) {
            FD_SET(fd->fd, &writefds);
        }
        int error = select(0, &readfds, &writefds, nullptr, &timeout);
        if (error > 0) {
            if (FD_ISSET(fd->fd, &readfds)) {
                fd->revents |= POLLIN;
            }
            if (FD_ISSET(fd->fd, &writefds)) {
                fd->revents |= POLLOUT;
            }
            ++checked_sockets;
        }
    }

    if (checked_sockets > 0) {
        // returns without wait since we already have sockets in desired conditions
        return checked_sockets;
    }

    HANDLE events[] = {event.Get()};
    DWORD wait_result = WSAWaitForMultipleEvents(1, events, TRUE, timeout, FALSE);
    if (wait_result == WSA_WAIT_TIMEOUT)
        return 0;
    else if (wait_result == WSA_WAIT_EVENT_0) {
        for (pollfd* fd = fds; fd < fds + nfds; ++fd) {
            if (fd->revents == POLLNVAL)
                continue;
            WSANETWORKEVENTS network_events;
            if (WSAEnumNetworkEvents(fd->fd, event.Get(), &network_events)) {
                errno = EIO;
                return -1;
            }
            fd->revents = 0;
            for (int i = 0; i < FD_MAX_EVENTS; ++i) {
                if ((network_events.lNetworkEvents & (1 << i)) != 0 && network_events.iErrorCode[i]) {
                    fd->revents = POLLERR;
                    break;
                }
            }
            if (fd->revents == POLLERR)
                continue;
            if (network_events.lNetworkEvents) {
                fd->revents = static_cast<short>(convert_events(network_events.lNetworkEvents, evpairs_to_unix, nevpairs_to_unix, true));
                if (fd->revents & POLLHUP) {
                    fd->revents &= POLLHUP | POLLIN | POLLRDNORM;
                }
            }
        }
        int chanded_sockets = 0;
        for (pollfd* fd = fds; fd < fds + nfds; ++fd)
            if (fd->revents != 0)
                ++chanded_sockets;
        return chanded_sockets;
    } else {
        errno = EIO;
        return -1;
    }
}
    #endif

#endif

bool GetRemoteAddr(SOCKET Socket, char* str, socklen_t size) {
    if (!size) {
        return false;
    }

    TOpaqueAddr addr;

    if (getpeername(Socket, addr.MutableAddr(), addr.LenPtr()) != 0) {
        return false;
    }

    try {
        TMemoryOutput out(str, size - 1);

        PrintHost(out, addr);
        *out.Buf() = 0;

        return true;
    } catch (...) {
        // ¯\_(ツ)_/¯
    }

    return false;
}

void SetSocketTimeout(SOCKET s, long timeout) {
    SetSocketTimeout(s, timeout, 0);
}

void SetSocketTimeout(SOCKET s, long sec, long msec) {
#ifdef SO_SNDTIMEO
    #ifdef _darwin_
    const timeval timeout = {sec, (__darwin_suseconds_t)msec * 1000};
    #elif defined(_unix_)
    const timeval timeout = {sec, msec * 1000};
    #else
    const int timeout = sec * 1000 + msec;
    #endif
    CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVTIMEO, timeout, "recv timeout");
    CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDTIMEO, timeout, "send timeout");
#endif
}

void SetLinger(SOCKET s, bool on, unsigned len) {
#ifdef SO_LINGER
    struct linger l = {on, (u_short)len};

    CheckedSetSockOpt(s, SOL_SOCKET, SO_LINGER, l, "linger");
#endif
}

void SetZeroLinger(SOCKET s) {
    SetLinger(s, 1, 0);
}

void SetKeepAlive(SOCKET s, bool value) {
    CheckedSetSockOpt(s, SOL_SOCKET, SO_KEEPALIVE, (int)value, "keepalive");
}

void SetOutputBuffer(SOCKET s, unsigned value) {
    CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDBUF, value, "output buffer");
}

void SetInputBuffer(SOCKET s, unsigned value) {
    CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVBUF, value, "input buffer");
}

void SetReusePort(SOCKET s, bool value) {
#if defined(_unix_)
    CheckedSetSockOpt(s, SOL_SOCKET, SO_REUSEPORT, (int)value, "reuse port");
#else
    Y_UNUSED(s);
    Y_UNUSED(value);
    ythrow TSystemError(ENOSYS) << "SO_REUSEPORT is not available on Windows";
#endif
}

void SetNoDelay(SOCKET s, bool value) {
    CheckedSetSockOpt(s, IPPROTO_TCP, TCP_NODELAY, (int)value, "tcp no delay");
}

void SetCloseOnExec(SOCKET s, bool value) {
#if defined(_unix_)
    int flags = fcntl(s, F_GETFD);
    if (flags == -1) {
        ythrow TSystemError() << "fcntl() failed";
    }
    if (value) {
        flags |= FD_CLOEXEC;
    } else {
        flags &= ~FD_CLOEXEC;
    }
    if (fcntl(s, F_SETFD, flags) == -1) {
        ythrow TSystemError() << "fcntl() failed";
    }
#else
    Y_UNUSED(s);
    Y_UNUSED(value);
#endif
}

size_t GetMaximumSegmentSize(SOCKET s) {
#if defined(TCP_MAXSEG)
    int val;

    if (GetSockOpt(s, IPPROTO_TCP, TCP_MAXSEG, val) == 0) {
        return (size_t)val;
    }
#endif

    /*
     * probably a good guess...
     */
    return 8192;
}

size_t GetMaximumTransferUnit(SOCKET /*s*/) {
    // for someone who'll dare to write it
    // Linux: there rummored to be IP_MTU getsockopt() request
    // FreeBSD: request to a socket of type PF_ROUTE
    //          with peer address as a destination argument
    return 8192;
}

int GetSocketToS(SOCKET s) {
    TOpaqueAddr addr;

    if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) {
        ythrow TSystemError() << "getsockname() failed";
    }

    return GetSocketToS(s, &addr);
}

int GetSocketToS(SOCKET s, const IRemoteAddr* addr) {
    int result = 0;

    switch (addr->Addr()->sa_family) {
        case AF_INET:
            CheckedGetSockOpt(s, IPPROTO_IP, IP_TOS, result, "tos");
            break;

        case AF_INET6:
#ifdef IPV6_TCLASS
            CheckedGetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, result, "tos");
#endif
            break;
    }

    return result;
}

void SetSocketToS(SOCKET s, const NAddr::IRemoteAddr* addr, int tos) {
    switch (addr->Addr()->sa_family) {
        case AF_INET:
            CheckedSetSockOpt(s, IPPROTO_IP, IP_TOS, tos, "tos");
            return;

        case AF_INET6:
#ifdef IPV6_TCLASS
            CheckedSetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, tos, "tos");
            return;
#endif
            break;
    }

    ythrow yexception() << "SetSocketToS unsupported for family " << addr->Addr()->sa_family;
}

void SetSocketToS(SOCKET s, int tos) {
    TOpaqueAddr addr;

    if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) {
        ythrow TSystemError() << "getsockname() failed";
    }

    SetSocketToS(s, &addr, tos);
}

void SetSocketPriority(SOCKET s, int priority) {
#if defined(SO_PRIORITY)
    CheckedSetSockOpt(s, SOL_SOCKET, SO_PRIORITY, priority, "priority");
#else
    Y_UNUSED(s);
    Y_UNUSED(priority);
#endif
}

bool HasLocalAddress(SOCKET socket) {
    TOpaqueAddr localAddr;
    if (getsockname(socket, localAddr.MutableAddr(), localAddr.LenPtr()) != 0) {
        ythrow TSystemError() << "HasLocalAddress: getsockname() failed. ";
    }
    if (IsLoopback(localAddr)) {
        return true;
    }

    TOpaqueAddr remoteAddr;
    if (getpeername(socket, remoteAddr.MutableAddr(), remoteAddr.LenPtr()) != 0) {
        ythrow TSystemError() << "HasLocalAddress: getpeername() failed. ";
    }
    return IsSame(localAddr, remoteAddr);
}

namespace {
#if defined(_linux_)
    #if !defined(TCP_FASTOPEN)
        #define TCP_FASTOPEN 23
    #endif
#endif

#if defined(TCP_FASTOPEN)
    struct TTcpFastOpenFeature {
        inline TTcpFastOpenFeature()
            : HasFastOpen_(false)
        {
            TSocketHolder tmp(socket(AF_INET, SOCK_STREAM, 0));
            int val = 1;
            int ret = SetSockOpt(tmp, IPPROTO_TCP, TCP_FASTOPEN, val);
            HasFastOpen_ = (ret == 0);
        }

        inline void SetFastOpen(SOCKET s, int qlen) const {
            if (HasFastOpen_) {
                CheckedSetSockOpt(s, IPPROTO_TCP, TCP_FASTOPEN, qlen, "setting TCP_FASTOPEN");
            }
        }

        static inline const TTcpFastOpenFeature* Instance() noexcept {
            return Singleton<TTcpFastOpenFeature>();
        }

        bool HasFastOpen_;
    };
#endif
}

void SetTcpFastOpen(SOCKET s, int qlen) {
#if defined(TCP_FASTOPEN)
    TTcpFastOpenFeature::Instance()->SetFastOpen(s, qlen);
#else
    Y_UNUSED(s);
    Y_UNUSED(qlen);
#endif
}

static bool IsBlocked(int lasterr) noexcept {
    return lasterr == EAGAIN || lasterr == EWOULDBLOCK;
}

struct TUnblockingGuard {
    SOCKET S_;

    TUnblockingGuard(SOCKET s)
        : S_(s)
    {
        SetNonBlock(S_, true);
    }

    ~TUnblockingGuard() {
        SetNonBlock(S_, false);
    }
};

static int MsgPeek(SOCKET s) {
    int flags = MSG_PEEK;

#if defined(_win_)
    TUnblockingGuard unblocker(s);
    Y_UNUSED(unblocker);
#else
    flags |= MSG_DONTWAIT;
#endif

    char c;
    return recv(s, &c, 1, flags);
}

bool IsNotSocketClosedByOtherSide(SOCKET s) {
    return HasSocketDataToRead(s) != ESocketReadStatus::SocketClosed;
}

ESocketReadStatus HasSocketDataToRead(SOCKET s) {
    const int r = MsgPeek(s);
    if (r == -1 && IsBlocked(LastSystemError())) {
        return ESocketReadStatus::NoData;
    }
    if (r > 0) {
        return ESocketReadStatus::HasData;
    }
    return ESocketReadStatus::SocketClosed;
}

#if defined(_win_)
static ssize_t DoSendMsg(SOCKET sock, const struct iovec* iov, int iovcnt) {
    return writev(sock, iov, iovcnt);
}
#else
static ssize_t DoSendMsg(SOCKET sock, const struct iovec* iov, int iovcnt) {
    struct msghdr message;

    Zero(message);
    message.msg_iov = const_cast<struct iovec*>(iov);
    message.msg_iovlen = iovcnt;

    return sendmsg(sock, &message, MSG_NOSIGNAL);
}
#endif

void TSocketHolder::Close() noexcept {
    if (Fd_ != INVALID_SOCKET) {
        bool ok = (closesocket(Fd_) == 0);
        if (!ok) {
// Do not quietly close bad descriptor,
// because often it means double close
// that is disasterous
#ifdef _win_
            Y_ABORT_UNLESS(WSAGetLastError() != WSAENOTSOCK, "must not quietly close bad socket descriptor");
#elif defined(_unix_)
            Y_ABORT_UNLESS(errno != EBADF, "must not quietly close bad descriptor: fd=%d", int(Fd_));
#else
    #error unsupported platform
#endif
        }

        Fd_ = INVALID_SOCKET;
    }
}

class TSocket::TImpl: public TAtomicRefCount<TImpl> {
    using TOps = TSocket::TOps;

public:
    inline TImpl(SOCKET fd, TOps* ops)
        : Fd_(fd)
        , Ops_(ops)
    {
    }

    inline ~TImpl() = default;

    inline SOCKET Fd() const noexcept {
        return Fd_;
    }

    inline ssize_t Send(const void* data, size_t len) {
        return Ops_->Send(Fd_, data, len);
    }

    inline ssize_t Recv(void* buf, size_t len) {
        return Ops_->Recv(Fd_, buf, len);
    }

    inline ssize_t SendV(const TPart* parts, size_t count) {
        return Ops_->SendV(Fd_, parts, count);
    }

    inline void Close() {
        Fd_.Close();
    }

private:
    TSocketHolder Fd_;
    TOps* Ops_;
};

template <>
void Out<const struct addrinfo*>(IOutputStream& os, const struct addrinfo* ai) {
    if (ai->ai_flags & AI_CANONNAME) {
        os << "`" << ai->ai_canonname << "' ";
    }

    os << '[';
    for (int i = 0; ai; ++i, ai = ai->ai_next) {
        if (i > 0) {
            os << ", ";
        }

        os << (const IRemoteAddr&)TAddrInfo(ai);
    }
    os << ']';
}

template <>
void Out<struct addrinfo*>(IOutputStream& os, struct addrinfo* ai) {
    Out<const struct addrinfo*>(os, static_cast<const struct addrinfo*>(ai));
}

template <>
void Out<TNetworkAddress>(IOutputStream& os, const TNetworkAddress& addr) {
    os << &*addr.Begin();
}

static inline const struct addrinfo* Iterate(const struct addrinfo* addr, const struct addrinfo* addr0, const int sockerr) {
    if (addr->ai_next) {
        return addr->ai_next;
    }

    ythrow TSystemError(sockerr) << "can not connect to " << addr0;
}

static inline SOCKET DoConnectImpl(const struct addrinfo* res, const TInstant& deadLine) {
    const struct addrinfo* addr0 = res;

    while (res) {
        TSocketHolder s(socket(res->ai_family, res->ai_socktype, res->ai_protocol));

        if (s.Closed()) {
            res = Iterate(res, addr0, LastSystemError());

            continue;
        }

        SetNonBlock(s, true);

        if (connect(s, res->ai_addr, (int)res->ai_addrlen)) {
            int err = LastSystemError();

            if (err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK) {
                /*
                 * must wait
                 */
                struct pollfd p = {
                    (SOCKET)s,
                    POLLOUT,
                    0};

                const ssize_t n = PollD(&p, 1, deadLine);

                /*
                 * timeout occured
                 */
                if (n < 0) {
                    ythrow TSystemError(-(int)n) << "can not connect";
                }

                CheckedGetSockOpt(s, SOL_SOCKET, SO_ERROR, err, "socket error");

                if (!err) {
                    return s.Release();
                }
            }

            res = Iterate(res, addr0, err);

            continue;
        }

        return s.Release();
    }

    ythrow yexception() << "something went wrong: nullptr at addrinfo";
}

static inline SOCKET DoConnect(const struct addrinfo* res, const TInstant& deadLine) {
    TSocketHolder ret(DoConnectImpl(res, deadLine));

    SetNonBlock(ret, false);

    return ret.Release();
}

static inline ssize_t DoSendV(SOCKET fd, const struct iovec* iov, size_t count) {
    ssize_t ret = -1;
    do {
        ret = DoSendMsg(fd, iov, (int)count);
    } while (ret == -1 && errno == EINTR);

    if (ret < 0) {
        return -LastSystemError();
    }

    return ret;
}

template <bool isCompat>
struct TSender {
    using TPart = TSocket::TPart;

    static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) {
        return DoSendV(fd, (const iovec*)parts, count);
    }
};

template <>
struct TSender<false> {
    using TPart = TSocket::TPart;

    static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) {
        TTempBuf tempbuf(sizeof(struct iovec) * count);
        struct iovec* iov = (struct iovec*)tempbuf.Data();

        for (size_t i = 0; i < count; ++i) {
            struct iovec& io = iov[i];
            const TPart& part = parts[i];

            io.iov_base = (char*)part.buf;
            io.iov_len = part.len;
        }

        return DoSendV(fd, iov, count);
    }
};

class TCommonSockOps: public TSocket::TOps {
    using TPart = TSocket::TPart;

public:
    inline TCommonSockOps() noexcept {
    }

    ~TCommonSockOps() override = default;

    ssize_t Send(SOCKET fd, const void* data, size_t len) override {
        ssize_t ret = -1;
        do {
            ret = send(fd, (const char*)data, (int)len, MSG_NOSIGNAL);
        } while (ret == -1 && errno == EINTR);

        if (ret < 0) {
            return -LastSystemError();
        }

        return ret;
    }

    ssize_t Recv(SOCKET fd, void* buf, size_t len) override {
        ssize_t ret = -1;
        do {
            ret = recv(fd, (char*)buf, (int)len, 0);
        } while (ret == -1 && errno == EINTR);

        if (ret < 0) {
            return -LastSystemError();
        }

        return ret;
    }

    ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) override {
        ssize_t ret = SendVImpl(fd, parts, count);

        if (ret < 0) {
            return ret;
        }

        size_t len = TContIOVector::Bytes(parts, count);

        if ((size_t)ret == len) {
            return ret;
        }

        return SendVPartial(fd, parts, count, ret);
    }

    inline ssize_t SendVImpl(SOCKET fd, const TPart* parts, size_t count) {
        return TSender < (sizeof(iovec) == sizeof(TPart)) && (offsetof(iovec, iov_base) == offsetof(TPart, buf)) && (offsetof(iovec, iov_len) == offsetof(TPart, len)) > ::SendV(fd, parts, count);
    }

    ssize_t SendVPartial(SOCKET fd, const TPart* constParts, size_t count, size_t written);
};

ssize_t TCommonSockOps::SendVPartial(SOCKET fd, const TPart* constParts, size_t count, size_t written) {
    TTempBuf tempbuf(sizeof(TPart) * count);
    TPart* parts = (TPart*)tempbuf.Data();

    for (size_t i = 0; i < count; ++i) {
        parts[i] = constParts[i];
    }

    TContIOVector vec(parts, count);
    vec.Proceed(written);

    while (!vec.Complete()) {
        ssize_t ret = SendVImpl(fd, vec.Parts(), vec.Count());

        if (ret < 0) {
            return ret;
        }

        written += ret;

        vec.Proceed((size_t)ret);
    }

    return written;
}

static inline TSocket::TOps* GetCommonSockOps() noexcept {
    return Singleton<TCommonSockOps>();
}

TSocket::TSocket()
    : Impl_(new TImpl(INVALID_SOCKET, GetCommonSockOps()))
{
}

TSocket::TSocket(SOCKET fd)
    : Impl_(new TImpl(fd, GetCommonSockOps()))
{
}

TSocket::TSocket(SOCKET fd, TOps* ops)
    : Impl_(new TImpl(fd, ops))
{
}

TSocket::TSocket(const TNetworkAddress& addr)
    : Impl_(new TImpl(DoConnect(addr.Info(), TInstant::Max()), GetCommonSockOps()))
{
}

TSocket::TSocket(const TNetworkAddress& addr, const TDuration& timeOut)
    : Impl_(new TImpl(DoConnect(addr.Info(), timeOut.ToDeadLine()), GetCommonSockOps()))
{
}

TSocket::TSocket(const TNetworkAddress& addr, const TInstant& deadLine)
    : Impl_(new TImpl(DoConnect(addr.Info(), deadLine), GetCommonSockOps()))
{
}

TSocket::~TSocket() = default;

SOCKET TSocket::Fd() const noexcept {
    return Impl_->Fd();
}

ssize_t TSocket::Send(const void* data, size_t len) {
    return Impl_->Send(data, len);
}

ssize_t TSocket::Recv(void* buf, size_t len) {
    return Impl_->Recv(buf, len);
}

ssize_t TSocket::SendV(const TPart* parts, size_t count) {
    return Impl_->SendV(parts, count);
}

void TSocket::Close() {
    Impl_->Close();
}

TSocketInput::TSocketInput(const TSocket& s) noexcept
    : S_(s)
{
}

TSocketInput::~TSocketInput() = default;

size_t TSocketInput::DoRead(void* buf, size_t len) {
    const ssize_t ret = S_.Recv(buf, len);

    if (ret >= 0) {
        return (size_t)ret;
    }

    ythrow TSystemError(-(int)ret) << "can not read from socket input stream";
}

TSocketOutput::TSocketOutput(const TSocket& s) noexcept
    : S_(s)
{
}

TSocketOutput::~TSocketOutput() {
    try {
        Finish();
    } catch (...) {
        // ¯\_(ツ)_/¯
    }
}

void TSocketOutput::DoWrite(const void* buf, size_t len) {
    size_t send = 0;
    while (len) {
        const ssize_t ret = S_.Send(buf, len);

        if (ret < 0) {
            ythrow TSystemError(-(int)ret) << "can not write to socket output stream; " << send << " bytes already send";
        }
        buf = (const char*)buf + ret;
        len -= ret;
        send += ret;
    }
}

void TSocketOutput::DoWriteV(const TPart* parts, size_t count) {
    const ssize_t ret = S_.SendV(parts, count);

    if (ret < 0) {
        ythrow TSystemError(-(int)ret) << "can not writev to socket output stream";
    }

    /*
     * todo for nonblocking sockets?
     */
}

namespace {
    //https://bugzilla.mozilla.org/attachment.cgi?id=503263&action=diff

    struct TLocalNames: public THashSet<TStringBuf> {
        inline TLocalNames() {
            insert("localhost");
            insert("localhost.localdomain");
            insert("localhost6");
            insert("localhost6.localdomain6");
            insert("::1");
        }

        inline bool IsLocalName(const char* name) const noexcept {
            struct sockaddr_in sa;
            memset(&sa, 0, sizeof(sa));

            if (inet_pton(AF_INET, name, &(sa.sin_addr)) == 1) {
                return (InetToHost(sa.sin_addr.s_addr) >> 24) == 127;
            }

            return contains(name);
        }
    };
}

class TNetworkAddress::TImpl: public TAtomicRefCount<TImpl> {
private:
    class TAddrInfoDeleter {
    public:
        TAddrInfoDeleter(bool useFreeAddrInfo = true)
            : UseFreeAddrInfo_(useFreeAddrInfo)
        {
        }

        void operator()(struct addrinfo* ai) noexcept {
            if (!UseFreeAddrInfo_ && ai != NULL) {
                if (ai->ai_addr != NULL) {
                    free(ai->ai_addr);
                }

                struct addrinfo* p;
                while (ai != NULL) {
                    p = ai;
                    ai = ai->ai_next;
                    free(p->ai_canonname);
                    free(p);
                }
            } else if (ai != NULL) {
                freeaddrinfo(ai);
            }
        }

    private:
        bool UseFreeAddrInfo_ = true;
    };

public:
    inline TImpl(const char* host, ui16 port, int flags)
        : Info_(nullptr, TAddrInfoDeleter{})
    {
        const TString port_st(ToString(port));
        struct addrinfo hints;

        memset(&hints, 0, sizeof(hints));

        hints.ai_flags = flags;
        hints.ai_family = PF_UNSPEC;
        hints.ai_socktype = SOCK_STREAM;

        if (!host) {
            hints.ai_flags |= AI_PASSIVE;
        } else {
            if (!Singleton<TLocalNames>()->IsLocalName(host)) {
                hints.ai_flags |= AI_ADDRCONFIG;
            }
        }

        struct addrinfo* pai = NULL;
        const int error = getaddrinfo(host, port_st.data(), &hints, &pai);

        if (error) {
            TAddrInfoDeleter()(pai);
            ythrow TNetworkResolutionError(error) << ": can not resolve " << host << ":" << port;
        }

        Info_.reset(pai);
    }

    inline TImpl(const char* path, int flags)
        : Info_(nullptr, TAddrInfoDeleter{/* useFreeAddrInfo = */ false})
    {
        THolder<struct sockaddr_un, TFree> sockAddr(
            reinterpret_cast<struct sockaddr_un*>(malloc(sizeof(struct sockaddr_un))));

        Y_ENSURE(strlen(path) < sizeof(sockAddr->sun_path), "Unix socket path more than " << sizeof(sockAddr->sun_path));
        sockAddr->sun_family = AF_UNIX;
        strcpy(sockAddr->sun_path, path);

        TAddrInfoPtr hints(reinterpret_cast<struct addrinfo*>(malloc(sizeof(struct addrinfo))), TAddrInfoDeleter{/* useFreeAddrInfo = */ false});
        memset(hints.get(), 0, sizeof(*hints));

        hints->ai_flags = flags;
        hints->ai_family = AF_UNIX;
        hints->ai_socktype = SOCK_STREAM;
        hints->ai_addrlen = sizeof(*sockAddr);
        hints->ai_addr = (struct sockaddr*)sockAddr.Release();

        Info_.reset(hints.release());
    }

    inline struct addrinfo* Info() const noexcept {
        return Info_.get();
    }

private:
    using TAddrInfoPtr = std::unique_ptr<struct addrinfo, TAddrInfoDeleter>;

    TAddrInfoPtr Info_;
};

TNetworkAddress::TNetworkAddress(const TUnixSocketPath& unixSocketPath, int flags)
    : Impl_(new TImpl(unixSocketPath.Path.data(), flags))
{
}

TNetworkAddress::TNetworkAddress(const TString& host, ui16 port, int flags)
    : Impl_(new TImpl(host.data(), port, flags))
{
}

TNetworkAddress::TNetworkAddress(const TString& host, ui16 port)
    : Impl_(new TImpl(host.data(), port, 0))
{
}

TNetworkAddress::TNetworkAddress(ui16 port)
    : Impl_(new TImpl(nullptr, port, 0))
{
}

TNetworkAddress::~TNetworkAddress() = default;

struct addrinfo* TNetworkAddress::Info() const noexcept {
    return Impl_->Info();
}

TNetworkResolutionError::TNetworkResolutionError(int error) {
    const char* errMsg = nullptr;
#ifdef _win_
    errMsg = LastSystemErrorText(error); // gai_strerror is not thread-safe on Windows
#else
    errMsg = gai_strerror(error);
#endif
    (*this) << errMsg << "(" << error;

#if defined(_unix_)
    if (error == EAI_SYSTEM) {
        (*this) << "; errno=" << LastSystemError();
    }
#endif

    (*this) << "): ";
}

#if defined(_unix_)
static inline int GetFlags(int fd) {
    const int ret = fcntl(fd, F_GETFL);

    if (ret == -1) {
        ythrow TSystemError() << "can not get fd flags";
    }

    return ret;
}

static inline void SetFlags(int fd, int flags) {
    if (fcntl(fd, F_SETFL, flags) == -1) {
        ythrow TSystemError() << "can not set fd flags";
    }
}

static inline void EnableFlag(int fd, int flag) {
    const int oldf = GetFlags(fd);
    const int newf = oldf | flag;

    if (oldf != newf) {
        SetFlags(fd, newf);
    }
}

static inline void DisableFlag(int fd, int flag) {
    const int oldf = GetFlags(fd);
    const int newf = oldf & (~flag);

    if (oldf != newf) {
        SetFlags(fd, newf);
    }
}

static inline void SetFlag(int fd, int flag, bool value) {
    if (value) {
        EnableFlag(fd, flag);
    } else {
        DisableFlag(fd, flag);
    }
}

static inline bool FlagsAreEnabled(int fd, int flags) {
    return GetFlags(fd) & flags;
}
#endif

#if defined(_win_)
static inline void SetNonBlockSocket(SOCKET fd, int value) {
    unsigned long inbuf = value;
    unsigned long outbuf = 0;
    DWORD written = 0;

    if (!inbuf) {
        WSAEventSelect(fd, nullptr, 0);
    }

    if (WSAIoctl(fd, FIONBIO, &inbuf, sizeof(inbuf), &outbuf, sizeof(outbuf), &written, 0, 0) == SOCKET_ERROR) {
        ythrow TSystemError() << "can not set non block socket state";
    }
}

static inline bool IsNonBlockSocket(SOCKET fd) {
    unsigned long buf = 0;

    if (WSAIoctl(fd, FIONBIO, 0, 0, &buf, sizeof(buf), 0, 0, 0) == SOCKET_ERROR) {
        ythrow TSystemError() << "can not get non block socket state";
    }

    return buf;
}
#endif

void SetNonBlock(SOCKET fd, bool value) {
#if defined(_unix_)
    #if defined(FIONBIO)
    Y_UNUSED(SetFlag); // shut up clang about unused function
    int nb = value;

    if (ioctl(fd, FIONBIO, &nb) < 0) {
        ythrow TSystemError() << "ioctl failed";
    }
    #else
    SetFlag(fd, O_NONBLOCK, value);
    #endif
#elif defined(_win_)
    SetNonBlockSocket(fd, value);
#else
    #error todo
#endif
}

bool IsNonBlock(SOCKET fd) {
#if defined(_unix_)
    return FlagsAreEnabled(fd, O_NONBLOCK);
#elif defined(_win_)
    return IsNonBlockSocket(fd);
#else
    #error todo
#endif
}

void SetDeferAccept(SOCKET s) {
    (void)s;

#if defined(TCP_DEFER_ACCEPT)
    CheckedSetSockOpt(s, IPPROTO_TCP, TCP_DEFER_ACCEPT, 10, "defer accept");
#endif

#if defined(SO_ACCEPTFILTER)
    struct accept_filter_arg afa;

    Zero(afa);
    strcpy(afa.af_name, "dataready");
    SetSockOpt(s, SOL_SOCKET, SO_ACCEPTFILTER, afa);
#endif
}

ssize_t PollD(struct pollfd fds[], nfds_t nfds, const TInstant& deadLine) noexcept {
    TInstant now = TInstant::Now();

    do {
        const TDuration toWait = PollStep(deadLine, now);
        const int res = poll(fds, nfds, MicroToMilli(toWait.MicroSeconds()));

        if (res > 0) {
            return res;
        }

        if (res < 0) {
            const int err = LastSystemError();

            if (err != ETIMEDOUT && err != EINTR) {
                return -err;
            }
        }
    } while ((now = TInstant::Now()) < deadLine);

    return -ETIMEDOUT;
}

void ShutDown(SOCKET s, int mode) {
    if (shutdown(s, mode)) {
        ythrow TSystemError() << "shutdown socket error";
    }
}