#include <library/cpp/actors/core/events.h> #include <library/cpp/monlib/metrics/metric_registry.h> #include "http_proxy.h" namespace NHttp { class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpConfig { public: IActor* AddListeningPort(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { IActor* listeningSocket = CreateHttpAcceptorActor(ctx.SelfID, Poller); TActorId acceptorId = ctx.Register(listeningSocket); ctx.Send(event->Forward(acceptorId)); Acceptors.emplace_back(acceptorId); return listeningSocket; } IActor* AddOutgoingConnection(const TString& address, bool secure, const NActors::TActorContext& ctx) { IActor* connectionSocket = CreateOutgoingConnectionActor(ctx.SelfID, address, secure, Poller); TActorId connectionId = ctx.Register(connectionSocket); Connections.emplace(connectionId); return connectionSocket; } void Bootstrap(const NActors::TActorContext& ctx) { Poller = ctx.Register(NActors::CreatePollerActor()); Become(&THttpProxy::StateWork); } THttpProxy(NMonitoring::TMetricRegistry& sensors) : Sensors(sensors) {} protected: STFUNC(StateWork) { switch (ev->GetTypeRewrite()) { HFunc(TEvHttpProxy::TEvAddListeningPort, Handle); HFunc(TEvHttpProxy::TEvRegisterHandler, Handle); HFunc(TEvHttpProxy::TEvHttpIncomingRequest, Handle); HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, Handle); HFunc(TEvHttpProxy::TEvHttpIncomingResponse, Handle); HFunc(TEvHttpProxy::TEvHttpOutgoingResponse, Handle); HFunc(TEvHttpProxy::TEvHttpAcceptorClosed, Handle); HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle); HFunc(TEvHttpProxy::TEvResolveHostRequest, Handle); HFunc(TEvHttpProxy::TEvReportSensors, Handle); HFunc(NActors::TEvents::TEvPoison, Handle); } } void PassAway() override { Send(Poller, new NActors::TEvents::TEvPoisonPill()); for (const NActors::TActorId& connection : Connections) { Send(connection, new NActors::TEvents::TEvPoisonPill()); } for (const NActors::TActorId& acceptor : Acceptors) { Send(acceptor, new NActors::TEvents::TEvPoisonPill()); } NActors::TActorBootstrapped<THttpProxy>::PassAway(); } void Handle(TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) { TStringBuf url = event->Get()->Request->URL.Before('?'); THashMap<TString, TActorId>::iterator it; while (!url.empty()) { it = Handlers.find(url); if (it != Handlers.end()) { ctx.Send(event->Forward(it->second)); return; } else { if (url.EndsWith('/')) { url.Trunc(url.size() - 1); } size_t pos = url.rfind('/'); if (pos == TStringBuf::npos) { break; } else { url = url.substr(0, pos + 1); } } } ctx.Send(event->Sender, new TEvHttpProxy::TEvHttpOutgoingResponse(event->Get()->Request->CreateResponseNotFound())); } void Handle(TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) { Y_UNUSED(event); Y_UNUSED(ctx); Y_FAIL("This event shouldn't be there, it should go to the http connection owner directly"); } void Handle(TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) { Y_UNUSED(event); Y_UNUSED(ctx); Y_FAIL("This event shouldn't be there, it should go to the http connection directly"); } void Handle(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { TStringBuf host(event->Get()->Request->Host); bool secure(event->Get()->Request->Secure); NActors::IActor* actor = AddOutgoingConnection(TString(host), secure, ctx); ctx.Send(event->Forward(actor->SelfId())); } void Handle(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { AddListeningPort(event, ctx); } void Handle(TEvHttpProxy::TEvHttpAcceptorClosed::TPtr event, const NActors::TActorContext&) { for (auto it = Acceptors.begin(); it != Acceptors.end(); ++it) { if (*it == event->Get()->ConnectionID) { Acceptors.erase(it); break; } } } void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) { Connections.erase(event->Get()->ConnectionID); } void Handle(TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext&) { Handlers[event->Get()->Path] = event->Get()->Handler; } void Handle(TEvHttpProxy::TEvResolveHostRequest::TPtr event, const NActors::TActorContext& ctx) { const TString& host(event->Get()->Host); auto it = Hosts.find(host); if (it == Hosts.end() || it->second.DeadlineTime > ctx.Now()) { TString addressPart; TIpPort portPart = 0; CrackAddress(host, addressPart, portPart); if (IsIPv6(addressPart)) { TSockAddrInet6 address(addressPart.c_str(), portPart); if (it == Hosts.end()) { it = Hosts.emplace(host, THostEntry()).first; } it->second.Address = address; it->second.DeadlineTime = ctx.Now() + HostsTimeToLive; } else { // TODO(xenoxeno): move to another, possible blocking actor try { const NDns::TResolvedHost* result = NDns::CachedResolve(NDns::TResolveInfo(addressPart, portPart)); if (result != nullptr) { auto pAddr = result->Addr.Begin(); while (pAddr != result->Addr.End() && pAddr->ai_family != AF_INET6) { ++pAddr; } if (pAddr == result->Addr.End()) { ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse("Invalid address family resolved")); return; } TSockAddrInet6 address = {}; static_cast<sockaddr_in6&>(address) = *reinterpret_cast<sockaddr_in6*>(pAddr->ai_addr); LOG_DEBUG_S(ctx, HttpLog, "Host " << host << " resolved to " << address.ToString()); if (it == Hosts.end()) { it = Hosts.emplace(host, THostEntry()).first; } it->second.Address = address; it->second.DeadlineTime = ctx.Now() + HostsTimeToLive; } else { ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse("Error resolving host")); return; } } catch (const yexception& e) { ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse(e.what())); return; } } } ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse(it->first, it->second.Address)); } void Handle(TEvHttpProxy::TEvReportSensors::TPtr event, const NActors::TActorContext&) { const TEvHttpProxy::TEvReportSensors& sensors(*event->Get()); const static TString urlNotFound = "not-found"; const TString& url = (sensors.Status == "404" ? urlNotFound : sensors.Url); Sensors.Rate({ {"sensor", "count"}, {"direction", sensors.Direction}, {"peer", sensors.Host}, {"url", url}, {"status", sensors.Status} })->Inc(); Sensors.HistogramRate({ {"sensor", "time_us"}, {"direction", sensors.Direction}, {"peer", sensors.Host}, {"url", url}, {"status", sensors.Status} }, NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MicroSeconds()); Sensors.HistogramRate({ {"sensor", "time_ms"}, {"direction", sensors.Direction}, {"peer", sensors.Host}, {"url", url}, {"status", sensors.Status} }, NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MilliSeconds()); } void Handle(NActors::TEvents::TEvPoison::TPtr, const NActors::TActorContext&) { PassAway(); } NActors::TActorId Poller; TVector<TActorId> Acceptors; struct THostEntry { TSockAddrInet6 Address; TInstant DeadlineTime; }; static constexpr TDuration HostsTimeToLive = TDuration::Seconds(60); THashMap<TString, THostEntry> Hosts; THashMap<TString, TActorId> Handlers; THashSet<TActorId> Connections; // outgoing NMonitoring::TMetricRegistry& Sensors; }; TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingRequestPtr& request, const THttpIncomingResponsePtr& response) { return new TEvHttpProxy::TEvReportSensors( "out", request->Host, request->URL.Before('?'), response ? response->Status : "504", TDuration::Seconds(std::abs(request->Timer.Passed())) ); } TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingRequestPtr& request, const THttpOutgoingResponsePtr& response) { return new TEvHttpProxy::TEvReportSensors( "in", request->Host, request->URL.Before('?'), response->Status, TDuration::Seconds(std::abs(request->Timer.Passed())) ); } NActors::IActor* CreateHttpProxy(NMonitoring::TMetricRegistry& sensors) { return new THttpProxy(sensors); } bool IsIPv6(const TString& host) { return host.find_first_not_of(":0123456789abcdef") == TString::npos; } bool CrackURL(TStringBuf url, TStringBuf& scheme, TStringBuf& host, TStringBuf& uri) { url.TrySplit("://", scheme, url); auto pos = url.find('/'); if (pos == TStringBuf::npos) { host = url; } else { host = url.substr(0, pos); uri = url.substr(pos); } return true; } void CrackAddress(const TString& address, TString& hostname, TIpPort& port) { size_t first_colon_pos = address.find(':'); if (first_colon_pos != TString::npos) { size_t last_colon_pos = address.rfind(':'); if (last_colon_pos == first_colon_pos) { // only one colon, simple case port = FromStringWithDefault<TIpPort>(address.substr(first_colon_pos + 1), 0); hostname = address.substr(0, first_colon_pos); } else { // ipv6? size_t closing_bracket_pos = address.rfind(']'); if (closing_bracket_pos == TString::npos || closing_bracket_pos > last_colon_pos) { // whole address is ipv6 host hostname = address; } else { port = FromStringWithDefault<TIpPort>(address.substr(last_colon_pos + 1), 0); hostname = address.substr(0, last_colon_pos); } if (hostname.StartsWith('[') && hostname.EndsWith(']')) { hostname = hostname.substr(1, hostname.size() - 2); } } } else { hostname = address; } } void TrimBegin(TStringBuf& target, char delim) { while (!target.empty() && *target.begin() == delim) { target.Skip(1); } } void TrimEnd(TStringBuf& target, char delim) { while (!target.empty() && target.back() == delim) { target.Trunc(target.size() - 1); } } void Trim(TStringBuf& target, char delim) { TrimBegin(target, delim); TrimEnd(target, delim); } void TrimEnd(TString& target, char delim) { while (!target.empty() && target.back() == delim) { target.resize(target.size() - 1); } } }