aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/acceptor.cpp
blob: 65db22c2172f081f88bdc4870b2458a226d82027 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#include "acceptor.h"

#include "key_value_printer.h"
#include "mb_lwtrace.h"
#include "network.h"

#include <util/network/init.h>
#include <util/system/defaults.h>
#include <util/system/error.h>
#include <util/system/yassert.h>

LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)

using namespace NActor;
using namespace NBus;
using namespace NBus::NPrivate;

TAcceptor::TAcceptor(TBusSessionImpl* session, ui64 acceptorId, SOCKET socket, const TNetAddr& addr)
    : TActor<TAcceptor>(session->Queue->WorkQueue.Get())
    , AcceptorId(acceptorId)
    , Session(session)
    , GranStatus(session->Config.Secret.StatusFlushPeriod)
{
    SetNonBlock(socket, true);

    Channel = Session->ReadEventLoop.Register(socket, this);
    Channel->EnableRead();

    Stats.AcceptorId = acceptorId;
    Stats.Fd = socket;
    Stats.ListenAddr = addr;

    SendStatus(TInstant::Now());
}

void TAcceptor::Act(TDefaultTag) {
    EShutdownState state = ShutdownState.State.Get();

    if (state == SS_SHUTDOWN_COMPLETE) {
        return;
    }

    TInstant now = TInstant::Now();

    if (state == SS_SHUTDOWN_COMMAND) {
        if (!!Channel) {
            Channel->Unregister();
            Channel.Drop();
            Stats.Fd = INVALID_SOCKET;
        }

        SendStatus(now);

        Session->GetDeadAcceptorStatusQueue()->EnqueueAndSchedule(Stats);
        Stats.ResetIncremental();

        ShutdownState.CompleteShutdown();
        return;
    }

    THolder<TOpaqueAddr> addr(new TOpaqueAddr());
    SOCKET acceptedSocket = accept(Channel->GetSocket(), addr->MutableAddr(), addr->LenPtr());

    int acceptErrno = LastSystemError();

    if (acceptedSocket == INVALID_SOCKET) {
        if (LastSystemError() != EWOULDBLOCK) {
            Stats.LastAcceptErrorErrno = acceptErrno;
            Stats.LastAcceptErrorInstant = now;
            ++Stats.AcceptErrorCount;
        }
    } else {
        TSocketHolder s(acceptedSocket);
        try {
            SetKeepAlive(s, true);
            SetNoDelay(s, Session->Config.TcpNoDelay);
            SetSockOptTcpCork(s, Session->Config.TcpCork);
            SetCloseOnExec(s, true);
            SetNonBlock(s, true);
            if (Session->Config.SocketToS >= 0) {
                SetSocketToS(s, addr.Get(), Session->Config.SocketToS);
            }
        } catch (...) {
            // It means that connection was reset just now
            // TODO: do something better
            goto skipAccept;
        }

        {
            TOnAccept onAccept;
            onAccept.s = s.Release();
            onAccept.addr = TNetAddr(addr.Release());
            onAccept.now = now;

            LWPROBE(Accepted, ToString(onAccept.addr));

            Session->GetOnAcceptQueue()->EnqueueAndSchedule(onAccept);

            Stats.LastAcceptSuccessInstant = now;
            ++Stats.AcceptSuccessCount;
        }

    skipAccept:;
    }

    Channel->EnableRead();

    SendStatus(now);
}

void TAcceptor::SendStatus(TInstant now) {
    GranStatus.Listen.Update(Stats, now);
}

void TAcceptor::HandleEvent(SOCKET socket, void* cookie) {
    Y_UNUSED(socket); 
    Y_UNUSED(cookie); 

    GetActor()->Schedule();
}

void TAcceptor::Shutdown() {
    ShutdownState.ShutdownCommand();
    GetActor()->Schedule();

    ShutdownState.ShutdownComplete.WaitI();
}