aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/acceptor.cpp
blob: d54be57fd96e552ddbefef33e96b6bf1f40928cc (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#include "acceptor.h"
 
#include "key_value_printer.h" 
#include "mb_lwtrace.h" 
#include "network.h"
 
#include <util/network/init.h>
#include <util/system/defaults.h>
#include <util/system/error.h>
#include <util/system/yassert.h>
 
LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) 
 
using namespace NActor; 
using namespace NBus; 
using namespace NBus::NPrivate; 
 
TAcceptor::TAcceptor(TBusSessionImpl* session, ui64 acceptorId, SOCKET socket, const TNetAddr& addr) 
    : TActor<TAcceptor>(session->Queue->WorkQueue.Get()) 
    , AcceptorId(acceptorId) 
    , Session(session) 
    , GranStatus(session->Config.Secret.StatusFlushPeriod)
{ 
    SetNonBlock(socket, true); 
 
    Channel = Session->ReadEventLoop.Register(socket, this); 
    Channel->EnableRead(); 
 
    Stats.AcceptorId = acceptorId; 
    Stats.Fd = socket; 
    Stats.ListenAddr = addr; 
 
    SendStatus(TInstant::Now());
} 
 
void TAcceptor::Act(TDefaultTag) { 
    EShutdownState state = ShutdownState.State.Get(); 
 
    if (state == SS_SHUTDOWN_COMPLETE) { 
        return; 
    } 
 
    TInstant now = TInstant::Now();

    if (state == SS_SHUTDOWN_COMMAND) { 
        if (!!Channel) { 
            Channel->Unregister(); 
            Channel.Drop(); 
            Stats.Fd = INVALID_SOCKET; 
        } 
 
        SendStatus(now);
 
        Session->GetDeadAcceptorStatusQueue()->EnqueueAndSchedule(Stats); 
        Stats.ResetIncremental(); 
 
        ShutdownState.CompleteShutdown(); 
        return; 
    } 
 
    THolder<TOpaqueAddr> addr(new TOpaqueAddr()); 
    SOCKET acceptedSocket = accept(Channel->GetSocket(), addr->MutableAddr(), addr->LenPtr()); 
 
    int acceptErrno = LastSystemError(); 
 
    if (acceptedSocket == INVALID_SOCKET) { 
        if (LastSystemError() != EWOULDBLOCK) { 
            Stats.LastAcceptErrorErrno = acceptErrno; 
            Stats.LastAcceptErrorInstant = now; 
            ++Stats.AcceptErrorCount; 
        } 
    } else { 
        TSocketHolder s(acceptedSocket); 
        try { 
            SetKeepAlive(s, true); 
            SetNoDelay(s, Session->Config.TcpNoDelay); 
            SetSockOptTcpCork(s, Session->Config.TcpCork); 
            SetCloseOnExec(s, true); 
            SetNonBlock(s, true); 
            if (Session->Config.SocketToS >= 0) {
                SetSocketToS(s, addr.Get(), Session->Config.SocketToS);
            }
        } catch (...) { 
            // It means that connection was reset just now 
            // TODO: do something better 
            goto skipAccept; 
        } 
 
        { 
            TOnAccept onAccept; 
            onAccept.s = s.Release(); 
            onAccept.addr = TNetAddr(addr.Release()); 
            onAccept.now = now; 
 
            LWPROBE(Accepted, ToString(onAccept.addr)); 
 
            Session->GetOnAcceptQueue()->EnqueueAndSchedule(onAccept); 
 
            Stats.LastAcceptSuccessInstant = now;
            ++Stats.AcceptSuccessCount; 
        } 
 
    skipAccept:;
    } 
 
    Channel->EnableRead(); 
 
    SendStatus(now);
} 
 
void TAcceptor::SendStatus(TInstant now) {
    GranStatus.Listen.Update(Stats, now);
} 
 
void TAcceptor::HandleEvent(SOCKET socket, void* cookie) { 
    Y_UNUSED(socket);
    Y_UNUSED(cookie);
 
    GetActor()->Schedule(); 
} 
 
void TAcceptor::Shutdown() { 
    ShutdownState.ShutdownCommand(); 
    GetActor()->Schedule(); 
 
    ShutdownState.ShutdownComplete.WaitI(); 
}