1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
#include "acceptor.h"
#include "key_value_printer.h"
#include "mb_lwtrace.h"
#include "network.h"
#include <util/network/init.h>
#include <util/system/defaults.h>
#include <util/system/error.h>
#include <util/system/yassert.h>
LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
using namespace NActor;
using namespace NBus;
using namespace NBus::NPrivate;
TAcceptor::TAcceptor(TBusSessionImpl* session, ui64 acceptorId, SOCKET socket, const TNetAddr& addr)
: TActor<TAcceptor>(session->Queue->WorkQueue.Get())
, AcceptorId(acceptorId)
, Session(session)
, GranStatus(session->Config.Secret.StatusFlushPeriod)
{
SetNonBlock(socket, true);
Channel = Session->ReadEventLoop.Register(socket, this);
Channel->EnableRead();
Stats.AcceptorId = acceptorId;
Stats.Fd = socket;
Stats.ListenAddr = addr;
SendStatus(TInstant::Now());
}
void TAcceptor::Act(TDefaultTag) {
EShutdownState state = ShutdownState.State.Get();
if (state == SS_SHUTDOWN_COMPLETE) {
return;
}
TInstant now = TInstant::Now();
if (state == SS_SHUTDOWN_COMMAND) {
if (!!Channel) {
Channel->Unregister();
Channel.Drop();
Stats.Fd = INVALID_SOCKET;
}
SendStatus(now);
Session->GetDeadAcceptorStatusQueue()->EnqueueAndSchedule(Stats);
Stats.ResetIncremental();
ShutdownState.CompleteShutdown();
return;
}
THolder<TOpaqueAddr> addr(new TOpaqueAddr());
SOCKET acceptedSocket = accept(Channel->GetSocket(), addr->MutableAddr(), addr->LenPtr());
int acceptErrno = LastSystemError();
if (acceptedSocket == INVALID_SOCKET) {
if (LastSystemError() != EWOULDBLOCK) {
Stats.LastAcceptErrorErrno = acceptErrno;
Stats.LastAcceptErrorInstant = now;
++Stats.AcceptErrorCount;
}
} else {
TSocketHolder s(acceptedSocket);
try {
SetKeepAlive(s, true);
SetNoDelay(s, Session->Config.TcpNoDelay);
SetSockOptTcpCork(s, Session->Config.TcpCork);
SetCloseOnExec(s, true);
SetNonBlock(s, true);
if (Session->Config.SocketToS >= 0) {
SetSocketToS(s, addr.Get(), Session->Config.SocketToS);
}
} catch (...) {
// It means that connection was reset just now
// TODO: do something better
goto skipAccept;
}
{
TOnAccept onAccept;
onAccept.s = s.Release();
onAccept.addr = TNetAddr(addr.Release());
onAccept.now = now;
LWPROBE(Accepted, ToString(onAccept.addr));
Session->GetOnAcceptQueue()->EnqueueAndSchedule(onAccept);
Stats.LastAcceptSuccessInstant = now;
++Stats.AcceptSuccessCount;
}
skipAccept:;
}
Channel->EnableRead();
SendStatus(now);
}
void TAcceptor::SendStatus(TInstant now) {
GranStatus.Listen.Update(Stats, now);
}
void TAcceptor::HandleEvent(SOCKET socket, void* cookie) {
Y_UNUSED(socket);
Y_UNUSED(cookie);
GetActor()->Schedule();
}
void TAcceptor::Shutdown() {
ShutdownState.ShutdownCommand();
GetActor()->Schedule();
ShutdownState.ShutdownComplete.WaitI();
}
|