aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/http/http_proxy_acceptor.cpp
blob: 9780541b71f0b5b9b24bc42a0da14edd8285abe6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#include <util/network/sock.h>
#include "http_proxy.h"
#include "http_proxy_ssl.h"

namespace NHttp {

class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfig {
public:
    using TBase = NActors::TActor<TAcceptorActor>;
    const TActorId Owner;
    const TActorId Poller;
    TIntrusivePtr<TSocketDescriptor> Socket;
    NActors::TPollerToken::TPtr PollerToken;
    THashSet<TActorId> Connections;
    TDeque<THttpIncomingRequestPtr> RecycledRequests;
    TEndpointInfo Endpoint;

    TAcceptorActor(const TActorId& owner, const TActorId& poller)
        : NActors::TActor<TAcceptorActor>(&TAcceptorActor::StateInit)
        , Owner(owner)
        , Poller(poller)
        , Socket(new TSocketDescriptor())
    {
        // for unit tests :(
        CheckedSetSockOpt(Socket->Socket, SOL_SOCKET, SO_REUSEADDR, (int)true, "reuse address");
#ifdef SO_REUSEPORT
        CheckedSetSockOpt(Socket->Socket, SOL_SOCKET, SO_REUSEPORT, (int)true, "reuse port");
#endif
    }

protected:
    STFUNC(StateListening) {
        switch (ev->GetTypeRewrite()) {
            HFunc(NActors::TEvPollerRegisterResult, Handle);
            HFunc(NActors::TEvPollerReady, Handle);
            HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle);
            HFunc(TEvHttpProxy::TEvReportSensors, Handle);
        }
    }

    STFUNC(StateInit) {
        switch (ev->GetTypeRewrite()) {
            HFunc(TEvHttpProxy::TEvAddListeningPort, HandleInit);
        }
    }

    void HandleInit(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
        SocketAddressType bindAddress("::", event->Get()->Port);
        Endpoint.Owner = ctx.SelfID;
        Endpoint.Proxy = Owner;
        Endpoint.WorkerName = event->Get()->WorkerName;
        Endpoint.Secure = event->Get()->Secure;
        int err = 0;
        if (Endpoint.Secure) {
            if (!event->Get()->SslCertificatePem.empty()) {
                Endpoint.SecureContext = TSslHelpers::CreateServerContext(event->Get()->SslCertificatePem);
            } else {
                Endpoint.SecureContext = TSslHelpers::CreateServerContext(event->Get()->CertificateFile, event->Get()->PrivateKeyFile);
            }
            if (Endpoint.SecureContext == nullptr) {
                err = -1;
                LOG_WARN_S(ctx, HttpLog, "Failed to construct server security context");
            }
        }
        if (err == 0) {
            err = Socket->Socket.Bind(&bindAddress);
        }
        if (err == 0) {
            err = Socket->Socket.Listen(LISTEN_QUEUE);
            if (err == 0) {
                LOG_INFO_S(ctx, HttpLog, "Listening on " << bindAddress.ToString());
                SetNonBlock(Socket->Socket);
                ctx.Send(Poller, new NActors::TEvPollerRegister(Socket, SelfId(), SelfId()));
                TBase::Become(&TAcceptorActor::StateListening);
                ctx.Send(event->Sender, new TEvHttpProxy::TEvConfirmListen(bindAddress), 0, event->Cookie);
                return;
            }
        }
        LOG_WARN_S(ctx, HttpLog, "Failed to listen on " << bindAddress.ToString() << " - retrying...");
        ctx.ExecutorThread.Schedule(TDuration::Seconds(1), event.Release());
    }

    void Die(const NActors::TActorContext& ctx) override {
        ctx.Send(Owner, new TEvHttpProxy::TEvHttpAcceptorClosed(ctx.SelfID));
        for (const NActors::TActorId& connection : Connections) {
            ctx.Send(connection, new NActors::TEvents::TEvPoisonPill());
        }
    }

    void Handle(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& /*ctx*/) {
        PollerToken = std::move(ev->Get()->PollerToken);
        PollerToken->Request(true, false); // request read polling
    }

    void Handle(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) {
        TIntrusivePtr<TSocketDescriptor> socket = new TSocketDescriptor();
        SocketAddressType addr;
        int err;
        while ((err = Socket->Socket.Accept(&socket->Socket, &addr)) == 0) {
            NActors::IActor* connectionSocket = nullptr;
            if (RecycledRequests.empty()) {
                connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr);
            } else {
                connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr, std::move(RecycledRequests.front()));
                RecycledRequests.pop_front();
            }
            NActors::TActorId connectionId = ctx.Register(connectionSocket);
            ctx.Send(Poller, new NActors::TEvPollerRegister(socket, connectionId, connectionId));
            Connections.emplace(connectionId);
            socket = new TSocketDescriptor();
        }
        if (err == -EAGAIN || err == -EWOULDBLOCK) { // request poller for further connection polling
            Y_VERIFY(PollerToken);
            PollerToken->Request(true, false);
        }
    }

    void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) {
        Connections.erase(event->Get()->ConnectionID);
        for (auto& req : event->Get()->RecycledRequests) {
            req->Clear();
            RecycledRequests.push_back(std::move(req));
        }
    }

    void Handle(TEvHttpProxy::TEvReportSensors::TPtr event, const NActors::TActorContext& ctx) {
        ctx.Send(event->Forward(Owner));
    }
};

NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller) {
    return new TAcceptorActor(owner, poller);
}

}