aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/http/http_proxy_acceptor.cpp
blob: f9ee1d8032ec6717443e35eb371dd81127bcc80d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#include <util/network/sock.h>
#include "http_proxy.h"
#include "http_proxy_ssl.h"

namespace NHttp {

class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfig {
public:
    using TBase = NActors::TActor<TAcceptorActor>;
    const TActorId Owner;
    const TActorId Poller;
    TIntrusivePtr<TSocketDescriptor> Socket;
    NActors::TPollerToken::TPtr PollerToken;
    THashSet<TActorId> Connections;
    TDeque<THttpIncomingRequestPtr> RecycledRequests;
    std::shared_ptr<TPrivateEndpointInfo> Endpoint;

    TAcceptorActor(const TActorId& owner, const TActorId& poller)
        : NActors::TActor<TAcceptorActor>(&TAcceptorActor::StateInit)
        , Owner(owner)
        , Poller(poller)
    {
    }

    static constexpr char ActorName[] = "HTTP_ACCEPTOR_ACTOR";

protected:
    STFUNC(StateListening) {
        switch (ev->GetTypeRewrite()) {
            HFunc(NActors::TEvPollerRegisterResult, Handle);
            HFunc(NActors::TEvPollerReady, Handle);
            HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle);
            HFunc(TEvHttpProxy::TEvReportSensors, Handle);
        }
    }

    STFUNC(StateInit) {
        switch (ev->GetTypeRewrite()) {
            HFunc(TEvHttpProxy::TEvAddListeningPort, HandleInit);
        }
    }

    void HandleInit(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
        TString address = event->Get()->Address;
        ui16 port = event->Get()->Port;
        Socket = new TSocketDescriptor(SocketType::GuessAddressFamily(address));
        // for unit tests :(
        SetSockOpt(Socket->Socket, SOL_SOCKET, SO_REUSEADDR, (int)true);
#ifdef SO_REUSEPORT
        SetSockOpt(Socket->Socket, SOL_SOCKET, SO_REUSEPORT, (int)true);
#endif
        SocketAddressType bindAddress(Socket->Socket.MakeAddress(address, port));
        Endpoint = std::make_shared<TPrivateEndpointInfo>(event->Get()->CompressContentTypes);
        Endpoint->Owner = ctx.SelfID;
        Endpoint->Proxy = Owner;
        Endpoint->WorkerName = event->Get()->WorkerName;
        Endpoint->Secure = event->Get()->Secure;
        int err = 0;
        if (Endpoint->Secure) {
            if (!event->Get()->SslCertificatePem.empty()) {
                Endpoint->SecureContext = TSslHelpers::CreateServerContext(event->Get()->SslCertificatePem);
            } else {
                Endpoint->SecureContext = TSslHelpers::CreateServerContext(event->Get()->CertificateFile, event->Get()->PrivateKeyFile);
            }
            if (Endpoint->SecureContext == nullptr) {
                err = -1;
                LOG_WARN_S(ctx, HttpLog, "Failed to construct server security context");
            }
        }
        if (err == 0) {
            err = Socket->Socket.Bind(bindAddress.get());
            if (err != 0) {
                LOG_WARN_S(
                    ctx,
                    HttpLog,
                    "Failed to bind " << bindAddress->ToString()
                    << ", code: " << err);
            }
        }
        TStringBuf schema = Endpoint->Secure ? "https://" : "http://";
        if (err == 0) {
            err = Socket->Socket.Listen(LISTEN_QUEUE);
            if (err == 0) {
                LOG_INFO_S(ctx, HttpLog, "Listening on " << schema << bindAddress->ToString());
                SetNonBlock(Socket->Socket);
                ctx.Send(Poller, new NActors::TEvPollerRegister(Socket, SelfId(), SelfId()));
                TBase::Become(&TAcceptorActor::StateListening);
                ctx.Send(event->Sender, new TEvHttpProxy::TEvConfirmListen(bindAddress, Endpoint), 0, event->Cookie);
                return;
            } else {
                LOG_WARN_S(
                    ctx,
                    HttpLog,
                    "Failed to listen on " << schema << bindAddress->ToString()
                    << ", code: " << err);
            }
        }
        LOG_WARN_S(ctx, HttpLog, "Failed to init - retrying...");
        ctx.ExecutorThread.Schedule(TDuration::Seconds(1), event.Release());
    }

    void Die(const NActors::TActorContext& ctx) override {
        ctx.Send(Owner, new TEvHttpProxy::TEvHttpAcceptorClosed(ctx.SelfID));
        for (const NActors::TActorId& connection : Connections) {
            ctx.Send(connection, new NActors::TEvents::TEvPoisonPill());
        }
    }

    void Handle(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& /*ctx*/) {
        PollerToken = std::move(ev->Get()->PollerToken);
        PollerToken->Request(true, false); // request read polling
    }

    void Handle(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) {
        for (;;) {
            SocketAddressType addr;
            std::optional<SocketType> s = Socket->Socket.Accept(addr);
            if (!s) {
                break;
            }
            TIntrusivePtr<TSocketDescriptor> socket = new TSocketDescriptor(std::move(s).value());
            NActors::IActor* connectionSocket = nullptr;
            if (RecycledRequests.empty()) {
                connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr);
            } else {
                connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr, std::move(RecycledRequests.front()));
                RecycledRequests.pop_front();
            }
            NActors::TActorId connectionId = ctx.Register(connectionSocket);
            ctx.Send(Poller, new NActors::TEvPollerRegister(socket, connectionId, connectionId));
            Connections.emplace(connectionId);
        }
        int err = errno;
        if (err == EAGAIN || err == EWOULDBLOCK) { // request poller for further connection polling
            Y_ABORT_UNLESS(PollerToken);
            PollerToken->Request(true, false);
        }
    }

    void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) {
        Connections.erase(event->Get()->ConnectionID);
        for (auto& req : event->Get()->RecycledRequests) {
            req->Clear();
            RecycledRequests.push_back(std::move(req));
        }
    }

    void Handle(TEvHttpProxy::TEvReportSensors::TPtr event, const NActors::TActorContext& ctx) {
        ctx.Send(event->Forward(Owner));
    }
};

NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller) {
    return new TAcceptorActor(owner, poller);
}

}