aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/coroutine/engine/sockpool.cpp
blob: 8a61b19bcd0324adf63ab84f7ec64dff977f755c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include "sockpool.h"

void SetCommonSockOpts(SOCKET sock, const struct sockaddr* sa) {
    SetSockOpt(sock, SOL_SOCKET, SO_REUSEADDR, 1);

    if (!sa || sa->sa_family == AF_INET) {
        sockaddr_in s_in;
        s_in.sin_family = AF_INET;
        s_in.sin_addr.s_addr = INADDR_ANY;
        s_in.sin_port = 0;

        if (bind(sock, (struct sockaddr*)&s_in, sizeof(s_in)) == -1) {
            warn("bind");
        }
    } else if (sa->sa_family == AF_INET6) {
        sockaddr_in6 s_in6(*(const sockaddr_in6*)sa);
        Zero(s_in6.sin6_addr);
        s_in6.sin6_port = 0;

        if (bind(sock, (const struct sockaddr*)&s_in6, sizeof s_in6) == -1) {
            warn("bind6");
        }
    } else {
        Y_ASSERT(0); 
    }

    SetNoDelay(sock, true);
}

TPooledSocket TSocketPool::AllocateMore(TConnectData* conn) {
    TCont* cont = conn->Cont;

    while (true) {
        TSocketHolder s(NCoro::Socket(Addr_->Addr()->sa_family, SOCK_STREAM, 0));

        if (s == INVALID_SOCKET) {
            ythrow TSystemError(errno) << TStringBuf("can not create socket");
        }

        SetCommonSockOpts(s, Addr_->Addr());
        SetZeroLinger(s);

        const int ret = NCoro::ConnectD(cont, s, Addr_->Addr(), Addr_->Len(), conn->DeadLine);

        if (ret == EINTR) {
            continue;
        } else if (ret) {
            ythrow TSystemError(ret) << TStringBuf("can not connect(") << cont->Name() << ')';
        }

        THolder<TPooledSocket::TImpl> res(new TPooledSocket::TImpl(s, this));
        s.Release();

        if (res->IsOpen()) {
            return res.Release();
        }
    }
}