1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
#include <util/network/sock.h>
#include "http_proxy.h"
#include "http_proxy_ssl.h"
namespace NHttp {
class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfig {
public:
using TBase = NActors::TActor<TAcceptorActor>;
const TActorId Owner;
const TActorId Poller;
TIntrusivePtr<TSocketDescriptor> Socket;
NActors::TPollerToken::TPtr PollerToken;
THashSet<TActorId> Connections;
TDeque<THttpIncomingRequestPtr> RecycledRequests;
std::shared_ptr<TPrivateEndpointInfo> Endpoint;
TAcceptorActor(const TActorId& owner, const TActorId& poller)
: NActors::TActor<TAcceptorActor>(&TAcceptorActor::StateInit)
, Owner(owner)
, Poller(poller)
, Socket(new TSocketDescriptor())
{
// for unit tests :(
SetSockOpt(Socket->Socket, SOL_SOCKET, SO_REUSEADDR, (int)true);
#ifdef SO_REUSEPORT
SetSockOpt(Socket->Socket, SOL_SOCKET, SO_REUSEPORT, (int)true);
#endif
}
static constexpr char ActorName[] = "HTTP_ACCEPTOR_ACTOR";
protected:
STFUNC(StateListening) {
switch (ev->GetTypeRewrite()) {
HFunc(NActors::TEvPollerRegisterResult, Handle);
HFunc(NActors::TEvPollerReady, Handle);
HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle);
HFunc(TEvHttpProxy::TEvReportSensors, Handle);
}
}
STFUNC(StateInit) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvHttpProxy::TEvAddListeningPort, HandleInit);
}
}
void HandleInit(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
SocketAddressType bindAddress(Socket->Socket.MakeAddress(event->Get()->Address,event->Get()->Port));
Endpoint = std::make_shared<TPrivateEndpointInfo>(event->Get()->CompressContentTypes);
Endpoint->Owner = ctx.SelfID;
Endpoint->Proxy = Owner;
Endpoint->WorkerName = event->Get()->WorkerName;
Endpoint->Secure = event->Get()->Secure;
int err = 0;
if (Endpoint->Secure) {
if (!event->Get()->SslCertificatePem.empty()) {
Endpoint->SecureContext = TSslHelpers::CreateServerContext(event->Get()->SslCertificatePem);
} else {
Endpoint->SecureContext = TSslHelpers::CreateServerContext(event->Get()->CertificateFile, event->Get()->PrivateKeyFile);
}
if (Endpoint->SecureContext == nullptr) {
err = -1;
LOG_WARN_S(ctx, HttpLog, "Failed to construct server security context");
}
}
if (err == 0) {
err = Socket->Socket.Bind(bindAddress.get());
}
TStringBuf schema = Endpoint->Secure ? "https://" : "http://";
if (err == 0) {
err = Socket->Socket.Listen(LISTEN_QUEUE);
if (err == 0) {
LOG_INFO_S(ctx, HttpLog, "Listening on " << schema << bindAddress->ToString());
SetNonBlock(Socket->Socket);
ctx.Send(Poller, new NActors::TEvPollerRegister(Socket, SelfId(), SelfId()));
TBase::Become(&TAcceptorActor::StateListening);
ctx.Send(event->Sender, new TEvHttpProxy::TEvConfirmListen(bindAddress, Endpoint), 0, event->Cookie);
return;
}
}
LOG_WARN_S(ctx, HttpLog, "Failed to listen on " << schema << bindAddress->ToString() << " - retrying...");
ctx.ExecutorThread.Schedule(TDuration::Seconds(1), event.Release());
}
void Die(const NActors::TActorContext& ctx) override {
ctx.Send(Owner, new TEvHttpProxy::TEvHttpAcceptorClosed(ctx.SelfID));
for (const NActors::TActorId& connection : Connections) {
ctx.Send(connection, new NActors::TEvents::TEvPoisonPill());
}
}
void Handle(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& /*ctx*/) {
PollerToken = std::move(ev->Get()->PollerToken);
PollerToken->Request(true, false); // request read polling
}
void Handle(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) {
for (;;) {
SocketAddressType addr;
std::optional<SocketType> s = Socket->Socket.Accept(addr);
if (!s) {
break;
}
TIntrusivePtr<TSocketDescriptor> socket = new TSocketDescriptor(std::move(s).value());
NActors::IActor* connectionSocket = nullptr;
if (RecycledRequests.empty()) {
connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr);
} else {
connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr, std::move(RecycledRequests.front()));
RecycledRequests.pop_front();
}
NActors::TActorId connectionId = ctx.Register(connectionSocket);
ctx.Send(Poller, new NActors::TEvPollerRegister(socket, connectionId, connectionId));
Connections.emplace(connectionId);
}
int err = errno;
if (err == EAGAIN || err == EWOULDBLOCK) { // request poller for further connection polling
Y_VERIFY(PollerToken);
PollerToken->Request(true, false);
}
}
void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) {
Connections.erase(event->Get()->ConnectionID);
for (auto& req : event->Get()->RecycledRequests) {
req->Clear();
RecycledRequests.push_back(std::move(req));
}
}
void Handle(TEvHttpProxy::TEvReportSensors::TPtr event, const NActors::TActorContext& ctx) {
ctx.Send(event->Forward(Owner));
}
};
NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller) {
return new TAcceptorActor(owner, poller);
}
}
|