diff options
author | Alexey Efimov <xeno@prnwatch.com> | 2022-02-10 16:49:41 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:41 +0300 |
commit | 26e0e4fb5e5cd6b4d7f4c21f9fcd7978891bf946 (patch) | |
tree | d34555f21d4d9f94f84d460e55b77d7eb41a953c /library/cpp/actors/http | |
parent | ca3252a147a429eac4ba8221857493c58dcd09b5 (diff) | |
download | ydb-26e0e4fb5e5cd6b4d7f4c21f9fcd7978891bf946.tar.gz |
Restoring authorship annotation for Alexey Efimov <xeno@prnwatch.com>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/http')
-rw-r--r-- | library/cpp/actors/http/http.cpp | 1188 | ||||
-rw-r--r-- | library/cpp/actors/http/http.h | 1394 | ||||
-rw-r--r-- | library/cpp/actors/http/http_cache.cpp | 1178 | ||||
-rw-r--r-- | library/cpp/actors/http/http_cache.h | 50 | ||||
-rw-r--r-- | library/cpp/actors/http/http_config.h | 34 | ||||
-rw-r--r-- | library/cpp/actors/http/http_proxy.cpp | 564 | ||||
-rw-r--r-- | library/cpp/actors/http/http_proxy.h | 424 | ||||
-rw-r--r-- | library/cpp/actors/http/http_proxy_acceptor.cpp | 210 | ||||
-rw-r--r-- | library/cpp/actors/http/http_proxy_incoming.cpp | 376 | ||||
-rw-r--r-- | library/cpp/actors/http/http_proxy_outgoing.cpp | 450 | ||||
-rw-r--r-- | library/cpp/actors/http/http_proxy_sock_impl.h | 472 | ||||
-rw-r--r-- | library/cpp/actors/http/http_proxy_ssl.h | 148 | ||||
-rw-r--r-- | library/cpp/actors/http/http_static.cpp | 176 | ||||
-rw-r--r-- | library/cpp/actors/http/http_static.h | 16 | ||||
-rw-r--r-- | library/cpp/actors/http/http_ut.cpp | 632 | ||||
-rw-r--r-- | library/cpp/actors/http/ut/ya.make | 26 | ||||
-rw-r--r-- | library/cpp/actors/http/ya.make | 56 |
17 files changed, 3697 insertions, 3697 deletions
diff --git a/library/cpp/actors/http/http.cpp b/library/cpp/actors/http/http.cpp index 7125f9d8b0..90fdd161ed 100644 --- a/library/cpp/actors/http/http.cpp +++ b/library/cpp/actors/http/http.cpp @@ -1,116 +1,116 @@ -#include "http.h" +#include "http.h" #include <library/cpp/string_utils/quote/quote.h> - -inline TStringBuf operator +(TStringBuf l, TStringBuf r) { - if (l.empty()) { - return r; - } - if (r.empty()) { - return l; - } - if (l.end() == r.begin()) { - return TStringBuf(l.data(), l.size() + r.size()); - } - if (r.end() == l.begin()) { - return TStringBuf(r.data(), l.size() + r.size()); - } - Y_FAIL("oops"); - return TStringBuf(); -} - -inline TStringBuf operator +=(TStringBuf& l, TStringBuf r) { - return l = l + r; -} - -namespace NHttp { - -template <> TStringBuf THttpRequest::GetName<&THttpRequest::Host>() { return "Host"; } -template <> TStringBuf THttpRequest::GetName<&THttpRequest::Accept>() { return "Accept"; } -template <> TStringBuf THttpRequest::GetName<&THttpRequest::Connection>() { return "Connection"; } -template <> TStringBuf THttpRequest::GetName<&THttpRequest::ContentType>() { return "Content-Type"; } -template <> TStringBuf THttpRequest::GetName<&THttpRequest::ContentLength>() { return "Content-Length"; } -template <> TStringBuf THttpRequest::GetName<&THttpRequest::TransferEncoding>() { return "Transfer-Encoding"; } - -const TMap<TStringBuf, TStringBuf THttpRequest::*, TLessNoCase> THttpRequest::HeadersLocation = { - { THttpRequest::GetName<&THttpRequest::Host>(), &THttpRequest::Host }, - { THttpRequest::GetName<&THttpRequest::Accept>(), &THttpRequest::Accept }, - { THttpRequest::GetName<&THttpRequest::Connection>(), &THttpRequest::Connection }, - { THttpRequest::GetName<&THttpRequest::ContentType>(), &THttpRequest::ContentType }, - { THttpRequest::GetName<&THttpRequest::ContentLength>(), &THttpRequest::ContentLength }, - { THttpRequest::GetName<&THttpRequest::TransferEncoding>(), &THttpRequest::TransferEncoding }, -}; - -template <> TStringBuf THttpResponse::GetName<&THttpResponse::Connection>() { return "Connection"; } -template <> TStringBuf THttpResponse::GetName<&THttpResponse::ContentType>() { return "Content-Type"; } -template <> TStringBuf THttpResponse::GetName<&THttpResponse::ContentLength>() { return "Content-Length"; } -template <> TStringBuf THttpResponse::GetName<&THttpResponse::TransferEncoding>() { return "Transfer-Encoding"; } -template <> TStringBuf THttpResponse::GetName<&THttpResponse::LastModified>() { return "Last-Modified"; } + +inline TStringBuf operator +(TStringBuf l, TStringBuf r) { + if (l.empty()) { + return r; + } + if (r.empty()) { + return l; + } + if (l.end() == r.begin()) { + return TStringBuf(l.data(), l.size() + r.size()); + } + if (r.end() == l.begin()) { + return TStringBuf(r.data(), l.size() + r.size()); + } + Y_FAIL("oops"); + return TStringBuf(); +} + +inline TStringBuf operator +=(TStringBuf& l, TStringBuf r) { + return l = l + r; +} + +namespace NHttp { + +template <> TStringBuf THttpRequest::GetName<&THttpRequest::Host>() { return "Host"; } +template <> TStringBuf THttpRequest::GetName<&THttpRequest::Accept>() { return "Accept"; } +template <> TStringBuf THttpRequest::GetName<&THttpRequest::Connection>() { return "Connection"; } +template <> TStringBuf THttpRequest::GetName<&THttpRequest::ContentType>() { return "Content-Type"; } +template <> TStringBuf THttpRequest::GetName<&THttpRequest::ContentLength>() { return "Content-Length"; } +template <> TStringBuf THttpRequest::GetName<&THttpRequest::TransferEncoding>() { return "Transfer-Encoding"; } + +const TMap<TStringBuf, TStringBuf THttpRequest::*, TLessNoCase> THttpRequest::HeadersLocation = { + { THttpRequest::GetName<&THttpRequest::Host>(), &THttpRequest::Host }, + { THttpRequest::GetName<&THttpRequest::Accept>(), &THttpRequest::Accept }, + { THttpRequest::GetName<&THttpRequest::Connection>(), &THttpRequest::Connection }, + { THttpRequest::GetName<&THttpRequest::ContentType>(), &THttpRequest::ContentType }, + { THttpRequest::GetName<&THttpRequest::ContentLength>(), &THttpRequest::ContentLength }, + { THttpRequest::GetName<&THttpRequest::TransferEncoding>(), &THttpRequest::TransferEncoding }, +}; + +template <> TStringBuf THttpResponse::GetName<&THttpResponse::Connection>() { return "Connection"; } +template <> TStringBuf THttpResponse::GetName<&THttpResponse::ContentType>() { return "Content-Type"; } +template <> TStringBuf THttpResponse::GetName<&THttpResponse::ContentLength>() { return "Content-Length"; } +template <> TStringBuf THttpResponse::GetName<&THttpResponse::TransferEncoding>() { return "Transfer-Encoding"; } +template <> TStringBuf THttpResponse::GetName<&THttpResponse::LastModified>() { return "Last-Modified"; } template <> TStringBuf THttpResponse::GetName<&THttpResponse::ContentEncoding>() { return "Content-Encoding"; } - -const TMap<TStringBuf, TStringBuf THttpResponse::*, TLessNoCase> THttpResponse::HeadersLocation = { - { THttpResponse::GetName<&THttpResponse::Connection>(), &THttpResponse::Connection }, - { THttpResponse::GetName<&THttpResponse::ContentType>(), &THttpResponse::ContentType }, - { THttpResponse::GetName<&THttpResponse::ContentLength>(), &THttpResponse::ContentLength }, - { THttpResponse::GetName<&THttpResponse::TransferEncoding>(), &THttpResponse::TransferEncoding }, - { THttpResponse::GetName<&THttpResponse::LastModified>(), &THttpResponse::LastModified }, + +const TMap<TStringBuf, TStringBuf THttpResponse::*, TLessNoCase> THttpResponse::HeadersLocation = { + { THttpResponse::GetName<&THttpResponse::Connection>(), &THttpResponse::Connection }, + { THttpResponse::GetName<&THttpResponse::ContentType>(), &THttpResponse::ContentType }, + { THttpResponse::GetName<&THttpResponse::ContentLength>(), &THttpResponse::ContentLength }, + { THttpResponse::GetName<&THttpResponse::TransferEncoding>(), &THttpResponse::TransferEncoding }, + { THttpResponse::GetName<&THttpResponse::LastModified>(), &THttpResponse::LastModified }, { THttpResponse::GetName<&THttpResponse::ContentEncoding>(), &THttpResponse::ContentEncoding } -}; - -void THttpRequest::Clear() { - // a dirty little trick - this->~THttpRequest(); // basically, do nothing - new (this) THttpRequest(); // reset all fields -} - -template <> -void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) { - TStringBuf data(Pos(), len); - while (!data.empty()) { - if (Stage != EParseStage::Error) { - LastSuccessStage = Stage; - } - switch (Stage) { - case EParseStage::Method: { - if (ProcessData(Method, data, ' ', MaxMethodSize)) { - Stage = EParseStage::URL; - } - break; - } - case EParseStage::URL: { - if (ProcessData(URL, data, ' ', MaxURLSize)) { - Stage = EParseStage::Protocol; - } - break; - } - case EParseStage::Protocol: { - if (ProcessData(Protocol, data, '/', MaxProtocolSize)) { - Stage = EParseStage::Version; - } - break; - } - case EParseStage::Version: { - if (ProcessData(Version, data, "\r\n", MaxVersionSize)) { - Stage = EParseStage::Header; - Headers = data; - } - break; - } - case EParseStage::Header: { - if (ProcessData(Header, data, "\r\n", MaxHeaderSize)) { - if (Header.empty()) { - Headers = TStringBuf(Headers.data(), data.begin() - Headers.begin()); - if (HaveBody()) { - Stage = EParseStage::Body; - } else { - Stage = EParseStage::Done; - } - } else { - ProcessHeader(Header); - } - } - break; - } - case EParseStage::Body: { +}; + +void THttpRequest::Clear() { + // a dirty little trick + this->~THttpRequest(); // basically, do nothing + new (this) THttpRequest(); // reset all fields +} + +template <> +void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) { + TStringBuf data(Pos(), len); + while (!data.empty()) { + if (Stage != EParseStage::Error) { + LastSuccessStage = Stage; + } + switch (Stage) { + case EParseStage::Method: { + if (ProcessData(Method, data, ' ', MaxMethodSize)) { + Stage = EParseStage::URL; + } + break; + } + case EParseStage::URL: { + if (ProcessData(URL, data, ' ', MaxURLSize)) { + Stage = EParseStage::Protocol; + } + break; + } + case EParseStage::Protocol: { + if (ProcessData(Protocol, data, '/', MaxProtocolSize)) { + Stage = EParseStage::Version; + } + break; + } + case EParseStage::Version: { + if (ProcessData(Version, data, "\r\n", MaxVersionSize)) { + Stage = EParseStage::Header; + Headers = data; + } + break; + } + case EParseStage::Header: { + if (ProcessData(Header, data, "\r\n", MaxHeaderSize)) { + if (Header.empty()) { + Headers = TStringBuf(Headers.data(), data.begin() - Headers.begin()); + if (HaveBody()) { + Stage = EParseStage::Body; + } else { + Stage = EParseStage::Done; + } + } else { + ProcessHeader(Header); + } + } + break; + } + case EParseStage::Body: { if (!ContentLength.empty()) { if (ProcessData(Content, data, FromString(ContentLength))) { Body = Content; @@ -121,9 +121,9 @@ void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) { } else { // Invalid body encoding Stage = EParseStage::Error; - } - break; - } + } + break; + } case EParseStage::ChunkLength: { if (ProcessData(Line, data, "\r\n", MaxChunkLengthSize)) { if (!Line.empty()) { @@ -170,484 +170,484 @@ void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) { break; } - case EParseStage::Done: - case EParseStage::Error: { - data.Clear(); - break; - } - default: - Y_FAIL("Invalid processing sequence"); - break; - } - } - TSocketBuffer::Advance(len); -} - -template <> -THttpParser<THttpRequest, TSocketBuffer>::EParseStage THttpParser<THttpRequest, TSocketBuffer>::GetInitialStage() { - return EParseStage::Method; -} - -template <> -THttpParser<THttpResponse, TSocketBuffer>::EParseStage THttpParser<THttpResponse, TSocketBuffer>::GetInitialStage() { - return EParseStage::Protocol; -} - -void THttpResponse::Clear() { - // a dirty little trick - this->~THttpResponse(); // basically, do nothing - new (this) THttpResponse(); // reset all fields -} - -template <> -void THttpParser<THttpResponse, TSocketBuffer>::Advance(size_t len) { - TStringBuf data(Pos(), len); - while (!data.empty()) { - if (Stage != EParseStage::Error) { - LastSuccessStage = Stage; - } - switch (Stage) { - case EParseStage::Protocol: { - if (ProcessData(Protocol, data, '/', MaxProtocolSize)) { - Stage = EParseStage::Version; - } - break; - } - case EParseStage::Version: { - if (ProcessData(Version, data, ' ', MaxVersionSize)) { - Stage = EParseStage::Status; - } - break; - } - case EParseStage::Status: { - if (ProcessData(Status, data, ' ', MaxStatusSize)) { - Stage = EParseStage::Message; - } - break; - } - case EParseStage::Message: { - if (ProcessData(Message, data, "\r\n", MaxMessageSize)) { - Stage = EParseStage::Header; - Headers = TStringBuf(data.data(), size_t(0)); - } - break; - } - case EParseStage::Header: { - if (ProcessData(Header, data, "\r\n", MaxHeaderSize)) { - if (Header.empty()) { - if (HaveBody() && (ContentLength.empty() || ContentLength != "0")) { - Stage = EParseStage::Body; - } else { - Stage = EParseStage::Done; - } - } else { - ProcessHeader(Header); - } - Headers = TStringBuf(Headers.data(), data.data() - Headers.data()); - } - break; - } - case EParseStage::Body: { - if (!ContentLength.empty()) { - if (ProcessData(Body, data, FromString(ContentLength))) { - Stage = EParseStage::Done; - } - } else if (TransferEncoding == "chunked") { - Stage = EParseStage::ChunkLength; - } else { - // Invalid body encoding - Stage = EParseStage::Error; - } - break; - } - case EParseStage::ChunkLength: { - if (ProcessData(Line, data, "\r\n", MaxChunkLengthSize)) { - if (!Line.empty()) { - ChunkLength = ParseHex(Line); - if (ChunkLength <= MaxChunkSize) { - ContentSize = Content.size() + ChunkLength; - if (ContentSize <= MaxChunkContentSize) { - Stage = EParseStage::ChunkData; - Line.Clear(); - } else { - // Invalid chunk content length - Stage = EParseStage::Error; - } - } else { - // Invalid chunk length - Stage = EParseStage::Error; - } - } else { - // Invalid body encoding - Stage = EParseStage::Error; - } - } - break; - } - case EParseStage::ChunkData: { - if (!IsError()) { - if (ProcessData(Content, data, ContentSize)) { - if (ProcessData(Line, data, 2)) { - if (Line == "\r\n") { - if (ChunkLength == 0) { - Body = Content; - Stage = EParseStage::Done; - } else { - Stage = EParseStage::ChunkLength; - } - Line.Clear(); - } else { - // Invalid body encoding - Stage = EParseStage::Error; - } - } - } - } - break; - } - case EParseStage::Done: - case EParseStage::Error: - data.Clear(); - break; - default: - // Invalid processing sequence - Stage = EParseStage::Error; - break; - } - } - TSocketBuffer::Advance(len); -} - -template <> -void THttpParser<THttpResponse, TSocketBuffer>::ConnectionClosed() { - if (Stage == EParseStage::Done) { - return; - } - if (Stage == EParseStage::Body) { - // ? - Stage = EParseStage::Done; - } else { - LastSuccessStage = Stage; - Stage = EParseStage::Error; - } -} - -THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseString(TStringBuf data) { - THttpOutgoingResponsePtr response = new THttpOutgoingResponse(this); - response->Append(data); - response->Reparse(); - return response; -} - -THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseOK(TStringBuf body, TStringBuf contentType, TInstant lastModified) { - return CreateResponse("200", "OK", contentType, body, lastModified); -} - -THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseBadRequest(TStringBuf html, TStringBuf contentType) { - if (html.empty() && IsError()) { - contentType = "text/plain"; - html = GetErrorText(); - } - return CreateResponse("400", "Bad Request", contentType, html); -} - -THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseNotFound(TStringBuf html, TStringBuf contentType) { - return CreateResponse("404", "Not Found", contentType, html); -} - -THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseServiceUnavailable(TStringBuf html, TStringBuf contentType) { - return CreateResponse("503", "Service Unavailable", contentType, html); -} - -THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseGatewayTimeout(TStringBuf html, TStringBuf contentType) { - return CreateResponse("504", "Gateway Timeout", contentType, html); -} - -THttpIncomingResponse::THttpIncomingResponse(THttpOutgoingRequestPtr request) - : Request(request) -{} - -THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponse(TStringBuf status, TStringBuf message, TStringBuf contentType, TStringBuf body, TInstant lastModified) { - TStringBuf version = Version; - if (version != "1.0" && version != "1.1") { - version = "1.1"; - } - THttpOutgoingResponsePtr response = new THttpOutgoingResponse(this, "HTTP", version, status, message); - response->Set<&THttpResponse::Connection>(GetConnection()); - if (!WorkerName.empty()) { - response->Set("X-Worker-Name", WorkerName); - } - if (!contentType.empty() && !body.empty()) { - response->Set<&THttpResponse::ContentType>(contentType); - } - if (lastModified) { - response->Set<&THttpResponse::LastModified>(lastModified.FormatGmTime("%a, %d %b %Y %H:%M:%S GMT")); - } - if (response->IsNeedBody() || !body.empty()) { - if (Method == "HEAD") { - response->Set<&THttpResponse::ContentLength>(ToString(body.size())); - } else { - response->Set<&THttpResponse::Body>(body); - } - } - return response; -} - -THttpIncomingRequestPtr THttpIncomingRequest::Duplicate() { - THttpIncomingRequestPtr request = new THttpIncomingRequest(*this); - request->Reparse(); - request->Timer.Reset(); - return request; -} - -THttpIncomingResponsePtr THttpIncomingResponse::Duplicate(THttpOutgoingRequestPtr request) { - THttpIncomingResponsePtr response = new THttpIncomingResponse(*this); - response->Reparse(); - response->Request = request; - return response; -} - -THttpOutgoingResponsePtr THttpOutgoingResponse::Duplicate(THttpIncomingRequestPtr request) { - THttpOutgoingResponsePtr response = new THttpOutgoingResponse(*this); - response->Reparse(); - response->Request = request; - return response; -} - - -THttpOutgoingResponsePtr THttpIncomingResponse::Reverse(THttpIncomingRequestPtr request) { - THttpOutgoingResponsePtr response = new THttpOutgoingResponse(request); - response->Assign(Data(), Size()); - response->Reparse(); - return response; -} - -THttpOutgoingRequest::THttpOutgoingRequest(TStringBuf method, TStringBuf scheme, TStringBuf host, TStringBuf uri, TStringBuf protocol, TStringBuf version) { - Secure = (scheme == "https"); - TString urie = UrlEscapeRet(uri); - InitRequest(method, urie, protocol, version); - if (host) { - Set<&THttpRequest::Host>(host); - } -} - -THttpOutgoingRequest::THttpOutgoingRequest(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version) { - TStringBuf scheme, host, uri; - if (!CrackURL(url, scheme, host, uri)) { - Y_FAIL("Invalid URL specified"); - } - if (!scheme.empty() && scheme != "http" && scheme != "https") { - Y_FAIL("Invalid URL specified"); - } - Secure = (scheme == "https"); - TString urie = UrlEscapeRet(uri); - InitRequest(method, urie, protocol, version); - if (host) { - Set<&THttpRequest::Host>(host); - } -} - -THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestString(const TString& data) { - THttpOutgoingRequestPtr request = new THttpOutgoingRequest(); - request->Assign(data.data(), data.size()); - request->Reparse(); - return request; -} - -THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestGet(TStringBuf url) { - return CreateRequest("GET", url); -} - -THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestGet(TStringBuf host, TStringBuf uri) { - return CreateHttpRequest("GET", host, uri); -} - -THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestPost(TStringBuf url, TStringBuf contentType, TStringBuf body) { - return CreateRequest("POST", url, contentType, body); -} - -THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestPost(TStringBuf host, TStringBuf uri, TStringBuf contentType, TStringBuf body) { - return CreateHttpRequest("POST", host, uri, contentType, body); -} - -THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequest(TStringBuf method, TStringBuf url, TStringBuf contentType, TStringBuf body) { - THttpOutgoingRequestPtr request = new THttpOutgoingRequest(method, url, "HTTP", "1.1"); - request->Set<&THttpRequest::Accept>("*/*"); - if (!contentType.empty()) { - request->Set<&THttpRequest::ContentType>(contentType); - request->Set<&THttpRequest::Body>(body); - } - return request; -} - -THttpOutgoingRequestPtr THttpOutgoingRequest::CreateHttpRequest(TStringBuf method, TStringBuf host, TStringBuf uri, TStringBuf contentType, TStringBuf body) { - THttpOutgoingRequestPtr request = new THttpOutgoingRequest(method, "http", host, uri, "HTTP", "1.1"); - request->Set<&THttpRequest::Accept>("*/*"); - if (!contentType.empty()) { - request->Set<&THttpRequest::ContentType>(contentType); - request->Set<&THttpRequest::Body>(body); - } - return request; -} - -THttpOutgoingRequestPtr THttpOutgoingRequest::Duplicate() { - THttpOutgoingRequestPtr request = new THttpOutgoingRequest(*this); - request->Reparse(); - return request; -} - -THttpOutgoingResponse::THttpOutgoingResponse(THttpIncomingRequestPtr request) - : Request(request) -{} - -THttpOutgoingResponse::THttpOutgoingResponse(THttpIncomingRequestPtr request, TStringBuf protocol, TStringBuf version, TStringBuf status, TStringBuf message) - : Request(request) -{ - InitResponse(protocol, version, status, message); -} - -const size_t THttpConfig::BUFFER_MIN_STEP; -const TDuration THttpConfig::CONNECTION_TIMEOUT; - -TUrlParameters::TUrlParameters(TStringBuf url) { - TStringBuf base; - TStringBuf params; - if (url.TrySplit('?', base, params)) { - for (TStringBuf param = params.NextTok('&'); !param.empty(); param = params.NextTok('&')) { - TStringBuf name = param.NextTok('='); - Parameters[name] = param; - } - } -} - -TString TUrlParameters::operator [](TStringBuf name) const { - TString value(Get(name)); - CGIUnescape(value); - return value; -} - -bool TUrlParameters::Has(TStringBuf name) const { - return Parameters.count(name) != 0; -} - -TStringBuf TUrlParameters::Get(TStringBuf name) const { - auto it = Parameters.find(name); - if (it != Parameters.end()) { - return it->second; - } - return TStringBuf(); -} - -TString TUrlParameters::Render() const { - TStringBuilder parameters; - for (const std::pair<TStringBuf, TStringBuf> parameter : Parameters) { - if (parameters.empty()) { - parameters << '?'; - } else { - parameters << '&'; - } - parameters << parameter.first; - parameters << '='; - parameters << parameter.second; - } - return parameters; -} - -TCookies::TCookies(TStringBuf cookie) { - for (TStringBuf param = cookie.NextTok(';'); !param.empty(); param = cookie.NextTok(';')) { - param.SkipPrefix(" "); - TStringBuf name = param.NextTok('='); - Cookies[name] = param; - } -} - -TStringBuf TCookies::operator [](TStringBuf name) const { - return Get(name); -} - -bool TCookies::Has(TStringBuf name) const { - return Cookies.count(name) != 0; -} - -TStringBuf TCookies::Get(TStringBuf name) const { - auto it = Cookies.find(name); - if (it != Cookies.end()) { - return it->second; - } - return TStringBuf(); -} - -TString TCookies::Render() const { - TStringBuilder cookies; - for (const std::pair<TStringBuf, TStringBuf> cookie : Cookies) { - if (!cookies.empty()) { - cookies << ' '; - } - cookies << cookie.first; - cookies << '='; - cookies << cookie.second; - cookies << ';'; - } - return cookies; -} - -TCookiesBuilder::TCookiesBuilder() - :TCookies(TStringBuf()) -{} - -void TCookiesBuilder::Set(TStringBuf name, TStringBuf data) { - Data.emplace_back(name, data); - Cookies[Data.back().first] = Data.back().second; -} - -THeaders::THeaders(TStringBuf headers) { - for (TStringBuf param = headers.NextTok("\r\n"); !param.empty(); param = headers.NextTok("\r\n")) { - TStringBuf name = param.NextTok(":"); - param.SkipPrefix(" "); - Headers[name] = param; - } -} - -TStringBuf THeaders::operator [](TStringBuf name) const { - return Get(name); -} - -bool THeaders::Has(TStringBuf name) const { - return Headers.count(name) != 0; -} - -TStringBuf THeaders::Get(TStringBuf name) const { - auto it = Headers.find(name); - if (it != Headers.end()) { - return it->second; - } - return TStringBuf(); -} - -TString THeaders::Render() const { - TStringBuilder headers; - for (const std::pair<TStringBuf, TStringBuf> header : Headers) { - headers << header.first; - headers << ": "; - headers << header.second; - headers << "\r\n"; - } - return headers; -} - -THeadersBuilder::THeadersBuilder() - :THeaders(TStringBuf()) -{} - -THeadersBuilder::THeadersBuilder(const THeadersBuilder& builder) { - for (const auto& pr : builder.Headers) { - Set(pr.first, pr.second); - } -} - -void THeadersBuilder::Set(TStringBuf name, TStringBuf data) { - Data.emplace_back(name, data); - Headers[Data.back().first] = Data.back().second; -} - -} + case EParseStage::Done: + case EParseStage::Error: { + data.Clear(); + break; + } + default: + Y_FAIL("Invalid processing sequence"); + break; + } + } + TSocketBuffer::Advance(len); +} + +template <> +THttpParser<THttpRequest, TSocketBuffer>::EParseStage THttpParser<THttpRequest, TSocketBuffer>::GetInitialStage() { + return EParseStage::Method; +} + +template <> +THttpParser<THttpResponse, TSocketBuffer>::EParseStage THttpParser<THttpResponse, TSocketBuffer>::GetInitialStage() { + return EParseStage::Protocol; +} + +void THttpResponse::Clear() { + // a dirty little trick + this->~THttpResponse(); // basically, do nothing + new (this) THttpResponse(); // reset all fields +} + +template <> +void THttpParser<THttpResponse, TSocketBuffer>::Advance(size_t len) { + TStringBuf data(Pos(), len); + while (!data.empty()) { + if (Stage != EParseStage::Error) { + LastSuccessStage = Stage; + } + switch (Stage) { + case EParseStage::Protocol: { + if (ProcessData(Protocol, data, '/', MaxProtocolSize)) { + Stage = EParseStage::Version; + } + break; + } + case EParseStage::Version: { + if (ProcessData(Version, data, ' ', MaxVersionSize)) { + Stage = EParseStage::Status; + } + break; + } + case EParseStage::Status: { + if (ProcessData(Status, data, ' ', MaxStatusSize)) { + Stage = EParseStage::Message; + } + break; + } + case EParseStage::Message: { + if (ProcessData(Message, data, "\r\n", MaxMessageSize)) { + Stage = EParseStage::Header; + Headers = TStringBuf(data.data(), size_t(0)); + } + break; + } + case EParseStage::Header: { + if (ProcessData(Header, data, "\r\n", MaxHeaderSize)) { + if (Header.empty()) { + if (HaveBody() && (ContentLength.empty() || ContentLength != "0")) { + Stage = EParseStage::Body; + } else { + Stage = EParseStage::Done; + } + } else { + ProcessHeader(Header); + } + Headers = TStringBuf(Headers.data(), data.data() - Headers.data()); + } + break; + } + case EParseStage::Body: { + if (!ContentLength.empty()) { + if (ProcessData(Body, data, FromString(ContentLength))) { + Stage = EParseStage::Done; + } + } else if (TransferEncoding == "chunked") { + Stage = EParseStage::ChunkLength; + } else { + // Invalid body encoding + Stage = EParseStage::Error; + } + break; + } + case EParseStage::ChunkLength: { + if (ProcessData(Line, data, "\r\n", MaxChunkLengthSize)) { + if (!Line.empty()) { + ChunkLength = ParseHex(Line); + if (ChunkLength <= MaxChunkSize) { + ContentSize = Content.size() + ChunkLength; + if (ContentSize <= MaxChunkContentSize) { + Stage = EParseStage::ChunkData; + Line.Clear(); + } else { + // Invalid chunk content length + Stage = EParseStage::Error; + } + } else { + // Invalid chunk length + Stage = EParseStage::Error; + } + } else { + // Invalid body encoding + Stage = EParseStage::Error; + } + } + break; + } + case EParseStage::ChunkData: { + if (!IsError()) { + if (ProcessData(Content, data, ContentSize)) { + if (ProcessData(Line, data, 2)) { + if (Line == "\r\n") { + if (ChunkLength == 0) { + Body = Content; + Stage = EParseStage::Done; + } else { + Stage = EParseStage::ChunkLength; + } + Line.Clear(); + } else { + // Invalid body encoding + Stage = EParseStage::Error; + } + } + } + } + break; + } + case EParseStage::Done: + case EParseStage::Error: + data.Clear(); + break; + default: + // Invalid processing sequence + Stage = EParseStage::Error; + break; + } + } + TSocketBuffer::Advance(len); +} + +template <> +void THttpParser<THttpResponse, TSocketBuffer>::ConnectionClosed() { + if (Stage == EParseStage::Done) { + return; + } + if (Stage == EParseStage::Body) { + // ? + Stage = EParseStage::Done; + } else { + LastSuccessStage = Stage; + Stage = EParseStage::Error; + } +} + +THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseString(TStringBuf data) { + THttpOutgoingResponsePtr response = new THttpOutgoingResponse(this); + response->Append(data); + response->Reparse(); + return response; +} + +THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseOK(TStringBuf body, TStringBuf contentType, TInstant lastModified) { + return CreateResponse("200", "OK", contentType, body, lastModified); +} + +THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseBadRequest(TStringBuf html, TStringBuf contentType) { + if (html.empty() && IsError()) { + contentType = "text/plain"; + html = GetErrorText(); + } + return CreateResponse("400", "Bad Request", contentType, html); +} + +THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseNotFound(TStringBuf html, TStringBuf contentType) { + return CreateResponse("404", "Not Found", contentType, html); +} + +THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseServiceUnavailable(TStringBuf html, TStringBuf contentType) { + return CreateResponse("503", "Service Unavailable", contentType, html); +} + +THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseGatewayTimeout(TStringBuf html, TStringBuf contentType) { + return CreateResponse("504", "Gateway Timeout", contentType, html); +} + +THttpIncomingResponse::THttpIncomingResponse(THttpOutgoingRequestPtr request) + : Request(request) +{} + +THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponse(TStringBuf status, TStringBuf message, TStringBuf contentType, TStringBuf body, TInstant lastModified) { + TStringBuf version = Version; + if (version != "1.0" && version != "1.1") { + version = "1.1"; + } + THttpOutgoingResponsePtr response = new THttpOutgoingResponse(this, "HTTP", version, status, message); + response->Set<&THttpResponse::Connection>(GetConnection()); + if (!WorkerName.empty()) { + response->Set("X-Worker-Name", WorkerName); + } + if (!contentType.empty() && !body.empty()) { + response->Set<&THttpResponse::ContentType>(contentType); + } + if (lastModified) { + response->Set<&THttpResponse::LastModified>(lastModified.FormatGmTime("%a, %d %b %Y %H:%M:%S GMT")); + } + if (response->IsNeedBody() || !body.empty()) { + if (Method == "HEAD") { + response->Set<&THttpResponse::ContentLength>(ToString(body.size())); + } else { + response->Set<&THttpResponse::Body>(body); + } + } + return response; +} + +THttpIncomingRequestPtr THttpIncomingRequest::Duplicate() { + THttpIncomingRequestPtr request = new THttpIncomingRequest(*this); + request->Reparse(); + request->Timer.Reset(); + return request; +} + +THttpIncomingResponsePtr THttpIncomingResponse::Duplicate(THttpOutgoingRequestPtr request) { + THttpIncomingResponsePtr response = new THttpIncomingResponse(*this); + response->Reparse(); + response->Request = request; + return response; +} + +THttpOutgoingResponsePtr THttpOutgoingResponse::Duplicate(THttpIncomingRequestPtr request) { + THttpOutgoingResponsePtr response = new THttpOutgoingResponse(*this); + response->Reparse(); + response->Request = request; + return response; +} + + +THttpOutgoingResponsePtr THttpIncomingResponse::Reverse(THttpIncomingRequestPtr request) { + THttpOutgoingResponsePtr response = new THttpOutgoingResponse(request); + response->Assign(Data(), Size()); + response->Reparse(); + return response; +} + +THttpOutgoingRequest::THttpOutgoingRequest(TStringBuf method, TStringBuf scheme, TStringBuf host, TStringBuf uri, TStringBuf protocol, TStringBuf version) { + Secure = (scheme == "https"); + TString urie = UrlEscapeRet(uri); + InitRequest(method, urie, protocol, version); + if (host) { + Set<&THttpRequest::Host>(host); + } +} + +THttpOutgoingRequest::THttpOutgoingRequest(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version) { + TStringBuf scheme, host, uri; + if (!CrackURL(url, scheme, host, uri)) { + Y_FAIL("Invalid URL specified"); + } + if (!scheme.empty() && scheme != "http" && scheme != "https") { + Y_FAIL("Invalid URL specified"); + } + Secure = (scheme == "https"); + TString urie = UrlEscapeRet(uri); + InitRequest(method, urie, protocol, version); + if (host) { + Set<&THttpRequest::Host>(host); + } +} + +THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestString(const TString& data) { + THttpOutgoingRequestPtr request = new THttpOutgoingRequest(); + request->Assign(data.data(), data.size()); + request->Reparse(); + return request; +} + +THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestGet(TStringBuf url) { + return CreateRequest("GET", url); +} + +THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestGet(TStringBuf host, TStringBuf uri) { + return CreateHttpRequest("GET", host, uri); +} + +THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestPost(TStringBuf url, TStringBuf contentType, TStringBuf body) { + return CreateRequest("POST", url, contentType, body); +} + +THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestPost(TStringBuf host, TStringBuf uri, TStringBuf contentType, TStringBuf body) { + return CreateHttpRequest("POST", host, uri, contentType, body); +} + +THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequest(TStringBuf method, TStringBuf url, TStringBuf contentType, TStringBuf body) { + THttpOutgoingRequestPtr request = new THttpOutgoingRequest(method, url, "HTTP", "1.1"); + request->Set<&THttpRequest::Accept>("*/*"); + if (!contentType.empty()) { + request->Set<&THttpRequest::ContentType>(contentType); + request->Set<&THttpRequest::Body>(body); + } + return request; +} + +THttpOutgoingRequestPtr THttpOutgoingRequest::CreateHttpRequest(TStringBuf method, TStringBuf host, TStringBuf uri, TStringBuf contentType, TStringBuf body) { + THttpOutgoingRequestPtr request = new THttpOutgoingRequest(method, "http", host, uri, "HTTP", "1.1"); + request->Set<&THttpRequest::Accept>("*/*"); + if (!contentType.empty()) { + request->Set<&THttpRequest::ContentType>(contentType); + request->Set<&THttpRequest::Body>(body); + } + return request; +} + +THttpOutgoingRequestPtr THttpOutgoingRequest::Duplicate() { + THttpOutgoingRequestPtr request = new THttpOutgoingRequest(*this); + request->Reparse(); + return request; +} + +THttpOutgoingResponse::THttpOutgoingResponse(THttpIncomingRequestPtr request) + : Request(request) +{} + +THttpOutgoingResponse::THttpOutgoingResponse(THttpIncomingRequestPtr request, TStringBuf protocol, TStringBuf version, TStringBuf status, TStringBuf message) + : Request(request) +{ + InitResponse(protocol, version, status, message); +} + +const size_t THttpConfig::BUFFER_MIN_STEP; +const TDuration THttpConfig::CONNECTION_TIMEOUT; + +TUrlParameters::TUrlParameters(TStringBuf url) { + TStringBuf base; + TStringBuf params; + if (url.TrySplit('?', base, params)) { + for (TStringBuf param = params.NextTok('&'); !param.empty(); param = params.NextTok('&')) { + TStringBuf name = param.NextTok('='); + Parameters[name] = param; + } + } +} + +TString TUrlParameters::operator [](TStringBuf name) const { + TString value(Get(name)); + CGIUnescape(value); + return value; +} + +bool TUrlParameters::Has(TStringBuf name) const { + return Parameters.count(name) != 0; +} + +TStringBuf TUrlParameters::Get(TStringBuf name) const { + auto it = Parameters.find(name); + if (it != Parameters.end()) { + return it->second; + } + return TStringBuf(); +} + +TString TUrlParameters::Render() const { + TStringBuilder parameters; + for (const std::pair<TStringBuf, TStringBuf> parameter : Parameters) { + if (parameters.empty()) { + parameters << '?'; + } else { + parameters << '&'; + } + parameters << parameter.first; + parameters << '='; + parameters << parameter.second; + } + return parameters; +} + +TCookies::TCookies(TStringBuf cookie) { + for (TStringBuf param = cookie.NextTok(';'); !param.empty(); param = cookie.NextTok(';')) { + param.SkipPrefix(" "); + TStringBuf name = param.NextTok('='); + Cookies[name] = param; + } +} + +TStringBuf TCookies::operator [](TStringBuf name) const { + return Get(name); +} + +bool TCookies::Has(TStringBuf name) const { + return Cookies.count(name) != 0; +} + +TStringBuf TCookies::Get(TStringBuf name) const { + auto it = Cookies.find(name); + if (it != Cookies.end()) { + return it->second; + } + return TStringBuf(); +} + +TString TCookies::Render() const { + TStringBuilder cookies; + for (const std::pair<TStringBuf, TStringBuf> cookie : Cookies) { + if (!cookies.empty()) { + cookies << ' '; + } + cookies << cookie.first; + cookies << '='; + cookies << cookie.second; + cookies << ';'; + } + return cookies; +} + +TCookiesBuilder::TCookiesBuilder() + :TCookies(TStringBuf()) +{} + +void TCookiesBuilder::Set(TStringBuf name, TStringBuf data) { + Data.emplace_back(name, data); + Cookies[Data.back().first] = Data.back().second; +} + +THeaders::THeaders(TStringBuf headers) { + for (TStringBuf param = headers.NextTok("\r\n"); !param.empty(); param = headers.NextTok("\r\n")) { + TStringBuf name = param.NextTok(":"); + param.SkipPrefix(" "); + Headers[name] = param; + } +} + +TStringBuf THeaders::operator [](TStringBuf name) const { + return Get(name); +} + +bool THeaders::Has(TStringBuf name) const { + return Headers.count(name) != 0; +} + +TStringBuf THeaders::Get(TStringBuf name) const { + auto it = Headers.find(name); + if (it != Headers.end()) { + return it->second; + } + return TStringBuf(); +} + +TString THeaders::Render() const { + TStringBuilder headers; + for (const std::pair<TStringBuf, TStringBuf> header : Headers) { + headers << header.first; + headers << ": "; + headers << header.second; + headers << "\r\n"; + } + return headers; +} + +THeadersBuilder::THeadersBuilder() + :THeaders(TStringBuf()) +{} + +THeadersBuilder::THeadersBuilder(const THeadersBuilder& builder) { + for (const auto& pr : builder.Headers) { + Set(pr.first, pr.second); + } +} + +void THeadersBuilder::Set(TStringBuf name, TStringBuf data) { + Data.emplace_back(name, data); + Headers[Data.back().first] = Data.back().second; +} + +} diff --git a/library/cpp/actors/http/http.h b/library/cpp/actors/http/http.h index 96c5c1ec48..a11d6158ee 100644 --- a/library/cpp/actors/http/http.h +++ b/library/cpp/actors/http/http.h @@ -1,703 +1,703 @@ -#pragma once -#include <util/datetime/base.h> -#include <util/string/builder.h> -#include <util/system/thread.h> -#include <util/system/hp_timer.h> -#include <util/generic/hash_set.h> -#include <util/generic/buffer.h> -#include <util/generic/intrlist.h> -#include "http_config.h" - -// TODO(xenoxeno): hide in implementation -template <typename Type> -struct THash<TIntrusivePtr<Type>> { - size_t operator ()(const TIntrusivePtr<Type>& ptr) const { return reinterpret_cast<size_t>(ptr.Get()); } -}; - -template<> -inline void Out<TSockAddrInet6>(IOutputStream& o, const TSockAddrInet6& x) { - o << x.ToString(); -} - -namespace NHttp { - -bool IsIPv6(const TString& host); -bool CrackURL(TStringBuf url, TStringBuf& scheme, TStringBuf& host, TStringBuf& uri); -void CrackAddress(const TString& address, TString& hostname, TIpPort& port); -void TrimBegin(TStringBuf& target, char delim); -void TrimEnd(TStringBuf& target, char delim); -void Trim(TStringBuf& target, char delim); -void TrimEnd(TString& target, char delim); - -struct TLessNoCase { - bool operator()(TStringBuf l, TStringBuf r) const { - auto ll = l.length(); - auto rl = r.length(); - if (ll != rl) { - return ll < rl; - } - return strnicmp(l.data(), r.data(), ll) < 0; - } -}; - -struct TUrlParameters { - THashMap<TStringBuf, TStringBuf> Parameters; - - TUrlParameters(TStringBuf url); - TString operator [](TStringBuf name) const; - bool Has(TStringBuf name) const; - TStringBuf Get(TStringBuf name) const; // raw - TString Render() const; -}; - -struct TCookies { - THashMap<TStringBuf, TStringBuf> Cookies; - - TCookies(TStringBuf cookie); - TCookies(const TCookies&) = delete; - TStringBuf operator [](TStringBuf name) const; - bool Has(TStringBuf name) const; - TStringBuf Get(TStringBuf name) const; // raw - TString Render() const; -}; - -struct TCookiesBuilder : TCookies { +#pragma once +#include <util/datetime/base.h> +#include <util/string/builder.h> +#include <util/system/thread.h> +#include <util/system/hp_timer.h> +#include <util/generic/hash_set.h> +#include <util/generic/buffer.h> +#include <util/generic/intrlist.h> +#include "http_config.h" + +// TODO(xenoxeno): hide in implementation +template <typename Type> +struct THash<TIntrusivePtr<Type>> { + size_t operator ()(const TIntrusivePtr<Type>& ptr) const { return reinterpret_cast<size_t>(ptr.Get()); } +}; + +template<> +inline void Out<TSockAddrInet6>(IOutputStream& o, const TSockAddrInet6& x) { + o << x.ToString(); +} + +namespace NHttp { + +bool IsIPv6(const TString& host); +bool CrackURL(TStringBuf url, TStringBuf& scheme, TStringBuf& host, TStringBuf& uri); +void CrackAddress(const TString& address, TString& hostname, TIpPort& port); +void TrimBegin(TStringBuf& target, char delim); +void TrimEnd(TStringBuf& target, char delim); +void Trim(TStringBuf& target, char delim); +void TrimEnd(TString& target, char delim); + +struct TLessNoCase { + bool operator()(TStringBuf l, TStringBuf r) const { + auto ll = l.length(); + auto rl = r.length(); + if (ll != rl) { + return ll < rl; + } + return strnicmp(l.data(), r.data(), ll) < 0; + } +}; + +struct TUrlParameters { + THashMap<TStringBuf, TStringBuf> Parameters; + + TUrlParameters(TStringBuf url); + TString operator [](TStringBuf name) const; + bool Has(TStringBuf name) const; + TStringBuf Get(TStringBuf name) const; // raw + TString Render() const; +}; + +struct TCookies { + THashMap<TStringBuf, TStringBuf> Cookies; + + TCookies(TStringBuf cookie); + TCookies(const TCookies&) = delete; + TStringBuf operator [](TStringBuf name) const; + bool Has(TStringBuf name) const; + TStringBuf Get(TStringBuf name) const; // raw + TString Render() const; +}; + +struct TCookiesBuilder : TCookies { TDeque<std::pair<TString, TString>> Data; - - TCookiesBuilder(); - void Set(TStringBuf name, TStringBuf data); -}; - -struct THeaders { - TMap<TStringBuf, TStringBuf, TLessNoCase> Headers; - - THeaders() = default; - THeaders(TStringBuf headers); - THeaders(const THeaders&) = delete; - TStringBuf operator [](TStringBuf name) const; - bool Has(TStringBuf name) const; - TStringBuf Get(TStringBuf name) const; // raw - TString Render() const; -}; - -struct THeadersBuilder : THeaders { - TDeque<std::pair<TString, TString>> Data; - - THeadersBuilder(); - THeadersBuilder(const THeadersBuilder& builder); - void Set(TStringBuf name, TStringBuf data); -}; - -class TSocketBuffer : public TBuffer, public THttpConfig { -public: - TSocketBuffer() - : TBuffer(BUFFER_SIZE) - {} - - bool EnsureEnoughSpaceAvailable(size_t need) { - size_t avail = Avail(); - if (avail < need) { - Reserve(Capacity() + std::max(need, BUFFER_MIN_STEP)); - return false; - } - return true; - } -}; - -class THttpRequest { -public: - TStringBuf Method; - TStringBuf URL; - TStringBuf Protocol; - TStringBuf Version; - TStringBuf Headers; - - TStringBuf Host; - TStringBuf Accept; - TStringBuf Connection; - TStringBuf ContentType; - TStringBuf ContentLength; - TStringBuf TransferEncoding; - - TStringBuf Body; - - static const TMap<TStringBuf, TStringBuf THttpRequest::*, TLessNoCase> HeadersLocation; - - template <TStringBuf THttpRequest::* Header> - static TStringBuf GetName(); - void Clear(); -}; - -class THttpResponse { -public: - TStringBuf Protocol; - TStringBuf Version; - TStringBuf Status; - TStringBuf Message; - TStringBuf Headers; - - TStringBuf Connection; - TStringBuf ContentType; - TStringBuf ContentLength; - TStringBuf TransferEncoding; - TStringBuf LastModified; + + TCookiesBuilder(); + void Set(TStringBuf name, TStringBuf data); +}; + +struct THeaders { + TMap<TStringBuf, TStringBuf, TLessNoCase> Headers; + + THeaders() = default; + THeaders(TStringBuf headers); + THeaders(const THeaders&) = delete; + TStringBuf operator [](TStringBuf name) const; + bool Has(TStringBuf name) const; + TStringBuf Get(TStringBuf name) const; // raw + TString Render() const; +}; + +struct THeadersBuilder : THeaders { + TDeque<std::pair<TString, TString>> Data; + + THeadersBuilder(); + THeadersBuilder(const THeadersBuilder& builder); + void Set(TStringBuf name, TStringBuf data); +}; + +class TSocketBuffer : public TBuffer, public THttpConfig { +public: + TSocketBuffer() + : TBuffer(BUFFER_SIZE) + {} + + bool EnsureEnoughSpaceAvailable(size_t need) { + size_t avail = Avail(); + if (avail < need) { + Reserve(Capacity() + std::max(need, BUFFER_MIN_STEP)); + return false; + } + return true; + } +}; + +class THttpRequest { +public: + TStringBuf Method; + TStringBuf URL; + TStringBuf Protocol; + TStringBuf Version; + TStringBuf Headers; + + TStringBuf Host; + TStringBuf Accept; + TStringBuf Connection; + TStringBuf ContentType; + TStringBuf ContentLength; + TStringBuf TransferEncoding; + + TStringBuf Body; + + static const TMap<TStringBuf, TStringBuf THttpRequest::*, TLessNoCase> HeadersLocation; + + template <TStringBuf THttpRequest::* Header> + static TStringBuf GetName(); + void Clear(); +}; + +class THttpResponse { +public: + TStringBuf Protocol; + TStringBuf Version; + TStringBuf Status; + TStringBuf Message; + TStringBuf Headers; + + TStringBuf Connection; + TStringBuf ContentType; + TStringBuf ContentLength; + TStringBuf TransferEncoding; + TStringBuf LastModified; TStringBuf ContentEncoding; - - TStringBuf Body; - - static const TMap<TStringBuf, TStringBuf THttpResponse::*, TLessNoCase> HeadersLocation; - - template <TStringBuf THttpResponse::* Header> - static TStringBuf GetName(); - void Clear(); -}; - -template <typename HeaderType, typename BufferType> -class THttpParser : public HeaderType, public BufferType { -public: - enum class EParseStage : ui8 { - Method, - URL, - Protocol, - Version, - Status, - Message, - Header, - Body, - ChunkLength, - ChunkData, - Done, - Error, - }; - - static constexpr size_t MaxMethodSize = 6; - static constexpr size_t MaxURLSize = 1024; - static constexpr size_t MaxProtocolSize = 4; - static constexpr size_t MaxVersionSize = 4; - static constexpr size_t MaxStatusSize = 3; - static constexpr size_t MaxMessageSize = 1024; + + TStringBuf Body; + + static const TMap<TStringBuf, TStringBuf THttpResponse::*, TLessNoCase> HeadersLocation; + + template <TStringBuf THttpResponse::* Header> + static TStringBuf GetName(); + void Clear(); +}; + +template <typename HeaderType, typename BufferType> +class THttpParser : public HeaderType, public BufferType { +public: + enum class EParseStage : ui8 { + Method, + URL, + Protocol, + Version, + Status, + Message, + Header, + Body, + ChunkLength, + ChunkData, + Done, + Error, + }; + + static constexpr size_t MaxMethodSize = 6; + static constexpr size_t MaxURLSize = 1024; + static constexpr size_t MaxProtocolSize = 4; + static constexpr size_t MaxVersionSize = 4; + static constexpr size_t MaxStatusSize = 3; + static constexpr size_t MaxMessageSize = 1024; static constexpr size_t MaxHeaderSize = 8192; - static constexpr size_t MaxChunkLengthSize = 8; - static constexpr size_t MaxChunkSize = 256 * 1024 * 1024; - static constexpr size_t MaxChunkContentSize = 1 * 1024 * 1024 * 1024; - - EParseStage Stage; - EParseStage LastSuccessStage; - TStringBuf Line; - TStringBuf& Header = Line; - size_t ChunkLength = 0; - size_t ContentSize = 0; - TString Content; - - THttpParser(const THttpParser& src) - : HeaderType(src) - , BufferType(src) - , Stage(src.Stage) - , LastSuccessStage(src.LastSuccessStage) - , Line() - , Header(Line) - , ChunkLength(src.ChunkLength) - , ContentSize(src.ContentSize) - , Content(src.Content) - {} - - template <typename StringType> - bool ProcessData(StringType& target, TStringBuf& source, char delim, size_t maxLen) { - TStringBuf maxSource(source.substr(0, maxLen + 1 - target.size())); - size_t pos = maxSource.find(delim); - target += maxSource.substr(0, pos); - source.Skip(pos); - if (target.size() > maxLen) { - Stage = EParseStage::Error; - return false; - } - if (!source.empty() && *source.begin() == delim) { - source.Skip(1); - } - return pos != TStringBuf::npos; - } - - template <typename StringType> - bool ProcessData(StringType& target, TStringBuf& source, TStringBuf delim, size_t maxLen) { - if (delim.empty()) { - return false; - } - if (delim.size() == 1) { - return ProcessData(target, source, delim[0], maxLen); - } + static constexpr size_t MaxChunkLengthSize = 8; + static constexpr size_t MaxChunkSize = 256 * 1024 * 1024; + static constexpr size_t MaxChunkContentSize = 1 * 1024 * 1024 * 1024; + + EParseStage Stage; + EParseStage LastSuccessStage; + TStringBuf Line; + TStringBuf& Header = Line; + size_t ChunkLength = 0; + size_t ContentSize = 0; + TString Content; + + THttpParser(const THttpParser& src) + : HeaderType(src) + , BufferType(src) + , Stage(src.Stage) + , LastSuccessStage(src.LastSuccessStage) + , Line() + , Header(Line) + , ChunkLength(src.ChunkLength) + , ContentSize(src.ContentSize) + , Content(src.Content) + {} + + template <typename StringType> + bool ProcessData(StringType& target, TStringBuf& source, char delim, size_t maxLen) { + TStringBuf maxSource(source.substr(0, maxLen + 1 - target.size())); + size_t pos = maxSource.find(delim); + target += maxSource.substr(0, pos); + source.Skip(pos); + if (target.size() > maxLen) { + Stage = EParseStage::Error; + return false; + } + if (!source.empty() && *source.begin() == delim) { + source.Skip(1); + } + return pos != TStringBuf::npos; + } + + template <typename StringType> + bool ProcessData(StringType& target, TStringBuf& source, TStringBuf delim, size_t maxLen) { + if (delim.empty()) { + return false; + } + if (delim.size() == 1) { + return ProcessData(target, source, delim[0], maxLen); + } if (ProcessData(target, source, delim.back(), maxLen + 1)) { - for (signed i = delim.size() - 2; i >= 0; --i) { - TrimEnd(target, delim[i]); - } - return true; - } - return false; - } - - template <typename StringType> - bool ProcessData(StringType& target, TStringBuf& source, size_t size) { - TStringBuf maxSource(source.substr(0, size - target.size())); - target += maxSource; - source.Skip(maxSource.size()); - if (target.size() > size && !source.empty()) { - Stage = EParseStage::Error; - return false; - } - return target.size() == size; - } - - void ProcessHeader(TStringBuf& header) { - TStringBuf name = header.NextTok(':'); - TrimBegin(name, ' '); - TStringBuf value = header; - Trim(value, ' '); - auto cit = HeaderType::HeadersLocation.find(name); - if (cit != HeaderType::HeadersLocation.end()) { - this->*cit->second = value; - } - header.Clear(); - } - - size_t ParseHex(TStringBuf value) { - size_t result = 0; - for (char ch : value) { - if (ch >= '0' && ch <= '9') { - result *= 16; - result += ch - '0'; - } else if (ch >= 'a' && ch <= 'f') { - result *= 16; - result += 10 + ch - 'a'; - } else if (ch >= 'A' && ch <= 'F') { - result *= 16; - result += 10 + ch - 'A'; - } else if (ch == ';') { - break; - } else if (isspace(ch)) { - continue; - } else { - Stage = EParseStage::Error; - return 0; - } - } - return result; - } - - void Advance(size_t len); - void ConnectionClosed(); - - void Clear() { - BufferType::Clear(); - HeaderType::Clear(); - Stage = GetInitialStage(); - Line.Clear(); - Content.clear(); - } - - bool IsReady() const { - return Stage == EParseStage::Done; - } - - bool IsError() const { - return Stage == EParseStage::Error; - } - - TStringBuf GetErrorText() const { - switch (LastSuccessStage) { - case EParseStage::Method: - return "Invalid http method"; - case EParseStage::URL: - return "Invalid url"; - case EParseStage::Protocol: - return "Invalid http protocol"; - case EParseStage::Version: - return "Invalid http version"; - case EParseStage::Status: - return "Invalid http status"; - case EParseStage::Message: - return "Invalid http message"; - case EParseStage::Header: - return "Invalid http header"; - case EParseStage::Body: - return "Invalid content body"; - case EParseStage::ChunkLength: - case EParseStage::ChunkData: - return "Broken chunked data"; - case EParseStage::Done: - return "Everything is fine"; - case EParseStage::Error: - return "Error on error"; // wat? ...because we don't want to include default label here - } - } - - bool IsDone() const { - return IsReady() || IsError(); - } - - bool HaveBody() const { - return !HeaderType::ContentType.empty() || !HeaderType::ContentLength.empty() || !HeaderType::TransferEncoding.empty(); - } - - bool EnsureEnoughSpaceAvailable(size_t need = BufferType::BUFFER_MIN_STEP) { - bool result = BufferType::EnsureEnoughSpaceAvailable(need); - if (!result && !BufferType::Empty()) { - Reparse(); - } - return true; - } - - void Reparse() { - size_t size = BufferType::Size(); - Clear(); - Advance(size); - } - - TStringBuf GetRawData() const { - return TStringBuf(BufferType::Data(), BufferType::Size()); - } - - TString GetObfuscatedData() const { - THeaders headers(HeaderType::Headers); - TStringBuf authorization(headers["Authorization"]); - TStringBuf cookie(headers["Cookie"]); - TStringBuf x_ydb_auth_ticket(headers["x-ydb-auth-ticket"]); - TStringBuf x_yacloud_subjecttoken(headers["x-yacloud-subjecttoken"]); - TString data(GetRawData()); - if (!authorization.empty()) { - auto pos = data.find(authorization); - if (pos != TString::npos) { - data.replace(pos, authorization.size(), TString("<obfuscated>")); - } - } - if (!cookie.empty()) { - auto pos = data.find(cookie); - if (pos != TString::npos) { - data.replace(pos, cookie.size(), TString("<obfuscated>")); - } - } - if (!x_ydb_auth_ticket.empty()) { - auto pos = data.find(x_ydb_auth_ticket); - if (pos != TString::npos) { - data.replace(pos, x_ydb_auth_ticket.size(), TString("<obfuscated>")); - } - } - if (!x_yacloud_subjecttoken.empty()) { - auto pos = data.find(x_yacloud_subjecttoken); - if (pos != TString::npos) { - data.replace(pos, x_yacloud_subjecttoken.size(), TString("<obfuscated>")); - } - } - return data; - } - - static EParseStage GetInitialStage(); - - THttpParser() - : Stage(GetInitialStage()) - , LastSuccessStage(Stage) - {} -}; - -template <typename HeaderType, typename BufferType> -class THttpRenderer : public HeaderType, public BufferType { -public: - enum class ERenderStage { - Init, - Header, - Body, - Done, - Error, - }; - - ERenderStage Stage = ERenderStage::Init; - - void Append(TStringBuf text) { - EnsureEnoughSpaceAvailable(text.size()); - BufferType::Append(text.data(), text.size()); - } - - void Append(char c) { - EnsureEnoughSpaceAvailable(sizeof(c)); - BufferType::Append(c); - } - - template <TStringBuf HeaderType::* string> - void AppendParsedValue(TStringBuf value) { - Append(value); - static_cast<HeaderType*>(this)->*string = TStringBuf(BufferType::Pos() - value.size(), value.size()); - } - - template <TStringBuf HeaderType::* name> - void Set(TStringBuf value) { - Y_VERIFY_DEBUG(Stage == ERenderStage::Header); - Append(HeaderType::template GetName<name>()); - Append(": "); - AppendParsedValue<name>(value); - Append("\r\n"); - HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data()); - } - - void Set(TStringBuf name, TStringBuf value) { - Y_VERIFY_DEBUG(Stage == ERenderStage::Header); - Append(name); - Append(": "); - Append(value); - Append("\r\n"); - HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data()); - } - - void Set(const THeaders& headers) { - Y_VERIFY_DEBUG(Stage == ERenderStage::Header); - Append(headers.Render()); - HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data()); - } - - //THttpRenderer(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version); // request - void InitRequest(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version) { - Y_VERIFY_DEBUG(Stage == ERenderStage::Init); - AppendParsedValue<&THttpRequest::Method>(method); - Append(' '); - AppendParsedValue<&THttpRequest::URL>(url); - Append(' '); - AppendParsedValue<&THttpRequest::Protocol>(protocol); - Append('/'); - AppendParsedValue<&THttpRequest::Version>(version); - Append("\r\n"); - Stage = ERenderStage::Header; - HeaderType::Headers = TStringBuf(BufferType::Pos(), size_t(0)); - } - - //THttpRenderer(TStringBuf protocol, TStringBuf version, TStringBuf status, TStringBuf message); // response - void InitResponse(TStringBuf protocol, TStringBuf version, TStringBuf status, TStringBuf message) { - Y_VERIFY_DEBUG(Stage == ERenderStage::Init); - AppendParsedValue<&THttpResponse::Protocol>(protocol); - Append('/'); - AppendParsedValue<&THttpResponse::Version>(version); - Append(' '); - AppendParsedValue<&THttpResponse::Status>(status); - Append(' '); - AppendParsedValue<&THttpResponse::Message>(message); - Append("\r\n"); - Stage = ERenderStage::Header; - HeaderType::Headers = TStringBuf(BufferType::Pos(), size_t(0)); - } - - void FinishHeader() { - Append("\r\n"); - HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data()); - Stage = ERenderStage::Body; - } - - void SetBody(TStringBuf body) { - Y_VERIFY_DEBUG(Stage == ERenderStage::Header); - if (HeaderType::ContentLength.empty()) { - Set<&HeaderType::ContentLength>(ToString(body.size())); - } - FinishHeader(); - AppendParsedValue<&HeaderType::Body>(body); - Stage = ERenderStage::Done; - } - - bool IsDone() const { - return Stage == ERenderStage::Done; - } - - void Finish() { - switch (Stage) { - case ERenderStage::Header: - FinishHeader(); - break; - default: - break; - } - } - - bool EnsureEnoughSpaceAvailable(size_t need = BufferType::BUFFER_MIN_STEP) { - bool result = BufferType::EnsureEnoughSpaceAvailable(need); - if (!result && !BufferType::Empty()) { - Reparse(); - } - return true; - } - - void Clear() { - BufferType::Clear(); - HeaderType::Clear(); - } - - void Reparse() { - // move-magic - size_t size = BufferType::Size(); - THttpParser<HeaderType, BufferType> parser; - // move the buffer to parser - static_cast<BufferType&>(parser) = std::move(static_cast<BufferType&>(*this)); - // reparse - parser.Clear(); - parser.Advance(size); - // move buffer and result back - static_cast<HeaderType&>(*this) = std::move(static_cast<HeaderType&>(parser)); - static_cast<BufferType&>(*this) = std::move(static_cast<BufferType&>(parser)); - switch (parser.Stage) { - case THttpParser<HeaderType, BufferType>::EParseStage::Method: - case THttpParser<HeaderType, BufferType>::EParseStage::URL: - case THttpParser<HeaderType, BufferType>::EParseStage::Protocol: - case THttpParser<HeaderType, BufferType>::EParseStage::Version: - case THttpParser<HeaderType, BufferType>::EParseStage::Status: - case THttpParser<HeaderType, BufferType>::EParseStage::Message: - Stage = ERenderStage::Init; - break; - case THttpParser<HeaderType, BufferType>::EParseStage::Header: - Stage = ERenderStage::Header; + for (signed i = delim.size() - 2; i >= 0; --i) { + TrimEnd(target, delim[i]); + } + return true; + } + return false; + } + + template <typename StringType> + bool ProcessData(StringType& target, TStringBuf& source, size_t size) { + TStringBuf maxSource(source.substr(0, size - target.size())); + target += maxSource; + source.Skip(maxSource.size()); + if (target.size() > size && !source.empty()) { + Stage = EParseStage::Error; + return false; + } + return target.size() == size; + } + + void ProcessHeader(TStringBuf& header) { + TStringBuf name = header.NextTok(':'); + TrimBegin(name, ' '); + TStringBuf value = header; + Trim(value, ' '); + auto cit = HeaderType::HeadersLocation.find(name); + if (cit != HeaderType::HeadersLocation.end()) { + this->*cit->second = value; + } + header.Clear(); + } + + size_t ParseHex(TStringBuf value) { + size_t result = 0; + for (char ch : value) { + if (ch >= '0' && ch <= '9') { + result *= 16; + result += ch - '0'; + } else if (ch >= 'a' && ch <= 'f') { + result *= 16; + result += 10 + ch - 'a'; + } else if (ch >= 'A' && ch <= 'F') { + result *= 16; + result += 10 + ch - 'A'; + } else if (ch == ';') { + break; + } else if (isspace(ch)) { + continue; + } else { + Stage = EParseStage::Error; + return 0; + } + } + return result; + } + + void Advance(size_t len); + void ConnectionClosed(); + + void Clear() { + BufferType::Clear(); + HeaderType::Clear(); + Stage = GetInitialStage(); + Line.Clear(); + Content.clear(); + } + + bool IsReady() const { + return Stage == EParseStage::Done; + } + + bool IsError() const { + return Stage == EParseStage::Error; + } + + TStringBuf GetErrorText() const { + switch (LastSuccessStage) { + case EParseStage::Method: + return "Invalid http method"; + case EParseStage::URL: + return "Invalid url"; + case EParseStage::Protocol: + return "Invalid http protocol"; + case EParseStage::Version: + return "Invalid http version"; + case EParseStage::Status: + return "Invalid http status"; + case EParseStage::Message: + return "Invalid http message"; + case EParseStage::Header: + return "Invalid http header"; + case EParseStage::Body: + return "Invalid content body"; + case EParseStage::ChunkLength: + case EParseStage::ChunkData: + return "Broken chunked data"; + case EParseStage::Done: + return "Everything is fine"; + case EParseStage::Error: + return "Error on error"; // wat? ...because we don't want to include default label here + } + } + + bool IsDone() const { + return IsReady() || IsError(); + } + + bool HaveBody() const { + return !HeaderType::ContentType.empty() || !HeaderType::ContentLength.empty() || !HeaderType::TransferEncoding.empty(); + } + + bool EnsureEnoughSpaceAvailable(size_t need = BufferType::BUFFER_MIN_STEP) { + bool result = BufferType::EnsureEnoughSpaceAvailable(need); + if (!result && !BufferType::Empty()) { + Reparse(); + } + return true; + } + + void Reparse() { + size_t size = BufferType::Size(); + Clear(); + Advance(size); + } + + TStringBuf GetRawData() const { + return TStringBuf(BufferType::Data(), BufferType::Size()); + } + + TString GetObfuscatedData() const { + THeaders headers(HeaderType::Headers); + TStringBuf authorization(headers["Authorization"]); + TStringBuf cookie(headers["Cookie"]); + TStringBuf x_ydb_auth_ticket(headers["x-ydb-auth-ticket"]); + TStringBuf x_yacloud_subjecttoken(headers["x-yacloud-subjecttoken"]); + TString data(GetRawData()); + if (!authorization.empty()) { + auto pos = data.find(authorization); + if (pos != TString::npos) { + data.replace(pos, authorization.size(), TString("<obfuscated>")); + } + } + if (!cookie.empty()) { + auto pos = data.find(cookie); + if (pos != TString::npos) { + data.replace(pos, cookie.size(), TString("<obfuscated>")); + } + } + if (!x_ydb_auth_ticket.empty()) { + auto pos = data.find(x_ydb_auth_ticket); + if (pos != TString::npos) { + data.replace(pos, x_ydb_auth_ticket.size(), TString("<obfuscated>")); + } + } + if (!x_yacloud_subjecttoken.empty()) { + auto pos = data.find(x_yacloud_subjecttoken); + if (pos != TString::npos) { + data.replace(pos, x_yacloud_subjecttoken.size(), TString("<obfuscated>")); + } + } + return data; + } + + static EParseStage GetInitialStage(); + + THttpParser() + : Stage(GetInitialStage()) + , LastSuccessStage(Stage) + {} +}; + +template <typename HeaderType, typename BufferType> +class THttpRenderer : public HeaderType, public BufferType { +public: + enum class ERenderStage { + Init, + Header, + Body, + Done, + Error, + }; + + ERenderStage Stage = ERenderStage::Init; + + void Append(TStringBuf text) { + EnsureEnoughSpaceAvailable(text.size()); + BufferType::Append(text.data(), text.size()); + } + + void Append(char c) { + EnsureEnoughSpaceAvailable(sizeof(c)); + BufferType::Append(c); + } + + template <TStringBuf HeaderType::* string> + void AppendParsedValue(TStringBuf value) { + Append(value); + static_cast<HeaderType*>(this)->*string = TStringBuf(BufferType::Pos() - value.size(), value.size()); + } + + template <TStringBuf HeaderType::* name> + void Set(TStringBuf value) { + Y_VERIFY_DEBUG(Stage == ERenderStage::Header); + Append(HeaderType::template GetName<name>()); + Append(": "); + AppendParsedValue<name>(value); + Append("\r\n"); + HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data()); + } + + void Set(TStringBuf name, TStringBuf value) { + Y_VERIFY_DEBUG(Stage == ERenderStage::Header); + Append(name); + Append(": "); + Append(value); + Append("\r\n"); + HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data()); + } + + void Set(const THeaders& headers) { + Y_VERIFY_DEBUG(Stage == ERenderStage::Header); + Append(headers.Render()); + HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data()); + } + + //THttpRenderer(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version); // request + void InitRequest(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version) { + Y_VERIFY_DEBUG(Stage == ERenderStage::Init); + AppendParsedValue<&THttpRequest::Method>(method); + Append(' '); + AppendParsedValue<&THttpRequest::URL>(url); + Append(' '); + AppendParsedValue<&THttpRequest::Protocol>(protocol); + Append('/'); + AppendParsedValue<&THttpRequest::Version>(version); + Append("\r\n"); + Stage = ERenderStage::Header; + HeaderType::Headers = TStringBuf(BufferType::Pos(), size_t(0)); + } + + //THttpRenderer(TStringBuf protocol, TStringBuf version, TStringBuf status, TStringBuf message); // response + void InitResponse(TStringBuf protocol, TStringBuf version, TStringBuf status, TStringBuf message) { + Y_VERIFY_DEBUG(Stage == ERenderStage::Init); + AppendParsedValue<&THttpResponse::Protocol>(protocol); + Append('/'); + AppendParsedValue<&THttpResponse::Version>(version); + Append(' '); + AppendParsedValue<&THttpResponse::Status>(status); + Append(' '); + AppendParsedValue<&THttpResponse::Message>(message); + Append("\r\n"); + Stage = ERenderStage::Header; + HeaderType::Headers = TStringBuf(BufferType::Pos(), size_t(0)); + } + + void FinishHeader() { + Append("\r\n"); + HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data()); + Stage = ERenderStage::Body; + } + + void SetBody(TStringBuf body) { + Y_VERIFY_DEBUG(Stage == ERenderStage::Header); + if (HeaderType::ContentLength.empty()) { + Set<&HeaderType::ContentLength>(ToString(body.size())); + } + FinishHeader(); + AppendParsedValue<&HeaderType::Body>(body); + Stage = ERenderStage::Done; + } + + bool IsDone() const { + return Stage == ERenderStage::Done; + } + + void Finish() { + switch (Stage) { + case ERenderStage::Header: + FinishHeader(); break; - case THttpParser<HeaderType, BufferType>::EParseStage::Body: - case THttpParser<HeaderType, BufferType>::EParseStage::ChunkLength: - case THttpParser<HeaderType, BufferType>::EParseStage::ChunkData: - Stage = ERenderStage::Body; - break; - case THttpParser<HeaderType, BufferType>::EParseStage::Done: - Stage = ERenderStage::Done; - break; - case THttpParser<HeaderType, BufferType>::EParseStage::Error: - Stage = ERenderStage::Error; - break; - } - Y_VERIFY(size == BufferType::Size()); - } - - TStringBuf GetRawData() const { - return TStringBuf(BufferType::Data(), BufferType::Size()); - } -}; - -template <> -template <> -inline void THttpRenderer<THttpResponse, TSocketBuffer>::Set<&THttpResponse::Body>(TStringBuf value) { - SetBody(value); -} - -template <> -template <> -inline void THttpRenderer<THttpRequest, TSocketBuffer>::Set<&THttpRequest::Body>(TStringBuf value) { - SetBody(value); -} - -class THttpIncomingRequest; -using THttpIncomingRequestPtr = TIntrusivePtr<THttpIncomingRequest>; - -class THttpOutgoingResponse; -using THttpOutgoingResponsePtr = TIntrusivePtr<THttpOutgoingResponse>; - -class THttpIncomingRequest : - public THttpParser<THttpRequest, TSocketBuffer>, - public TRefCounted<THttpIncomingRequest, TAtomicCounter> { -public: - THttpConfig::SocketAddressType Address; - TString WorkerName; - THPTimer Timer; - bool Secure = false; - - bool IsConnectionClose() const { - if (Connection.empty()) { - return Version == "1.0"; - } else { - return Connection == "close"; - } - } - - TStringBuf GetConnection() const { - if (!Connection.empty()) { - return Connection; - } - return Version == "1.0" ? "close" : "keep-alive"; - } - - THttpOutgoingResponsePtr CreateResponseOK(TStringBuf body, TStringBuf contentType = "text/html", TInstant lastModified = TInstant()); - THttpOutgoingResponsePtr CreateResponseString(TStringBuf data); - THttpOutgoingResponsePtr CreateResponseBadRequest(TStringBuf html = TStringBuf(), TStringBuf contentType = "text/html"); // 400 - THttpOutgoingResponsePtr CreateResponseNotFound(TStringBuf html = TStringBuf(), TStringBuf contentType = "text/html"); // 404 - THttpOutgoingResponsePtr CreateResponseServiceUnavailable(TStringBuf html = TStringBuf(), TStringBuf contentType = "text/html"); // 503 - THttpOutgoingResponsePtr CreateResponseGatewayTimeout(TStringBuf html = TStringBuf(), TStringBuf contentType = "text/html"); // 504 - THttpOutgoingResponsePtr CreateResponse( - TStringBuf status, - TStringBuf message, - TStringBuf contentType = TStringBuf(), - TStringBuf body = TStringBuf(), - TInstant lastModified = TInstant()); - - THttpIncomingRequestPtr Duplicate(); -}; - -class THttpIncomingResponse; -using THttpIncomingResponsePtr = TIntrusivePtr<THttpIncomingResponse>; - -class THttpOutgoingRequest; -using THttpOutgoingRequestPtr = TIntrusivePtr<THttpOutgoingRequest>; - -class THttpIncomingResponse : - public THttpParser<THttpResponse, TSocketBuffer>, - public TRefCounted<THttpIncomingResponse, TAtomicCounter> { -public: - THttpIncomingResponse(THttpOutgoingRequestPtr request); - - THttpOutgoingRequestPtr GetRequest() const { - return Request; - } - - THttpIncomingResponsePtr Duplicate(THttpOutgoingRequestPtr request); - THttpOutgoingResponsePtr Reverse(THttpIncomingRequestPtr request); - -protected: - THttpOutgoingRequestPtr Request; -}; - -class THttpOutgoingRequest : - public THttpRenderer<THttpRequest, TSocketBuffer>, - public TRefCounted<THttpOutgoingRequest, TAtomicCounter> { -public: - THPTimer Timer; + default: + break; + } + } + + bool EnsureEnoughSpaceAvailable(size_t need = BufferType::BUFFER_MIN_STEP) { + bool result = BufferType::EnsureEnoughSpaceAvailable(need); + if (!result && !BufferType::Empty()) { + Reparse(); + } + return true; + } + + void Clear() { + BufferType::Clear(); + HeaderType::Clear(); + } + + void Reparse() { + // move-magic + size_t size = BufferType::Size(); + THttpParser<HeaderType, BufferType> parser; + // move the buffer to parser + static_cast<BufferType&>(parser) = std::move(static_cast<BufferType&>(*this)); + // reparse + parser.Clear(); + parser.Advance(size); + // move buffer and result back + static_cast<HeaderType&>(*this) = std::move(static_cast<HeaderType&>(parser)); + static_cast<BufferType&>(*this) = std::move(static_cast<BufferType&>(parser)); + switch (parser.Stage) { + case THttpParser<HeaderType, BufferType>::EParseStage::Method: + case THttpParser<HeaderType, BufferType>::EParseStage::URL: + case THttpParser<HeaderType, BufferType>::EParseStage::Protocol: + case THttpParser<HeaderType, BufferType>::EParseStage::Version: + case THttpParser<HeaderType, BufferType>::EParseStage::Status: + case THttpParser<HeaderType, BufferType>::EParseStage::Message: + Stage = ERenderStage::Init; + break; + case THttpParser<HeaderType, BufferType>::EParseStage::Header: + Stage = ERenderStage::Header; + break; + case THttpParser<HeaderType, BufferType>::EParseStage::Body: + case THttpParser<HeaderType, BufferType>::EParseStage::ChunkLength: + case THttpParser<HeaderType, BufferType>::EParseStage::ChunkData: + Stage = ERenderStage::Body; + break; + case THttpParser<HeaderType, BufferType>::EParseStage::Done: + Stage = ERenderStage::Done; + break; + case THttpParser<HeaderType, BufferType>::EParseStage::Error: + Stage = ERenderStage::Error; + break; + } + Y_VERIFY(size == BufferType::Size()); + } + + TStringBuf GetRawData() const { + return TStringBuf(BufferType::Data(), BufferType::Size()); + } +}; + +template <> +template <> +inline void THttpRenderer<THttpResponse, TSocketBuffer>::Set<&THttpResponse::Body>(TStringBuf value) { + SetBody(value); +} + +template <> +template <> +inline void THttpRenderer<THttpRequest, TSocketBuffer>::Set<&THttpRequest::Body>(TStringBuf value) { + SetBody(value); +} + +class THttpIncomingRequest; +using THttpIncomingRequestPtr = TIntrusivePtr<THttpIncomingRequest>; + +class THttpOutgoingResponse; +using THttpOutgoingResponsePtr = TIntrusivePtr<THttpOutgoingResponse>; + +class THttpIncomingRequest : + public THttpParser<THttpRequest, TSocketBuffer>, + public TRefCounted<THttpIncomingRequest, TAtomicCounter> { +public: + THttpConfig::SocketAddressType Address; + TString WorkerName; + THPTimer Timer; bool Secure = false; - - THttpOutgoingRequest() = default; - THttpOutgoingRequest(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version); - THttpOutgoingRequest(TStringBuf method, TStringBuf scheme, TStringBuf host, TStringBuf uri, TStringBuf protocol, TStringBuf version); - static THttpOutgoingRequestPtr CreateRequestString(TStringBuf data); - static THttpOutgoingRequestPtr CreateRequestString(const TString& data); - static THttpOutgoingRequestPtr CreateRequestGet(TStringBuf url); - static THttpOutgoingRequestPtr CreateRequestGet(TStringBuf host, TStringBuf uri); // http only - static THttpOutgoingRequestPtr CreateRequestPost(TStringBuf url, TStringBuf contentType = {}, TStringBuf body = {}); - static THttpOutgoingRequestPtr CreateRequestPost(TStringBuf host, TStringBuf uri, TStringBuf contentType, TStringBuf body); // http only - static THttpOutgoingRequestPtr CreateRequest(TStringBuf method, TStringBuf url, TStringBuf contentType = TStringBuf(), TStringBuf body = TStringBuf()); - static THttpOutgoingRequestPtr CreateHttpRequest(TStringBuf method, TStringBuf host, TStringBuf uri, TStringBuf contentType = TStringBuf(), TStringBuf body = TStringBuf()); - THttpOutgoingRequestPtr Duplicate(); -}; - -class THttpOutgoingResponse : - public THttpRenderer<THttpResponse, TSocketBuffer>, - public TRefCounted<THttpOutgoingResponse, TAtomicCounter> { -public: - THttpOutgoingResponse(THttpIncomingRequestPtr request); - THttpOutgoingResponse(THttpIncomingRequestPtr request, TStringBuf protocol, TStringBuf version, TStringBuf status, TStringBuf message); - - bool IsConnectionClose() const { - if (!Connection.empty()) { - return Connection == "close"; - } else { - return Request->IsConnectionClose(); - } - } - - bool IsNeedBody() const { - return Status != "204"; - } - - THttpIncomingRequestPtr GetRequest() const { - return Request; - } - - THttpOutgoingResponsePtr Duplicate(THttpIncomingRequestPtr request); - -// it's temporary accessible for cleanup -//protected: - THttpIncomingRequestPtr Request; -}; - -} + + bool IsConnectionClose() const { + if (Connection.empty()) { + return Version == "1.0"; + } else { + return Connection == "close"; + } + } + + TStringBuf GetConnection() const { + if (!Connection.empty()) { + return Connection; + } + return Version == "1.0" ? "close" : "keep-alive"; + } + + THttpOutgoingResponsePtr CreateResponseOK(TStringBuf body, TStringBuf contentType = "text/html", TInstant lastModified = TInstant()); + THttpOutgoingResponsePtr CreateResponseString(TStringBuf data); + THttpOutgoingResponsePtr CreateResponseBadRequest(TStringBuf html = TStringBuf(), TStringBuf contentType = "text/html"); // 400 + THttpOutgoingResponsePtr CreateResponseNotFound(TStringBuf html = TStringBuf(), TStringBuf contentType = "text/html"); // 404 + THttpOutgoingResponsePtr CreateResponseServiceUnavailable(TStringBuf html = TStringBuf(), TStringBuf contentType = "text/html"); // 503 + THttpOutgoingResponsePtr CreateResponseGatewayTimeout(TStringBuf html = TStringBuf(), TStringBuf contentType = "text/html"); // 504 + THttpOutgoingResponsePtr CreateResponse( + TStringBuf status, + TStringBuf message, + TStringBuf contentType = TStringBuf(), + TStringBuf body = TStringBuf(), + TInstant lastModified = TInstant()); + + THttpIncomingRequestPtr Duplicate(); +}; + +class THttpIncomingResponse; +using THttpIncomingResponsePtr = TIntrusivePtr<THttpIncomingResponse>; + +class THttpOutgoingRequest; +using THttpOutgoingRequestPtr = TIntrusivePtr<THttpOutgoingRequest>; + +class THttpIncomingResponse : + public THttpParser<THttpResponse, TSocketBuffer>, + public TRefCounted<THttpIncomingResponse, TAtomicCounter> { +public: + THttpIncomingResponse(THttpOutgoingRequestPtr request); + + THttpOutgoingRequestPtr GetRequest() const { + return Request; + } + + THttpIncomingResponsePtr Duplicate(THttpOutgoingRequestPtr request); + THttpOutgoingResponsePtr Reverse(THttpIncomingRequestPtr request); + +protected: + THttpOutgoingRequestPtr Request; +}; + +class THttpOutgoingRequest : + public THttpRenderer<THttpRequest, TSocketBuffer>, + public TRefCounted<THttpOutgoingRequest, TAtomicCounter> { +public: + THPTimer Timer; + bool Secure = false; + + THttpOutgoingRequest() = default; + THttpOutgoingRequest(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version); + THttpOutgoingRequest(TStringBuf method, TStringBuf scheme, TStringBuf host, TStringBuf uri, TStringBuf protocol, TStringBuf version); + static THttpOutgoingRequestPtr CreateRequestString(TStringBuf data); + static THttpOutgoingRequestPtr CreateRequestString(const TString& data); + static THttpOutgoingRequestPtr CreateRequestGet(TStringBuf url); + static THttpOutgoingRequestPtr CreateRequestGet(TStringBuf host, TStringBuf uri); // http only + static THttpOutgoingRequestPtr CreateRequestPost(TStringBuf url, TStringBuf contentType = {}, TStringBuf body = {}); + static THttpOutgoingRequestPtr CreateRequestPost(TStringBuf host, TStringBuf uri, TStringBuf contentType, TStringBuf body); // http only + static THttpOutgoingRequestPtr CreateRequest(TStringBuf method, TStringBuf url, TStringBuf contentType = TStringBuf(), TStringBuf body = TStringBuf()); + static THttpOutgoingRequestPtr CreateHttpRequest(TStringBuf method, TStringBuf host, TStringBuf uri, TStringBuf contentType = TStringBuf(), TStringBuf body = TStringBuf()); + THttpOutgoingRequestPtr Duplicate(); +}; + +class THttpOutgoingResponse : + public THttpRenderer<THttpResponse, TSocketBuffer>, + public TRefCounted<THttpOutgoingResponse, TAtomicCounter> { +public: + THttpOutgoingResponse(THttpIncomingRequestPtr request); + THttpOutgoingResponse(THttpIncomingRequestPtr request, TStringBuf protocol, TStringBuf version, TStringBuf status, TStringBuf message); + + bool IsConnectionClose() const { + if (!Connection.empty()) { + return Connection == "close"; + } else { + return Request->IsConnectionClose(); + } + } + + bool IsNeedBody() const { + return Status != "204"; + } + + THttpIncomingRequestPtr GetRequest() const { + return Request; + } + + THttpOutgoingResponsePtr Duplicate(THttpIncomingRequestPtr request); + +// it's temporary accessible for cleanup +//protected: + THttpIncomingRequestPtr Request; +}; + +} diff --git a/library/cpp/actors/http/http_cache.cpp b/library/cpp/actors/http/http_cache.cpp index 27c4eeb6f3..8fe6783260 100644 --- a/library/cpp/actors/http/http_cache.cpp +++ b/library/cpp/actors/http/http_cache.cpp @@ -1,4 +1,4 @@ -#include "http.h" +#include "http.h" #include "http_proxy.h" #include "http_cache.h" #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -6,594 +6,594 @@ #include <library/cpp/actors/core/log.h> #include <library/cpp/actors/core/scheduler_basic.h> #include <library/cpp/actors/http/http.h> -#include <library/cpp/digest/md5/md5.h> -#include <util/digest/multi.h> -#include <util/generic/queue.h> +#include <library/cpp/digest/md5/md5.h> +#include <util/digest/multi.h> +#include <util/generic/queue.h> #include <util/string/cast.h> - -namespace NHttp { - -class THttpOutgoingCacheActor : public NActors::TActorBootstrapped<THttpOutgoingCacheActor>, THttpConfig { -public: - using TBase = NActors::TActorBootstrapped<THttpOutgoingCacheActor>; + +namespace NHttp { + +class THttpOutgoingCacheActor : public NActors::TActorBootstrapped<THttpOutgoingCacheActor>, THttpConfig { +public: + using TBase = NActors::TActorBootstrapped<THttpOutgoingCacheActor>; NActors::TActorId HttpProxyId; - TGetCachePolicy GetCachePolicy; - static constexpr TDuration RefreshTimeout = TDuration::Seconds(1); - - struct TCacheKey { - TString Host; - TString URL; - TString Headers; - - operator size_t() const { - return MultiHash(Host, URL, Headers); - } - - TString GetId() const { - return MD5::Calc(Host + ':' + URL + ':' + Headers); - } - }; - - struct TCacheRecord { - TInstant RefreshTime; - TInstant DeathTime; - TCachePolicy CachePolicy; - NHttp::THttpOutgoingRequestPtr Request; - NHttp::THttpOutgoingRequestPtr OutgoingRequest; - TDuration Timeout; - NHttp::THttpIncomingResponsePtr Response; - TString Error; - TVector<NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr> Waiters; - - TCacheRecord(const TCachePolicy cachePolicy) - : CachePolicy(cachePolicy) - {} - - bool IsValid() const { - return Response != nullptr || !Error.empty(); - } - - void UpdateResponse(NHttp::THttpIncomingResponsePtr response, const TString& error, TInstant now) { - if (error.empty() || Response == nullptr || !CachePolicy.KeepOnError) { - Response = response; - Error = error; - } - RefreshTime = now + CachePolicy.TimeToRefresh; - if (CachePolicy.PaceToRefresh) { - RefreshTime += TDuration::MilliSeconds(RandomNumber<ui64>() % CachePolicy.PaceToRefresh.MilliSeconds()); - } - } - - TString GetName() const { - return TStringBuilder() << (Request->Secure ? "https://" : "http://") << Request->Host << Request->URL; - } - }; - - struct TRefreshRecord { - TCacheKey Key; - TInstant RefreshTime; - - bool operator <(const TRefreshRecord& b) const { - return RefreshTime > b.RefreshTime; - } - }; - - THashMap<TCacheKey, TCacheRecord> Cache; - TPriorityQueue<TRefreshRecord> RefreshQueue; - THashMap<THttpOutgoingRequest*, TCacheKey> OutgoingRequests; - - THttpOutgoingCacheActor(const NActors::TActorId& httpProxyId, TGetCachePolicy getCachePolicy) - : HttpProxyId(httpProxyId) - , GetCachePolicy(std::move(getCachePolicy)) - {} - - void Bootstrap(const NActors::TActorContext&) { - // - Become(&THttpOutgoingCacheActor::StateWork, RefreshTimeout, new NActors::TEvents::TEvWakeup()); - } - - static TString GetCacheHeadersKey(const NHttp::THttpOutgoingRequest* request, const TCachePolicy& policy) { - TStringBuilder key; - if (!policy.HeadersToCacheKey.empty()) { - NHttp::THeaders headers(request->Headers); - for (const TString& header : policy.HeadersToCacheKey) { - key << headers[header]; - } - } - return key; - } - - static TCacheKey GetCacheKey(const NHttp::THttpOutgoingRequest* request, const TCachePolicy& policy) { - return { ToString(request->Host), ToString(request->URL), GetCacheHeadersKey(request, policy) }; - } - - void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) { - ctx.Send(event->Forward(HttpProxyId)); - } - - void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) { - ctx.Send(event->Forward(HttpProxyId)); - } - - void Handle(NHttp::TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { - ctx.Send(event->Forward(HttpProxyId)); - } - - void Handle(NHttp::TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) { - ctx.Send(event->Forward(HttpProxyId)); - } - - void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) { - NHttp::THttpOutgoingRequestPtr request(event->Get()->Request); - NHttp::THttpIncomingResponsePtr response(event->Get()->Response); - auto itRequests = OutgoingRequests.find(request.Get()); - if (itRequests == OutgoingRequests.end()) { - LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown request " << request->Host << request->URL); - return; - } - auto key = itRequests->second; - OutgoingRequests.erase(itRequests); - auto it = Cache.find(key); - if (it == Cache.end()) { - LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown cache key " << request->Host << request->URL); - return; - } - TCacheRecord& cacheRecord = it->second; - cacheRecord.OutgoingRequest.Reset(); - for (auto& waiter : cacheRecord.Waiters) { - NHttp::THttpIncomingResponsePtr response2; - TString error2; - if (response != nullptr) { - response2 = response->Duplicate(waiter->Get()->Request); - } - if (!event->Get()->Error.empty()) { - error2 = event->Get()->Error; - } - ctx.Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpIncomingResponse(waiter->Get()->Request, response2, error2)); - } - cacheRecord.Waiters.clear(); - TString error; - if (event->Get()->Error.empty()) { - if (event->Get()->Response != nullptr && event->Get()->Response->Status != "200") { - error = event->Get()->Response->Message; - } - } else { - error = event->Get()->Error; - } - if (!error.empty()) { - LOG_WARN_S(ctx, HttpLog, "Error from " << cacheRecord.GetName() << ": " << error); - } - LOG_DEBUG_S(ctx, HttpLog, "OutgoingUpdate " << cacheRecord.GetName()); - cacheRecord.UpdateResponse(response, event->Get()->Error, ctx.Now()); - RefreshQueue.push({it->first, it->second.RefreshTime}); - LOG_DEBUG_S(ctx, HttpLog, "OutgoingSchedule " << cacheRecord.GetName() << " at " << cacheRecord.RefreshTime << " until " << cacheRecord.DeathTime); - } - - void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { - const NHttp::THttpOutgoingRequest* request = event->Get()->Request.Get(); - auto policy = GetCachePolicy(request); - if (policy.TimeToExpire == TDuration()) { - ctx.Send(event->Forward(HttpProxyId)); - return; - } - auto key = GetCacheKey(request, policy); - auto it = Cache.find(key); - if (it != Cache.end()) { - if (it->second.IsValid()) { - LOG_DEBUG_S(ctx, HttpLog, "OutgoingRespond " - << it->second.GetName() - << " (" - << ((it->second.Response != nullptr) ? ToString(it->second.Response->Size()) : TString("error")) - << ")"); - NHttp::THttpIncomingResponsePtr response = it->second.Response; - if (response != nullptr) { - response = response->Duplicate(event->Get()->Request); - } - ctx.Send(event->Sender, - new NHttp::TEvHttpProxy::TEvHttpIncomingResponse(event->Get()->Request, - response, - it->second.Error)); - it->second.DeathTime = ctx.Now() + it->second.CachePolicy.TimeToExpire; // prolong active cache items - return; - } - } else { - it = Cache.emplace(key, policy).first; - it->second.Request = event->Get()->Request; - it->second.Timeout = event->Get()->Timeout; - it->second.OutgoingRequest = it->second.Request->Duplicate(); - OutgoingRequests[it->second.OutgoingRequest.Get()] = key; - LOG_DEBUG_S(ctx, HttpLog, "OutgoingInitiate " << it->second.GetName()); - ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(it->second.OutgoingRequest, it->second.Timeout)); - } - it->second.DeathTime = ctx.Now() + it->second.CachePolicy.TimeToExpire; - it->second.Waiters.emplace_back(std::move(event)); - } - - void HandleRefresh(const NActors::TActorContext& ctx) { - while (!RefreshQueue.empty() && RefreshQueue.top().RefreshTime <= ctx.Now()) { - TRefreshRecord rrec = RefreshQueue.top(); - RefreshQueue.pop(); - auto it = Cache.find(rrec.Key); - if (it != Cache.end()) { - if (it->second.DeathTime > ctx.Now()) { - LOG_DEBUG_S(ctx, HttpLog, "OutgoingRefresh " << it->second.GetName()); - it->second.OutgoingRequest = it->second.Request->Duplicate(); - OutgoingRequests[it->second.OutgoingRequest.Get()] = it->first; - ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(it->second.OutgoingRequest, it->second.Timeout)); - } else { - LOG_DEBUG_S(ctx, HttpLog, "OutgoingForget " << it->second.GetName()); - if (it->second.OutgoingRequest) { - OutgoingRequests.erase(it->second.OutgoingRequest.Get()); - } - Cache.erase(it); - } - } - } - ctx.Schedule(RefreshTimeout, new NActors::TEvents::TEvWakeup()); - } - - STFUNC(StateWork) { - switch (ev->GetTypeRewrite()) { - HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle); - HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest, Handle); - HFunc(NHttp::TEvHttpProxy::TEvAddListeningPort, Handle); - HFunc(NHttp::TEvHttpProxy::TEvRegisterHandler, Handle); - HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); - HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle); - CFunc(NActors::TEvents::TSystem::Wakeup, HandleRefresh); - } - } -}; - -const TDuration THttpOutgoingCacheActor::RefreshTimeout; - -class THttpIncomingCacheActor : public NActors::TActorBootstrapped<THttpIncomingCacheActor>, THttpConfig { -public: - using TBase = NActors::TActorBootstrapped<THttpIncomingCacheActor>; - NActors::TActorId HttpProxyId; - TGetCachePolicy GetCachePolicy; - static constexpr TDuration RefreshTimeout = TDuration::Seconds(1); - THashMap<TString, TActorId> Handlers; - - struct TCacheKey { - TString Host; - TString URL; - TString Headers; - - operator size_t() const { - return MultiHash(Host, URL, Headers); - } - - TString GetId() const { - return MD5::Calc(Host + ':' + URL + ':' + Headers); - } - }; - - struct TCacheRecord { - TInstant RefreshTime; - TInstant DeathTime; - TCachePolicy CachePolicy; - TString CacheId; - NHttp::THttpIncomingRequestPtr Request; - TDuration Timeout; - NHttp::THttpOutgoingResponsePtr Response; - TVector<NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr> Waiters; - ui32 Retries = 0; - bool Enqueued = false; - - TCacheRecord(const TCachePolicy cachePolicy) - : CachePolicy(cachePolicy) - {} - - bool IsValid() const { - return Response != nullptr; - } - - void InitRequest(NHttp::THttpIncomingRequestPtr request) { - Request = request; - if (CachePolicy.TimeToExpire) { - DeathTime = NActors::TlsActivationContext->Now() + CachePolicy.TimeToExpire; - } - } - - void UpdateResponse(NHttp::THttpOutgoingResponsePtr response, const TString& error, TInstant now) { - if (error.empty() || !CachePolicy.KeepOnError) { - Response = response; - } - Retries = 0; - if (CachePolicy.TimeToRefresh) { - RefreshTime = now + CachePolicy.TimeToRefresh; - if (CachePolicy.PaceToRefresh) { - RefreshTime += TDuration::MilliSeconds(RandomNumber<ui64>() % CachePolicy.PaceToRefresh.MilliSeconds()); - } - } - } - - void UpdateExpireTime() { - if (CachePolicy.TimeToExpire) { - DeathTime = NActors::TlsActivationContext->Now() + CachePolicy.TimeToExpire; - } - } - - TString GetName() const { - return TStringBuilder() << (Request->Secure ? "https://" : "http://") << Request->Host << Request->URL - << " (" << CacheId << ")"; - } - }; - - struct TRefreshRecord { - TCacheKey Key; - TInstant RefreshTime; - - bool operator <(const TRefreshRecord& b) const { - return RefreshTime > b.RefreshTime; - } - }; - - THashMap<TCacheKey, TCacheRecord> Cache; - TPriorityQueue<TRefreshRecord> RefreshQueue; - THashMap<THttpIncomingRequest*, TCacheKey> IncomingRequests; - - THttpIncomingCacheActor(const NActors::TActorId& httpProxyId, TGetCachePolicy getCachePolicy) - : HttpProxyId(httpProxyId) - , GetCachePolicy(std::move(getCachePolicy)) - {} - - void Bootstrap(const NActors::TActorContext&) { - // - Become(&THttpIncomingCacheActor::StateWork, RefreshTimeout, new NActors::TEvents::TEvWakeup()); - } - - static TString GetCacheHeadersKey(const NHttp::THttpIncomingRequest* request, const TCachePolicy& policy) { - TStringBuilder key; - if (!policy.HeadersToCacheKey.empty()) { - NHttp::THeaders headers(request->Headers); - for (const TString& header : policy.HeadersToCacheKey) { - key << headers[header]; - } - } - return key; - } - - static TCacheKey GetCacheKey(const NHttp::THttpIncomingRequest* request, const TCachePolicy& policy) { - return { ToString(request->Host), ToString(request->URL), GetCacheHeadersKey(request, policy) }; - } - - TActorId GetRequestHandler(NHttp::THttpIncomingRequestPtr request) { - TStringBuf url = request->URL.Before('?'); - THashMap<TString, TActorId>::iterator it; - while (!url.empty()) { - it = Handlers.find(url); - if (it != Handlers.end()) { - return it->second; - } else { - if (url.EndsWith('/')) { - url.Trunc(url.size() - 1); - } - size_t pos = url.rfind('/'); - if (pos == TStringBuf::npos) { - break; - } else { - url = url.substr(0, pos + 1); - } - } - } - return {}; - } - - void SendCacheRequest(const TCacheKey& cacheKey, TCacheRecord& cacheRecord, const NActors::TActorContext& ctx) { - cacheRecord.Request = cacheRecord.Request->Duplicate(); - IncomingRequests[cacheRecord.Request.Get()] = cacheKey; - TActorId handler = GetRequestHandler(cacheRecord.Request); - if (handler) { - Send(handler, new NHttp::TEvHttpProxy::TEvHttpIncomingRequest(cacheRecord.Request)); - } else { - LOG_ERROR_S(ctx, HttpLog, "Can't find cache handler for " << cacheRecord.GetName()); - } - } - - void DropCacheRecord(THashMap<TCacheKey, TCacheRecord>::iterator it) { - if (it->second.Request) { - IncomingRequests.erase(it->second.Request.Get()); - } - for (auto& waiter : it->second.Waiters) { - NHttp::THttpOutgoingResponsePtr response; - response = waiter->Get()->Request->CreateResponseGatewayTimeout("Timeout", "text/plain"); - Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); - } - Cache.erase(it); - } - - void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) { - ctx.Send(event->Forward(HttpProxyId)); - } - - void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { - ctx.Send(event->Forward(HttpProxyId)); - } - - void Handle(NHttp::TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { - ctx.Send(event->Forward(HttpProxyId)); - } - - void Handle(NHttp::TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) { - Handlers[event->Get()->Path] = event->Get()->Handler; - ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvRegisterHandler(event->Get()->Path, ctx.SelfID)); - } - - void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) { - NHttp::THttpIncomingRequestPtr request(event->Get()->Response->GetRequest()); - NHttp::THttpOutgoingResponsePtr response(event->Get()->Response); - auto itRequests = IncomingRequests.find(request.Get()); - if (itRequests == IncomingRequests.end()) { - LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown request " << request->Host << request->URL); - return; - } - - TCacheKey key = itRequests->second; - auto it = Cache.find(key); - if (it == Cache.end()) { - LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown cache key " << request->Host << request->URL); - return; - } - - IncomingRequests.erase(itRequests); - TCacheRecord& cacheRecord = it->second; - TStringBuf status; - TString error; - - if (event->Get()->Response != nullptr) { - status = event->Get()->Response->Status; - if (!status.StartsWith("2")) { - error = event->Get()->Response->Message; - } - } - if (cacheRecord.CachePolicy.RetriesCount > 0) { - auto itStatusToRetry = std::find(cacheRecord.CachePolicy.StatusesToRetry.begin(), cacheRecord.CachePolicy.StatusesToRetry.end(), status); - if (itStatusToRetry != cacheRecord.CachePolicy.StatusesToRetry.end()) { - if (cacheRecord.Retries < cacheRecord.CachePolicy.RetriesCount) { - ++cacheRecord.Retries; - LOG_WARN_S(ctx, HttpLog, "IncomingRetry " << cacheRecord.GetName() << ": " << status << " " << error); - SendCacheRequest(key, cacheRecord, ctx); - return; - } - } - } - for (auto& waiter : cacheRecord.Waiters) { - NHttp::THttpOutgoingResponsePtr response2; - response2 = response->Duplicate(waiter->Get()->Request); - ctx.Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response2)); - } - cacheRecord.Waiters.clear(); - if (!error.empty()) { - LOG_WARN_S(ctx, HttpLog, "Error from " << cacheRecord.GetName() << ": " << error); - if (!cacheRecord.Response) { - LOG_DEBUG_S(ctx, HttpLog, "IncomingDiscard " << cacheRecord.GetName()); - DropCacheRecord(it); - return; - } - } - if (cacheRecord.CachePolicy.TimeToRefresh) { - LOG_DEBUG_S(ctx, HttpLog, "IncomingUpdate " << cacheRecord.GetName()); - cacheRecord.UpdateResponse(response, error, ctx.Now()); - if (!cacheRecord.Enqueued) { - RefreshQueue.push({it->first, it->second.RefreshTime}); - cacheRecord.Enqueued = true; - } - LOG_DEBUG_S(ctx, HttpLog, "IncomingSchedule " << cacheRecord.GetName() << " at " << cacheRecord.RefreshTime << " until " << cacheRecord.DeathTime); - } else { - LOG_DEBUG_S(ctx, HttpLog, "IncomingDrop " << cacheRecord.GetName()); - DropCacheRecord(it); - } - } - - void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) { - const NHttp::THttpIncomingRequest* request = event->Get()->Request.Get(); - TCachePolicy policy = GetCachePolicy(request); - if (policy.TimeToExpire == TDuration() && policy.RetriesCount == 0) { - TActorId handler = GetRequestHandler(event->Get()->Request); - if (handler) { - ctx.Send(event->Forward(handler)); - } - return; - } - auto key = GetCacheKey(request, policy); - auto it = Cache.find(key); - if (it != Cache.end() && !policy.DiscardCache) { - it->second.UpdateExpireTime(); - if (it->second.IsValid()) { - LOG_DEBUG_S(ctx, HttpLog, "IncomingRespond " - << it->second.GetName() - << " (" - << ((it->second.Response != nullptr) ? ToString(it->second.Response->Size()) : TString("error")) - << ")"); - NHttp::THttpOutgoingResponsePtr response = it->second.Response; - if (response != nullptr) { - response = response->Duplicate(event->Get()->Request); - } - ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); - return; - } - } else { - it = Cache.emplace(key, policy).first; - it->second.CacheId = key.GetId(); // for debugging - it->second.InitRequest(event->Get()->Request); - if (policy.DiscardCache) { - LOG_DEBUG_S(ctx, HttpLog, "IncomingDiscardCache " << it->second.GetName()); - } - LOG_DEBUG_S(ctx, HttpLog, "IncomingInitiate " << it->second.GetName()); - SendCacheRequest(key, it->second, ctx); - } - it->second.Waiters.emplace_back(std::move(event)); - } - - void HandleRefresh(const NActors::TActorContext& ctx) { - while (!RefreshQueue.empty() && RefreshQueue.top().RefreshTime <= ctx.Now()) { - TRefreshRecord rrec = RefreshQueue.top(); - RefreshQueue.pop(); - auto it = Cache.find(rrec.Key); - if (it != Cache.end()) { - it->second.Enqueued = false; - if (it->second.DeathTime > ctx.Now()) { - LOG_DEBUG_S(ctx, HttpLog, "IncomingRefresh " << it->second.GetName()); - SendCacheRequest(it->first, it->second, ctx); - } else { - LOG_DEBUG_S(ctx, HttpLog, "IncomingForget " << it->second.GetName()); - DropCacheRecord(it); - } - } - } - ctx.Schedule(RefreshTimeout, new NActors::TEvents::TEvWakeup()); - } - - STFUNC(StateWork) { - switch (ev->GetTypeRewrite()) { - HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle); - HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest, Handle); - HFunc(NHttp::TEvHttpProxy::TEvAddListeningPort, Handle); - HFunc(NHttp::TEvHttpProxy::TEvRegisterHandler, Handle); - HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); - HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle); - CFunc(NActors::TEvents::TSystem::Wakeup, HandleRefresh); - } - } -}; - -TCachePolicy GetDefaultCachePolicy(const THttpRequest* request, const TCachePolicy& defaultPolicy) { - TCachePolicy policy = defaultPolicy; - THeaders headers(request->Headers); - TStringBuf cacheControl(headers["Cache-Control"]); - while (TStringBuf cacheItem = cacheControl.NextTok(',')) { - Trim(cacheItem, ' '); - if (cacheItem == "no-store" || cacheItem == "no-cache") { - policy.DiscardCache = true; - } - TStringBuf itemName = cacheItem.NextTok('='); - TrimEnd(itemName, ' '); - TrimBegin(cacheItem, ' '); - if (itemName == "max-age") { - policy.TimeToRefresh = policy.TimeToExpire = TDuration::Seconds(FromString(cacheItem)); - } - if (itemName == "min-fresh") { - policy.TimeToRefresh = policy.TimeToExpire = TDuration::Seconds(FromString(cacheItem)); - } - if (itemName == "stale-if-error") { - policy.KeepOnError = true; - } - } - return policy; -} - + TGetCachePolicy GetCachePolicy; + static constexpr TDuration RefreshTimeout = TDuration::Seconds(1); + + struct TCacheKey { + TString Host; + TString URL; + TString Headers; + + operator size_t() const { + return MultiHash(Host, URL, Headers); + } + + TString GetId() const { + return MD5::Calc(Host + ':' + URL + ':' + Headers); + } + }; + + struct TCacheRecord { + TInstant RefreshTime; + TInstant DeathTime; + TCachePolicy CachePolicy; + NHttp::THttpOutgoingRequestPtr Request; + NHttp::THttpOutgoingRequestPtr OutgoingRequest; + TDuration Timeout; + NHttp::THttpIncomingResponsePtr Response; + TString Error; + TVector<NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr> Waiters; + + TCacheRecord(const TCachePolicy cachePolicy) + : CachePolicy(cachePolicy) + {} + + bool IsValid() const { + return Response != nullptr || !Error.empty(); + } + + void UpdateResponse(NHttp::THttpIncomingResponsePtr response, const TString& error, TInstant now) { + if (error.empty() || Response == nullptr || !CachePolicy.KeepOnError) { + Response = response; + Error = error; + } + RefreshTime = now + CachePolicy.TimeToRefresh; + if (CachePolicy.PaceToRefresh) { + RefreshTime += TDuration::MilliSeconds(RandomNumber<ui64>() % CachePolicy.PaceToRefresh.MilliSeconds()); + } + } + + TString GetName() const { + return TStringBuilder() << (Request->Secure ? "https://" : "http://") << Request->Host << Request->URL; + } + }; + + struct TRefreshRecord { + TCacheKey Key; + TInstant RefreshTime; + + bool operator <(const TRefreshRecord& b) const { + return RefreshTime > b.RefreshTime; + } + }; + + THashMap<TCacheKey, TCacheRecord> Cache; + TPriorityQueue<TRefreshRecord> RefreshQueue; + THashMap<THttpOutgoingRequest*, TCacheKey> OutgoingRequests; + + THttpOutgoingCacheActor(const NActors::TActorId& httpProxyId, TGetCachePolicy getCachePolicy) + : HttpProxyId(httpProxyId) + , GetCachePolicy(std::move(getCachePolicy)) + {} + + void Bootstrap(const NActors::TActorContext&) { + // + Become(&THttpOutgoingCacheActor::StateWork, RefreshTimeout, new NActors::TEvents::TEvWakeup()); + } + + static TString GetCacheHeadersKey(const NHttp::THttpOutgoingRequest* request, const TCachePolicy& policy) { + TStringBuilder key; + if (!policy.HeadersToCacheKey.empty()) { + NHttp::THeaders headers(request->Headers); + for (const TString& header : policy.HeadersToCacheKey) { + key << headers[header]; + } + } + return key; + } + + static TCacheKey GetCacheKey(const NHttp::THttpOutgoingRequest* request, const TCachePolicy& policy) { + return { ToString(request->Host), ToString(request->URL), GetCacheHeadersKey(request, policy) }; + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) { + ctx.Send(event->Forward(HttpProxyId)); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) { + ctx.Send(event->Forward(HttpProxyId)); + } + + void Handle(NHttp::TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { + ctx.Send(event->Forward(HttpProxyId)); + } + + void Handle(NHttp::TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) { + ctx.Send(event->Forward(HttpProxyId)); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) { + NHttp::THttpOutgoingRequestPtr request(event->Get()->Request); + NHttp::THttpIncomingResponsePtr response(event->Get()->Response); + auto itRequests = OutgoingRequests.find(request.Get()); + if (itRequests == OutgoingRequests.end()) { + LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown request " << request->Host << request->URL); + return; + } + auto key = itRequests->second; + OutgoingRequests.erase(itRequests); + auto it = Cache.find(key); + if (it == Cache.end()) { + LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown cache key " << request->Host << request->URL); + return; + } + TCacheRecord& cacheRecord = it->second; + cacheRecord.OutgoingRequest.Reset(); + for (auto& waiter : cacheRecord.Waiters) { + NHttp::THttpIncomingResponsePtr response2; + TString error2; + if (response != nullptr) { + response2 = response->Duplicate(waiter->Get()->Request); + } + if (!event->Get()->Error.empty()) { + error2 = event->Get()->Error; + } + ctx.Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpIncomingResponse(waiter->Get()->Request, response2, error2)); + } + cacheRecord.Waiters.clear(); + TString error; + if (event->Get()->Error.empty()) { + if (event->Get()->Response != nullptr && event->Get()->Response->Status != "200") { + error = event->Get()->Response->Message; + } + } else { + error = event->Get()->Error; + } + if (!error.empty()) { + LOG_WARN_S(ctx, HttpLog, "Error from " << cacheRecord.GetName() << ": " << error); + } + LOG_DEBUG_S(ctx, HttpLog, "OutgoingUpdate " << cacheRecord.GetName()); + cacheRecord.UpdateResponse(response, event->Get()->Error, ctx.Now()); + RefreshQueue.push({it->first, it->second.RefreshTime}); + LOG_DEBUG_S(ctx, HttpLog, "OutgoingSchedule " << cacheRecord.GetName() << " at " << cacheRecord.RefreshTime << " until " << cacheRecord.DeathTime); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { + const NHttp::THttpOutgoingRequest* request = event->Get()->Request.Get(); + auto policy = GetCachePolicy(request); + if (policy.TimeToExpire == TDuration()) { + ctx.Send(event->Forward(HttpProxyId)); + return; + } + auto key = GetCacheKey(request, policy); + auto it = Cache.find(key); + if (it != Cache.end()) { + if (it->second.IsValid()) { + LOG_DEBUG_S(ctx, HttpLog, "OutgoingRespond " + << it->second.GetName() + << " (" + << ((it->second.Response != nullptr) ? ToString(it->second.Response->Size()) : TString("error")) + << ")"); + NHttp::THttpIncomingResponsePtr response = it->second.Response; + if (response != nullptr) { + response = response->Duplicate(event->Get()->Request); + } + ctx.Send(event->Sender, + new NHttp::TEvHttpProxy::TEvHttpIncomingResponse(event->Get()->Request, + response, + it->second.Error)); + it->second.DeathTime = ctx.Now() + it->second.CachePolicy.TimeToExpire; // prolong active cache items + return; + } + } else { + it = Cache.emplace(key, policy).first; + it->second.Request = event->Get()->Request; + it->second.Timeout = event->Get()->Timeout; + it->second.OutgoingRequest = it->second.Request->Duplicate(); + OutgoingRequests[it->second.OutgoingRequest.Get()] = key; + LOG_DEBUG_S(ctx, HttpLog, "OutgoingInitiate " << it->second.GetName()); + ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(it->second.OutgoingRequest, it->second.Timeout)); + } + it->second.DeathTime = ctx.Now() + it->second.CachePolicy.TimeToExpire; + it->second.Waiters.emplace_back(std::move(event)); + } + + void HandleRefresh(const NActors::TActorContext& ctx) { + while (!RefreshQueue.empty() && RefreshQueue.top().RefreshTime <= ctx.Now()) { + TRefreshRecord rrec = RefreshQueue.top(); + RefreshQueue.pop(); + auto it = Cache.find(rrec.Key); + if (it != Cache.end()) { + if (it->second.DeathTime > ctx.Now()) { + LOG_DEBUG_S(ctx, HttpLog, "OutgoingRefresh " << it->second.GetName()); + it->second.OutgoingRequest = it->second.Request->Duplicate(); + OutgoingRequests[it->second.OutgoingRequest.Get()] = it->first; + ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(it->second.OutgoingRequest, it->second.Timeout)); + } else { + LOG_DEBUG_S(ctx, HttpLog, "OutgoingForget " << it->second.GetName()); + if (it->second.OutgoingRequest) { + OutgoingRequests.erase(it->second.OutgoingRequest.Get()); + } + Cache.erase(it); + } + } + } + ctx.Schedule(RefreshTimeout, new NActors::TEvents::TEvWakeup()); + } + + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle); + HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest, Handle); + HFunc(NHttp::TEvHttpProxy::TEvAddListeningPort, Handle); + HFunc(NHttp::TEvHttpProxy::TEvRegisterHandler, Handle); + HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); + HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle); + CFunc(NActors::TEvents::TSystem::Wakeup, HandleRefresh); + } + } +}; + +const TDuration THttpOutgoingCacheActor::RefreshTimeout; + +class THttpIncomingCacheActor : public NActors::TActorBootstrapped<THttpIncomingCacheActor>, THttpConfig { +public: + using TBase = NActors::TActorBootstrapped<THttpIncomingCacheActor>; + NActors::TActorId HttpProxyId; + TGetCachePolicy GetCachePolicy; + static constexpr TDuration RefreshTimeout = TDuration::Seconds(1); + THashMap<TString, TActorId> Handlers; + + struct TCacheKey { + TString Host; + TString URL; + TString Headers; + + operator size_t() const { + return MultiHash(Host, URL, Headers); + } + + TString GetId() const { + return MD5::Calc(Host + ':' + URL + ':' + Headers); + } + }; + + struct TCacheRecord { + TInstant RefreshTime; + TInstant DeathTime; + TCachePolicy CachePolicy; + TString CacheId; + NHttp::THttpIncomingRequestPtr Request; + TDuration Timeout; + NHttp::THttpOutgoingResponsePtr Response; + TVector<NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr> Waiters; + ui32 Retries = 0; + bool Enqueued = false; + + TCacheRecord(const TCachePolicy cachePolicy) + : CachePolicy(cachePolicy) + {} + + bool IsValid() const { + return Response != nullptr; + } + + void InitRequest(NHttp::THttpIncomingRequestPtr request) { + Request = request; + if (CachePolicy.TimeToExpire) { + DeathTime = NActors::TlsActivationContext->Now() + CachePolicy.TimeToExpire; + } + } + + void UpdateResponse(NHttp::THttpOutgoingResponsePtr response, const TString& error, TInstant now) { + if (error.empty() || !CachePolicy.KeepOnError) { + Response = response; + } + Retries = 0; + if (CachePolicy.TimeToRefresh) { + RefreshTime = now + CachePolicy.TimeToRefresh; + if (CachePolicy.PaceToRefresh) { + RefreshTime += TDuration::MilliSeconds(RandomNumber<ui64>() % CachePolicy.PaceToRefresh.MilliSeconds()); + } + } + } + + void UpdateExpireTime() { + if (CachePolicy.TimeToExpire) { + DeathTime = NActors::TlsActivationContext->Now() + CachePolicy.TimeToExpire; + } + } + + TString GetName() const { + return TStringBuilder() << (Request->Secure ? "https://" : "http://") << Request->Host << Request->URL + << " (" << CacheId << ")"; + } + }; + + struct TRefreshRecord { + TCacheKey Key; + TInstant RefreshTime; + + bool operator <(const TRefreshRecord& b) const { + return RefreshTime > b.RefreshTime; + } + }; + + THashMap<TCacheKey, TCacheRecord> Cache; + TPriorityQueue<TRefreshRecord> RefreshQueue; + THashMap<THttpIncomingRequest*, TCacheKey> IncomingRequests; + + THttpIncomingCacheActor(const NActors::TActorId& httpProxyId, TGetCachePolicy getCachePolicy) + : HttpProxyId(httpProxyId) + , GetCachePolicy(std::move(getCachePolicy)) + {} + + void Bootstrap(const NActors::TActorContext&) { + // + Become(&THttpIncomingCacheActor::StateWork, RefreshTimeout, new NActors::TEvents::TEvWakeup()); + } + + static TString GetCacheHeadersKey(const NHttp::THttpIncomingRequest* request, const TCachePolicy& policy) { + TStringBuilder key; + if (!policy.HeadersToCacheKey.empty()) { + NHttp::THeaders headers(request->Headers); + for (const TString& header : policy.HeadersToCacheKey) { + key << headers[header]; + } + } + return key; + } + + static TCacheKey GetCacheKey(const NHttp::THttpIncomingRequest* request, const TCachePolicy& policy) { + return { ToString(request->Host), ToString(request->URL), GetCacheHeadersKey(request, policy) }; + } + + TActorId GetRequestHandler(NHttp::THttpIncomingRequestPtr request) { + TStringBuf url = request->URL.Before('?'); + THashMap<TString, TActorId>::iterator it; + while (!url.empty()) { + it = Handlers.find(url); + if (it != Handlers.end()) { + return it->second; + } else { + if (url.EndsWith('/')) { + url.Trunc(url.size() - 1); + } + size_t pos = url.rfind('/'); + if (pos == TStringBuf::npos) { + break; + } else { + url = url.substr(0, pos + 1); + } + } + } + return {}; + } + + void SendCacheRequest(const TCacheKey& cacheKey, TCacheRecord& cacheRecord, const NActors::TActorContext& ctx) { + cacheRecord.Request = cacheRecord.Request->Duplicate(); + IncomingRequests[cacheRecord.Request.Get()] = cacheKey; + TActorId handler = GetRequestHandler(cacheRecord.Request); + if (handler) { + Send(handler, new NHttp::TEvHttpProxy::TEvHttpIncomingRequest(cacheRecord.Request)); + } else { + LOG_ERROR_S(ctx, HttpLog, "Can't find cache handler for " << cacheRecord.GetName()); + } + } + + void DropCacheRecord(THashMap<TCacheKey, TCacheRecord>::iterator it) { + if (it->second.Request) { + IncomingRequests.erase(it->second.Request.Get()); + } + for (auto& waiter : it->second.Waiters) { + NHttp::THttpOutgoingResponsePtr response; + response = waiter->Get()->Request->CreateResponseGatewayTimeout("Timeout", "text/plain"); + Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); + } + Cache.erase(it); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) { + ctx.Send(event->Forward(HttpProxyId)); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { + ctx.Send(event->Forward(HttpProxyId)); + } + + void Handle(NHttp::TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { + ctx.Send(event->Forward(HttpProxyId)); + } + + void Handle(NHttp::TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) { + Handlers[event->Get()->Path] = event->Get()->Handler; + ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvRegisterHandler(event->Get()->Path, ctx.SelfID)); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) { + NHttp::THttpIncomingRequestPtr request(event->Get()->Response->GetRequest()); + NHttp::THttpOutgoingResponsePtr response(event->Get()->Response); + auto itRequests = IncomingRequests.find(request.Get()); + if (itRequests == IncomingRequests.end()) { + LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown request " << request->Host << request->URL); + return; + } + + TCacheKey key = itRequests->second; + auto it = Cache.find(key); + if (it == Cache.end()) { + LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown cache key " << request->Host << request->URL); + return; + } + + IncomingRequests.erase(itRequests); + TCacheRecord& cacheRecord = it->second; + TStringBuf status; + TString error; + + if (event->Get()->Response != nullptr) { + status = event->Get()->Response->Status; + if (!status.StartsWith("2")) { + error = event->Get()->Response->Message; + } + } + if (cacheRecord.CachePolicy.RetriesCount > 0) { + auto itStatusToRetry = std::find(cacheRecord.CachePolicy.StatusesToRetry.begin(), cacheRecord.CachePolicy.StatusesToRetry.end(), status); + if (itStatusToRetry != cacheRecord.CachePolicy.StatusesToRetry.end()) { + if (cacheRecord.Retries < cacheRecord.CachePolicy.RetriesCount) { + ++cacheRecord.Retries; + LOG_WARN_S(ctx, HttpLog, "IncomingRetry " << cacheRecord.GetName() << ": " << status << " " << error); + SendCacheRequest(key, cacheRecord, ctx); + return; + } + } + } + for (auto& waiter : cacheRecord.Waiters) { + NHttp::THttpOutgoingResponsePtr response2; + response2 = response->Duplicate(waiter->Get()->Request); + ctx.Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response2)); + } + cacheRecord.Waiters.clear(); + if (!error.empty()) { + LOG_WARN_S(ctx, HttpLog, "Error from " << cacheRecord.GetName() << ": " << error); + if (!cacheRecord.Response) { + LOG_DEBUG_S(ctx, HttpLog, "IncomingDiscard " << cacheRecord.GetName()); + DropCacheRecord(it); + return; + } + } + if (cacheRecord.CachePolicy.TimeToRefresh) { + LOG_DEBUG_S(ctx, HttpLog, "IncomingUpdate " << cacheRecord.GetName()); + cacheRecord.UpdateResponse(response, error, ctx.Now()); + if (!cacheRecord.Enqueued) { + RefreshQueue.push({it->first, it->second.RefreshTime}); + cacheRecord.Enqueued = true; + } + LOG_DEBUG_S(ctx, HttpLog, "IncomingSchedule " << cacheRecord.GetName() << " at " << cacheRecord.RefreshTime << " until " << cacheRecord.DeathTime); + } else { + LOG_DEBUG_S(ctx, HttpLog, "IncomingDrop " << cacheRecord.GetName()); + DropCacheRecord(it); + } + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) { + const NHttp::THttpIncomingRequest* request = event->Get()->Request.Get(); + TCachePolicy policy = GetCachePolicy(request); + if (policy.TimeToExpire == TDuration() && policy.RetriesCount == 0) { + TActorId handler = GetRequestHandler(event->Get()->Request); + if (handler) { + ctx.Send(event->Forward(handler)); + } + return; + } + auto key = GetCacheKey(request, policy); + auto it = Cache.find(key); + if (it != Cache.end() && !policy.DiscardCache) { + it->second.UpdateExpireTime(); + if (it->second.IsValid()) { + LOG_DEBUG_S(ctx, HttpLog, "IncomingRespond " + << it->second.GetName() + << " (" + << ((it->second.Response != nullptr) ? ToString(it->second.Response->Size()) : TString("error")) + << ")"); + NHttp::THttpOutgoingResponsePtr response = it->second.Response; + if (response != nullptr) { + response = response->Duplicate(event->Get()->Request); + } + ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); + return; + } + } else { + it = Cache.emplace(key, policy).first; + it->second.CacheId = key.GetId(); // for debugging + it->second.InitRequest(event->Get()->Request); + if (policy.DiscardCache) { + LOG_DEBUG_S(ctx, HttpLog, "IncomingDiscardCache " << it->second.GetName()); + } + LOG_DEBUG_S(ctx, HttpLog, "IncomingInitiate " << it->second.GetName()); + SendCacheRequest(key, it->second, ctx); + } + it->second.Waiters.emplace_back(std::move(event)); + } + + void HandleRefresh(const NActors::TActorContext& ctx) { + while (!RefreshQueue.empty() && RefreshQueue.top().RefreshTime <= ctx.Now()) { + TRefreshRecord rrec = RefreshQueue.top(); + RefreshQueue.pop(); + auto it = Cache.find(rrec.Key); + if (it != Cache.end()) { + it->second.Enqueued = false; + if (it->second.DeathTime > ctx.Now()) { + LOG_DEBUG_S(ctx, HttpLog, "IncomingRefresh " << it->second.GetName()); + SendCacheRequest(it->first, it->second, ctx); + } else { + LOG_DEBUG_S(ctx, HttpLog, "IncomingForget " << it->second.GetName()); + DropCacheRecord(it); + } + } + } + ctx.Schedule(RefreshTimeout, new NActors::TEvents::TEvWakeup()); + } + + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle); + HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest, Handle); + HFunc(NHttp::TEvHttpProxy::TEvAddListeningPort, Handle); + HFunc(NHttp::TEvHttpProxy::TEvRegisterHandler, Handle); + HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); + HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle); + CFunc(NActors::TEvents::TSystem::Wakeup, HandleRefresh); + } + } +}; + +TCachePolicy GetDefaultCachePolicy(const THttpRequest* request, const TCachePolicy& defaultPolicy) { + TCachePolicy policy = defaultPolicy; + THeaders headers(request->Headers); + TStringBuf cacheControl(headers["Cache-Control"]); + while (TStringBuf cacheItem = cacheControl.NextTok(',')) { + Trim(cacheItem, ' '); + if (cacheItem == "no-store" || cacheItem == "no-cache") { + policy.DiscardCache = true; + } + TStringBuf itemName = cacheItem.NextTok('='); + TrimEnd(itemName, ' '); + TrimBegin(cacheItem, ' '); + if (itemName == "max-age") { + policy.TimeToRefresh = policy.TimeToExpire = TDuration::Seconds(FromString(cacheItem)); + } + if (itemName == "min-fresh") { + policy.TimeToRefresh = policy.TimeToExpire = TDuration::Seconds(FromString(cacheItem)); + } + if (itemName == "stale-if-error") { + policy.KeepOnError = true; + } + } + return policy; +} + NActors::IActor* CreateHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) { - return new THttpOutgoingCacheActor(httpProxyId, std::move(cachePolicy)); -} - -NActors::IActor* CreateOutgoingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) { - return new THttpOutgoingCacheActor(httpProxyId, std::move(cachePolicy)); -} - -NActors::IActor* CreateIncomingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) { - return new THttpIncomingCacheActor(httpProxyId, std::move(cachePolicy)); -} - -} + return new THttpOutgoingCacheActor(httpProxyId, std::move(cachePolicy)); +} + +NActors::IActor* CreateOutgoingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) { + return new THttpOutgoingCacheActor(httpProxyId, std::move(cachePolicy)); +} + +NActors::IActor* CreateIncomingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) { + return new THttpIncomingCacheActor(httpProxyId, std::move(cachePolicy)); +} + +} diff --git a/library/cpp/actors/http/http_cache.h b/library/cpp/actors/http/http_cache.h index ac38bdcac8..567a8105f0 100644 --- a/library/cpp/actors/http/http_cache.h +++ b/library/cpp/actors/http/http_cache.h @@ -1,27 +1,27 @@ -#pragma once +#pragma once #include <library/cpp/actors/core/actor.h> -#include "http.h" - -namespace NHttp { - -struct TCachePolicy { - TDuration TimeToExpire; - TDuration TimeToRefresh; - TDuration PaceToRefresh; - bool KeepOnError = false; - bool DiscardCache = false; - TArrayRef<TString> HeadersToCacheKey; - TArrayRef<TString> StatusesToRetry; - ui32 RetriesCount = 0; - - TCachePolicy() = default; -}; - -using TGetCachePolicy = std::function<TCachePolicy(const THttpRequest*)>; - +#include "http.h" + +namespace NHttp { + +struct TCachePolicy { + TDuration TimeToExpire; + TDuration TimeToRefresh; + TDuration PaceToRefresh; + bool KeepOnError = false; + bool DiscardCache = false; + TArrayRef<TString> HeadersToCacheKey; + TArrayRef<TString> StatusesToRetry; + ui32 RetriesCount = 0; + + TCachePolicy() = default; +}; + +using TGetCachePolicy = std::function<TCachePolicy(const THttpRequest*)>; + NActors::IActor* CreateHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy); -NActors::IActor* CreateOutgoingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy); -NActors::IActor* CreateIncomingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy); -TCachePolicy GetDefaultCachePolicy(const THttpRequest* request, const TCachePolicy& policy = TCachePolicy()); - -} +NActors::IActor* CreateOutgoingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy); +NActors::IActor* CreateIncomingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy); +TCachePolicy GetDefaultCachePolicy(const THttpRequest* request, const TCachePolicy& policy = TCachePolicy()); + +} diff --git a/library/cpp/actors/http/http_config.h b/library/cpp/actors/http/http_config.h index faeff79449..eeafd2a019 100644 --- a/library/cpp/actors/http/http_config.h +++ b/library/cpp/actors/http/http_config.h @@ -1,19 +1,19 @@ -#pragma once -#include <util/network/sock.h> +#pragma once +#include <util/network/sock.h> #include <library/cpp/actors/core/log.h> #include <library/cpp/actors/protos/services_common.pb.h> - -namespace NHttp { - -struct THttpConfig { - static constexpr NActors::NLog::EComponent HttpLog = NActorsServices::EServiceCommon::HTTP; - static constexpr size_t BUFFER_SIZE = 64 * 1024; - static constexpr size_t BUFFER_MIN_STEP = 10 * 1024; - static constexpr int LISTEN_QUEUE = 10; - static constexpr TDuration SOCKET_TIMEOUT = TDuration::MilliSeconds(60000); - static constexpr TDuration CONNECTION_TIMEOUT = TDuration::MilliSeconds(60000); - using SocketType = TInet6StreamSocket; - using SocketAddressType = TSockAddrInet6; -}; - -} + +namespace NHttp { + +struct THttpConfig { + static constexpr NActors::NLog::EComponent HttpLog = NActorsServices::EServiceCommon::HTTP; + static constexpr size_t BUFFER_SIZE = 64 * 1024; + static constexpr size_t BUFFER_MIN_STEP = 10 * 1024; + static constexpr int LISTEN_QUEUE = 10; + static constexpr TDuration SOCKET_TIMEOUT = TDuration::MilliSeconds(60000); + static constexpr TDuration CONNECTION_TIMEOUT = TDuration::MilliSeconds(60000); + using SocketType = TInet6StreamSocket; + using SocketAddressType = TSockAddrInet6; +}; + +} diff --git a/library/cpp/actors/http/http_proxy.cpp b/library/cpp/actors/http/http_proxy.cpp index 36c6855d93..2217838624 100644 --- a/library/cpp/actors/http/http_proxy.cpp +++ b/library/cpp/actors/http/http_proxy.cpp @@ -1,314 +1,314 @@ #include <library/cpp/actors/core/events.h> #include <library/cpp/monlib/metrics/metric_registry.h> -#include "http_proxy.h" - -namespace NHttp { - -class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpConfig { -public: - IActor* AddListeningPort(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { - IActor* listeningSocket = CreateHttpAcceptorActor(ctx.SelfID, Poller); +#include "http_proxy.h" + +namespace NHttp { + +class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpConfig { +public: + IActor* AddListeningPort(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { + IActor* listeningSocket = CreateHttpAcceptorActor(ctx.SelfID, Poller); TActorId acceptorId = ctx.Register(listeningSocket); - ctx.Send(event->Forward(acceptorId)); - Acceptors.emplace_back(acceptorId); - return listeningSocket; - } - - IActor* AddOutgoingConnection(const TString& address, bool secure, const NActors::TActorContext& ctx) { - IActor* connectionSocket = CreateOutgoingConnectionActor(ctx.SelfID, address, secure, Poller); + ctx.Send(event->Forward(acceptorId)); + Acceptors.emplace_back(acceptorId); + return listeningSocket; + } + + IActor* AddOutgoingConnection(const TString& address, bool secure, const NActors::TActorContext& ctx) { + IActor* connectionSocket = CreateOutgoingConnectionActor(ctx.SelfID, address, secure, Poller); TActorId connectionId = ctx.Register(connectionSocket); - Connections.emplace(connectionId); - return connectionSocket; - } - - void Bootstrap(const NActors::TActorContext& ctx) { - Poller = ctx.Register(NActors::CreatePollerActor()); - Become(&THttpProxy::StateWork); - } - + Connections.emplace(connectionId); + return connectionSocket; + } + + void Bootstrap(const NActors::TActorContext& ctx) { + Poller = ctx.Register(NActors::CreatePollerActor()); + Become(&THttpProxy::StateWork); + } + THttpProxy(NMonitoring::TMetricRegistry& sensors) - : Sensors(sensors) - {} - -protected: - STFUNC(StateWork) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvHttpProxy::TEvAddListeningPort, Handle); - HFunc(TEvHttpProxy::TEvRegisterHandler, Handle); - HFunc(TEvHttpProxy::TEvHttpIncomingRequest, Handle); - HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, Handle); - HFunc(TEvHttpProxy::TEvHttpIncomingResponse, Handle); - HFunc(TEvHttpProxy::TEvHttpOutgoingResponse, Handle); - HFunc(TEvHttpProxy::TEvHttpAcceptorClosed, Handle); - HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle); - HFunc(TEvHttpProxy::TEvResolveHostRequest, Handle); - HFunc(TEvHttpProxy::TEvReportSensors, Handle); + : Sensors(sensors) + {} + +protected: + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvHttpProxy::TEvAddListeningPort, Handle); + HFunc(TEvHttpProxy::TEvRegisterHandler, Handle); + HFunc(TEvHttpProxy::TEvHttpIncomingRequest, Handle); + HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, Handle); + HFunc(TEvHttpProxy::TEvHttpIncomingResponse, Handle); + HFunc(TEvHttpProxy::TEvHttpOutgoingResponse, Handle); + HFunc(TEvHttpProxy::TEvHttpAcceptorClosed, Handle); + HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle); + HFunc(TEvHttpProxy::TEvResolveHostRequest, Handle); + HFunc(TEvHttpProxy::TEvReportSensors, Handle); HFunc(NActors::TEvents::TEvPoison, Handle); - } - } - + } + } + void PassAway() override { Send(Poller, new NActors::TEvents::TEvPoisonPill()); for (const NActors::TActorId& connection : Connections) { Send(connection, new NActors::TEvents::TEvPoisonPill()); - } + } for (const NActors::TActorId& acceptor : Acceptors) { Send(acceptor, new NActors::TEvents::TEvPoisonPill()); - } + } NActors::TActorBootstrapped<THttpProxy>::PassAway(); - } - - void Handle(TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) { - TStringBuf url = event->Get()->Request->URL.Before('?'); + } + + void Handle(TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) { + TStringBuf url = event->Get()->Request->URL.Before('?'); THashMap<TString, TActorId>::iterator it; - while (!url.empty()) { - it = Handlers.find(url); - if (it != Handlers.end()) { - ctx.Send(event->Forward(it->second)); - return; - } else { - if (url.EndsWith('/')) { - url.Trunc(url.size() - 1); - } - size_t pos = url.rfind('/'); - if (pos == TStringBuf::npos) { - break; - } else { - url = url.substr(0, pos + 1); - } - } - } - ctx.Send(event->Sender, new TEvHttpProxy::TEvHttpOutgoingResponse(event->Get()->Request->CreateResponseNotFound())); - } - - void Handle(TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) { - Y_UNUSED(event); - Y_UNUSED(ctx); - Y_FAIL("This event shouldn't be there, it should go to the http connection owner directly"); - } - - void Handle(TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) { - Y_UNUSED(event); - Y_UNUSED(ctx); - Y_FAIL("This event shouldn't be there, it should go to the http connection directly"); - } - - void Handle(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { - TStringBuf host(event->Get()->Request->Host); - bool secure(event->Get()->Request->Secure); - NActors::IActor* actor = AddOutgoingConnection(TString(host), secure, ctx); - ctx.Send(event->Forward(actor->SelfId())); - } - - void Handle(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { - AddListeningPort(event, ctx); - } - - void Handle(TEvHttpProxy::TEvHttpAcceptorClosed::TPtr event, const NActors::TActorContext&) { - for (auto it = Acceptors.begin(); it != Acceptors.end(); ++it) { - if (*it == event->Get()->ConnectionID) { - Acceptors.erase(it); - break; - } - } - } - - void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) { - Connections.erase(event->Get()->ConnectionID); - } - - void Handle(TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext&) { - Handlers[event->Get()->Path] = event->Get()->Handler; - } - - void Handle(TEvHttpProxy::TEvResolveHostRequest::TPtr event, const NActors::TActorContext& ctx) { - const TString& host(event->Get()->Host); - auto it = Hosts.find(host); - if (it == Hosts.end() || it->second.DeadlineTime > ctx.Now()) { - TString addressPart; - TIpPort portPart = 0; - CrackAddress(host, addressPart, portPart); - if (IsIPv6(addressPart)) { - TSockAddrInet6 address(addressPart.c_str(), portPart); - if (it == Hosts.end()) { - it = Hosts.emplace(host, THostEntry()).first; - } - it->second.Address = address; - it->second.DeadlineTime = ctx.Now() + HostsTimeToLive; - } else { - // TODO(xenoxeno): move to another, possible blocking actor - try { - const NDns::TResolvedHost* result = NDns::CachedResolve(NDns::TResolveInfo(addressPart, portPart)); - if (result != nullptr) { - auto pAddr = result->Addr.Begin(); - while (pAddr != result->Addr.End() && pAddr->ai_family != AF_INET6) { - ++pAddr; - } - if (pAddr == result->Addr.End()) { - ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse("Invalid address family resolved")); - return; - } - TSockAddrInet6 address = {}; - static_cast<sockaddr_in6&>(address) = *reinterpret_cast<sockaddr_in6*>(pAddr->ai_addr); - LOG_DEBUG_S(ctx, HttpLog, "Host " << host << " resolved to " << address.ToString()); - if (it == Hosts.end()) { - it = Hosts.emplace(host, THostEntry()).first; - } - it->second.Address = address; - it->second.DeadlineTime = ctx.Now() + HostsTimeToLive; - } else { - ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse("Error resolving host")); - return; - } - } - catch (const yexception& e) { - ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse(e.what())); - return; - } - } - } - ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse(it->first, it->second.Address)); - } - - void Handle(TEvHttpProxy::TEvReportSensors::TPtr event, const NActors::TActorContext&) { - const TEvHttpProxy::TEvReportSensors& sensors(*event->Get()); + while (!url.empty()) { + it = Handlers.find(url); + if (it != Handlers.end()) { + ctx.Send(event->Forward(it->second)); + return; + } else { + if (url.EndsWith('/')) { + url.Trunc(url.size() - 1); + } + size_t pos = url.rfind('/'); + if (pos == TStringBuf::npos) { + break; + } else { + url = url.substr(0, pos + 1); + } + } + } + ctx.Send(event->Sender, new TEvHttpProxy::TEvHttpOutgoingResponse(event->Get()->Request->CreateResponseNotFound())); + } + + void Handle(TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) { + Y_UNUSED(event); + Y_UNUSED(ctx); + Y_FAIL("This event shouldn't be there, it should go to the http connection owner directly"); + } + + void Handle(TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) { + Y_UNUSED(event); + Y_UNUSED(ctx); + Y_FAIL("This event shouldn't be there, it should go to the http connection directly"); + } + + void Handle(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { + TStringBuf host(event->Get()->Request->Host); + bool secure(event->Get()->Request->Secure); + NActors::IActor* actor = AddOutgoingConnection(TString(host), secure, ctx); + ctx.Send(event->Forward(actor->SelfId())); + } + + void Handle(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { + AddListeningPort(event, ctx); + } + + void Handle(TEvHttpProxy::TEvHttpAcceptorClosed::TPtr event, const NActors::TActorContext&) { + for (auto it = Acceptors.begin(); it != Acceptors.end(); ++it) { + if (*it == event->Get()->ConnectionID) { + Acceptors.erase(it); + break; + } + } + } + + void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) { + Connections.erase(event->Get()->ConnectionID); + } + + void Handle(TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext&) { + Handlers[event->Get()->Path] = event->Get()->Handler; + } + + void Handle(TEvHttpProxy::TEvResolveHostRequest::TPtr event, const NActors::TActorContext& ctx) { + const TString& host(event->Get()->Host); + auto it = Hosts.find(host); + if (it == Hosts.end() || it->second.DeadlineTime > ctx.Now()) { + TString addressPart; + TIpPort portPart = 0; + CrackAddress(host, addressPart, portPart); + if (IsIPv6(addressPart)) { + TSockAddrInet6 address(addressPart.c_str(), portPart); + if (it == Hosts.end()) { + it = Hosts.emplace(host, THostEntry()).first; + } + it->second.Address = address; + it->second.DeadlineTime = ctx.Now() + HostsTimeToLive; + } else { + // TODO(xenoxeno): move to another, possible blocking actor + try { + const NDns::TResolvedHost* result = NDns::CachedResolve(NDns::TResolveInfo(addressPart, portPart)); + if (result != nullptr) { + auto pAddr = result->Addr.Begin(); + while (pAddr != result->Addr.End() && pAddr->ai_family != AF_INET6) { + ++pAddr; + } + if (pAddr == result->Addr.End()) { + ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse("Invalid address family resolved")); + return; + } + TSockAddrInet6 address = {}; + static_cast<sockaddr_in6&>(address) = *reinterpret_cast<sockaddr_in6*>(pAddr->ai_addr); + LOG_DEBUG_S(ctx, HttpLog, "Host " << host << " resolved to " << address.ToString()); + if (it == Hosts.end()) { + it = Hosts.emplace(host, THostEntry()).first; + } + it->second.Address = address; + it->second.DeadlineTime = ctx.Now() + HostsTimeToLive; + } else { + ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse("Error resolving host")); + return; + } + } + catch (const yexception& e) { + ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse(e.what())); + return; + } + } + } + ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse(it->first, it->second.Address)); + } + + void Handle(TEvHttpProxy::TEvReportSensors::TPtr event, const NActors::TActorContext&) { + const TEvHttpProxy::TEvReportSensors& sensors(*event->Get()); const static TString urlNotFound = "not-found"; const TString& url = (sensors.Status == "404" ? urlNotFound : sensors.Url); - Sensors.Rate({ - {"sensor", "count"}, - {"direction", sensors.Direction}, - {"peer", sensors.Host}, + Sensors.Rate({ + {"sensor", "count"}, + {"direction", sensors.Direction}, + {"peer", sensors.Host}, {"url", url}, - {"status", sensors.Status} - })->Inc(); - Sensors.HistogramRate({ - {"sensor", "time_us"}, - {"direction", sensors.Direction}, - {"peer", sensors.Host}, + {"status", sensors.Status} + })->Inc(); + Sensors.HistogramRate({ + {"sensor", "time_us"}, + {"direction", sensors.Direction}, + {"peer", sensors.Host}, {"url", url}, - {"status", sensors.Status} - }, - NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MicroSeconds()); - Sensors.HistogramRate({ - {"sensor", "time_ms"}, - {"direction", sensors.Direction}, - {"peer", sensors.Host}, + {"status", sensors.Status} + }, + NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MicroSeconds()); + Sensors.HistogramRate({ + {"sensor", "time_ms"}, + {"direction", sensors.Direction}, + {"peer", sensors.Host}, {"url", url}, - {"status", sensors.Status} - }, - NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MilliSeconds()); - } - + {"status", sensors.Status} + }, + NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MilliSeconds()); + } + void Handle(NActors::TEvents::TEvPoison::TPtr, const NActors::TActorContext&) { PassAway(); } NActors::TActorId Poller; TVector<TActorId> Acceptors; - - struct THostEntry { - TSockAddrInet6 Address; - TInstant DeadlineTime; - }; - - static constexpr TDuration HostsTimeToLive = TDuration::Seconds(60); - - THashMap<TString, THostEntry> Hosts; + + struct THostEntry { + TSockAddrInet6 Address; + TInstant DeadlineTime; + }; + + static constexpr TDuration HostsTimeToLive = TDuration::Seconds(60); + + THashMap<TString, THostEntry> Hosts; THashMap<TString, TActorId> Handlers; THashSet<TActorId> Connections; // outgoing NMonitoring::TMetricRegistry& Sensors; -}; - -TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingRequestPtr& request, const THttpIncomingResponsePtr& response) { - return new TEvHttpProxy::TEvReportSensors( - "out", - request->Host, - request->URL.Before('?'), - response ? response->Status : "504", +}; + +TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingRequestPtr& request, const THttpIncomingResponsePtr& response) { + return new TEvHttpProxy::TEvReportSensors( + "out", + request->Host, + request->URL.Before('?'), + response ? response->Status : "504", TDuration::Seconds(std::abs(request->Timer.Passed())) - ); -} - -TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingRequestPtr& request, const THttpOutgoingResponsePtr& response) { - return new TEvHttpProxy::TEvReportSensors( - "in", - request->Host, - request->URL.Before('?'), - response->Status, + ); +} + +TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingRequestPtr& request, const THttpOutgoingResponsePtr& response) { + return new TEvHttpProxy::TEvReportSensors( + "in", + request->Host, + request->URL.Before('?'), + response->Status, TDuration::Seconds(std::abs(request->Timer.Passed())) - ); -} - + ); +} + NActors::IActor* CreateHttpProxy(NMonitoring::TMetricRegistry& sensors) { - return new THttpProxy(sensors); -} - -bool IsIPv6(const TString& host) { - return host.find_first_not_of(":0123456789abcdef") == TString::npos; -} - -bool CrackURL(TStringBuf url, TStringBuf& scheme, TStringBuf& host, TStringBuf& uri) { - url.TrySplit("://", scheme, url); - auto pos = url.find('/'); - if (pos == TStringBuf::npos) { - host = url; - } else { - host = url.substr(0, pos); - uri = url.substr(pos); - } - return true; -} - -void CrackAddress(const TString& address, TString& hostname, TIpPort& port) { - size_t first_colon_pos = address.find(':'); - if (first_colon_pos != TString::npos) { - size_t last_colon_pos = address.rfind(':'); - if (last_colon_pos == first_colon_pos) { - // only one colon, simple case - port = FromStringWithDefault<TIpPort>(address.substr(first_colon_pos + 1), 0); - hostname = address.substr(0, first_colon_pos); - } else { - // ipv6? - size_t closing_bracket_pos = address.rfind(']'); - if (closing_bracket_pos == TString::npos || closing_bracket_pos > last_colon_pos) { - // whole address is ipv6 host - hostname = address; - } else { - port = FromStringWithDefault<TIpPort>(address.substr(last_colon_pos + 1), 0); - hostname = address.substr(0, last_colon_pos); - } - if (hostname.StartsWith('[') && hostname.EndsWith(']')) { - hostname = hostname.substr(1, hostname.size() - 2); - } - } - } else { - hostname = address; - } -} - - -void TrimBegin(TStringBuf& target, char delim) { - while (!target.empty() && *target.begin() == delim) { - target.Skip(1); - } -} - -void TrimEnd(TStringBuf& target, char delim) { - while (!target.empty() && target.back() == delim) { - target.Trunc(target.size() - 1); - } -} - -void Trim(TStringBuf& target, char delim) { - TrimBegin(target, delim); - TrimEnd(target, delim); -} - -void TrimEnd(TString& target, char delim) { - while (!target.empty() && target.back() == delim) { - target.resize(target.size() - 1); - } -} - -} + return new THttpProxy(sensors); +} + +bool IsIPv6(const TString& host) { + return host.find_first_not_of(":0123456789abcdef") == TString::npos; +} + +bool CrackURL(TStringBuf url, TStringBuf& scheme, TStringBuf& host, TStringBuf& uri) { + url.TrySplit("://", scheme, url); + auto pos = url.find('/'); + if (pos == TStringBuf::npos) { + host = url; + } else { + host = url.substr(0, pos); + uri = url.substr(pos); + } + return true; +} + +void CrackAddress(const TString& address, TString& hostname, TIpPort& port) { + size_t first_colon_pos = address.find(':'); + if (first_colon_pos != TString::npos) { + size_t last_colon_pos = address.rfind(':'); + if (last_colon_pos == first_colon_pos) { + // only one colon, simple case + port = FromStringWithDefault<TIpPort>(address.substr(first_colon_pos + 1), 0); + hostname = address.substr(0, first_colon_pos); + } else { + // ipv6? + size_t closing_bracket_pos = address.rfind(']'); + if (closing_bracket_pos == TString::npos || closing_bracket_pos > last_colon_pos) { + // whole address is ipv6 host + hostname = address; + } else { + port = FromStringWithDefault<TIpPort>(address.substr(last_colon_pos + 1), 0); + hostname = address.substr(0, last_colon_pos); + } + if (hostname.StartsWith('[') && hostname.EndsWith(']')) { + hostname = hostname.substr(1, hostname.size() - 2); + } + } + } else { + hostname = address; + } +} + + +void TrimBegin(TStringBuf& target, char delim) { + while (!target.empty() && *target.begin() == delim) { + target.Skip(1); + } +} + +void TrimEnd(TStringBuf& target, char delim) { + while (!target.empty() && target.back() == delim) { + target.Trunc(target.size() - 1); + } +} + +void Trim(TStringBuf& target, char delim) { + TrimBegin(target, delim); + TrimEnd(target, delim); +} + +void TrimEnd(TString& target, char delim) { + while (!target.empty() && target.back() == delim) { + target.resize(target.size() - 1); + } +} + +} diff --git a/library/cpp/actors/http/http_proxy.h b/library/cpp/actors/http/http_proxy.h index afd0170997..ce77bf8d5f 100644 --- a/library/cpp/actors/http/http_proxy.h +++ b/library/cpp/actors/http/http_proxy.h @@ -1,4 +1,4 @@ -#pragma once +#pragma once #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/hfunc.h> @@ -9,231 +9,231 @@ #include <library/cpp/actors/interconnect/poller_actor.h> #include <library/cpp/dns/cache.h> #include <library/cpp/monlib/metrics/metric_registry.h> -#include <util/generic/variant.h> -#include "http.h" -#include "http_proxy_ssl.h" - -namespace NHttp { - -struct TSocketDescriptor : NActors::TSharedDescriptor, THttpConfig { - SocketType Socket; - - int GetDescriptor() override { - return static_cast<SOCKET>(Socket); - } -}; - -struct TEvHttpProxy { - enum EEv { +#include <util/generic/variant.h> +#include "http.h" +#include "http_proxy_ssl.h" + +namespace NHttp { + +struct TSocketDescriptor : NActors::TSharedDescriptor, THttpConfig { + SocketType Socket; + + int GetDescriptor() override { + return static_cast<SOCKET>(Socket); + } +}; + +struct TEvHttpProxy { + enum EEv { EvAddListeningPort = EventSpaceBegin(NActors::TEvents::ES_HTTP), - EvConfirmListen, - EvRegisterHandler, - EvHttpIncomingRequest, - EvHttpOutgoingRequest, - EvHttpIncomingResponse, - EvHttpOutgoingResponse, - EvHttpConnectionOpened, - EvHttpConnectionClosed, - EvHttpAcceptorClosed, - EvResolveHostRequest, - EvResolveHostResponse, - EvReportSensors, - EvEnd - }; - + EvConfirmListen, + EvRegisterHandler, + EvHttpIncomingRequest, + EvHttpOutgoingRequest, + EvHttpIncomingResponse, + EvHttpOutgoingResponse, + EvHttpConnectionOpened, + EvHttpConnectionClosed, + EvHttpAcceptorClosed, + EvResolveHostRequest, + EvResolveHostResponse, + EvReportSensors, + EvEnd + }; + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_HTTP), "ES_HTTP event space is too small."); - - struct TEvAddListeningPort : NActors::TEventLocal<TEvAddListeningPort, EvAddListeningPort> { - TIpPort Port; - TString WorkerName; - bool Secure = false; - TString CertificateFile; - TString PrivateKeyFile; + + struct TEvAddListeningPort : NActors::TEventLocal<TEvAddListeningPort, EvAddListeningPort> { + TIpPort Port; + TString WorkerName; + bool Secure = false; + TString CertificateFile; + TString PrivateKeyFile; TString SslCertificatePem; - - TEvAddListeningPort(TIpPort port) - : Port(port) - {} - - TEvAddListeningPort(TIpPort port, const TString& workerName) - : Port(port) - , WorkerName(workerName) - {} - }; - - struct TEvConfirmListen : NActors::TEventLocal<TEvConfirmListen, EvConfirmListen> { - THttpConfig::SocketAddressType Address; - - TEvConfirmListen(const THttpConfig::SocketAddressType& address) - : Address(address) - {} - }; - - struct TEvRegisterHandler : NActors::TEventLocal<TEvRegisterHandler, EvRegisterHandler> { - TString Path; + + TEvAddListeningPort(TIpPort port) + : Port(port) + {} + + TEvAddListeningPort(TIpPort port, const TString& workerName) + : Port(port) + , WorkerName(workerName) + {} + }; + + struct TEvConfirmListen : NActors::TEventLocal<TEvConfirmListen, EvConfirmListen> { + THttpConfig::SocketAddressType Address; + + TEvConfirmListen(const THttpConfig::SocketAddressType& address) + : Address(address) + {} + }; + + struct TEvRegisterHandler : NActors::TEventLocal<TEvRegisterHandler, EvRegisterHandler> { + TString Path; TActorId Handler; - + TEvRegisterHandler(const TString& path, const TActorId& handler) - : Path(path) - , Handler(handler) - {} - }; - - struct TEvHttpIncomingRequest : NActors::TEventLocal<TEvHttpIncomingRequest, EvHttpIncomingRequest> { - THttpIncomingRequestPtr Request; - - TEvHttpIncomingRequest(THttpIncomingRequestPtr request) - : Request(std::move(request)) - {} - }; - - struct TEvHttpOutgoingRequest : NActors::TEventLocal<TEvHttpOutgoingRequest, EvHttpOutgoingRequest> { - THttpOutgoingRequestPtr Request; - TDuration Timeout; - - TEvHttpOutgoingRequest(THttpOutgoingRequestPtr request) - : Request(std::move(request)) - {} - - TEvHttpOutgoingRequest(THttpOutgoingRequestPtr request, TDuration timeout) - : Request(std::move(request)) - , Timeout(timeout) - {} - }; - - struct TEvHttpIncomingResponse : NActors::TEventLocal<TEvHttpIncomingResponse, EvHttpIncomingResponse> { - THttpOutgoingRequestPtr Request; - THttpIncomingResponsePtr Response; - TString Error; - - TEvHttpIncomingResponse(THttpOutgoingRequestPtr request, THttpIncomingResponsePtr response, const TString& error) - : Request(std::move(request)) - , Response(std::move(response)) - , Error(error) - {} - - TEvHttpIncomingResponse(THttpOutgoingRequestPtr request, THttpIncomingResponsePtr response) - : Request(std::move(request)) - , Response(std::move(response)) - {} - - TString GetError() const { - TStringBuilder error; - if (Response != nullptr && !Response->Status.StartsWith('2')) { - error << Response->Status << ' ' << Response->Message; - } - if (!Error.empty()) { - if (!error.empty()) { - error << ';'; - } - error << Error; - } - return error; - } - }; - - struct TEvHttpOutgoingResponse : NActors::TEventLocal<TEvHttpOutgoingResponse, EvHttpOutgoingResponse> { - THttpOutgoingResponsePtr Response; - - TEvHttpOutgoingResponse(THttpOutgoingResponsePtr response) - : Response(std::move(response)) - {} - }; - - struct TEvHttpConnectionOpened : NActors::TEventLocal<TEvHttpConnectionOpened, EvHttpConnectionOpened> { - TString PeerAddress; + : Path(path) + , Handler(handler) + {} + }; + + struct TEvHttpIncomingRequest : NActors::TEventLocal<TEvHttpIncomingRequest, EvHttpIncomingRequest> { + THttpIncomingRequestPtr Request; + + TEvHttpIncomingRequest(THttpIncomingRequestPtr request) + : Request(std::move(request)) + {} + }; + + struct TEvHttpOutgoingRequest : NActors::TEventLocal<TEvHttpOutgoingRequest, EvHttpOutgoingRequest> { + THttpOutgoingRequestPtr Request; + TDuration Timeout; + + TEvHttpOutgoingRequest(THttpOutgoingRequestPtr request) + : Request(std::move(request)) + {} + + TEvHttpOutgoingRequest(THttpOutgoingRequestPtr request, TDuration timeout) + : Request(std::move(request)) + , Timeout(timeout) + {} + }; + + struct TEvHttpIncomingResponse : NActors::TEventLocal<TEvHttpIncomingResponse, EvHttpIncomingResponse> { + THttpOutgoingRequestPtr Request; + THttpIncomingResponsePtr Response; + TString Error; + + TEvHttpIncomingResponse(THttpOutgoingRequestPtr request, THttpIncomingResponsePtr response, const TString& error) + : Request(std::move(request)) + , Response(std::move(response)) + , Error(error) + {} + + TEvHttpIncomingResponse(THttpOutgoingRequestPtr request, THttpIncomingResponsePtr response) + : Request(std::move(request)) + , Response(std::move(response)) + {} + + TString GetError() const { + TStringBuilder error; + if (Response != nullptr && !Response->Status.StartsWith('2')) { + error << Response->Status << ' ' << Response->Message; + } + if (!Error.empty()) { + if (!error.empty()) { + error << ';'; + } + error << Error; + } + return error; + } + }; + + struct TEvHttpOutgoingResponse : NActors::TEventLocal<TEvHttpOutgoingResponse, EvHttpOutgoingResponse> { + THttpOutgoingResponsePtr Response; + + TEvHttpOutgoingResponse(THttpOutgoingResponsePtr response) + : Response(std::move(response)) + {} + }; + + struct TEvHttpConnectionOpened : NActors::TEventLocal<TEvHttpConnectionOpened, EvHttpConnectionOpened> { + TString PeerAddress; TActorId ConnectionID; - + TEvHttpConnectionOpened(const TString& peerAddress, const TActorId& connectionID) - : PeerAddress(peerAddress) - , ConnectionID(connectionID) - {} - }; - - struct TEvHttpConnectionClosed : NActors::TEventLocal<TEvHttpConnectionClosed, EvHttpConnectionClosed> { + : PeerAddress(peerAddress) + , ConnectionID(connectionID) + {} + }; + + struct TEvHttpConnectionClosed : NActors::TEventLocal<TEvHttpConnectionClosed, EvHttpConnectionClosed> { TActorId ConnectionID; - TDeque<THttpIncomingRequestPtr> RecycledRequests; - + TDeque<THttpIncomingRequestPtr> RecycledRequests; + TEvHttpConnectionClosed(const TActorId& connectionID) - : ConnectionID(connectionID) - {} - + : ConnectionID(connectionID) + {} + TEvHttpConnectionClosed(const TActorId& connectionID, TDeque<THttpIncomingRequestPtr> recycledRequests) - : ConnectionID(connectionID) - , RecycledRequests(std::move(recycledRequests)) - {} - }; - - struct TEvHttpAcceptorClosed : NActors::TEventLocal<TEvHttpAcceptorClosed, EvHttpAcceptorClosed> { + : ConnectionID(connectionID) + , RecycledRequests(std::move(recycledRequests)) + {} + }; + + struct TEvHttpAcceptorClosed : NActors::TEventLocal<TEvHttpAcceptorClosed, EvHttpAcceptorClosed> { TActorId ConnectionID; - + TEvHttpAcceptorClosed(const TActorId& connectionID) - : ConnectionID(connectionID) - {} - }; - - struct TEvResolveHostRequest : NActors::TEventLocal<TEvResolveHostRequest, EvResolveHostRequest> { - TString Host; - - TEvResolveHostRequest(const TString& host) - : Host(host) - {} - }; - - struct TEvResolveHostResponse : NActors::TEventLocal<TEvResolveHostResponse, EvResolveHostResponse> { - TString Host; - TSockAddrInet6 Address; - TString Error; - - TEvResolveHostResponse(const TString& host, const TSockAddrInet6& address) - : Host(host) - , Address(address) - {} - - TEvResolveHostResponse(const TString& error) - : Error(error) - {} - }; - - struct TEvReportSensors : NActors::TEventLocal<TEvReportSensors, EvReportSensors> { - TString Direction; - TString Host; - TString Url; - TString Status; - TDuration Time; - - TEvReportSensors( - TStringBuf direction, - TStringBuf host, - TStringBuf url, - TStringBuf status, - TDuration time) - : Direction(direction) - , Host(host) - , Url(url) - , Status(status) - , Time(time) - {} - }; -}; - -struct TEndpointInfo { + : ConnectionID(connectionID) + {} + }; + + struct TEvResolveHostRequest : NActors::TEventLocal<TEvResolveHostRequest, EvResolveHostRequest> { + TString Host; + + TEvResolveHostRequest(const TString& host) + : Host(host) + {} + }; + + struct TEvResolveHostResponse : NActors::TEventLocal<TEvResolveHostResponse, EvResolveHostResponse> { + TString Host; + TSockAddrInet6 Address; + TString Error; + + TEvResolveHostResponse(const TString& host, const TSockAddrInet6& address) + : Host(host) + , Address(address) + {} + + TEvResolveHostResponse(const TString& error) + : Error(error) + {} + }; + + struct TEvReportSensors : NActors::TEventLocal<TEvReportSensors, EvReportSensors> { + TString Direction; + TString Host; + TString Url; + TString Status; + TDuration Time; + + TEvReportSensors( + TStringBuf direction, + TStringBuf host, + TStringBuf url, + TStringBuf status, + TDuration time) + : Direction(direction) + , Host(host) + , Url(url) + , Status(status) + , Time(time) + {} + }; +}; + +struct TEndpointInfo { TActorId Proxy; TActorId Owner; - TString WorkerName; - bool Secure; - TSslHelpers::TSslHolder<SSL_CTX> SecureContext; -}; - + TString WorkerName; + bool Secure; + TSslHelpers::TSslHolder<SSL_CTX> SecureContext; +}; + NActors::IActor* CreateHttpProxy(NMonitoring::TMetricRegistry& sensors); NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller); NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, const TString& host, bool secure, const TActorId& poller); -NActors::IActor* CreateIncomingConnectionActor( - const TEndpointInfo& endpoint, - TIntrusivePtr<TSocketDescriptor> socket, - THttpConfig::SocketAddressType address, - THttpIncomingRequestPtr recycledRequest = nullptr); -TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingRequestPtr& request, const THttpIncomingResponsePtr& response); -TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingRequestPtr& request, const THttpOutgoingResponsePtr& response); - -} +NActors::IActor* CreateIncomingConnectionActor( + const TEndpointInfo& endpoint, + TIntrusivePtr<TSocketDescriptor> socket, + THttpConfig::SocketAddressType address, + THttpIncomingRequestPtr recycledRequest = nullptr); +TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingRequestPtr& request, const THttpIncomingResponsePtr& response); +TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingRequestPtr& request, const THttpOutgoingResponsePtr& response); + +} diff --git a/library/cpp/actors/http/http_proxy_acceptor.cpp b/library/cpp/actors/http/http_proxy_acceptor.cpp index 9780541b71..3d2557c6fe 100644 --- a/library/cpp/actors/http/http_proxy_acceptor.cpp +++ b/library/cpp/actors/http/http_proxy_acceptor.cpp @@ -1,135 +1,135 @@ -#include <util/network/sock.h> -#include "http_proxy.h" -#include "http_proxy_ssl.h" - -namespace NHttp { - -class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfig { -public: - using TBase = NActors::TActor<TAcceptorActor>; +#include <util/network/sock.h> +#include "http_proxy.h" +#include "http_proxy_ssl.h" + +namespace NHttp { + +class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfig { +public: + using TBase = NActors::TActor<TAcceptorActor>; const TActorId Owner; const TActorId Poller; - TIntrusivePtr<TSocketDescriptor> Socket; + TIntrusivePtr<TSocketDescriptor> Socket; NActors::TPollerToken::TPtr PollerToken; THashSet<TActorId> Connections; - TDeque<THttpIncomingRequestPtr> RecycledRequests; - TEndpointInfo Endpoint; - + TDeque<THttpIncomingRequestPtr> RecycledRequests; + TEndpointInfo Endpoint; + TAcceptorActor(const TActorId& owner, const TActorId& poller) - : NActors::TActor<TAcceptorActor>(&TAcceptorActor::StateInit) - , Owner(owner) - , Poller(poller) - , Socket(new TSocketDescriptor()) - { - // for unit tests :( - CheckedSetSockOpt(Socket->Socket, SOL_SOCKET, SO_REUSEADDR, (int)true, "reuse address"); -#ifdef SO_REUSEPORT - CheckedSetSockOpt(Socket->Socket, SOL_SOCKET, SO_REUSEPORT, (int)true, "reuse port"); -#endif - } - -protected: - STFUNC(StateListening) { - switch (ev->GetTypeRewrite()) { + : NActors::TActor<TAcceptorActor>(&TAcceptorActor::StateInit) + , Owner(owner) + , Poller(poller) + , Socket(new TSocketDescriptor()) + { + // for unit tests :( + CheckedSetSockOpt(Socket->Socket, SOL_SOCKET, SO_REUSEADDR, (int)true, "reuse address"); +#ifdef SO_REUSEPORT + CheckedSetSockOpt(Socket->Socket, SOL_SOCKET, SO_REUSEPORT, (int)true, "reuse port"); +#endif + } + +protected: + STFUNC(StateListening) { + switch (ev->GetTypeRewrite()) { HFunc(NActors::TEvPollerRegisterResult, Handle); HFunc(NActors::TEvPollerReady, Handle); - HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle); - HFunc(TEvHttpProxy::TEvReportSensors, Handle); - } - } - - STFUNC(StateInit) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvHttpProxy::TEvAddListeningPort, HandleInit); - } - } - - void HandleInit(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { - SocketAddressType bindAddress("::", event->Get()->Port); - Endpoint.Owner = ctx.SelfID; - Endpoint.Proxy = Owner; - Endpoint.WorkerName = event->Get()->WorkerName; - Endpoint.Secure = event->Get()->Secure; - int err = 0; - if (Endpoint.Secure) { + HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle); + HFunc(TEvHttpProxy::TEvReportSensors, Handle); + } + } + + STFUNC(StateInit) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvHttpProxy::TEvAddListeningPort, HandleInit); + } + } + + void HandleInit(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { + SocketAddressType bindAddress("::", event->Get()->Port); + Endpoint.Owner = ctx.SelfID; + Endpoint.Proxy = Owner; + Endpoint.WorkerName = event->Get()->WorkerName; + Endpoint.Secure = event->Get()->Secure; + int err = 0; + if (Endpoint.Secure) { if (!event->Get()->SslCertificatePem.empty()) { Endpoint.SecureContext = TSslHelpers::CreateServerContext(event->Get()->SslCertificatePem); } else { Endpoint.SecureContext = TSslHelpers::CreateServerContext(event->Get()->CertificateFile, event->Get()->PrivateKeyFile); } - if (Endpoint.SecureContext == nullptr) { - err = -1; - LOG_WARN_S(ctx, HttpLog, "Failed to construct server security context"); - } - } - if (err == 0) { - err = Socket->Socket.Bind(&bindAddress); - } - if (err == 0) { - err = Socket->Socket.Listen(LISTEN_QUEUE); - if (err == 0) { - LOG_INFO_S(ctx, HttpLog, "Listening on " << bindAddress.ToString()); - SetNonBlock(Socket->Socket); + if (Endpoint.SecureContext == nullptr) { + err = -1; + LOG_WARN_S(ctx, HttpLog, "Failed to construct server security context"); + } + } + if (err == 0) { + err = Socket->Socket.Bind(&bindAddress); + } + if (err == 0) { + err = Socket->Socket.Listen(LISTEN_QUEUE); + if (err == 0) { + LOG_INFO_S(ctx, HttpLog, "Listening on " << bindAddress.ToString()); + SetNonBlock(Socket->Socket); ctx.Send(Poller, new NActors::TEvPollerRegister(Socket, SelfId(), SelfId())); - TBase::Become(&TAcceptorActor::StateListening); - ctx.Send(event->Sender, new TEvHttpProxy::TEvConfirmListen(bindAddress), 0, event->Cookie); - return; - } - } - LOG_WARN_S(ctx, HttpLog, "Failed to listen on " << bindAddress.ToString() << " - retrying..."); + TBase::Become(&TAcceptorActor::StateListening); + ctx.Send(event->Sender, new TEvHttpProxy::TEvConfirmListen(bindAddress), 0, event->Cookie); + return; + } + } + LOG_WARN_S(ctx, HttpLog, "Failed to listen on " << bindAddress.ToString() << " - retrying..."); ctx.ExecutorThread.Schedule(TDuration::Seconds(1), event.Release()); - } - - void Die(const NActors::TActorContext& ctx) override { - ctx.Send(Owner, new TEvHttpProxy::TEvHttpAcceptorClosed(ctx.SelfID)); + } + + void Die(const NActors::TActorContext& ctx) override { + ctx.Send(Owner, new TEvHttpProxy::TEvHttpAcceptorClosed(ctx.SelfID)); for (const NActors::TActorId& connection : Connections) { - ctx.Send(connection, new NActors::TEvents::TEvPoisonPill()); - } - } - + ctx.Send(connection, new NActors::TEvents::TEvPoisonPill()); + } + } + void Handle(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& /*ctx*/) { PollerToken = std::move(ev->Get()->PollerToken); PollerToken->Request(true, false); // request read polling } void Handle(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) { - TIntrusivePtr<TSocketDescriptor> socket = new TSocketDescriptor(); - SocketAddressType addr; + TIntrusivePtr<TSocketDescriptor> socket = new TSocketDescriptor(); + SocketAddressType addr; int err; while ((err = Socket->Socket.Accept(&socket->Socket, &addr)) == 0) { - NActors::IActor* connectionSocket = nullptr; - if (RecycledRequests.empty()) { - connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr); - } else { - connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr, std::move(RecycledRequests.front())); - RecycledRequests.pop_front(); - } + NActors::IActor* connectionSocket = nullptr; + if (RecycledRequests.empty()) { + connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr); + } else { + connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr, std::move(RecycledRequests.front())); + RecycledRequests.pop_front(); + } NActors::TActorId connectionId = ctx.Register(connectionSocket); ctx.Send(Poller, new NActors::TEvPollerRegister(socket, connectionId, connectionId)); - Connections.emplace(connectionId); - socket = new TSocketDescriptor(); - } + Connections.emplace(connectionId); + socket = new TSocketDescriptor(); + } if (err == -EAGAIN || err == -EWOULDBLOCK) { // request poller for further connection polling Y_VERIFY(PollerToken); PollerToken->Request(true, false); } - } - - void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) { - Connections.erase(event->Get()->ConnectionID); - for (auto& req : event->Get()->RecycledRequests) { - req->Clear(); - RecycledRequests.push_back(std::move(req)); - } - } - - void Handle(TEvHttpProxy::TEvReportSensors::TPtr event, const NActors::TActorContext& ctx) { - ctx.Send(event->Forward(Owner)); - } -}; - + } + + void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) { + Connections.erase(event->Get()->ConnectionID); + for (auto& req : event->Get()->RecycledRequests) { + req->Clear(); + RecycledRequests.push_back(std::move(req)); + } + } + + void Handle(TEvHttpProxy::TEvReportSensors::TPtr event, const NActors::TActorContext& ctx) { + ctx.Send(event->Forward(Owner)); + } +}; + NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller) { - return new TAcceptorActor(owner, poller); -} - -} + return new TAcceptorActor(owner, poller); +} + +} diff --git a/library/cpp/actors/http/http_proxy_incoming.cpp b/library/cpp/actors/http/http_proxy_incoming.cpp index 80fe2af53d..0608e0e25b 100644 --- a/library/cpp/actors/http/http_proxy_incoming.cpp +++ b/library/cpp/actors/http/http_proxy_incoming.cpp @@ -1,80 +1,80 @@ -#include "http_proxy.h" -#include "http_proxy_sock_impl.h" - -namespace NHttp { - +#include "http_proxy.h" +#include "http_proxy_sock_impl.h" + +namespace NHttp { + using namespace NActors; -template <typename TSocketImpl> -class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig { -public: - using TBase = TActor<TIncomingConnectionActor<TSocketImpl>>; - static constexpr bool RecycleRequests = true; - - const TEndpointInfo& Endpoint; - SocketAddressType Address; - TList<THttpIncomingRequestPtr> Requests; - THashMap<THttpIncomingRequestPtr, THttpOutgoingResponsePtr> Responses; - THttpIncomingRequestPtr CurrentRequest; - THttpOutgoingResponsePtr CurrentResponse; - TDeque<THttpIncomingRequestPtr> RecycledRequests; - +template <typename TSocketImpl> +class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig { +public: + using TBase = TActor<TIncomingConnectionActor<TSocketImpl>>; + static constexpr bool RecycleRequests = true; + + const TEndpointInfo& Endpoint; + SocketAddressType Address; + TList<THttpIncomingRequestPtr> Requests; + THashMap<THttpIncomingRequestPtr, THttpOutgoingResponsePtr> Responses; + THttpIncomingRequestPtr CurrentRequest; + THttpOutgoingResponsePtr CurrentResponse; + TDeque<THttpIncomingRequestPtr> RecycledRequests; + THPTimer InactivityTimer; static constexpr TDuration InactivityTimeout = TDuration::Minutes(2); TEvPollerReady* InactivityEvent = nullptr; TPollerToken::TPtr PollerToken; - TIncomingConnectionActor( - const TEndpointInfo& endpoint, - TIntrusivePtr<TSocketDescriptor> socket, - SocketAddressType address, - THttpIncomingRequestPtr recycledRequest = nullptr) - : TBase(&TIncomingConnectionActor::StateAccepting) - , TSocketImpl(std::move(socket)) - , Endpoint(endpoint) - , Address(address) - { - if (recycledRequest != nullptr) { - RecycledRequests.emplace_back(std::move(recycledRequest)); - } - TSocketImpl::SetNonBlock(); - } - - void CleanupRequest(THttpIncomingRequestPtr& request) { - if (RecycleRequests) { - request->Clear(); - RecycledRequests.push_back(std::move(request)); - } else { - request = nullptr; - } - } - - void CleanupResponse(THttpOutgoingResponsePtr& response) { - CleanupRequest(response->Request); - // TODO: maybe recycle too? - response = nullptr; - } - + TIncomingConnectionActor( + const TEndpointInfo& endpoint, + TIntrusivePtr<TSocketDescriptor> socket, + SocketAddressType address, + THttpIncomingRequestPtr recycledRequest = nullptr) + : TBase(&TIncomingConnectionActor::StateAccepting) + , TSocketImpl(std::move(socket)) + , Endpoint(endpoint) + , Address(address) + { + if (recycledRequest != nullptr) { + RecycledRequests.emplace_back(std::move(recycledRequest)); + } + TSocketImpl::SetNonBlock(); + } + + void CleanupRequest(THttpIncomingRequestPtr& request) { + if (RecycleRequests) { + request->Clear(); + RecycledRequests.push_back(std::move(request)); + } else { + request = nullptr; + } + } + + void CleanupResponse(THttpOutgoingResponsePtr& response) { + CleanupRequest(response->Request); + // TODO: maybe recycle too? + response = nullptr; + } + TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) override { return new IEventHandle(self, parent, new TEvents::TEvBootstrap()); } void Die(const TActorContext& ctx) override { - ctx.Send(Endpoint.Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID, std::move(RecycledRequests))); - TSocketImpl::Shutdown(); - TBase::Die(ctx); - } - -protected: + ctx.Send(Endpoint.Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID, std::move(RecycledRequests))); + TSocketImpl::Shutdown(); + TBase::Die(ctx); + } + +protected: void Bootstrap(const TActorContext& ctx) { InactivityTimer.Reset(); ctx.Schedule(InactivityTimeout, InactivityEvent = new TEvPollerReady(nullptr, false, false)); - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") incoming connection opened"); - OnAccept(ctx); + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") incoming connection opened"); + OnAccept(ctx); } - void OnAccept(const NActors::TActorContext& ctx) { + void OnAccept(const NActors::TActorContext& ctx) { int res; bool read = false, write = false; if ((res = TSocketImpl::OnAccept(Endpoint, read, write)) != 1) { @@ -86,21 +86,21 @@ protected: } else { LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in Accept: " << strerror(-res)); return Die(ctx); - } - } - TBase::Become(&TIncomingConnectionActor::StateConnected); + } + } + TBase::Become(&TIncomingConnectionActor::StateConnected); ctx.Send(ctx.SelfID, new TEvPollerReady(nullptr, true, true)); - } - + } + void HandleAccepting(TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) { PollerToken = std::move(ev->Get()->PollerToken); - OnAccept(ctx); - } - + OnAccept(ctx); + } + void HandleAccepting(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) { - OnAccept(ctx); - } - + OnAccept(ctx); + } + void HandleConnected(TEvPollerReady::TPtr event, const TActorContext& ctx) { if (event->Get()->Read) { for (;;) { @@ -114,7 +114,7 @@ protected: CurrentRequest->Address = Address; CurrentRequest->WorkerName = Endpoint.WorkerName; CurrentRequest->Secure = Endpoint.Secure; - } + } if (!CurrentRequest->EnsureEnoughSpaceAvailable()) { LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - not enough space available"); return Die(ctx); @@ -134,13 +134,13 @@ protected: CurrentRequest = nullptr; } else if (CurrentRequest->IsError()) { LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -! (" << CurrentRequest->Method << " " << CurrentRequest->URL << ")"); - bool success = Respond(CurrentRequest->CreateResponseBadRequest(), ctx); - if (!success) { - return; - } + bool success = Respond(CurrentRequest->CreateResponseBadRequest(), ctx); + if (!success) { + return; + } CurrentRequest = nullptr; } - } + } } else if (-res == EAGAIN || -res == EWOULDBLOCK) { if (PollerToken) { if (!read && !write) { @@ -148,18 +148,18 @@ protected: } PollerToken->Request(read, write); } - break; + break; } else if (-res == EINTR) { continue; } else if (!res) { // connection closed - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed"); - return Die(ctx); + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed"); + return Die(ctx); } else { - LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in Receive: " << strerror(-res)); - return Die(ctx); - } - } + LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in Receive: " << strerror(-res)); + return Die(ctx); + } + } if (event->Get() == InactivityEvent) { const TDuration passed = TDuration::Seconds(std::abs(InactivityTimer.Passed())); if (passed >= InactivityTimeout) { @@ -173,83 +173,83 @@ protected: if (event->Get()->Write) { FlushOutput(ctx); } - } - + } + void HandleConnected(TEvPollerRegisterResult::TPtr ev, const TActorContext& /*ctx*/) { PollerToken = std::move(ev->Get()->PollerToken); PollerToken->Request(true, true); - } - - void HandleConnected(TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const TActorContext& ctx) { - Respond(event->Get()->Response, ctx); - } - - bool Respond(THttpOutgoingResponsePtr response, const TActorContext& ctx) { - THttpIncomingRequestPtr request = response->GetRequest(); - response->Finish(); - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- (" << response->Status << " " << response->Message << ")"); - if (response->Status != "200" && response->Status != "404") { - static constexpr size_t MAX_LOGGED_SIZE = 1024; - LOG_DEBUG_S(ctx, HttpLog, - "(#" - << TSocketImpl::GetRawSocket() - << "," - << Address - << ") Request: " - << request->GetObfuscatedData().substr(0, MAX_LOGGED_SIZE)); - LOG_DEBUG_S(ctx, HttpLog, - "(#" - << TSocketImpl::GetRawSocket() - << "," - << Address - << ") Response: " - << TString(response->GetRawData()).substr(0, MAX_LOGGED_SIZE)); - } + } + + void HandleConnected(TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const TActorContext& ctx) { + Respond(event->Get()->Response, ctx); + } + + bool Respond(THttpOutgoingResponsePtr response, const TActorContext& ctx) { + THttpIncomingRequestPtr request = response->GetRequest(); + response->Finish(); + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- (" << response->Status << " " << response->Message << ")"); + if (response->Status != "200" && response->Status != "404") { + static constexpr size_t MAX_LOGGED_SIZE = 1024; + LOG_DEBUG_S(ctx, HttpLog, + "(#" + << TSocketImpl::GetRawSocket() + << "," + << Address + << ") Request: " + << request->GetObfuscatedData().substr(0, MAX_LOGGED_SIZE)); + LOG_DEBUG_S(ctx, HttpLog, + "(#" + << TSocketImpl::GetRawSocket() + << "," + << Address + << ") Response: " + << TString(response->GetRawData()).substr(0, MAX_LOGGED_SIZE)); + } THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildIncomingRequestSensors(request, response)); - ctx.Send(Endpoint.Owner, sensors.Release()); - if (request == Requests.front() && CurrentResponse == nullptr) { - CurrentResponse = response; - return FlushOutput(ctx); - } else { - // we are ahead of our pipeline - Responses.emplace(request, response); - return true; - } - } - - bool FlushOutput(const TActorContext& ctx) { - while (CurrentResponse != nullptr) { - size_t size = CurrentResponse->Size(); - if (size == 0) { - Y_VERIFY(Requests.front() == CurrentResponse->GetRequest()); - bool close = CurrentResponse->IsConnectionClose(); - Requests.pop_front(); - CleanupResponse(CurrentResponse); - if (!Requests.empty()) { - auto it = Responses.find(Requests.front()); - if (it != Responses.end()) { - CurrentResponse = it->second; - Responses.erase(it); - continue; - } else { - LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - FlushOutput request not found"); - Die(ctx); - return false; - } - } else { - if (close) { - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed"); - Die(ctx); - return false; - } else { - continue; - } - } - } + ctx.Send(Endpoint.Owner, sensors.Release()); + if (request == Requests.front() && CurrentResponse == nullptr) { + CurrentResponse = response; + return FlushOutput(ctx); + } else { + // we are ahead of our pipeline + Responses.emplace(request, response); + return true; + } + } + + bool FlushOutput(const TActorContext& ctx) { + while (CurrentResponse != nullptr) { + size_t size = CurrentResponse->Size(); + if (size == 0) { + Y_VERIFY(Requests.front() == CurrentResponse->GetRequest()); + bool close = CurrentResponse->IsConnectionClose(); + Requests.pop_front(); + CleanupResponse(CurrentResponse); + if (!Requests.empty()) { + auto it = Responses.find(Requests.front()); + if (it != Responses.end()) { + CurrentResponse = it->second; + Responses.erase(it); + continue; + } else { + LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - FlushOutput request not found"); + Die(ctx); + return false; + } + } else { + if (close) { + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed"); + Die(ctx); + return false; + } else { + continue; + } + } + } bool read = false, write = false; ssize_t res = TSocketImpl::Send(CurrentResponse->Data(), size, read, write); - if (res > 0) { - CurrentResponse->ChopHead(res); + if (res > 0) { + CurrentResponse->ChopHead(res); } else if (-res == EINTR) { continue; } else if (-res == EAGAIN || -res == EWOULDBLOCK) { @@ -258,45 +258,45 @@ protected: write = true; } PollerToken->Request(read, write); - } - break; + } + break; } else { CleanupResponse(CurrentResponse); LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in FlushOutput: " << strerror(-res)); - Die(ctx); - return false; - } - } - return true; - } - - STFUNC(StateAccepting) { - switch (ev->GetTypeRewrite()) { + Die(ctx); + return false; + } + } + return true; + } + + STFUNC(StateAccepting) { + switch (ev->GetTypeRewrite()) { CFunc(TEvents::TEvBootstrap::EventType, Bootstrap); HFunc(TEvPollerReady, HandleAccepting); HFunc(TEvPollerRegisterResult, HandleAccepting); - } - } - - STFUNC(StateConnected) { - switch (ev->GetTypeRewrite()) { + } + } + + STFUNC(StateConnected) { + switch (ev->GetTypeRewrite()) { HFunc(TEvPollerReady, HandleConnected); - HFunc(TEvHttpProxy::TEvHttpOutgoingResponse, HandleConnected); + HFunc(TEvHttpProxy::TEvHttpOutgoingResponse, HandleConnected); HFunc(TEvPollerRegisterResult, HandleConnected); - } - } -}; - -IActor* CreateIncomingConnectionActor( - const TEndpointInfo& endpoint, - TIntrusivePtr<TSocketDescriptor> socket, - THttpConfig::SocketAddressType address, - THttpIncomingRequestPtr recycledRequest) { - if (endpoint.Secure) { - return new TIncomingConnectionActor<TSecureSocketImpl>(endpoint, std::move(socket), address, std::move(recycledRequest)); - } else { - return new TIncomingConnectionActor<TPlainSocketImpl>(endpoint, std::move(socket), address, std::move(recycledRequest)); - } -} - -} + } + } +}; + +IActor* CreateIncomingConnectionActor( + const TEndpointInfo& endpoint, + TIntrusivePtr<TSocketDescriptor> socket, + THttpConfig::SocketAddressType address, + THttpIncomingRequestPtr recycledRequest) { + if (endpoint.Secure) { + return new TIncomingConnectionActor<TSecureSocketImpl>(endpoint, std::move(socket), address, std::move(recycledRequest)); + } else { + return new TIncomingConnectionActor<TPlainSocketImpl>(endpoint, std::move(socket), address, std::move(recycledRequest)); + } +} + +} diff --git a/library/cpp/actors/http/http_proxy_outgoing.cpp b/library/cpp/actors/http/http_proxy_outgoing.cpp index d9189dba8a..5b3d07c614 100644 --- a/library/cpp/actors/http/http_proxy_outgoing.cpp +++ b/library/cpp/actors/http/http_proxy_outgoing.cpp @@ -1,92 +1,92 @@ -#include "http_proxy.h" -#include "http_proxy_sock_impl.h" - -namespace NHttp { - -template <typename TSocketImpl> -class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig { -public: - using TBase = NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>; - using TSelf = TOutgoingConnectionActor<TSocketImpl>; +#include "http_proxy.h" +#include "http_proxy_sock_impl.h" + +namespace NHttp { + +template <typename TSocketImpl> +class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig { +public: + using TBase = NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>; + using TSelf = TOutgoingConnectionActor<TSocketImpl>; const TActorId Owner; const TActorId Poller; - SocketAddressType Address; - TString Host; + SocketAddressType Address; + TString Host; TActorId RequestOwner; - THttpOutgoingRequestPtr Request; - THttpIncomingResponsePtr Response; - TInstant LastActivity; - TDuration ConnectionTimeout = CONNECTION_TIMEOUT; + THttpOutgoingRequestPtr Request; + THttpIncomingResponsePtr Response; + TInstant LastActivity; + TDuration ConnectionTimeout = CONNECTION_TIMEOUT; NActors::TPollerToken::TPtr PollerToken; - + TOutgoingConnectionActor(const TActorId& owner, const TString& host, const TActorId& poller) - : TBase(&TSelf::StateWaiting) - , Owner(owner) - , Poller(poller) - , Host(host) - { - TSocketImpl::SetNonBlock(); - TSocketImpl::SetTimeout(SOCKET_TIMEOUT); - } - + : TBase(&TSelf::StateWaiting) + , Owner(owner) + , Poller(poller) + , Host(host) + { + TSocketImpl::SetNonBlock(); + TSocketImpl::SetTimeout(SOCKET_TIMEOUT); + } + void Die(const NActors::TActorContext& ctx) override { - ctx.Send(Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID)); - TSocketImpl::Shutdown(); // to avoid errors when connection already closed - TBase::Die(ctx); - } - - void ReplyAndDie(const NActors::TActorContext& ctx) { - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -> (" << Response->Status << " " << Response->Message << ")"); - ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response)); + ctx.Send(Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID)); + TSocketImpl::Shutdown(); // to avoid errors when connection already closed + TBase::Die(ctx); + } + + void ReplyAndDie(const NActors::TActorContext& ctx) { + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -> (" << Response->Status << " " << Response->Message << ")"); + ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response)); RequestOwner = TActorId(); THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response)); - ctx.Send(Owner, sensors.Release()); - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed"); - Die(ctx); - } - - void ReplyErrorAndDie(const NActors::TActorContext& ctx, const TString& error) { - LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed with error: " << error); - if (RequestOwner) { - ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response, error)); + ctx.Send(Owner, sensors.Release()); + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed"); + Die(ctx); + } + + void ReplyErrorAndDie(const NActors::TActorContext& ctx, const TString& error) { + LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed with error: " << error); + if (RequestOwner) { + ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response, error)); RequestOwner = TActorId(); THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response)); - ctx.Send(Owner, sensors.Release()); - Die(ctx); - } - } - -protected: - void FailConnection(const NActors::TActorContext& ctx, const TString& error) { - if (Request) { - return ReplyErrorAndDie(ctx, error); - } - return TBase::Become(&TOutgoingConnectionActor::StateFailed); - } - - void Connect(const NActors::TActorContext& ctx) { - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connecting"); - int res = TSocketImpl::Connect(Address); - RegisterPoller(ctx); - switch (-res) { - case 0: - return OnConnect(ctx); - case EINPROGRESS: - case EAGAIN: - return TBase::Become(&TOutgoingConnectionActor::StateConnecting); - default: - return ReplyErrorAndDie(ctx, strerror(-res)); - } - } - - void FlushOutput(const NActors::TActorContext& ctx) { - if (Request != nullptr) { - Request->Finish(); + ctx.Send(Owner, sensors.Release()); + Die(ctx); + } + } + +protected: + void FailConnection(const NActors::TActorContext& ctx, const TString& error) { + if (Request) { + return ReplyErrorAndDie(ctx, error); + } + return TBase::Become(&TOutgoingConnectionActor::StateFailed); + } + + void Connect(const NActors::TActorContext& ctx) { + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connecting"); + int res = TSocketImpl::Connect(Address); + RegisterPoller(ctx); + switch (-res) { + case 0: + return OnConnect(ctx); + case EINPROGRESS: + case EAGAIN: + return TBase::Become(&TOutgoingConnectionActor::StateConnecting); + default: + return ReplyErrorAndDie(ctx, strerror(-res)); + } + } + + void FlushOutput(const NActors::TActorContext& ctx) { + if (Request != nullptr) { + Request->Finish(); while (auto size = Request->Size()) { bool read = false, write = false; ssize_t res = TSocketImpl::Send(Request->Data(), size, read, write); - if (res > 0) { - Request->ChopHead(res); + if (res > 0) { + Request->ChopHead(res); } else if (-res == EINTR) { continue; } else if (-res == EAGAIN || -res == EWOULDBLOCK) { @@ -97,30 +97,30 @@ protected: PollerToken->Request(read, write); } break; - } else { + } else { if (!res) { - ReplyAndDie(ctx); + ReplyAndDie(ctx); } else { - ReplyErrorAndDie(ctx, strerror(-res)); - } + ReplyErrorAndDie(ctx, strerror(-res)); + } break; - } - } - } - } - - void PullInput(const NActors::TActorContext& ctx) { + } + } + } + } + + void PullInput(const NActors::TActorContext& ctx) { for (;;) { - if (Response == nullptr) { - Response = new THttpIncomingResponse(Request); - } - if (!Response->EnsureEnoughSpaceAvailable()) { - return ReplyErrorAndDie(ctx, "Not enough space in socket buffer"); - } + if (Response == nullptr) { + Response = new THttpIncomingResponse(Request); + } + if (!Response->EnsureEnoughSpaceAvailable()) { + return ReplyErrorAndDie(ctx, "Not enough space in socket buffer"); + } bool read = false, write = false; ssize_t res = TSocketImpl::Recv(Response->Pos(), Response->Avail(), read, write); - if (res > 0) { - Response->Advance(res); + if (res > 0) { + Response->Advance(res); if (Response->IsDone() && Response->IsReady()) { return ReplyAndDie(ctx); } @@ -130,169 +130,169 @@ protected: if (PollerToken) { if (!read && !write) { read = true; - } + } PollerToken->Request(read, write); - } + } return; - } else { + } else { if (!res) { - Response->ConnectionClosed(); - } + Response->ConnectionClosed(); + } if (Response->IsDone() && Response->IsReady()) { return ReplyAndDie(ctx); } return ReplyErrorAndDie(ctx, strerror(-res)); - } + } } - } - - void RegisterPoller(const NActors::TActorContext& ctx) { + } + + void RegisterPoller(const NActors::TActorContext& ctx) { ctx.Send(Poller, new NActors::TEvPollerRegister(TSocketImpl::Socket, ctx.SelfID, ctx.SelfID)); - } - - void OnConnect(const NActors::TActorContext& ctx) { + } + + void OnConnect(const NActors::TActorContext& ctx) { bool read = false, write = false; if (int res = TSocketImpl::OnConnect(read, write); res != 1) { if (-res == EAGAIN) { if (PollerToken) { PollerToken->Request(read, write); } - return; + return; } else { return ReplyErrorAndDie(ctx, strerror(-res)); - } - } - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") outgoing connection opened"); - TBase::Become(&TOutgoingConnectionActor::StateConnected); - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- (" << Request->Method << " " << Request->URL << ")"); + } + } + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") outgoing connection opened"); + TBase::Become(&TOutgoingConnectionActor::StateConnected); + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- (" << Request->Method << " " << Request->URL << ")"); ctx.Send(ctx.SelfID, new NActors::TEvPollerReady(nullptr, true, true)); - } - - void HandleResolving(TEvHttpProxy::TEvResolveHostResponse::TPtr event, const NActors::TActorContext& ctx) { - LastActivity = ctx.Now(); - if (!event->Get()->Error.empty()) { - return FailConnection(ctx, event->Get()->Error); - } - Address = event->Get()->Address; - if (Address.GetPort() == 0) { - Address.SetPort(Request->Secure ? 443 : 80); - } - Connect(ctx); - } - + } + + void HandleResolving(TEvHttpProxy::TEvResolveHostResponse::TPtr event, const NActors::TActorContext& ctx) { + LastActivity = ctx.Now(); + if (!event->Get()->Error.empty()) { + return FailConnection(ctx, event->Get()->Error); + } + Address = event->Get()->Address; + if (Address.GetPort() == 0) { + Address.SetPort(Request->Secure ? 443 : 80); + } + Connect(ctx); + } + void HandleConnecting(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) { - LastActivity = ctx.Now(); - int res = TSocketImpl::GetError(); - if (res == 0) { - OnConnect(ctx); - } else { - FailConnection(ctx, TStringBuilder() << strerror(res)); - } - } - + LastActivity = ctx.Now(); + int res = TSocketImpl::GetError(); + if (res == 0) { + OnConnect(ctx); + } else { + FailConnection(ctx, TStringBuilder() << strerror(res)); + } + } + void HandleConnecting(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) { PollerToken = std::move(ev->Get()->PollerToken); - LastActivity = ctx.Now(); - int res = TSocketImpl::GetError(); - if (res == 0) { - OnConnect(ctx); - } else { - FailConnection(ctx, TStringBuilder() << strerror(res)); - } - } - - void HandleWaiting(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { - LastActivity = ctx.Now(); - Request = std::move(event->Get()->Request); - Host = Request->Host; - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << ") resolving " << Host); - Request->Timer.Reset(); - RequestOwner = event->Sender; - ctx.Send(Owner, new TEvHttpProxy::TEvResolveHostRequest(Host)); - if (event->Get()->Timeout) { - ConnectionTimeout = event->Get()->Timeout; - TSocketImpl::SetTimeout(ConnectionTimeout); - } - ctx.Schedule(ConnectionTimeout, new NActors::TEvents::TEvWakeup()); - LastActivity = ctx.Now(); - TBase::Become(&TOutgoingConnectionActor::StateResolving); - } - + LastActivity = ctx.Now(); + int res = TSocketImpl::GetError(); + if (res == 0) { + OnConnect(ctx); + } else { + FailConnection(ctx, TStringBuilder() << strerror(res)); + } + } + + void HandleWaiting(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { + LastActivity = ctx.Now(); + Request = std::move(event->Get()->Request); + Host = Request->Host; + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << ") resolving " << Host); + Request->Timer.Reset(); + RequestOwner = event->Sender; + ctx.Send(Owner, new TEvHttpProxy::TEvResolveHostRequest(Host)); + if (event->Get()->Timeout) { + ConnectionTimeout = event->Get()->Timeout; + TSocketImpl::SetTimeout(ConnectionTimeout); + } + ctx.Schedule(ConnectionTimeout, new NActors::TEvents::TEvWakeup()); + LastActivity = ctx.Now(); + TBase::Become(&TOutgoingConnectionActor::StateResolving); + } + void HandleConnected(NActors::TEvPollerReady::TPtr event, const NActors::TActorContext& ctx) { - LastActivity = ctx.Now(); + LastActivity = ctx.Now(); if (event->Get()->Read) { PullInput(ctx); - } + } if (event->Get()->Write) { FlushOutput(ctx); } - } - + } + void HandleConnected(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) { PollerToken = std::move(ev->Get()->PollerToken); - LastActivity = ctx.Now(); + LastActivity = ctx.Now(); PullInput(ctx); - FlushOutput(ctx); - } - - void HandleFailed(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { - Request = std::move(event->Get()->Request); - RequestOwner = event->Sender; - ReplyErrorAndDie(ctx, "Failed"); - } - - void HandleTimeout(const NActors::TActorContext& ctx) { - TDuration inactivityTime = ctx.Now() - LastActivity; - if (inactivityTime >= ConnectionTimeout) { - FailConnection(ctx, "Connection timed out"); - } else { - ctx.Schedule(Min(ConnectionTimeout - inactivityTime, TDuration::MilliSeconds(100)), new NActors::TEvents::TEvWakeup()); - } - } - - STFUNC(StateWaiting) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleWaiting); - CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); - } - } - - STFUNC(StateResolving) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvHttpProxy::TEvResolveHostResponse, HandleResolving); - CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); - } - } - - STFUNC(StateConnecting) { - switch (ev->GetTypeRewrite()) { + FlushOutput(ctx); + } + + void HandleFailed(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { + Request = std::move(event->Get()->Request); + RequestOwner = event->Sender; + ReplyErrorAndDie(ctx, "Failed"); + } + + void HandleTimeout(const NActors::TActorContext& ctx) { + TDuration inactivityTime = ctx.Now() - LastActivity; + if (inactivityTime >= ConnectionTimeout) { + FailConnection(ctx, "Connection timed out"); + } else { + ctx.Schedule(Min(ConnectionTimeout - inactivityTime, TDuration::MilliSeconds(100)), new NActors::TEvents::TEvWakeup()); + } + } + + STFUNC(StateWaiting) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleWaiting); + CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); + } + } + + STFUNC(StateResolving) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvHttpProxy::TEvResolveHostResponse, HandleResolving); + CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); + } + } + + STFUNC(StateConnecting) { + switch (ev->GetTypeRewrite()) { HFunc(NActors::TEvPollerReady, HandleConnecting); - CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); + CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); HFunc(NActors::TEvPollerRegisterResult, HandleConnecting); - } - } - - STFUNC(StateConnected) { - switch (ev->GetTypeRewrite()) { + } + } + + STFUNC(StateConnected) { + switch (ev->GetTypeRewrite()) { HFunc(NActors::TEvPollerReady, HandleConnected); - CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); + CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); HFunc(NActors::TEvPollerRegisterResult, HandleConnected); - } - } - - STFUNC(StateFailed) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleFailed); - } - } -}; - + } + } + + STFUNC(StateFailed) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleFailed); + } + } +}; + NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, const TString& host, bool secure, const TActorId& poller) { - if (secure) { - return new TOutgoingConnectionActor<TSecureSocketImpl>(owner, host, poller); - } else { - return new TOutgoingConnectionActor<TPlainSocketImpl>(owner, host, poller); - } -} - -} + if (secure) { + return new TOutgoingConnectionActor<TSecureSocketImpl>(owner, host, poller); + } else { + return new TOutgoingConnectionActor<TPlainSocketImpl>(owner, host, poller); + } +} + +} diff --git a/library/cpp/actors/http/http_proxy_sock_impl.h b/library/cpp/actors/http/http_proxy_sock_impl.h index bf8c71d05a..e7812cc5e1 100644 --- a/library/cpp/actors/http/http_proxy_sock_impl.h +++ b/library/cpp/actors/http/http_proxy_sock_impl.h @@ -1,262 +1,262 @@ -#pragma once - -#include "http.h" -#include "http_proxy.h" - -namespace NHttp { - -struct TPlainSocketImpl : virtual public THttpConfig { - TIntrusivePtr<TSocketDescriptor> Socket; - - TPlainSocketImpl() - : Socket(new TSocketDescriptor()) - {} - - TPlainSocketImpl(TIntrusivePtr<TSocketDescriptor> socket) - : Socket(std::move(socket)) - {} - - SOCKET GetRawSocket() const { - return static_cast<SOCKET>(Socket->Socket); - } - - void SetNonBlock(bool nonBlock = true) noexcept { - try { - ::SetNonBlock(Socket->Socket, nonBlock); - } - catch (const yexception&) { - } - } - - void SetTimeout(TDuration timeout) noexcept { - try { - ::SetSocketTimeout(Socket->Socket, timeout.Seconds(), timeout.MilliSecondsOfSecond()); - } - catch (const yexception&) { - } - } - - void Shutdown() { - //Socket->Socket.ShutDown(SHUT_RDWR); // KIKIMR-3895 - ::shutdown(Socket->Socket, SHUT_RDWR); - } - - int Connect(const SocketAddressType& address) { - return Socket->Socket.Connect(&address); - } - +#pragma once + +#include "http.h" +#include "http_proxy.h" + +namespace NHttp { + +struct TPlainSocketImpl : virtual public THttpConfig { + TIntrusivePtr<TSocketDescriptor> Socket; + + TPlainSocketImpl() + : Socket(new TSocketDescriptor()) + {} + + TPlainSocketImpl(TIntrusivePtr<TSocketDescriptor> socket) + : Socket(std::move(socket)) + {} + + SOCKET GetRawSocket() const { + return static_cast<SOCKET>(Socket->Socket); + } + + void SetNonBlock(bool nonBlock = true) noexcept { + try { + ::SetNonBlock(Socket->Socket, nonBlock); + } + catch (const yexception&) { + } + } + + void SetTimeout(TDuration timeout) noexcept { + try { + ::SetSocketTimeout(Socket->Socket, timeout.Seconds(), timeout.MilliSecondsOfSecond()); + } + catch (const yexception&) { + } + } + + void Shutdown() { + //Socket->Socket.ShutDown(SHUT_RDWR); // KIKIMR-3895 + ::shutdown(Socket->Socket, SHUT_RDWR); + } + + int Connect(const SocketAddressType& address) { + return Socket->Socket.Connect(&address); + } + static constexpr int OnConnect(bool&, bool&) { return 1; - } - + } + static constexpr int OnAccept(const TEndpointInfo&, bool&, bool&) { return 1; - } - - bool IsGood() { - int res; - GetSockOpt(Socket->Socket, SOL_SOCKET, SO_ERROR, res); - return res == 0; - } - - int GetError() { - int res; - GetSockOpt(Socket->Socket, SOL_SOCKET, SO_ERROR, res); - return res; - } - + } + + bool IsGood() { + int res; + GetSockOpt(Socket->Socket, SOL_SOCKET, SO_ERROR, res); + return res == 0; + } + + int GetError() { + int res; + GetSockOpt(Socket->Socket, SOL_SOCKET, SO_ERROR, res); + return res; + } + ssize_t Send(const void* data, size_t size, bool&, bool&) { - return Socket->Socket.Send(data, size); - } - + return Socket->Socket.Send(data, size); + } + ssize_t Recv(void* data, size_t size, bool&, bool&) { - return Socket->Socket.Recv(data, size); - } -}; - -struct TSecureSocketImpl : TPlainSocketImpl, TSslHelpers { - static TSecureSocketImpl* IO(BIO* bio) noexcept { - return static_cast<TSecureSocketImpl*>(BIO_get_data(bio)); - } - - static int IoWrite(BIO* bio, const char* data, int dlen) noexcept { - BIO_clear_retry_flags(bio); - int res = IO(bio)->Socket->Socket.Send(data, dlen); - if (-res == EAGAIN) { - BIO_set_retry_write(bio); - } - return res; - } - - static int IoRead(BIO* bio, char* data, int dlen) noexcept { - BIO_clear_retry_flags(bio); - int res = IO(bio)->Socket->Socket.Recv(data, dlen); - if (-res == EAGAIN) { - BIO_set_retry_read(bio); - } - return res; - } - - static int IoPuts(BIO* bio, const char* buf) noexcept { - Y_UNUSED(bio); - Y_UNUSED(buf); - return -2; - } - - static int IoGets(BIO* bio, char* buf, int size) noexcept { - Y_UNUSED(bio); - Y_UNUSED(buf); - Y_UNUSED(size); - return -2; - } - - static long IoCtrl(BIO* bio, int cmd, long larg, void* parg) noexcept { - Y_UNUSED(larg); - Y_UNUSED(parg); - - if (cmd == BIO_CTRL_FLUSH) { - IO(bio)->Flush(); - return 1; - } - - return -2; - } - - static int IoCreate(BIO* bio) noexcept { - BIO_set_data(bio, nullptr); - BIO_set_init(bio, 1); - return 1; - } - - static int IoDestroy(BIO* bio) noexcept { - BIO_set_data(bio, nullptr); - BIO_set_init(bio, 0); - return 1; - } - - static BIO_METHOD* CreateIoMethod() { - BIO_METHOD* method = BIO_meth_new(BIO_get_new_index() | BIO_TYPE_SOURCE_SINK, "SecureSocketImpl"); - BIO_meth_set_write(method, IoWrite); - BIO_meth_set_read(method, IoRead); - BIO_meth_set_puts(method, IoPuts); - BIO_meth_set_gets(method, IoGets); - BIO_meth_set_ctrl(method, IoCtrl); - BIO_meth_set_create(method, IoCreate); - BIO_meth_set_destroy(method, IoDestroy); - return method; - } - - static BIO_METHOD* IoMethod() { - static BIO_METHOD* method = CreateIoMethod(); - return method; - } - - TSslHolder<BIO> Bio; - TSslHolder<SSL_CTX> Ctx; - TSslHolder<SSL> Ssl; - - TSecureSocketImpl() = default; - - TSecureSocketImpl(TIntrusivePtr<TSocketDescriptor> socket) - : TPlainSocketImpl(std::move(socket)) - {} - - void InitClientSsl() { + return Socket->Socket.Recv(data, size); + } +}; + +struct TSecureSocketImpl : TPlainSocketImpl, TSslHelpers { + static TSecureSocketImpl* IO(BIO* bio) noexcept { + return static_cast<TSecureSocketImpl*>(BIO_get_data(bio)); + } + + static int IoWrite(BIO* bio, const char* data, int dlen) noexcept { + BIO_clear_retry_flags(bio); + int res = IO(bio)->Socket->Socket.Send(data, dlen); + if (-res == EAGAIN) { + BIO_set_retry_write(bio); + } + return res; + } + + static int IoRead(BIO* bio, char* data, int dlen) noexcept { + BIO_clear_retry_flags(bio); + int res = IO(bio)->Socket->Socket.Recv(data, dlen); + if (-res == EAGAIN) { + BIO_set_retry_read(bio); + } + return res; + } + + static int IoPuts(BIO* bio, const char* buf) noexcept { + Y_UNUSED(bio); + Y_UNUSED(buf); + return -2; + } + + static int IoGets(BIO* bio, char* buf, int size) noexcept { + Y_UNUSED(bio); + Y_UNUSED(buf); + Y_UNUSED(size); + return -2; + } + + static long IoCtrl(BIO* bio, int cmd, long larg, void* parg) noexcept { + Y_UNUSED(larg); + Y_UNUSED(parg); + + if (cmd == BIO_CTRL_FLUSH) { + IO(bio)->Flush(); + return 1; + } + + return -2; + } + + static int IoCreate(BIO* bio) noexcept { + BIO_set_data(bio, nullptr); + BIO_set_init(bio, 1); + return 1; + } + + static int IoDestroy(BIO* bio) noexcept { + BIO_set_data(bio, nullptr); + BIO_set_init(bio, 0); + return 1; + } + + static BIO_METHOD* CreateIoMethod() { + BIO_METHOD* method = BIO_meth_new(BIO_get_new_index() | BIO_TYPE_SOURCE_SINK, "SecureSocketImpl"); + BIO_meth_set_write(method, IoWrite); + BIO_meth_set_read(method, IoRead); + BIO_meth_set_puts(method, IoPuts); + BIO_meth_set_gets(method, IoGets); + BIO_meth_set_ctrl(method, IoCtrl); + BIO_meth_set_create(method, IoCreate); + BIO_meth_set_destroy(method, IoDestroy); + return method; + } + + static BIO_METHOD* IoMethod() { + static BIO_METHOD* method = CreateIoMethod(); + return method; + } + + TSslHolder<BIO> Bio; + TSslHolder<SSL_CTX> Ctx; + TSslHolder<SSL> Ssl; + + TSecureSocketImpl() = default; + + TSecureSocketImpl(TIntrusivePtr<TSocketDescriptor> socket) + : TPlainSocketImpl(std::move(socket)) + {} + + void InitClientSsl() { Bio.Reset(BIO_new(IoMethod())); - BIO_set_data(Bio.Get(), this); - BIO_set_nbio(Bio.Get(), 1); - Ctx = CreateClientContext(); - Ssl = ConstructSsl(Ctx.Get(), Bio.Get()); - SSL_set_connect_state(Ssl.Get()); - } - - void InitServerSsl(SSL_CTX* ctx) { + BIO_set_data(Bio.Get(), this); + BIO_set_nbio(Bio.Get(), 1); + Ctx = CreateClientContext(); + Ssl = ConstructSsl(Ctx.Get(), Bio.Get()); + SSL_set_connect_state(Ssl.Get()); + } + + void InitServerSsl(SSL_CTX* ctx) { Bio.Reset(BIO_new(IoMethod())); - BIO_set_data(Bio.Get(), this); - BIO_set_nbio(Bio.Get(), 1); - Ssl = ConstructSsl(ctx, Bio.Get()); - SSL_set_accept_state(Ssl.Get()); - } - - void Flush() {} - + BIO_set_data(Bio.Get(), this); + BIO_set_nbio(Bio.Get(), 1); + Ssl = ConstructSsl(ctx, Bio.Get()); + SSL_set_accept_state(Ssl.Get()); + } + + void Flush() {} + ssize_t Send(const void* data, size_t size, bool& read, bool& write) { - ssize_t res = SSL_write(Ssl.Get(), data, size); - if (res < 0) { - res = SSL_get_error(Ssl.Get(), res); - switch(res) { - case SSL_ERROR_WANT_READ: + ssize_t res = SSL_write(Ssl.Get(), data, size); + if (res < 0) { + res = SSL_get_error(Ssl.Get(), res); + switch(res) { + case SSL_ERROR_WANT_READ: read = true; return -EAGAIN; - case SSL_ERROR_WANT_WRITE: + case SSL_ERROR_WANT_WRITE: write = true; - return -EAGAIN; - default: - return -EIO; - } - } - return res; - } - + return -EAGAIN; + default: + return -EIO; + } + } + return res; + } + ssize_t Recv(void* data, size_t size, bool& read, bool& write) { - ssize_t res = SSL_read(Ssl.Get(), data, size); - if (res < 0) { - res = SSL_get_error(Ssl.Get(), res); - switch(res) { - case SSL_ERROR_WANT_READ: + ssize_t res = SSL_read(Ssl.Get(), data, size); + if (res < 0) { + res = SSL_get_error(Ssl.Get(), res); + switch(res) { + case SSL_ERROR_WANT_READ: read = true; return -EAGAIN; - case SSL_ERROR_WANT_WRITE: + case SSL_ERROR_WANT_WRITE: write = true; - return -EAGAIN; - default: - return -EIO; - } - } - return res; - } - + return -EAGAIN; + default: + return -EIO; + } + } + return res; + } + int OnConnect(bool& read, bool& write) { - if (!Ssl) { - InitClientSsl(); - } - int res = SSL_connect(Ssl.Get()); + if (!Ssl) { + InitClientSsl(); + } + int res = SSL_connect(Ssl.Get()); if (res <= 0) { - res = SSL_get_error(Ssl.Get(), res); - switch(res) { - case SSL_ERROR_WANT_READ: + res = SSL_get_error(Ssl.Get(), res); + switch(res) { + case SSL_ERROR_WANT_READ: read = true; return -EAGAIN; - case SSL_ERROR_WANT_WRITE: + case SSL_ERROR_WANT_WRITE: write = true; - return -EAGAIN; - default: - return -EIO; - } - } - return res; - } - + return -EAGAIN; + default: + return -EIO; + } + } + return res; + } + int OnAccept(const TEndpointInfo& endpoint, bool& read, bool& write) { - if (!Ssl) { - InitServerSsl(endpoint.SecureContext.Get()); - } - int res = SSL_accept(Ssl.Get()); + if (!Ssl) { + InitServerSsl(endpoint.SecureContext.Get()); + } + int res = SSL_accept(Ssl.Get()); if (res <= 0) { - res = SSL_get_error(Ssl.Get(), res); - switch(res) { - case SSL_ERROR_WANT_READ: + res = SSL_get_error(Ssl.Get(), res); + switch(res) { + case SSL_ERROR_WANT_READ: read = true; return -EAGAIN; - case SSL_ERROR_WANT_WRITE: + case SSL_ERROR_WANT_WRITE: write = true; - return -EAGAIN; - default: - return -EIO; - } - } - return res; - } -}; - -} + return -EAGAIN; + default: + return -EIO; + } + } + return res; + } +}; + +} diff --git a/library/cpp/actors/http/http_proxy_ssl.h b/library/cpp/actors/http/http_proxy_ssl.h index ffce12997f..12fb372b3c 100644 --- a/library/cpp/actors/http/http_proxy_ssl.h +++ b/library/cpp/actors/http/http_proxy_ssl.h @@ -1,22 +1,22 @@ -#pragma once - -#include <openssl/bio.h> -#include <openssl/ssl.h> -#include <openssl/err.h> -#include <openssl/tls1.h> - -namespace NHttp { - -struct TSslHelpers { - struct TSslDestroy { - static void Destroy(SSL_CTX* ctx) noexcept { - SSL_CTX_free(ctx); - } - - static void Destroy(SSL* ssl) noexcept { - SSL_free(ssl); - } - +#pragma once + +#include <openssl/bio.h> +#include <openssl/ssl.h> +#include <openssl/err.h> +#include <openssl/tls1.h> + +namespace NHttp { + +struct TSslHelpers { + struct TSslDestroy { + static void Destroy(SSL_CTX* ctx) noexcept { + SSL_CTX_free(ctx); + } + + static void Destroy(SSL* ssl) noexcept { + SSL_free(ssl); + } + static void Destroy(X509* cert) noexcept { X509_free(cert); } @@ -25,48 +25,48 @@ struct TSslHelpers { EVP_PKEY_free(pkey); } - static void Destroy(BIO* bio) noexcept { - BIO_free(bio); - } - }; - - template <typename T> - using TSslHolder = THolder<T, TSslDestroy>; - - static TSslHolder<SSL_CTX> CreateSslCtx(const SSL_METHOD* method) { - TSslHolder<SSL_CTX> ctx(SSL_CTX_new(method)); - - if (ctx) { - SSL_CTX_set_options(ctx.Get(), SSL_OP_NO_SSLv2); - SSL_CTX_set_options(ctx.Get(), SSL_OP_NO_SSLv3); - SSL_CTX_set_options(ctx.Get(), SSL_OP_MICROSOFT_SESS_ID_BUG); - SSL_CTX_set_options(ctx.Get(), SSL_OP_NETSCAPE_CHALLENGE_BUG); - } - - return ctx; - } - - static TSslHolder<SSL_CTX> CreateClientContext() { - return CreateSslCtx(SSLv23_client_method()); - } - - static TSslHolder<SSL_CTX> CreateServerContext(const TString& certificate, const TString& key) { - TSslHolder<SSL_CTX> ctx = CreateSslCtx(SSLv23_server_method()); - SSL_CTX_set_ecdh_auto(ctx.Get(), 1); - int res; - res = SSL_CTX_use_certificate_chain_file(ctx.Get(), certificate.c_str()); - if (res < 0) { - // TODO(xenoxeno): more diagnostics? - return nullptr; - } - res = SSL_CTX_use_PrivateKey_file(ctx.Get(), key.c_str(), SSL_FILETYPE_PEM); - if (res < 0) { - // TODO(xenoxeno): more diagnostics? - return nullptr; - } - return ctx; - } - + static void Destroy(BIO* bio) noexcept { + BIO_free(bio); + } + }; + + template <typename T> + using TSslHolder = THolder<T, TSslDestroy>; + + static TSslHolder<SSL_CTX> CreateSslCtx(const SSL_METHOD* method) { + TSslHolder<SSL_CTX> ctx(SSL_CTX_new(method)); + + if (ctx) { + SSL_CTX_set_options(ctx.Get(), SSL_OP_NO_SSLv2); + SSL_CTX_set_options(ctx.Get(), SSL_OP_NO_SSLv3); + SSL_CTX_set_options(ctx.Get(), SSL_OP_MICROSOFT_SESS_ID_BUG); + SSL_CTX_set_options(ctx.Get(), SSL_OP_NETSCAPE_CHALLENGE_BUG); + } + + return ctx; + } + + static TSslHolder<SSL_CTX> CreateClientContext() { + return CreateSslCtx(SSLv23_client_method()); + } + + static TSslHolder<SSL_CTX> CreateServerContext(const TString& certificate, const TString& key) { + TSslHolder<SSL_CTX> ctx = CreateSslCtx(SSLv23_server_method()); + SSL_CTX_set_ecdh_auto(ctx.Get(), 1); + int res; + res = SSL_CTX_use_certificate_chain_file(ctx.Get(), certificate.c_str()); + if (res < 0) { + // TODO(xenoxeno): more diagnostics? + return nullptr; + } + res = SSL_CTX_use_PrivateKey_file(ctx.Get(), key.c_str(), SSL_FILETYPE_PEM); + if (res < 0) { + // TODO(xenoxeno): more diagnostics? + return nullptr; + } + return ctx; + } + static bool LoadX509Chain(TSslHolder<SSL_CTX>& ctx, const TString& pem) { TSslHolder<BIO> bio(BIO_new_mem_buf(pem.c_str(), pem.size())); if (bio == nullptr) { @@ -116,16 +116,16 @@ struct TSslHelpers { return ctx; } - static TSslHolder<SSL> ConstructSsl(SSL_CTX* ctx, BIO* bio) { - TSslHolder<SSL> ssl(SSL_new(ctx)); - - if (ssl) { - BIO_up_ref(bio); // SSL_set_bio consumes only one reference if rbio and wbio are the same - SSL_set_bio(ssl.Get(), bio, bio); - } - - return ssl; - } -}; - -} + static TSslHolder<SSL> ConstructSsl(SSL_CTX* ctx, BIO* bio) { + TSslHolder<SSL> ssl(SSL_new(ctx)); + + if (ssl) { + BIO_up_ref(bio); // SSL_set_bio consumes only one reference if rbio and wbio are the same + SSL_set_bio(ssl.Get(), bio, bio); + } + + return ssl; + } +}; + +} diff --git a/library/cpp/actors/http/http_static.cpp b/library/cpp/actors/http/http_static.cpp index c075c5f693..452b0a8498 100644 --- a/library/cpp/actors/http/http_static.cpp +++ b/library/cpp/actors/http/http_static.cpp @@ -5,91 +5,91 @@ #include <library/cpp/actors/core/scheduler_basic.h> #include <library/cpp/actors/http/http.h> #include <library/cpp/resource/resource.h> -#include <util/folder/path.h> -#include <util/stream/file.h> - -namespace NHttp { - -class THttpStaticContentHandler : public NActors::TActor<THttpStaticContentHandler> { -public: - using TBase = NActors::TActor<THttpStaticContentHandler>; - const TFsPath URL; - const TFsPath FilePath; - const TFsPath ResourcePath; - const TFsPath Index; - - THttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index) - : TBase(&THttpStaticContentHandler::StateWork) - , URL(url) - , FilePath(filePath) - , ResourcePath(resourcePath) - , Index(index) - {} - - static TInstant GetCompileTime() { - tm compileTime; - strptime(__DATE__ " " __TIME__, "%B %d %Y %H:%M:%S", &compileTime); - return TInstant::Seconds(mktime(&compileTime)); - } - - void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) { - THttpOutgoingResponsePtr response; - if (event->Get()->Request->Method != "GET") { - response = event->Get()->Request->CreateResponseBadRequest("Wrong request"); - ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); - return; - } - TFsPath url(event->Get()->Request->URL.Before('?')); - if (!url.IsAbsolute()) { - response = event->Get()->Request->CreateResponseBadRequest("Completely wrong URL"); - ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); - return; - } - if (url.GetPath().EndsWith('/') && Index.IsDefined()) { - url /= Index; - } - url = url.RelativeTo(URL); - try { - // TODO: caching? - TString contentType = mimetypeByExt(url.GetExtension().c_str()); - TString data; - TFileStat filestat; - TFsPath resourcename(ResourcePath / url); - if (NResource::FindExact(resourcename.GetPath(), &data)) { - static TInstant compileTime(GetCompileTime()); - filestat.MTime = compileTime.Seconds(); - } else { - TFsPath filename(FilePath / url); - if (!filename.IsSubpathOf(FilePath) && filename != FilePath) { - response = event->Get()->Request->CreateResponseBadRequest("Wrong URL"); - ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); - return; - } - if (filename.Stat(filestat) && filestat.IsFile()) { - data = TUnbufferedFileInput(filename).ReadAll(); - } - } - if (!filestat.IsNull()) { - response = event->Get()->Request->CreateResponseOK(data, contentType, TInstant::Seconds(filestat.MTime)); - } else { - response = event->Get()->Request->CreateResponseNotFound("File not found"); - } - } - catch (const yexception&) { - response = event->Get()->Request->CreateResponseServiceUnavailable("Not available"); - } - ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); - } - - STFUNC(StateWork) { - switch (ev->GetTypeRewrite()) { - HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); - } - } -}; - -NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index) { - return new THttpStaticContentHandler(url, filePath, resourcePath, index); -} - -} +#include <util/folder/path.h> +#include <util/stream/file.h> + +namespace NHttp { + +class THttpStaticContentHandler : public NActors::TActor<THttpStaticContentHandler> { +public: + using TBase = NActors::TActor<THttpStaticContentHandler>; + const TFsPath URL; + const TFsPath FilePath; + const TFsPath ResourcePath; + const TFsPath Index; + + THttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index) + : TBase(&THttpStaticContentHandler::StateWork) + , URL(url) + , FilePath(filePath) + , ResourcePath(resourcePath) + , Index(index) + {} + + static TInstant GetCompileTime() { + tm compileTime; + strptime(__DATE__ " " __TIME__, "%B %d %Y %H:%M:%S", &compileTime); + return TInstant::Seconds(mktime(&compileTime)); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) { + THttpOutgoingResponsePtr response; + if (event->Get()->Request->Method != "GET") { + response = event->Get()->Request->CreateResponseBadRequest("Wrong request"); + ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); + return; + } + TFsPath url(event->Get()->Request->URL.Before('?')); + if (!url.IsAbsolute()) { + response = event->Get()->Request->CreateResponseBadRequest("Completely wrong URL"); + ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); + return; + } + if (url.GetPath().EndsWith('/') && Index.IsDefined()) { + url /= Index; + } + url = url.RelativeTo(URL); + try { + // TODO: caching? + TString contentType = mimetypeByExt(url.GetExtension().c_str()); + TString data; + TFileStat filestat; + TFsPath resourcename(ResourcePath / url); + if (NResource::FindExact(resourcename.GetPath(), &data)) { + static TInstant compileTime(GetCompileTime()); + filestat.MTime = compileTime.Seconds(); + } else { + TFsPath filename(FilePath / url); + if (!filename.IsSubpathOf(FilePath) && filename != FilePath) { + response = event->Get()->Request->CreateResponseBadRequest("Wrong URL"); + ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); + return; + } + if (filename.Stat(filestat) && filestat.IsFile()) { + data = TUnbufferedFileInput(filename).ReadAll(); + } + } + if (!filestat.IsNull()) { + response = event->Get()->Request->CreateResponseOK(data, contentType, TInstant::Seconds(filestat.MTime)); + } else { + response = event->Get()->Request->CreateResponseNotFound("File not found"); + } + } + catch (const yexception&) { + response = event->Get()->Request->CreateResponseServiceUnavailable("Not available"); + } + ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); + } + + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); + } + } +}; + +NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index) { + return new THttpStaticContentHandler(url, filePath, resourcePath, index); +} + +} diff --git a/library/cpp/actors/http/http_static.h b/library/cpp/actors/http/http_static.h index f91e15dfb1..f2ee13d003 100644 --- a/library/cpp/actors/http/http_static.h +++ b/library/cpp/actors/http/http_static.h @@ -1,9 +1,9 @@ -#pragma once +#pragma once #include <library/cpp/actors/core/actor.h> -#include "http.h" - -namespace NHttp { - -NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index = TString()); - -} +#include "http.h" + +namespace NHttp { + +NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index = TString()); + +} diff --git a/library/cpp/actors/http/http_ut.cpp b/library/cpp/actors/http/http_ut.cpp index 4c922f8d0f..209f61f4de 100644 --- a/library/cpp/actors/http/http_ut.cpp +++ b/library/cpp/actors/http/http_ut.cpp @@ -3,86 +3,86 @@ #include <library/cpp/actors/core/executor_pool_basic.h> #include <library/cpp/actors/core/scheduler_basic.h> #include <library/cpp/actors/testlib/test_runtime.h> -#include <util/system/tempfile.h> -#include "http.h" -#include "http_proxy.h" - - - -enum EService : NActors::NLog::EComponent { - MIN, - Logger, - MVP, - MAX -}; - -namespace { - -template <typename HttpType> -void EatWholeString(TIntrusivePtr<HttpType>& request, const TString& data) { - request->EnsureEnoughSpaceAvailable(data.size()); - auto size = std::min(request->Avail(), data.size()); - memcpy(request->Pos(), data.data(), size); - request->Advance(size); -} - -template <typename HttpType> -void EatPartialString(TIntrusivePtr<HttpType>& request, const TString& data) { - for (char c : data) { - request->EnsureEnoughSpaceAvailable(1); - memcpy(request->Pos(), &c, 1); - request->Advance(1); - } -} - -} - +#include <util/system/tempfile.h> +#include "http.h" +#include "http_proxy.h" + + + +enum EService : NActors::NLog::EComponent { + MIN, + Logger, + MVP, + MAX +}; + +namespace { + +template <typename HttpType> +void EatWholeString(TIntrusivePtr<HttpType>& request, const TString& data) { + request->EnsureEnoughSpaceAvailable(data.size()); + auto size = std::min(request->Avail(), data.size()); + memcpy(request->Pos(), data.data(), size); + request->Advance(size); +} + +template <typename HttpType> +void EatPartialString(TIntrusivePtr<HttpType>& request, const TString& data) { + for (char c : data) { + request->EnsureEnoughSpaceAvailable(1); + memcpy(request->Pos(), &c, 1); + request->Advance(1); + } +} + +} + Y_UNIT_TEST_SUITE(HttpProxy) { Y_UNIT_TEST(BasicParsing) { - NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest(); - EatWholeString(request, "GET /test HTTP/1.1\r\nHost: test\r\nSome-Header: 32344\r\n\r\n"); - UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done); - UNIT_ASSERT_EQUAL(request->Method, "GET"); - UNIT_ASSERT_EQUAL(request->URL, "/test"); - UNIT_ASSERT_EQUAL(request->Protocol, "HTTP"); - UNIT_ASSERT_EQUAL(request->Version, "1.1"); - UNIT_ASSERT_EQUAL(request->Host, "test"); - UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n"); - } - + NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest(); + EatWholeString(request, "GET /test HTTP/1.1\r\nHost: test\r\nSome-Header: 32344\r\n\r\n"); + UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done); + UNIT_ASSERT_EQUAL(request->Method, "GET"); + UNIT_ASSERT_EQUAL(request->URL, "/test"); + UNIT_ASSERT_EQUAL(request->Protocol, "HTTP"); + UNIT_ASSERT_EQUAL(request->Version, "1.1"); + UNIT_ASSERT_EQUAL(request->Host, "test"); + UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n"); + } + Y_UNIT_TEST(BasicParsingChunkedBody) { - NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest(); - NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request); - EatWholeString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nthis\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n"); - UNIT_ASSERT_EQUAL(response->Stage, NHttp::THttpIncomingResponse::EParseStage::Done); - UNIT_ASSERT_EQUAL(response->Status, "200"); - UNIT_ASSERT_EQUAL(response->Connection, "close"); - UNIT_ASSERT_EQUAL(response->Protocol, "HTTP"); - UNIT_ASSERT_EQUAL(response->Version, "1.1"); - UNIT_ASSERT_EQUAL(response->TransferEncoding, "chunked"); - UNIT_ASSERT_EQUAL(response->Body, "this is test."); - } - - Y_UNIT_TEST(InvalidParsingChunkedBody) { - NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest(); - NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request); - EatWholeString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n5\r\nthis\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n"); - UNIT_ASSERT(response->IsError()); - } - - Y_UNIT_TEST(AdvancedParsingChunkedBody) { - NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest(); - NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request); - EatWholeString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\nthis\r\n\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n"); - UNIT_ASSERT_EQUAL(response->Stage, NHttp::THttpIncomingResponse::EParseStage::Done); - UNIT_ASSERT_EQUAL(response->Status, "200"); - UNIT_ASSERT_EQUAL(response->Connection, "close"); - UNIT_ASSERT_EQUAL(response->Protocol, "HTTP"); - UNIT_ASSERT_EQUAL(response->Version, "1.1"); - UNIT_ASSERT_EQUAL(response->TransferEncoding, "chunked"); - UNIT_ASSERT_EQUAL(response->Body, "this\r\n is test."); - } - + NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest(); + NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request); + EatWholeString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nthis\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n"); + UNIT_ASSERT_EQUAL(response->Stage, NHttp::THttpIncomingResponse::EParseStage::Done); + UNIT_ASSERT_EQUAL(response->Status, "200"); + UNIT_ASSERT_EQUAL(response->Connection, "close"); + UNIT_ASSERT_EQUAL(response->Protocol, "HTTP"); + UNIT_ASSERT_EQUAL(response->Version, "1.1"); + UNIT_ASSERT_EQUAL(response->TransferEncoding, "chunked"); + UNIT_ASSERT_EQUAL(response->Body, "this is test."); + } + + Y_UNIT_TEST(InvalidParsingChunkedBody) { + NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest(); + NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request); + EatWholeString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n5\r\nthis\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n"); + UNIT_ASSERT(response->IsError()); + } + + Y_UNIT_TEST(AdvancedParsingChunkedBody) { + NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest(); + NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request); + EatWholeString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\nthis\r\n\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n"); + UNIT_ASSERT_EQUAL(response->Stage, NHttp::THttpIncomingResponse::EParseStage::Done); + UNIT_ASSERT_EQUAL(response->Status, "200"); + UNIT_ASSERT_EQUAL(response->Connection, "close"); + UNIT_ASSERT_EQUAL(response->Protocol, "HTTP"); + UNIT_ASSERT_EQUAL(response->Version, "1.1"); + UNIT_ASSERT_EQUAL(response->TransferEncoding, "chunked"); + UNIT_ASSERT_EQUAL(response->Body, "this\r\n is test."); + } + Y_UNIT_TEST(CreateRepsonseWithCompressedBody) { NHttp::THttpIncomingRequestPtr request = nullptr; NHttp::THttpOutgoingResponsePtr response = new NHttp::THttpOutgoingResponse(request, "HTTP", "1.1", "200", "OK"); @@ -95,264 +95,264 @@ Y_UNIT_TEST_SUITE(HttpProxy) { } Y_UNIT_TEST(BasicPartialParsing) { - NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest(); - EatPartialString(request, "GET /test HTTP/1.1\r\nHost: test\r\nSome-Header: 32344\r\n\r\n"); - UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done); - UNIT_ASSERT_EQUAL(request->Method, "GET"); - UNIT_ASSERT_EQUAL(request->URL, "/test"); - UNIT_ASSERT_EQUAL(request->Protocol, "HTTP"); - UNIT_ASSERT_EQUAL(request->Version, "1.1"); - UNIT_ASSERT_EQUAL(request->Host, "test"); - UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n"); - } - + NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest(); + EatPartialString(request, "GET /test HTTP/1.1\r\nHost: test\r\nSome-Header: 32344\r\n\r\n"); + UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done); + UNIT_ASSERT_EQUAL(request->Method, "GET"); + UNIT_ASSERT_EQUAL(request->URL, "/test"); + UNIT_ASSERT_EQUAL(request->Protocol, "HTTP"); + UNIT_ASSERT_EQUAL(request->Version, "1.1"); + UNIT_ASSERT_EQUAL(request->Host, "test"); + UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n"); + } + Y_UNIT_TEST(BasicPartialParsingChunkedBody) { - NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest(); - NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request); - EatPartialString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nthis\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n"); - UNIT_ASSERT_EQUAL(response->Stage, NHttp::THttpIncomingResponse::EParseStage::Done); - UNIT_ASSERT_EQUAL(response->Status, "200"); - UNIT_ASSERT_EQUAL(response->Connection, "close"); - UNIT_ASSERT_EQUAL(response->Protocol, "HTTP"); - UNIT_ASSERT_EQUAL(response->Version, "1.1"); - UNIT_ASSERT_EQUAL(response->TransferEncoding, "chunked"); - UNIT_ASSERT_EQUAL(response->Body, "this is test."); - } - + NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest(); + NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request); + EatPartialString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nthis\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n"); + UNIT_ASSERT_EQUAL(response->Stage, NHttp::THttpIncomingResponse::EParseStage::Done); + UNIT_ASSERT_EQUAL(response->Status, "200"); + UNIT_ASSERT_EQUAL(response->Connection, "close"); + UNIT_ASSERT_EQUAL(response->Protocol, "HTTP"); + UNIT_ASSERT_EQUAL(response->Version, "1.1"); + UNIT_ASSERT_EQUAL(response->TransferEncoding, "chunked"); + UNIT_ASSERT_EQUAL(response->Body, "this is test."); + } + Y_UNIT_TEST(AdvancedParsing) { - NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest(); - EatWholeString(request, "GE"); - EatWholeString(request, "T"); - EatWholeString(request, " "); - EatWholeString(request, "/test"); - EatWholeString(request, " HTTP/1.1\r"); - EatWholeString(request, "\nHo"); - EatWholeString(request, "st: test"); - EatWholeString(request, "\r\n"); - EatWholeString(request, "Some-Header: 32344\r\n\r"); - EatWholeString(request, "\n"); - UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done); - UNIT_ASSERT_EQUAL(request->Method, "GET"); - UNIT_ASSERT_EQUAL(request->URL, "/test"); - UNIT_ASSERT_EQUAL(request->Protocol, "HTTP"); - UNIT_ASSERT_EQUAL(request->Version, "1.1"); - UNIT_ASSERT_EQUAL(request->Host, "test"); - UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n"); - } - + NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest(); + EatWholeString(request, "GE"); + EatWholeString(request, "T"); + EatWholeString(request, " "); + EatWholeString(request, "/test"); + EatWholeString(request, " HTTP/1.1\r"); + EatWholeString(request, "\nHo"); + EatWholeString(request, "st: test"); + EatWholeString(request, "\r\n"); + EatWholeString(request, "Some-Header: 32344\r\n\r"); + EatWholeString(request, "\n"); + UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done); + UNIT_ASSERT_EQUAL(request->Method, "GET"); + UNIT_ASSERT_EQUAL(request->URL, "/test"); + UNIT_ASSERT_EQUAL(request->Protocol, "HTTP"); + UNIT_ASSERT_EQUAL(request->Version, "1.1"); + UNIT_ASSERT_EQUAL(request->Host, "test"); + UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n"); + } + Y_UNIT_TEST(AdvancedPartialParsing) { - NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest(); - EatPartialString(request, "GE"); - EatPartialString(request, "T"); - EatPartialString(request, " "); - EatPartialString(request, "/test"); - EatPartialString(request, " HTTP/1.1\r"); - EatPartialString(request, "\nHo"); - EatPartialString(request, "st: test"); - EatPartialString(request, "\r\n"); - EatPartialString(request, "Some-Header: 32344\r\n\r"); - EatPartialString(request, "\n"); - UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done); - UNIT_ASSERT_EQUAL(request->Method, "GET"); - UNIT_ASSERT_EQUAL(request->URL, "/test"); - UNIT_ASSERT_EQUAL(request->Protocol, "HTTP"); - UNIT_ASSERT_EQUAL(request->Version, "1.1"); - UNIT_ASSERT_EQUAL(request->Host, "test"); - UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n"); - } - - Y_UNIT_TEST(BasicRenderBodyWithHeadersAndCookies) { - NHttp::THttpOutgoingRequestPtr request = NHttp::THttpOutgoingRequest::CreateRequestGet("http://www.yandex.ru/data/url"); - NHttp::THeadersBuilder headers; - NHttp::TCookiesBuilder cookies; - cookies.Set("cookie1", "123456"); - cookies.Set("cookie2", "45678"); - headers.Set("Cookie", cookies.Render()); - request->Set(headers); - TString requestData; - request->AsString(requestData); - UNIT_ASSERT_VALUES_EQUAL(requestData, "GET /data/url HTTP/1.1\r\nHost: www.yandex.ru\r\nAccept: */*\r\nCookie: cookie1=123456; cookie2=45678;\r\n"); - } - + NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest(); + EatPartialString(request, "GE"); + EatPartialString(request, "T"); + EatPartialString(request, " "); + EatPartialString(request, "/test"); + EatPartialString(request, " HTTP/1.1\r"); + EatPartialString(request, "\nHo"); + EatPartialString(request, "st: test"); + EatPartialString(request, "\r\n"); + EatPartialString(request, "Some-Header: 32344\r\n\r"); + EatPartialString(request, "\n"); + UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done); + UNIT_ASSERT_EQUAL(request->Method, "GET"); + UNIT_ASSERT_EQUAL(request->URL, "/test"); + UNIT_ASSERT_EQUAL(request->Protocol, "HTTP"); + UNIT_ASSERT_EQUAL(request->Version, "1.1"); + UNIT_ASSERT_EQUAL(request->Host, "test"); + UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n"); + } + + Y_UNIT_TEST(BasicRenderBodyWithHeadersAndCookies) { + NHttp::THttpOutgoingRequestPtr request = NHttp::THttpOutgoingRequest::CreateRequestGet("http://www.yandex.ru/data/url"); + NHttp::THeadersBuilder headers; + NHttp::TCookiesBuilder cookies; + cookies.Set("cookie1", "123456"); + cookies.Set("cookie2", "45678"); + headers.Set("Cookie", cookies.Render()); + request->Set(headers); + TString requestData; + request->AsString(requestData); + UNIT_ASSERT_VALUES_EQUAL(requestData, "GET /data/url HTTP/1.1\r\nHost: www.yandex.ru\r\nAccept: */*\r\nCookie: cookie1=123456; cookie2=45678;\r\n"); + } + Y_UNIT_TEST(BasicRunning) { NActors::TTestActorRuntimeBase actorSystem; - TPortManager portManager; - TIpPort port = portManager.GetTcpPort(); - TAutoPtr<NActors::IEventHandle> handle; + TPortManager portManager; + TIpPort port = portManager.GetTcpPort(); + TAutoPtr<NActors::IEventHandle> handle; actorSystem.Initialize(); NMonitoring::TMetricRegistry sensors; - - NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors); + + NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors); NActors::TActorId proxyId = actorSystem.Register(proxy); actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true); - actorSystem.DispatchEvents(); - + actorSystem.DispatchEvents(); + NActors::TActorId serverId = actorSystem.AllocateEdgeActor(); - actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true); - + actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true); + NActors::TActorId clientId = actorSystem.AllocateEdgeActor(); - NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("http://[::1]:" + ToString(port) + "/test"); - actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true); - - NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle); - - UNIT_ASSERT_EQUAL(request->Request->URL, "/test"); - - NHttp::THttpOutgoingResponsePtr httpResponse = request->Request->CreateResponseString("HTTP/1.1 200 Found\r\nConnection: Close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\npassed\r\n0\r\n\r\n"); - actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true); - - NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle); - - UNIT_ASSERT_EQUAL(response->Response->Status, "200"); - UNIT_ASSERT_EQUAL(response->Response->Body, "passed"); - } - - Y_UNIT_TEST(TlsRunning) { - NActors::TTestActorRuntimeBase actorSystem; - TPortManager portManager; - TIpPort port = portManager.GetTcpPort(); - TAutoPtr<NActors::IEventHandle> handle; - actorSystem.Initialize(); + NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("http://[::1]:" + ToString(port) + "/test"); + actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true); + + NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle); + + UNIT_ASSERT_EQUAL(request->Request->URL, "/test"); + + NHttp::THttpOutgoingResponsePtr httpResponse = request->Request->CreateResponseString("HTTP/1.1 200 Found\r\nConnection: Close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\npassed\r\n0\r\n\r\n"); + actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true); + + NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle); + + UNIT_ASSERT_EQUAL(response->Response->Status, "200"); + UNIT_ASSERT_EQUAL(response->Response->Body, "passed"); + } + + Y_UNIT_TEST(TlsRunning) { + NActors::TTestActorRuntimeBase actorSystem; + TPortManager portManager; + TIpPort port = portManager.GetTcpPort(); + TAutoPtr<NActors::IEventHandle> handle; + actorSystem.Initialize(); NMonitoring::TMetricRegistry sensors; - - TString certificateContent = R"___(-----BEGIN PRIVATE KEY----- -MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCzRZjodO7Aqe1w -RyOj6kG6g2nn8ZGAxfao4mLT0jDTbVksrhV/h2s3uldLkFo5WrNQ8WZe+iIbXeFL -s8tO6hslzreo9sih2IHoRcH5KnS/6YTqVhRTJb1jE2dM8NwYbwTi+T2Pe0FrBPjI -kgVO50gAtYl9C+fc715uZiSKW+rRlP5OoFTwxrOjiU27RPZjFYyWK9wTI1Es9uRr -lbZbLl5cY6dK2J1AViRraaYKCWO26VbOPWLsY4OD3e+ZXIc3OMCz6Yb0wmRPeJ60 -bbbkGfI8O27kDdv69MAWHIm0yYMzKEnom1dce7rNQNDEqJfocsYIsg+EvayT1yQ9 -KTBegw7LAgMBAAECggEBAKaOCrotqYQmXArsjRhFFDwMy+BKdzyEr93INrlFl0dX -WHpCYobRcbOc1G3H94tB0UdqgAnNqtJyLlb+++ydZAuEOu4oGc8EL+10ofq0jzOd -6Xct8kQt0/6wkFDTlii9PHUDy0X65ZRgUiNGRtg/2I2QG+SpowmI+trm2xwQueFs -VaWrjc3cVvXx0b8Lu7hqZUv08kgC38stzuRk/n2T5VWSAr7Z4ZWQbO918Dv35HUw -Wy/0jNUFP9CBCvFJ4l0OoH9nYhWFG+HXWzNdw6/Hca4jciRKo6esCiOZ9uWYv/ec -/NvX9rgFg8G8/SrTisX10+Bbeq+R1RKwq/IG409TH4ECgYEA14L+3QsgNIUMeYAx -jSCyk22R/tOHI1BM+GtKPUhnwHlAssrcPcxXMJovl6WL93VauYjym0wpCz9urSpA -I2CqTsG8GYciA6Dr3mHgD6cK0jj9UPAU6EnZ5S0mjhPqKZqutu9QegzD2uESvuN8 -36xezwQthzAf0nI/P3sJGjVXjikCgYEA1POm5xcV6SmM6HnIdadEebhzZIJ9TXQz -ry3Jj3a7CKyD5C7fAdkHUTCjgT/2ElxPi9ABkZnC+d/cW9GtJFa0II5qO/agm3KQ -ZXYiutu9A7xACHYFXRiJEjVUdGG9dKMVOHUEa8IHEgrrcUVM/suy/GgutywIfaXs -y58IFP24K9MCgYEAk6zjz7wL+XEiNy+sxLQfKf7vB9sSwxQHakK6wHuY/L8Zomp3 -uLEJHfjJm/SIkK0N2g0JkXkCtv5kbKyC/rsCeK0wo52BpVLjzaLr0k34kE0U6B1b -dkEE2pGx1bG3x4KDLj+Wuct9ecK5Aa0IqIyI+vo16GkFpUM8K9e3SQo8UOECgYEA -sCZYAkILYtJ293p9giz5rIISGasDAUXE1vxWBXEeJ3+kneTTnZCrx9Im/ewtnWR0 -fF90XL9HFDDD88POqAd8eo2zfKR2l/89SGBfPBg2EtfuU9FkgGyiPciVcqvC7q9U -B15saMKX3KnhtdGwbfeLt9RqCCTJZT4SUSDcq5hwdvcCgYAxY4Be8mNipj8Cgg22 -mVWSolA0TEzbtUcNk6iGodpi+Z0LKpsPC0YRqPRyh1K+rIltG1BVdmUBHcMlOYxl -lWWvbJH6PkJWy4n2MF7PO45kjN3pPZg4hgH63JjZeAineBwEArUGb9zHnvzcdRvF -wuQ2pZHL/HJ0laUSieHDJ5917w== ------END PRIVATE KEY----- - - ------BEGIN CERTIFICATE----- -MIIDjTCCAnWgAwIBAgIURt5IBx0J3xgEaQvmyrFH2A+NkpMwDQYJKoZIhvcNAQEL -BQAwVjELMAkGA1UEBhMCUlUxDzANBgNVBAgMBk1vc2NvdzEPMA0GA1UEBwwGTW9z -Y293MQ8wDQYDVQQKDAZZYW5kZXgxFDASBgNVBAMMC3Rlc3Qtc2VydmVyMB4XDTE5 -MDkyMDE3MTQ0MVoXDTQ3MDIwNDE3MTQ0MVowVjELMAkGA1UEBhMCUlUxDzANBgNV -BAgMBk1vc2NvdzEPMA0GA1UEBwwGTW9zY293MQ8wDQYDVQQKDAZZYW5kZXgxFDAS -BgNVBAMMC3Rlc3Qtc2VydmVyMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKC -AQEAs0WY6HTuwKntcEcjo+pBuoNp5/GRgMX2qOJi09Iw021ZLK4Vf4drN7pXS5Ba -OVqzUPFmXvoiG13hS7PLTuobJc63qPbIodiB6EXB+Sp0v+mE6lYUUyW9YxNnTPDc -GG8E4vk9j3tBawT4yJIFTudIALWJfQvn3O9ebmYkilvq0ZT+TqBU8Mazo4lNu0T2 -YxWMlivcEyNRLPbka5W2Wy5eXGOnStidQFYka2mmCgljtulWzj1i7GODg93vmVyH -NzjAs+mG9MJkT3ietG225BnyPDtu5A3b+vTAFhyJtMmDMyhJ6JtXXHu6zUDQxKiX -6HLGCLIPhL2sk9ckPSkwXoMOywIDAQABo1MwUTAdBgNVHQ4EFgQUDv/xuJ4CvCgG -fPrZP3hRAt2+/LwwHwYDVR0jBBgwFoAUDv/xuJ4CvCgGfPrZP3hRAt2+/LwwDwYD -VR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAinKpMYaA2tjLpAnPVbjy -/ZxSBhhB26RiQp3Re8XOKyhTWqgYE6kldYT0aXgK9x9mPC5obQannDDYxDc7lX+/ -qP/u1X81ZcDRo/f+qQ3iHfT6Ftt/4O3qLnt45MFM6Q7WabRm82x3KjZTqpF3QUdy -tumWiuAP5DMd1IRDtnKjFHO721OsEsf6NLcqdX89bGeqXDvrkwg3/PNwTyW5E7cj -feY8L2eWtg6AJUnIBu11wvfzkLiH3QKzHvO/SIZTGf5ihDsJ3aKEE9UNauTL3bVc -CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V -6g== ------END CERTIFICATE-----)___"; - - TTempFileHandle certificateFile; - - certificateFile.Write(certificateContent.data(), certificateContent.size()); - - NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors); + + TString certificateContent = R"___(-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCzRZjodO7Aqe1w +RyOj6kG6g2nn8ZGAxfao4mLT0jDTbVksrhV/h2s3uldLkFo5WrNQ8WZe+iIbXeFL +s8tO6hslzreo9sih2IHoRcH5KnS/6YTqVhRTJb1jE2dM8NwYbwTi+T2Pe0FrBPjI +kgVO50gAtYl9C+fc715uZiSKW+rRlP5OoFTwxrOjiU27RPZjFYyWK9wTI1Es9uRr +lbZbLl5cY6dK2J1AViRraaYKCWO26VbOPWLsY4OD3e+ZXIc3OMCz6Yb0wmRPeJ60 +bbbkGfI8O27kDdv69MAWHIm0yYMzKEnom1dce7rNQNDEqJfocsYIsg+EvayT1yQ9 +KTBegw7LAgMBAAECggEBAKaOCrotqYQmXArsjRhFFDwMy+BKdzyEr93INrlFl0dX +WHpCYobRcbOc1G3H94tB0UdqgAnNqtJyLlb+++ydZAuEOu4oGc8EL+10ofq0jzOd +6Xct8kQt0/6wkFDTlii9PHUDy0X65ZRgUiNGRtg/2I2QG+SpowmI+trm2xwQueFs +VaWrjc3cVvXx0b8Lu7hqZUv08kgC38stzuRk/n2T5VWSAr7Z4ZWQbO918Dv35HUw +Wy/0jNUFP9CBCvFJ4l0OoH9nYhWFG+HXWzNdw6/Hca4jciRKo6esCiOZ9uWYv/ec +/NvX9rgFg8G8/SrTisX10+Bbeq+R1RKwq/IG409TH4ECgYEA14L+3QsgNIUMeYAx +jSCyk22R/tOHI1BM+GtKPUhnwHlAssrcPcxXMJovl6WL93VauYjym0wpCz9urSpA +I2CqTsG8GYciA6Dr3mHgD6cK0jj9UPAU6EnZ5S0mjhPqKZqutu9QegzD2uESvuN8 +36xezwQthzAf0nI/P3sJGjVXjikCgYEA1POm5xcV6SmM6HnIdadEebhzZIJ9TXQz +ry3Jj3a7CKyD5C7fAdkHUTCjgT/2ElxPi9ABkZnC+d/cW9GtJFa0II5qO/agm3KQ +ZXYiutu9A7xACHYFXRiJEjVUdGG9dKMVOHUEa8IHEgrrcUVM/suy/GgutywIfaXs +y58IFP24K9MCgYEAk6zjz7wL+XEiNy+sxLQfKf7vB9sSwxQHakK6wHuY/L8Zomp3 +uLEJHfjJm/SIkK0N2g0JkXkCtv5kbKyC/rsCeK0wo52BpVLjzaLr0k34kE0U6B1b +dkEE2pGx1bG3x4KDLj+Wuct9ecK5Aa0IqIyI+vo16GkFpUM8K9e3SQo8UOECgYEA +sCZYAkILYtJ293p9giz5rIISGasDAUXE1vxWBXEeJ3+kneTTnZCrx9Im/ewtnWR0 +fF90XL9HFDDD88POqAd8eo2zfKR2l/89SGBfPBg2EtfuU9FkgGyiPciVcqvC7q9U +B15saMKX3KnhtdGwbfeLt9RqCCTJZT4SUSDcq5hwdvcCgYAxY4Be8mNipj8Cgg22 +mVWSolA0TEzbtUcNk6iGodpi+Z0LKpsPC0YRqPRyh1K+rIltG1BVdmUBHcMlOYxl +lWWvbJH6PkJWy4n2MF7PO45kjN3pPZg4hgH63JjZeAineBwEArUGb9zHnvzcdRvF +wuQ2pZHL/HJ0laUSieHDJ5917w== +-----END PRIVATE KEY----- + + +-----BEGIN CERTIFICATE----- +MIIDjTCCAnWgAwIBAgIURt5IBx0J3xgEaQvmyrFH2A+NkpMwDQYJKoZIhvcNAQEL +BQAwVjELMAkGA1UEBhMCUlUxDzANBgNVBAgMBk1vc2NvdzEPMA0GA1UEBwwGTW9z +Y293MQ8wDQYDVQQKDAZZYW5kZXgxFDASBgNVBAMMC3Rlc3Qtc2VydmVyMB4XDTE5 +MDkyMDE3MTQ0MVoXDTQ3MDIwNDE3MTQ0MVowVjELMAkGA1UEBhMCUlUxDzANBgNV +BAgMBk1vc2NvdzEPMA0GA1UEBwwGTW9zY293MQ8wDQYDVQQKDAZZYW5kZXgxFDAS +BgNVBAMMC3Rlc3Qtc2VydmVyMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKC +AQEAs0WY6HTuwKntcEcjo+pBuoNp5/GRgMX2qOJi09Iw021ZLK4Vf4drN7pXS5Ba +OVqzUPFmXvoiG13hS7PLTuobJc63qPbIodiB6EXB+Sp0v+mE6lYUUyW9YxNnTPDc +GG8E4vk9j3tBawT4yJIFTudIALWJfQvn3O9ebmYkilvq0ZT+TqBU8Mazo4lNu0T2 +YxWMlivcEyNRLPbka5W2Wy5eXGOnStidQFYka2mmCgljtulWzj1i7GODg93vmVyH +NzjAs+mG9MJkT3ietG225BnyPDtu5A3b+vTAFhyJtMmDMyhJ6JtXXHu6zUDQxKiX +6HLGCLIPhL2sk9ckPSkwXoMOywIDAQABo1MwUTAdBgNVHQ4EFgQUDv/xuJ4CvCgG +fPrZP3hRAt2+/LwwHwYDVR0jBBgwFoAUDv/xuJ4CvCgGfPrZP3hRAt2+/LwwDwYD +VR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAinKpMYaA2tjLpAnPVbjy +/ZxSBhhB26RiQp3Re8XOKyhTWqgYE6kldYT0aXgK9x9mPC5obQannDDYxDc7lX+/ +qP/u1X81ZcDRo/f+qQ3iHfT6Ftt/4O3qLnt45MFM6Q7WabRm82x3KjZTqpF3QUdy +tumWiuAP5DMd1IRDtnKjFHO721OsEsf6NLcqdX89bGeqXDvrkwg3/PNwTyW5E7cj +feY8L2eWtg6AJUnIBu11wvfzkLiH3QKzHvO/SIZTGf5ihDsJ3aKEE9UNauTL3bVc +CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V +6g== +-----END CERTIFICATE-----)___"; + + TTempFileHandle certificateFile; + + certificateFile.Write(certificateContent.data(), certificateContent.size()); + + NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors); NActors::TActorId proxyId = actorSystem.Register(proxy); - + THolder<NHttp::TEvHttpProxy::TEvAddListeningPort> add = MakeHolder<NHttp::TEvHttpProxy::TEvAddListeningPort>(port); - ///////// https configuration - add->Secure = true; - add->CertificateFile = certificateFile.Name(); - add->PrivateKeyFile = certificateFile.Name(); - ///////// + ///////// https configuration + add->Secure = true; + add->CertificateFile = certificateFile.Name(); + add->PrivateKeyFile = certificateFile.Name(); + ///////// actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), add.Release()), 0, true); - actorSystem.DispatchEvents(); - + actorSystem.DispatchEvents(); + NActors::TActorId serverId = actorSystem.AllocateEdgeActor(); - actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true); - + actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true); + NActors::TActorId clientId = actorSystem.AllocateEdgeActor(); - NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("https://[::1]:" + ToString(port) + "/test"); - actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true); - - NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle); - - UNIT_ASSERT_EQUAL(request->Request->URL, "/test"); - - NHttp::THttpOutgoingResponsePtr httpResponse = request->Request->CreateResponseString("HTTP/1.1 200 Found\r\nConnection: Close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\npassed\r\n0\r\n\r\n"); - actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true); - - NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle); - - UNIT_ASSERT_EQUAL(response->Response->Status, "200"); - UNIT_ASSERT_EQUAL(response->Response->Body, "passed"); - } - + NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("https://[::1]:" + ToString(port) + "/test"); + actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true); + + NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle); + + UNIT_ASSERT_EQUAL(request->Request->URL, "/test"); + + NHttp::THttpOutgoingResponsePtr httpResponse = request->Request->CreateResponseString("HTTP/1.1 200 Found\r\nConnection: Close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\npassed\r\n0\r\n\r\n"); + actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true); + + NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle); + + UNIT_ASSERT_EQUAL(response->Response->Status, "200"); + UNIT_ASSERT_EQUAL(response->Response->Body, "passed"); + } + /*Y_UNIT_TEST(AdvancedRunning) { THolder<NActors::TActorSystemSetup> setup = MakeHolder<NActors::TActorSystemSetup>(); - setup->NodeId = 1; - setup->ExecutorsCount = 1; - setup->Executors = new TAutoPtr<NActors::IExecutorPool>[1]; - setup->Executors[0] = new NActors::TBasicExecutorPool(0, 2, 10); - setup->Scheduler = new NActors::TBasicSchedulerThread(NActors::TSchedulerConfig(512, 100)); - NActors::TActorSystem actorSystem(setup); - actorSystem.Start(); - NHttp::THttpProxy* incomingProxy = new NHttp::THttpProxy(); + setup->NodeId = 1; + setup->ExecutorsCount = 1; + setup->Executors = new TAutoPtr<NActors::IExecutorPool>[1]; + setup->Executors[0] = new NActors::TBasicExecutorPool(0, 2, 10); + setup->Scheduler = new NActors::TBasicSchedulerThread(NActors::TSchedulerConfig(512, 100)); + NActors::TActorSystem actorSystem(setup); + actorSystem.Start(); + NHttp::THttpProxy* incomingProxy = new NHttp::THttpProxy(); NActors::TActorId incomingProxyId = actorSystem.Register(incomingProxy); - actorSystem.Send(incomingProxyId, new NHttp::TEvHttpProxy::TEvAddListeningPort(13337)); - - NHttp::THttpProxy* outgoingProxy = new NHttp::THttpProxy(); + actorSystem.Send(incomingProxyId, new NHttp::TEvHttpProxy::TEvAddListeningPort(13337)); + + NHttp::THttpProxy* outgoingProxy = new NHttp::THttpProxy(); NActors::TActorId outgoingProxyId = actorSystem.Register(outgoingProxy); - + THolder<NHttp::THttpStaticStringRequest> httpRequest = MakeHolder<NHttp::THttpStaticStringRequest>("GET /test HTTP/1.1\r\n\r\n"); - actorSystem.Send(outgoingProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest("[::]:13337", std::move(httpRequest))); - - Sleep(TDuration::Minutes(60)); - }*/ - - Y_UNIT_TEST(TooLongHeader) { - NActors::TTestActorRuntimeBase actorSystem; - TPortManager portManager; - TIpPort port = portManager.GetTcpPort(); - TAutoPtr<NActors::IEventHandle> handle; - actorSystem.Initialize(); - NMonitoring::TMetricRegistry sensors; - - NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors); - NActors::TActorId proxyId = actorSystem.Register(proxy); - actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true); - actorSystem.DispatchEvents(); - - NActors::TActorId serverId = actorSystem.AllocateEdgeActor(); - actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true); - - NActors::TActorId clientId = actorSystem.AllocateEdgeActor(); - NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("http://[::1]:" + ToString(port) + "/test"); - httpRequest->Set("Connection", "close"); - TString longHeader; - longHeader.append(9000, 'X'); - httpRequest->Set(longHeader, "data"); - actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true); - - NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle); - - UNIT_ASSERT_EQUAL(response->Response->Status, "400"); - UNIT_ASSERT_EQUAL(response->Response->Body, "Invalid http header"); - } -} + actorSystem.Send(outgoingProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest("[::]:13337", std::move(httpRequest))); + + Sleep(TDuration::Minutes(60)); + }*/ + + Y_UNIT_TEST(TooLongHeader) { + NActors::TTestActorRuntimeBase actorSystem; + TPortManager portManager; + TIpPort port = portManager.GetTcpPort(); + TAutoPtr<NActors::IEventHandle> handle; + actorSystem.Initialize(); + NMonitoring::TMetricRegistry sensors; + + NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors); + NActors::TActorId proxyId = actorSystem.Register(proxy); + actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true); + actorSystem.DispatchEvents(); + + NActors::TActorId serverId = actorSystem.AllocateEdgeActor(); + actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true); + + NActors::TActorId clientId = actorSystem.AllocateEdgeActor(); + NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("http://[::1]:" + ToString(port) + "/test"); + httpRequest->Set("Connection", "close"); + TString longHeader; + longHeader.append(9000, 'X'); + httpRequest->Set(longHeader, "data"); + actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true); + + NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle); + + UNIT_ASSERT_EQUAL(response->Response->Status, "400"); + UNIT_ASSERT_EQUAL(response->Response->Body, "Invalid http header"); + } +} diff --git a/library/cpp/actors/http/ut/ya.make b/library/cpp/actors/http/ut/ya.make index 8b4c04c4d3..12d360dabf 100644 --- a/library/cpp/actors/http/ut/ya.make +++ b/library/cpp/actors/http/ut/ya.make @@ -1,18 +1,18 @@ UNITTEST_FOR(library/cpp/actors/http) - -OWNER(xenoxeno) - -SIZE(SMALL) - -PEERDIR( + +OWNER(xenoxeno) + +SIZE(SMALL) + +PEERDIR( library/cpp/actors/testlib -) - +) + IF (NOT OS_WINDOWS) -SRCS( - http_ut.cpp -) +SRCS( + http_ut.cpp +) ELSE() ENDIF() - -END() + +END() diff --git a/library/cpp/actors/http/ya.make b/library/cpp/actors/http/ya.make index 7ce68b7a75..60c9c93a09 100644 --- a/library/cpp/actors/http/ya.make +++ b/library/cpp/actors/http/ya.make @@ -1,33 +1,33 @@ -RECURSE_FOR_TESTS(ut) - -LIBRARY() - -OWNER(xenoxeno g:kikimr) - -SRCS( - http_cache.cpp - http_cache.h - http_config.h - http_proxy_acceptor.cpp - http_proxy_incoming.cpp - http_proxy_outgoing.cpp - http_proxy_sock_impl.h - http_proxy_ssl.h - http_proxy.cpp - http_proxy.h - http_static.cpp - http_static.h - http.cpp - http.h -) - -PEERDIR( - contrib/libs/openssl +RECURSE_FOR_TESTS(ut) + +LIBRARY() + +OWNER(xenoxeno g:kikimr) + +SRCS( + http_cache.cpp + http_cache.h + http_config.h + http_proxy_acceptor.cpp + http_proxy_incoming.cpp + http_proxy_outgoing.cpp + http_proxy_sock_impl.h + http_proxy_ssl.h + http_proxy.cpp + http_proxy.h + http_static.cpp + http_static.h + http.cpp + http.h +) + +PEERDIR( + contrib/libs/openssl library/cpp/actors/core library/cpp/actors/interconnect library/cpp/dns library/cpp/monlib/metrics library/cpp/string_utils/quote -) - -END() +) + +END() |