diff options
author | Alexey Efimov <xeno@prnwatch.com> | 2022-02-10 16:49:41 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:41 +0300 |
commit | 26e0e4fb5e5cd6b4d7f4c21f9fcd7978891bf946 (patch) | |
tree | d34555f21d4d9f94f84d460e55b77d7eb41a953c /library/cpp/actors/http/http_proxy.cpp | |
parent | ca3252a147a429eac4ba8221857493c58dcd09b5 (diff) | |
download | ydb-26e0e4fb5e5cd6b4d7f4c21f9fcd7978891bf946.tar.gz |
Restoring authorship annotation for Alexey Efimov <xeno@prnwatch.com>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/http/http_proxy.cpp')
-rw-r--r-- | library/cpp/actors/http/http_proxy.cpp | 564 |
1 files changed, 282 insertions, 282 deletions
diff --git a/library/cpp/actors/http/http_proxy.cpp b/library/cpp/actors/http/http_proxy.cpp index 36c6855d93..2217838624 100644 --- a/library/cpp/actors/http/http_proxy.cpp +++ b/library/cpp/actors/http/http_proxy.cpp @@ -1,314 +1,314 @@ #include <library/cpp/actors/core/events.h> #include <library/cpp/monlib/metrics/metric_registry.h> -#include "http_proxy.h" - -namespace NHttp { - -class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpConfig { -public: - IActor* AddListeningPort(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { - IActor* listeningSocket = CreateHttpAcceptorActor(ctx.SelfID, Poller); +#include "http_proxy.h" + +namespace NHttp { + +class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpConfig { +public: + IActor* AddListeningPort(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { + IActor* listeningSocket = CreateHttpAcceptorActor(ctx.SelfID, Poller); TActorId acceptorId = ctx.Register(listeningSocket); - ctx.Send(event->Forward(acceptorId)); - Acceptors.emplace_back(acceptorId); - return listeningSocket; - } - - IActor* AddOutgoingConnection(const TString& address, bool secure, const NActors::TActorContext& ctx) { - IActor* connectionSocket = CreateOutgoingConnectionActor(ctx.SelfID, address, secure, Poller); + ctx.Send(event->Forward(acceptorId)); + Acceptors.emplace_back(acceptorId); + return listeningSocket; + } + + IActor* AddOutgoingConnection(const TString& address, bool secure, const NActors::TActorContext& ctx) { + IActor* connectionSocket = CreateOutgoingConnectionActor(ctx.SelfID, address, secure, Poller); TActorId connectionId = ctx.Register(connectionSocket); - Connections.emplace(connectionId); - return connectionSocket; - } - - void Bootstrap(const NActors::TActorContext& ctx) { - Poller = ctx.Register(NActors::CreatePollerActor()); - Become(&THttpProxy::StateWork); - } - + Connections.emplace(connectionId); + return connectionSocket; + } + + void Bootstrap(const NActors::TActorContext& ctx) { + Poller = ctx.Register(NActors::CreatePollerActor()); + Become(&THttpProxy::StateWork); + } + THttpProxy(NMonitoring::TMetricRegistry& sensors) - : Sensors(sensors) - {} - -protected: - STFUNC(StateWork) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvHttpProxy::TEvAddListeningPort, Handle); - HFunc(TEvHttpProxy::TEvRegisterHandler, Handle); - HFunc(TEvHttpProxy::TEvHttpIncomingRequest, Handle); - HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, Handle); - HFunc(TEvHttpProxy::TEvHttpIncomingResponse, Handle); - HFunc(TEvHttpProxy::TEvHttpOutgoingResponse, Handle); - HFunc(TEvHttpProxy::TEvHttpAcceptorClosed, Handle); - HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle); - HFunc(TEvHttpProxy::TEvResolveHostRequest, Handle); - HFunc(TEvHttpProxy::TEvReportSensors, Handle); + : Sensors(sensors) + {} + +protected: + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvHttpProxy::TEvAddListeningPort, Handle); + HFunc(TEvHttpProxy::TEvRegisterHandler, Handle); + HFunc(TEvHttpProxy::TEvHttpIncomingRequest, Handle); + HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, Handle); + HFunc(TEvHttpProxy::TEvHttpIncomingResponse, Handle); + HFunc(TEvHttpProxy::TEvHttpOutgoingResponse, Handle); + HFunc(TEvHttpProxy::TEvHttpAcceptorClosed, Handle); + HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle); + HFunc(TEvHttpProxy::TEvResolveHostRequest, Handle); + HFunc(TEvHttpProxy::TEvReportSensors, Handle); HFunc(NActors::TEvents::TEvPoison, Handle); - } - } - + } + } + void PassAway() override { Send(Poller, new NActors::TEvents::TEvPoisonPill()); for (const NActors::TActorId& connection : Connections) { Send(connection, new NActors::TEvents::TEvPoisonPill()); - } + } for (const NActors::TActorId& acceptor : Acceptors) { Send(acceptor, new NActors::TEvents::TEvPoisonPill()); - } + } NActors::TActorBootstrapped<THttpProxy>::PassAway(); - } - - void Handle(TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) { - TStringBuf url = event->Get()->Request->URL.Before('?'); + } + + void Handle(TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) { + TStringBuf url = event->Get()->Request->URL.Before('?'); THashMap<TString, TActorId>::iterator it; - while (!url.empty()) { - it = Handlers.find(url); - if (it != Handlers.end()) { - ctx.Send(event->Forward(it->second)); - return; - } else { - if (url.EndsWith('/')) { - url.Trunc(url.size() - 1); - } - size_t pos = url.rfind('/'); - if (pos == TStringBuf::npos) { - break; - } else { - url = url.substr(0, pos + 1); - } - } - } - ctx.Send(event->Sender, new TEvHttpProxy::TEvHttpOutgoingResponse(event->Get()->Request->CreateResponseNotFound())); - } - - void Handle(TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) { - Y_UNUSED(event); - Y_UNUSED(ctx); - Y_FAIL("This event shouldn't be there, it should go to the http connection owner directly"); - } - - void Handle(TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) { - Y_UNUSED(event); - Y_UNUSED(ctx); - Y_FAIL("This event shouldn't be there, it should go to the http connection directly"); - } - - void Handle(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { - TStringBuf host(event->Get()->Request->Host); - bool secure(event->Get()->Request->Secure); - NActors::IActor* actor = AddOutgoingConnection(TString(host), secure, ctx); - ctx.Send(event->Forward(actor->SelfId())); - } - - void Handle(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { - AddListeningPort(event, ctx); - } - - void Handle(TEvHttpProxy::TEvHttpAcceptorClosed::TPtr event, const NActors::TActorContext&) { - for (auto it = Acceptors.begin(); it != Acceptors.end(); ++it) { - if (*it == event->Get()->ConnectionID) { - Acceptors.erase(it); - break; - } - } - } - - void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) { - Connections.erase(event->Get()->ConnectionID); - } - - void Handle(TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext&) { - Handlers[event->Get()->Path] = event->Get()->Handler; - } - - void Handle(TEvHttpProxy::TEvResolveHostRequest::TPtr event, const NActors::TActorContext& ctx) { - const TString& host(event->Get()->Host); - auto it = Hosts.find(host); - if (it == Hosts.end() || it->second.DeadlineTime > ctx.Now()) { - TString addressPart; - TIpPort portPart = 0; - CrackAddress(host, addressPart, portPart); - if (IsIPv6(addressPart)) { - TSockAddrInet6 address(addressPart.c_str(), portPart); - if (it == Hosts.end()) { - it = Hosts.emplace(host, THostEntry()).first; - } - it->second.Address = address; - it->second.DeadlineTime = ctx.Now() + HostsTimeToLive; - } else { - // TODO(xenoxeno): move to another, possible blocking actor - try { - const NDns::TResolvedHost* result = NDns::CachedResolve(NDns::TResolveInfo(addressPart, portPart)); - if (result != nullptr) { - auto pAddr = result->Addr.Begin(); - while (pAddr != result->Addr.End() && pAddr->ai_family != AF_INET6) { - ++pAddr; - } - if (pAddr == result->Addr.End()) { - ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse("Invalid address family resolved")); - return; - } - TSockAddrInet6 address = {}; - static_cast<sockaddr_in6&>(address) = *reinterpret_cast<sockaddr_in6*>(pAddr->ai_addr); - LOG_DEBUG_S(ctx, HttpLog, "Host " << host << " resolved to " << address.ToString()); - if (it == Hosts.end()) { - it = Hosts.emplace(host, THostEntry()).first; - } - it->second.Address = address; - it->second.DeadlineTime = ctx.Now() + HostsTimeToLive; - } else { - ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse("Error resolving host")); - return; - } - } - catch (const yexception& e) { - ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse(e.what())); - return; - } - } - } - ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse(it->first, it->second.Address)); - } - - void Handle(TEvHttpProxy::TEvReportSensors::TPtr event, const NActors::TActorContext&) { - const TEvHttpProxy::TEvReportSensors& sensors(*event->Get()); + while (!url.empty()) { + it = Handlers.find(url); + if (it != Handlers.end()) { + ctx.Send(event->Forward(it->second)); + return; + } else { + if (url.EndsWith('/')) { + url.Trunc(url.size() - 1); + } + size_t pos = url.rfind('/'); + if (pos == TStringBuf::npos) { + break; + } else { + url = url.substr(0, pos + 1); + } + } + } + ctx.Send(event->Sender, new TEvHttpProxy::TEvHttpOutgoingResponse(event->Get()->Request->CreateResponseNotFound())); + } + + void Handle(TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) { + Y_UNUSED(event); + Y_UNUSED(ctx); + Y_FAIL("This event shouldn't be there, it should go to the http connection owner directly"); + } + + void Handle(TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) { + Y_UNUSED(event); + Y_UNUSED(ctx); + Y_FAIL("This event shouldn't be there, it should go to the http connection directly"); + } + + void Handle(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { + TStringBuf host(event->Get()->Request->Host); + bool secure(event->Get()->Request->Secure); + NActors::IActor* actor = AddOutgoingConnection(TString(host), secure, ctx); + ctx.Send(event->Forward(actor->SelfId())); + } + + void Handle(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { + AddListeningPort(event, ctx); + } + + void Handle(TEvHttpProxy::TEvHttpAcceptorClosed::TPtr event, const NActors::TActorContext&) { + for (auto it = Acceptors.begin(); it != Acceptors.end(); ++it) { + if (*it == event->Get()->ConnectionID) { + Acceptors.erase(it); + break; + } + } + } + + void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) { + Connections.erase(event->Get()->ConnectionID); + } + + void Handle(TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext&) { + Handlers[event->Get()->Path] = event->Get()->Handler; + } + + void Handle(TEvHttpProxy::TEvResolveHostRequest::TPtr event, const NActors::TActorContext& ctx) { + const TString& host(event->Get()->Host); + auto it = Hosts.find(host); + if (it == Hosts.end() || it->second.DeadlineTime > ctx.Now()) { + TString addressPart; + TIpPort portPart = 0; + CrackAddress(host, addressPart, portPart); + if (IsIPv6(addressPart)) { + TSockAddrInet6 address(addressPart.c_str(), portPart); + if (it == Hosts.end()) { + it = Hosts.emplace(host, THostEntry()).first; + } + it->second.Address = address; + it->second.DeadlineTime = ctx.Now() + HostsTimeToLive; + } else { + // TODO(xenoxeno): move to another, possible blocking actor + try { + const NDns::TResolvedHost* result = NDns::CachedResolve(NDns::TResolveInfo(addressPart, portPart)); + if (result != nullptr) { + auto pAddr = result->Addr.Begin(); + while (pAddr != result->Addr.End() && pAddr->ai_family != AF_INET6) { + ++pAddr; + } + if (pAddr == result->Addr.End()) { + ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse("Invalid address family resolved")); + return; + } + TSockAddrInet6 address = {}; + static_cast<sockaddr_in6&>(address) = *reinterpret_cast<sockaddr_in6*>(pAddr->ai_addr); + LOG_DEBUG_S(ctx, HttpLog, "Host " << host << " resolved to " << address.ToString()); + if (it == Hosts.end()) { + it = Hosts.emplace(host, THostEntry()).first; + } + it->second.Address = address; + it->second.DeadlineTime = ctx.Now() + HostsTimeToLive; + } else { + ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse("Error resolving host")); + return; + } + } + catch (const yexception& e) { + ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse(e.what())); + return; + } + } + } + ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse(it->first, it->second.Address)); + } + + void Handle(TEvHttpProxy::TEvReportSensors::TPtr event, const NActors::TActorContext&) { + const TEvHttpProxy::TEvReportSensors& sensors(*event->Get()); const static TString urlNotFound = "not-found"; const TString& url = (sensors.Status == "404" ? urlNotFound : sensors.Url); - Sensors.Rate({ - {"sensor", "count"}, - {"direction", sensors.Direction}, - {"peer", sensors.Host}, + Sensors.Rate({ + {"sensor", "count"}, + {"direction", sensors.Direction}, + {"peer", sensors.Host}, {"url", url}, - {"status", sensors.Status} - })->Inc(); - Sensors.HistogramRate({ - {"sensor", "time_us"}, - {"direction", sensors.Direction}, - {"peer", sensors.Host}, + {"status", sensors.Status} + })->Inc(); + Sensors.HistogramRate({ + {"sensor", "time_us"}, + {"direction", sensors.Direction}, + {"peer", sensors.Host}, {"url", url}, - {"status", sensors.Status} - }, - NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MicroSeconds()); - Sensors.HistogramRate({ - {"sensor", "time_ms"}, - {"direction", sensors.Direction}, - {"peer", sensors.Host}, + {"status", sensors.Status} + }, + NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MicroSeconds()); + Sensors.HistogramRate({ + {"sensor", "time_ms"}, + {"direction", sensors.Direction}, + {"peer", sensors.Host}, {"url", url}, - {"status", sensors.Status} - }, - NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MilliSeconds()); - } - + {"status", sensors.Status} + }, + NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MilliSeconds()); + } + void Handle(NActors::TEvents::TEvPoison::TPtr, const NActors::TActorContext&) { PassAway(); } NActors::TActorId Poller; TVector<TActorId> Acceptors; - - struct THostEntry { - TSockAddrInet6 Address; - TInstant DeadlineTime; - }; - - static constexpr TDuration HostsTimeToLive = TDuration::Seconds(60); - - THashMap<TString, THostEntry> Hosts; + + struct THostEntry { + TSockAddrInet6 Address; + TInstant DeadlineTime; + }; + + static constexpr TDuration HostsTimeToLive = TDuration::Seconds(60); + + THashMap<TString, THostEntry> Hosts; THashMap<TString, TActorId> Handlers; THashSet<TActorId> Connections; // outgoing NMonitoring::TMetricRegistry& Sensors; -}; - -TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingRequestPtr& request, const THttpIncomingResponsePtr& response) { - return new TEvHttpProxy::TEvReportSensors( - "out", - request->Host, - request->URL.Before('?'), - response ? response->Status : "504", +}; + +TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingRequestPtr& request, const THttpIncomingResponsePtr& response) { + return new TEvHttpProxy::TEvReportSensors( + "out", + request->Host, + request->URL.Before('?'), + response ? response->Status : "504", TDuration::Seconds(std::abs(request->Timer.Passed())) - ); -} - -TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingRequestPtr& request, const THttpOutgoingResponsePtr& response) { - return new TEvHttpProxy::TEvReportSensors( - "in", - request->Host, - request->URL.Before('?'), - response->Status, + ); +} + +TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingRequestPtr& request, const THttpOutgoingResponsePtr& response) { + return new TEvHttpProxy::TEvReportSensors( + "in", + request->Host, + request->URL.Before('?'), + response->Status, TDuration::Seconds(std::abs(request->Timer.Passed())) - ); -} - + ); +} + NActors::IActor* CreateHttpProxy(NMonitoring::TMetricRegistry& sensors) { - return new THttpProxy(sensors); -} - -bool IsIPv6(const TString& host) { - return host.find_first_not_of(":0123456789abcdef") == TString::npos; -} - -bool CrackURL(TStringBuf url, TStringBuf& scheme, TStringBuf& host, TStringBuf& uri) { - url.TrySplit("://", scheme, url); - auto pos = url.find('/'); - if (pos == TStringBuf::npos) { - host = url; - } else { - host = url.substr(0, pos); - uri = url.substr(pos); - } - return true; -} - -void CrackAddress(const TString& address, TString& hostname, TIpPort& port) { - size_t first_colon_pos = address.find(':'); - if (first_colon_pos != TString::npos) { - size_t last_colon_pos = address.rfind(':'); - if (last_colon_pos == first_colon_pos) { - // only one colon, simple case - port = FromStringWithDefault<TIpPort>(address.substr(first_colon_pos + 1), 0); - hostname = address.substr(0, first_colon_pos); - } else { - // ipv6? - size_t closing_bracket_pos = address.rfind(']'); - if (closing_bracket_pos == TString::npos || closing_bracket_pos > last_colon_pos) { - // whole address is ipv6 host - hostname = address; - } else { - port = FromStringWithDefault<TIpPort>(address.substr(last_colon_pos + 1), 0); - hostname = address.substr(0, last_colon_pos); - } - if (hostname.StartsWith('[') && hostname.EndsWith(']')) { - hostname = hostname.substr(1, hostname.size() - 2); - } - } - } else { - hostname = address; - } -} - - -void TrimBegin(TStringBuf& target, char delim) { - while (!target.empty() && *target.begin() == delim) { - target.Skip(1); - } -} - -void TrimEnd(TStringBuf& target, char delim) { - while (!target.empty() && target.back() == delim) { - target.Trunc(target.size() - 1); - } -} - -void Trim(TStringBuf& target, char delim) { - TrimBegin(target, delim); - TrimEnd(target, delim); -} - -void TrimEnd(TString& target, char delim) { - while (!target.empty() && target.back() == delim) { - target.resize(target.size() - 1); - } -} - -} + return new THttpProxy(sensors); +} + +bool IsIPv6(const TString& host) { + return host.find_first_not_of(":0123456789abcdef") == TString::npos; +} + +bool CrackURL(TStringBuf url, TStringBuf& scheme, TStringBuf& host, TStringBuf& uri) { + url.TrySplit("://", scheme, url); + auto pos = url.find('/'); + if (pos == TStringBuf::npos) { + host = url; + } else { + host = url.substr(0, pos); + uri = url.substr(pos); + } + return true; +} + +void CrackAddress(const TString& address, TString& hostname, TIpPort& port) { + size_t first_colon_pos = address.find(':'); + if (first_colon_pos != TString::npos) { + size_t last_colon_pos = address.rfind(':'); + if (last_colon_pos == first_colon_pos) { + // only one colon, simple case + port = FromStringWithDefault<TIpPort>(address.substr(first_colon_pos + 1), 0); + hostname = address.substr(0, first_colon_pos); + } else { + // ipv6? + size_t closing_bracket_pos = address.rfind(']'); + if (closing_bracket_pos == TString::npos || closing_bracket_pos > last_colon_pos) { + // whole address is ipv6 host + hostname = address; + } else { + port = FromStringWithDefault<TIpPort>(address.substr(last_colon_pos + 1), 0); + hostname = address.substr(0, last_colon_pos); + } + if (hostname.StartsWith('[') && hostname.EndsWith(']')) { + hostname = hostname.substr(1, hostname.size() - 2); + } + } + } else { + hostname = address; + } +} + + +void TrimBegin(TStringBuf& target, char delim) { + while (!target.empty() && *target.begin() == delim) { + target.Skip(1); + } +} + +void TrimEnd(TStringBuf& target, char delim) { + while (!target.empty() && target.back() == delim) { + target.Trunc(target.size() - 1); + } +} + +void Trim(TStringBuf& target, char delim) { + TrimBegin(target, delim); + TrimEnd(target, delim); +} + +void TrimEnd(TString& target, char delim) { + while (!target.empty() && target.back() == delim) { + target.resize(target.size() - 1); + } +} + +} |