aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/http
diff options
context:
space:
mode:
authorAlexey Efimov <xeno@prnwatch.com>2022-02-10 16:49:41 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:49:41 +0300
commit26e0e4fb5e5cd6b4d7f4c21f9fcd7978891bf946 (patch)
treed34555f21d4d9f94f84d460e55b77d7eb41a953c /library/cpp/actors/http
parentca3252a147a429eac4ba8221857493c58dcd09b5 (diff)
downloadydb-26e0e4fb5e5cd6b4d7f4c21f9fcd7978891bf946.tar.gz
Restoring authorship annotation for Alexey Efimov <xeno@prnwatch.com>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/http')
-rw-r--r--library/cpp/actors/http/http.cpp1188
-rw-r--r--library/cpp/actors/http/http.h1394
-rw-r--r--library/cpp/actors/http/http_cache.cpp1178
-rw-r--r--library/cpp/actors/http/http_cache.h50
-rw-r--r--library/cpp/actors/http/http_config.h34
-rw-r--r--library/cpp/actors/http/http_proxy.cpp564
-rw-r--r--library/cpp/actors/http/http_proxy.h424
-rw-r--r--library/cpp/actors/http/http_proxy_acceptor.cpp210
-rw-r--r--library/cpp/actors/http/http_proxy_incoming.cpp376
-rw-r--r--library/cpp/actors/http/http_proxy_outgoing.cpp450
-rw-r--r--library/cpp/actors/http/http_proxy_sock_impl.h472
-rw-r--r--library/cpp/actors/http/http_proxy_ssl.h148
-rw-r--r--library/cpp/actors/http/http_static.cpp176
-rw-r--r--library/cpp/actors/http/http_static.h16
-rw-r--r--library/cpp/actors/http/http_ut.cpp632
-rw-r--r--library/cpp/actors/http/ut/ya.make26
-rw-r--r--library/cpp/actors/http/ya.make56
17 files changed, 3697 insertions, 3697 deletions
diff --git a/library/cpp/actors/http/http.cpp b/library/cpp/actors/http/http.cpp
index 7125f9d8b0..90fdd161ed 100644
--- a/library/cpp/actors/http/http.cpp
+++ b/library/cpp/actors/http/http.cpp
@@ -1,116 +1,116 @@
-#include "http.h"
+#include "http.h"
#include <library/cpp/string_utils/quote/quote.h>
-
-inline TStringBuf operator +(TStringBuf l, TStringBuf r) {
- if (l.empty()) {
- return r;
- }
- if (r.empty()) {
- return l;
- }
- if (l.end() == r.begin()) {
- return TStringBuf(l.data(), l.size() + r.size());
- }
- if (r.end() == l.begin()) {
- return TStringBuf(r.data(), l.size() + r.size());
- }
- Y_FAIL("oops");
- return TStringBuf();
-}
-
-inline TStringBuf operator +=(TStringBuf& l, TStringBuf r) {
- return l = l + r;
-}
-
-namespace NHttp {
-
-template <> TStringBuf THttpRequest::GetName<&THttpRequest::Host>() { return "Host"; }
-template <> TStringBuf THttpRequest::GetName<&THttpRequest::Accept>() { return "Accept"; }
-template <> TStringBuf THttpRequest::GetName<&THttpRequest::Connection>() { return "Connection"; }
-template <> TStringBuf THttpRequest::GetName<&THttpRequest::ContentType>() { return "Content-Type"; }
-template <> TStringBuf THttpRequest::GetName<&THttpRequest::ContentLength>() { return "Content-Length"; }
-template <> TStringBuf THttpRequest::GetName<&THttpRequest::TransferEncoding>() { return "Transfer-Encoding"; }
-
-const TMap<TStringBuf, TStringBuf THttpRequest::*, TLessNoCase> THttpRequest::HeadersLocation = {
- { THttpRequest::GetName<&THttpRequest::Host>(), &THttpRequest::Host },
- { THttpRequest::GetName<&THttpRequest::Accept>(), &THttpRequest::Accept },
- { THttpRequest::GetName<&THttpRequest::Connection>(), &THttpRequest::Connection },
- { THttpRequest::GetName<&THttpRequest::ContentType>(), &THttpRequest::ContentType },
- { THttpRequest::GetName<&THttpRequest::ContentLength>(), &THttpRequest::ContentLength },
- { THttpRequest::GetName<&THttpRequest::TransferEncoding>(), &THttpRequest::TransferEncoding },
-};
-
-template <> TStringBuf THttpResponse::GetName<&THttpResponse::Connection>() { return "Connection"; }
-template <> TStringBuf THttpResponse::GetName<&THttpResponse::ContentType>() { return "Content-Type"; }
-template <> TStringBuf THttpResponse::GetName<&THttpResponse::ContentLength>() { return "Content-Length"; }
-template <> TStringBuf THttpResponse::GetName<&THttpResponse::TransferEncoding>() { return "Transfer-Encoding"; }
-template <> TStringBuf THttpResponse::GetName<&THttpResponse::LastModified>() { return "Last-Modified"; }
+
+inline TStringBuf operator +(TStringBuf l, TStringBuf r) {
+ if (l.empty()) {
+ return r;
+ }
+ if (r.empty()) {
+ return l;
+ }
+ if (l.end() == r.begin()) {
+ return TStringBuf(l.data(), l.size() + r.size());
+ }
+ if (r.end() == l.begin()) {
+ return TStringBuf(r.data(), l.size() + r.size());
+ }
+ Y_FAIL("oops");
+ return TStringBuf();
+}
+
+inline TStringBuf operator +=(TStringBuf& l, TStringBuf r) {
+ return l = l + r;
+}
+
+namespace NHttp {
+
+template <> TStringBuf THttpRequest::GetName<&THttpRequest::Host>() { return "Host"; }
+template <> TStringBuf THttpRequest::GetName<&THttpRequest::Accept>() { return "Accept"; }
+template <> TStringBuf THttpRequest::GetName<&THttpRequest::Connection>() { return "Connection"; }
+template <> TStringBuf THttpRequest::GetName<&THttpRequest::ContentType>() { return "Content-Type"; }
+template <> TStringBuf THttpRequest::GetName<&THttpRequest::ContentLength>() { return "Content-Length"; }
+template <> TStringBuf THttpRequest::GetName<&THttpRequest::TransferEncoding>() { return "Transfer-Encoding"; }
+
+const TMap<TStringBuf, TStringBuf THttpRequest::*, TLessNoCase> THttpRequest::HeadersLocation = {
+ { THttpRequest::GetName<&THttpRequest::Host>(), &THttpRequest::Host },
+ { THttpRequest::GetName<&THttpRequest::Accept>(), &THttpRequest::Accept },
+ { THttpRequest::GetName<&THttpRequest::Connection>(), &THttpRequest::Connection },
+ { THttpRequest::GetName<&THttpRequest::ContentType>(), &THttpRequest::ContentType },
+ { THttpRequest::GetName<&THttpRequest::ContentLength>(), &THttpRequest::ContentLength },
+ { THttpRequest::GetName<&THttpRequest::TransferEncoding>(), &THttpRequest::TransferEncoding },
+};
+
+template <> TStringBuf THttpResponse::GetName<&THttpResponse::Connection>() { return "Connection"; }
+template <> TStringBuf THttpResponse::GetName<&THttpResponse::ContentType>() { return "Content-Type"; }
+template <> TStringBuf THttpResponse::GetName<&THttpResponse::ContentLength>() { return "Content-Length"; }
+template <> TStringBuf THttpResponse::GetName<&THttpResponse::TransferEncoding>() { return "Transfer-Encoding"; }
+template <> TStringBuf THttpResponse::GetName<&THttpResponse::LastModified>() { return "Last-Modified"; }
template <> TStringBuf THttpResponse::GetName<&THttpResponse::ContentEncoding>() { return "Content-Encoding"; }
-
-const TMap<TStringBuf, TStringBuf THttpResponse::*, TLessNoCase> THttpResponse::HeadersLocation = {
- { THttpResponse::GetName<&THttpResponse::Connection>(), &THttpResponse::Connection },
- { THttpResponse::GetName<&THttpResponse::ContentType>(), &THttpResponse::ContentType },
- { THttpResponse::GetName<&THttpResponse::ContentLength>(), &THttpResponse::ContentLength },
- { THttpResponse::GetName<&THttpResponse::TransferEncoding>(), &THttpResponse::TransferEncoding },
- { THttpResponse::GetName<&THttpResponse::LastModified>(), &THttpResponse::LastModified },
+
+const TMap<TStringBuf, TStringBuf THttpResponse::*, TLessNoCase> THttpResponse::HeadersLocation = {
+ { THttpResponse::GetName<&THttpResponse::Connection>(), &THttpResponse::Connection },
+ { THttpResponse::GetName<&THttpResponse::ContentType>(), &THttpResponse::ContentType },
+ { THttpResponse::GetName<&THttpResponse::ContentLength>(), &THttpResponse::ContentLength },
+ { THttpResponse::GetName<&THttpResponse::TransferEncoding>(), &THttpResponse::TransferEncoding },
+ { THttpResponse::GetName<&THttpResponse::LastModified>(), &THttpResponse::LastModified },
{ THttpResponse::GetName<&THttpResponse::ContentEncoding>(), &THttpResponse::ContentEncoding }
-};
-
-void THttpRequest::Clear() {
- // a dirty little trick
- this->~THttpRequest(); // basically, do nothing
- new (this) THttpRequest(); // reset all fields
-}
-
-template <>
-void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) {
- TStringBuf data(Pos(), len);
- while (!data.empty()) {
- if (Stage != EParseStage::Error) {
- LastSuccessStage = Stage;
- }
- switch (Stage) {
- case EParseStage::Method: {
- if (ProcessData(Method, data, ' ', MaxMethodSize)) {
- Stage = EParseStage::URL;
- }
- break;
- }
- case EParseStage::URL: {
- if (ProcessData(URL, data, ' ', MaxURLSize)) {
- Stage = EParseStage::Protocol;
- }
- break;
- }
- case EParseStage::Protocol: {
- if (ProcessData(Protocol, data, '/', MaxProtocolSize)) {
- Stage = EParseStage::Version;
- }
- break;
- }
- case EParseStage::Version: {
- if (ProcessData(Version, data, "\r\n", MaxVersionSize)) {
- Stage = EParseStage::Header;
- Headers = data;
- }
- break;
- }
- case EParseStage::Header: {
- if (ProcessData(Header, data, "\r\n", MaxHeaderSize)) {
- if (Header.empty()) {
- Headers = TStringBuf(Headers.data(), data.begin() - Headers.begin());
- if (HaveBody()) {
- Stage = EParseStage::Body;
- } else {
- Stage = EParseStage::Done;
- }
- } else {
- ProcessHeader(Header);
- }
- }
- break;
- }
- case EParseStage::Body: {
+};
+
+void THttpRequest::Clear() {
+ // a dirty little trick
+ this->~THttpRequest(); // basically, do nothing
+ new (this) THttpRequest(); // reset all fields
+}
+
+template <>
+void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) {
+ TStringBuf data(Pos(), len);
+ while (!data.empty()) {
+ if (Stage != EParseStage::Error) {
+ LastSuccessStage = Stage;
+ }
+ switch (Stage) {
+ case EParseStage::Method: {
+ if (ProcessData(Method, data, ' ', MaxMethodSize)) {
+ Stage = EParseStage::URL;
+ }
+ break;
+ }
+ case EParseStage::URL: {
+ if (ProcessData(URL, data, ' ', MaxURLSize)) {
+ Stage = EParseStage::Protocol;
+ }
+ break;
+ }
+ case EParseStage::Protocol: {
+ if (ProcessData(Protocol, data, '/', MaxProtocolSize)) {
+ Stage = EParseStage::Version;
+ }
+ break;
+ }
+ case EParseStage::Version: {
+ if (ProcessData(Version, data, "\r\n", MaxVersionSize)) {
+ Stage = EParseStage::Header;
+ Headers = data;
+ }
+ break;
+ }
+ case EParseStage::Header: {
+ if (ProcessData(Header, data, "\r\n", MaxHeaderSize)) {
+ if (Header.empty()) {
+ Headers = TStringBuf(Headers.data(), data.begin() - Headers.begin());
+ if (HaveBody()) {
+ Stage = EParseStage::Body;
+ } else {
+ Stage = EParseStage::Done;
+ }
+ } else {
+ ProcessHeader(Header);
+ }
+ }
+ break;
+ }
+ case EParseStage::Body: {
if (!ContentLength.empty()) {
if (ProcessData(Content, data, FromString(ContentLength))) {
Body = Content;
@@ -121,9 +121,9 @@ void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) {
} else {
// Invalid body encoding
Stage = EParseStage::Error;
- }
- break;
- }
+ }
+ break;
+ }
case EParseStage::ChunkLength: {
if (ProcessData(Line, data, "\r\n", MaxChunkLengthSize)) {
if (!Line.empty()) {
@@ -170,484 +170,484 @@ void THttpParser<THttpRequest, TSocketBuffer>::Advance(size_t len) {
break;
}
- case EParseStage::Done:
- case EParseStage::Error: {
- data.Clear();
- break;
- }
- default:
- Y_FAIL("Invalid processing sequence");
- break;
- }
- }
- TSocketBuffer::Advance(len);
-}
-
-template <>
-THttpParser<THttpRequest, TSocketBuffer>::EParseStage THttpParser<THttpRequest, TSocketBuffer>::GetInitialStage() {
- return EParseStage::Method;
-}
-
-template <>
-THttpParser<THttpResponse, TSocketBuffer>::EParseStage THttpParser<THttpResponse, TSocketBuffer>::GetInitialStage() {
- return EParseStage::Protocol;
-}
-
-void THttpResponse::Clear() {
- // a dirty little trick
- this->~THttpResponse(); // basically, do nothing
- new (this) THttpResponse(); // reset all fields
-}
-
-template <>
-void THttpParser<THttpResponse, TSocketBuffer>::Advance(size_t len) {
- TStringBuf data(Pos(), len);
- while (!data.empty()) {
- if (Stage != EParseStage::Error) {
- LastSuccessStage = Stage;
- }
- switch (Stage) {
- case EParseStage::Protocol: {
- if (ProcessData(Protocol, data, '/', MaxProtocolSize)) {
- Stage = EParseStage::Version;
- }
- break;
- }
- case EParseStage::Version: {
- if (ProcessData(Version, data, ' ', MaxVersionSize)) {
- Stage = EParseStage::Status;
- }
- break;
- }
- case EParseStage::Status: {
- if (ProcessData(Status, data, ' ', MaxStatusSize)) {
- Stage = EParseStage::Message;
- }
- break;
- }
- case EParseStage::Message: {
- if (ProcessData(Message, data, "\r\n", MaxMessageSize)) {
- Stage = EParseStage::Header;
- Headers = TStringBuf(data.data(), size_t(0));
- }
- break;
- }
- case EParseStage::Header: {
- if (ProcessData(Header, data, "\r\n", MaxHeaderSize)) {
- if (Header.empty()) {
- if (HaveBody() && (ContentLength.empty() || ContentLength != "0")) {
- Stage = EParseStage::Body;
- } else {
- Stage = EParseStage::Done;
- }
- } else {
- ProcessHeader(Header);
- }
- Headers = TStringBuf(Headers.data(), data.data() - Headers.data());
- }
- break;
- }
- case EParseStage::Body: {
- if (!ContentLength.empty()) {
- if (ProcessData(Body, data, FromString(ContentLength))) {
- Stage = EParseStage::Done;
- }
- } else if (TransferEncoding == "chunked") {
- Stage = EParseStage::ChunkLength;
- } else {
- // Invalid body encoding
- Stage = EParseStage::Error;
- }
- break;
- }
- case EParseStage::ChunkLength: {
- if (ProcessData(Line, data, "\r\n", MaxChunkLengthSize)) {
- if (!Line.empty()) {
- ChunkLength = ParseHex(Line);
- if (ChunkLength <= MaxChunkSize) {
- ContentSize = Content.size() + ChunkLength;
- if (ContentSize <= MaxChunkContentSize) {
- Stage = EParseStage::ChunkData;
- Line.Clear();
- } else {
- // Invalid chunk content length
- Stage = EParseStage::Error;
- }
- } else {
- // Invalid chunk length
- Stage = EParseStage::Error;
- }
- } else {
- // Invalid body encoding
- Stage = EParseStage::Error;
- }
- }
- break;
- }
- case EParseStage::ChunkData: {
- if (!IsError()) {
- if (ProcessData(Content, data, ContentSize)) {
- if (ProcessData(Line, data, 2)) {
- if (Line == "\r\n") {
- if (ChunkLength == 0) {
- Body = Content;
- Stage = EParseStage::Done;
- } else {
- Stage = EParseStage::ChunkLength;
- }
- Line.Clear();
- } else {
- // Invalid body encoding
- Stage = EParseStage::Error;
- }
- }
- }
- }
- break;
- }
- case EParseStage::Done:
- case EParseStage::Error:
- data.Clear();
- break;
- default:
- // Invalid processing sequence
- Stage = EParseStage::Error;
- break;
- }
- }
- TSocketBuffer::Advance(len);
-}
-
-template <>
-void THttpParser<THttpResponse, TSocketBuffer>::ConnectionClosed() {
- if (Stage == EParseStage::Done) {
- return;
- }
- if (Stage == EParseStage::Body) {
- // ?
- Stage = EParseStage::Done;
- } else {
- LastSuccessStage = Stage;
- Stage = EParseStage::Error;
- }
-}
-
-THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseString(TStringBuf data) {
- THttpOutgoingResponsePtr response = new THttpOutgoingResponse(this);
- response->Append(data);
- response->Reparse();
- return response;
-}
-
-THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseOK(TStringBuf body, TStringBuf contentType, TInstant lastModified) {
- return CreateResponse("200", "OK", contentType, body, lastModified);
-}
-
-THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseBadRequest(TStringBuf html, TStringBuf contentType) {
- if (html.empty() && IsError()) {
- contentType = "text/plain";
- html = GetErrorText();
- }
- return CreateResponse("400", "Bad Request", contentType, html);
-}
-
-THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseNotFound(TStringBuf html, TStringBuf contentType) {
- return CreateResponse("404", "Not Found", contentType, html);
-}
-
-THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseServiceUnavailable(TStringBuf html, TStringBuf contentType) {
- return CreateResponse("503", "Service Unavailable", contentType, html);
-}
-
-THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseGatewayTimeout(TStringBuf html, TStringBuf contentType) {
- return CreateResponse("504", "Gateway Timeout", contentType, html);
-}
-
-THttpIncomingResponse::THttpIncomingResponse(THttpOutgoingRequestPtr request)
- : Request(request)
-{}
-
-THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponse(TStringBuf status, TStringBuf message, TStringBuf contentType, TStringBuf body, TInstant lastModified) {
- TStringBuf version = Version;
- if (version != "1.0" && version != "1.1") {
- version = "1.1";
- }
- THttpOutgoingResponsePtr response = new THttpOutgoingResponse(this, "HTTP", version, status, message);
- response->Set<&THttpResponse::Connection>(GetConnection());
- if (!WorkerName.empty()) {
- response->Set("X-Worker-Name", WorkerName);
- }
- if (!contentType.empty() && !body.empty()) {
- response->Set<&THttpResponse::ContentType>(contentType);
- }
- if (lastModified) {
- response->Set<&THttpResponse::LastModified>(lastModified.FormatGmTime("%a, %d %b %Y %H:%M:%S GMT"));
- }
- if (response->IsNeedBody() || !body.empty()) {
- if (Method == "HEAD") {
- response->Set<&THttpResponse::ContentLength>(ToString(body.size()));
- } else {
- response->Set<&THttpResponse::Body>(body);
- }
- }
- return response;
-}
-
-THttpIncomingRequestPtr THttpIncomingRequest::Duplicate() {
- THttpIncomingRequestPtr request = new THttpIncomingRequest(*this);
- request->Reparse();
- request->Timer.Reset();
- return request;
-}
-
-THttpIncomingResponsePtr THttpIncomingResponse::Duplicate(THttpOutgoingRequestPtr request) {
- THttpIncomingResponsePtr response = new THttpIncomingResponse(*this);
- response->Reparse();
- response->Request = request;
- return response;
-}
-
-THttpOutgoingResponsePtr THttpOutgoingResponse::Duplicate(THttpIncomingRequestPtr request) {
- THttpOutgoingResponsePtr response = new THttpOutgoingResponse(*this);
- response->Reparse();
- response->Request = request;
- return response;
-}
-
-
-THttpOutgoingResponsePtr THttpIncomingResponse::Reverse(THttpIncomingRequestPtr request) {
- THttpOutgoingResponsePtr response = new THttpOutgoingResponse(request);
- response->Assign(Data(), Size());
- response->Reparse();
- return response;
-}
-
-THttpOutgoingRequest::THttpOutgoingRequest(TStringBuf method, TStringBuf scheme, TStringBuf host, TStringBuf uri, TStringBuf protocol, TStringBuf version) {
- Secure = (scheme == "https");
- TString urie = UrlEscapeRet(uri);
- InitRequest(method, urie, protocol, version);
- if (host) {
- Set<&THttpRequest::Host>(host);
- }
-}
-
-THttpOutgoingRequest::THttpOutgoingRequest(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version) {
- TStringBuf scheme, host, uri;
- if (!CrackURL(url, scheme, host, uri)) {
- Y_FAIL("Invalid URL specified");
- }
- if (!scheme.empty() && scheme != "http" && scheme != "https") {
- Y_FAIL("Invalid URL specified");
- }
- Secure = (scheme == "https");
- TString urie = UrlEscapeRet(uri);
- InitRequest(method, urie, protocol, version);
- if (host) {
- Set<&THttpRequest::Host>(host);
- }
-}
-
-THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestString(const TString& data) {
- THttpOutgoingRequestPtr request = new THttpOutgoingRequest();
- request->Assign(data.data(), data.size());
- request->Reparse();
- return request;
-}
-
-THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestGet(TStringBuf url) {
- return CreateRequest("GET", url);
-}
-
-THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestGet(TStringBuf host, TStringBuf uri) {
- return CreateHttpRequest("GET", host, uri);
-}
-
-THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestPost(TStringBuf url, TStringBuf contentType, TStringBuf body) {
- return CreateRequest("POST", url, contentType, body);
-}
-
-THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestPost(TStringBuf host, TStringBuf uri, TStringBuf contentType, TStringBuf body) {
- return CreateHttpRequest("POST", host, uri, contentType, body);
-}
-
-THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequest(TStringBuf method, TStringBuf url, TStringBuf contentType, TStringBuf body) {
- THttpOutgoingRequestPtr request = new THttpOutgoingRequest(method, url, "HTTP", "1.1");
- request->Set<&THttpRequest::Accept>("*/*");
- if (!contentType.empty()) {
- request->Set<&THttpRequest::ContentType>(contentType);
- request->Set<&THttpRequest::Body>(body);
- }
- return request;
-}
-
-THttpOutgoingRequestPtr THttpOutgoingRequest::CreateHttpRequest(TStringBuf method, TStringBuf host, TStringBuf uri, TStringBuf contentType, TStringBuf body) {
- THttpOutgoingRequestPtr request = new THttpOutgoingRequest(method, "http", host, uri, "HTTP", "1.1");
- request->Set<&THttpRequest::Accept>("*/*");
- if (!contentType.empty()) {
- request->Set<&THttpRequest::ContentType>(contentType);
- request->Set<&THttpRequest::Body>(body);
- }
- return request;
-}
-
-THttpOutgoingRequestPtr THttpOutgoingRequest::Duplicate() {
- THttpOutgoingRequestPtr request = new THttpOutgoingRequest(*this);
- request->Reparse();
- return request;
-}
-
-THttpOutgoingResponse::THttpOutgoingResponse(THttpIncomingRequestPtr request)
- : Request(request)
-{}
-
-THttpOutgoingResponse::THttpOutgoingResponse(THttpIncomingRequestPtr request, TStringBuf protocol, TStringBuf version, TStringBuf status, TStringBuf message)
- : Request(request)
-{
- InitResponse(protocol, version, status, message);
-}
-
-const size_t THttpConfig::BUFFER_MIN_STEP;
-const TDuration THttpConfig::CONNECTION_TIMEOUT;
-
-TUrlParameters::TUrlParameters(TStringBuf url) {
- TStringBuf base;
- TStringBuf params;
- if (url.TrySplit('?', base, params)) {
- for (TStringBuf param = params.NextTok('&'); !param.empty(); param = params.NextTok('&')) {
- TStringBuf name = param.NextTok('=');
- Parameters[name] = param;
- }
- }
-}
-
-TString TUrlParameters::operator [](TStringBuf name) const {
- TString value(Get(name));
- CGIUnescape(value);
- return value;
-}
-
-bool TUrlParameters::Has(TStringBuf name) const {
- return Parameters.count(name) != 0;
-}
-
-TStringBuf TUrlParameters::Get(TStringBuf name) const {
- auto it = Parameters.find(name);
- if (it != Parameters.end()) {
- return it->second;
- }
- return TStringBuf();
-}
-
-TString TUrlParameters::Render() const {
- TStringBuilder parameters;
- for (const std::pair<TStringBuf, TStringBuf> parameter : Parameters) {
- if (parameters.empty()) {
- parameters << '?';
- } else {
- parameters << '&';
- }
- parameters << parameter.first;
- parameters << '=';
- parameters << parameter.second;
- }
- return parameters;
-}
-
-TCookies::TCookies(TStringBuf cookie) {
- for (TStringBuf param = cookie.NextTok(';'); !param.empty(); param = cookie.NextTok(';')) {
- param.SkipPrefix(" ");
- TStringBuf name = param.NextTok('=');
- Cookies[name] = param;
- }
-}
-
-TStringBuf TCookies::operator [](TStringBuf name) const {
- return Get(name);
-}
-
-bool TCookies::Has(TStringBuf name) const {
- return Cookies.count(name) != 0;
-}
-
-TStringBuf TCookies::Get(TStringBuf name) const {
- auto it = Cookies.find(name);
- if (it != Cookies.end()) {
- return it->second;
- }
- return TStringBuf();
-}
-
-TString TCookies::Render() const {
- TStringBuilder cookies;
- for (const std::pair<TStringBuf, TStringBuf> cookie : Cookies) {
- if (!cookies.empty()) {
- cookies << ' ';
- }
- cookies << cookie.first;
- cookies << '=';
- cookies << cookie.second;
- cookies << ';';
- }
- return cookies;
-}
-
-TCookiesBuilder::TCookiesBuilder()
- :TCookies(TStringBuf())
-{}
-
-void TCookiesBuilder::Set(TStringBuf name, TStringBuf data) {
- Data.emplace_back(name, data);
- Cookies[Data.back().first] = Data.back().second;
-}
-
-THeaders::THeaders(TStringBuf headers) {
- for (TStringBuf param = headers.NextTok("\r\n"); !param.empty(); param = headers.NextTok("\r\n")) {
- TStringBuf name = param.NextTok(":");
- param.SkipPrefix(" ");
- Headers[name] = param;
- }
-}
-
-TStringBuf THeaders::operator [](TStringBuf name) const {
- return Get(name);
-}
-
-bool THeaders::Has(TStringBuf name) const {
- return Headers.count(name) != 0;
-}
-
-TStringBuf THeaders::Get(TStringBuf name) const {
- auto it = Headers.find(name);
- if (it != Headers.end()) {
- return it->second;
- }
- return TStringBuf();
-}
-
-TString THeaders::Render() const {
- TStringBuilder headers;
- for (const std::pair<TStringBuf, TStringBuf> header : Headers) {
- headers << header.first;
- headers << ": ";
- headers << header.second;
- headers << "\r\n";
- }
- return headers;
-}
-
-THeadersBuilder::THeadersBuilder()
- :THeaders(TStringBuf())
-{}
-
-THeadersBuilder::THeadersBuilder(const THeadersBuilder& builder) {
- for (const auto& pr : builder.Headers) {
- Set(pr.first, pr.second);
- }
-}
-
-void THeadersBuilder::Set(TStringBuf name, TStringBuf data) {
- Data.emplace_back(name, data);
- Headers[Data.back().first] = Data.back().second;
-}
-
-}
+ case EParseStage::Done:
+ case EParseStage::Error: {
+ data.Clear();
+ break;
+ }
+ default:
+ Y_FAIL("Invalid processing sequence");
+ break;
+ }
+ }
+ TSocketBuffer::Advance(len);
+}
+
+template <>
+THttpParser<THttpRequest, TSocketBuffer>::EParseStage THttpParser<THttpRequest, TSocketBuffer>::GetInitialStage() {
+ return EParseStage::Method;
+}
+
+template <>
+THttpParser<THttpResponse, TSocketBuffer>::EParseStage THttpParser<THttpResponse, TSocketBuffer>::GetInitialStage() {
+ return EParseStage::Protocol;
+}
+
+void THttpResponse::Clear() {
+ // a dirty little trick
+ this->~THttpResponse(); // basically, do nothing
+ new (this) THttpResponse(); // reset all fields
+}
+
+template <>
+void THttpParser<THttpResponse, TSocketBuffer>::Advance(size_t len) {
+ TStringBuf data(Pos(), len);
+ while (!data.empty()) {
+ if (Stage != EParseStage::Error) {
+ LastSuccessStage = Stage;
+ }
+ switch (Stage) {
+ case EParseStage::Protocol: {
+ if (ProcessData(Protocol, data, '/', MaxProtocolSize)) {
+ Stage = EParseStage::Version;
+ }
+ break;
+ }
+ case EParseStage::Version: {
+ if (ProcessData(Version, data, ' ', MaxVersionSize)) {
+ Stage = EParseStage::Status;
+ }
+ break;
+ }
+ case EParseStage::Status: {
+ if (ProcessData(Status, data, ' ', MaxStatusSize)) {
+ Stage = EParseStage::Message;
+ }
+ break;
+ }
+ case EParseStage::Message: {
+ if (ProcessData(Message, data, "\r\n", MaxMessageSize)) {
+ Stage = EParseStage::Header;
+ Headers = TStringBuf(data.data(), size_t(0));
+ }
+ break;
+ }
+ case EParseStage::Header: {
+ if (ProcessData(Header, data, "\r\n", MaxHeaderSize)) {
+ if (Header.empty()) {
+ if (HaveBody() && (ContentLength.empty() || ContentLength != "0")) {
+ Stage = EParseStage::Body;
+ } else {
+ Stage = EParseStage::Done;
+ }
+ } else {
+ ProcessHeader(Header);
+ }
+ Headers = TStringBuf(Headers.data(), data.data() - Headers.data());
+ }
+ break;
+ }
+ case EParseStage::Body: {
+ if (!ContentLength.empty()) {
+ if (ProcessData(Body, data, FromString(ContentLength))) {
+ Stage = EParseStage::Done;
+ }
+ } else if (TransferEncoding == "chunked") {
+ Stage = EParseStage::ChunkLength;
+ } else {
+ // Invalid body encoding
+ Stage = EParseStage::Error;
+ }
+ break;
+ }
+ case EParseStage::ChunkLength: {
+ if (ProcessData(Line, data, "\r\n", MaxChunkLengthSize)) {
+ if (!Line.empty()) {
+ ChunkLength = ParseHex(Line);
+ if (ChunkLength <= MaxChunkSize) {
+ ContentSize = Content.size() + ChunkLength;
+ if (ContentSize <= MaxChunkContentSize) {
+ Stage = EParseStage::ChunkData;
+ Line.Clear();
+ } else {
+ // Invalid chunk content length
+ Stage = EParseStage::Error;
+ }
+ } else {
+ // Invalid chunk length
+ Stage = EParseStage::Error;
+ }
+ } else {
+ // Invalid body encoding
+ Stage = EParseStage::Error;
+ }
+ }
+ break;
+ }
+ case EParseStage::ChunkData: {
+ if (!IsError()) {
+ if (ProcessData(Content, data, ContentSize)) {
+ if (ProcessData(Line, data, 2)) {
+ if (Line == "\r\n") {
+ if (ChunkLength == 0) {
+ Body = Content;
+ Stage = EParseStage::Done;
+ } else {
+ Stage = EParseStage::ChunkLength;
+ }
+ Line.Clear();
+ } else {
+ // Invalid body encoding
+ Stage = EParseStage::Error;
+ }
+ }
+ }
+ }
+ break;
+ }
+ case EParseStage::Done:
+ case EParseStage::Error:
+ data.Clear();
+ break;
+ default:
+ // Invalid processing sequence
+ Stage = EParseStage::Error;
+ break;
+ }
+ }
+ TSocketBuffer::Advance(len);
+}
+
+template <>
+void THttpParser<THttpResponse, TSocketBuffer>::ConnectionClosed() {
+ if (Stage == EParseStage::Done) {
+ return;
+ }
+ if (Stage == EParseStage::Body) {
+ // ?
+ Stage = EParseStage::Done;
+ } else {
+ LastSuccessStage = Stage;
+ Stage = EParseStage::Error;
+ }
+}
+
+THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseString(TStringBuf data) {
+ THttpOutgoingResponsePtr response = new THttpOutgoingResponse(this);
+ response->Append(data);
+ response->Reparse();
+ return response;
+}
+
+THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseOK(TStringBuf body, TStringBuf contentType, TInstant lastModified) {
+ return CreateResponse("200", "OK", contentType, body, lastModified);
+}
+
+THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseBadRequest(TStringBuf html, TStringBuf contentType) {
+ if (html.empty() && IsError()) {
+ contentType = "text/plain";
+ html = GetErrorText();
+ }
+ return CreateResponse("400", "Bad Request", contentType, html);
+}
+
+THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseNotFound(TStringBuf html, TStringBuf contentType) {
+ return CreateResponse("404", "Not Found", contentType, html);
+}
+
+THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseServiceUnavailable(TStringBuf html, TStringBuf contentType) {
+ return CreateResponse("503", "Service Unavailable", contentType, html);
+}
+
+THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseGatewayTimeout(TStringBuf html, TStringBuf contentType) {
+ return CreateResponse("504", "Gateway Timeout", contentType, html);
+}
+
+THttpIncomingResponse::THttpIncomingResponse(THttpOutgoingRequestPtr request)
+ : Request(request)
+{}
+
+THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponse(TStringBuf status, TStringBuf message, TStringBuf contentType, TStringBuf body, TInstant lastModified) {
+ TStringBuf version = Version;
+ if (version != "1.0" && version != "1.1") {
+ version = "1.1";
+ }
+ THttpOutgoingResponsePtr response = new THttpOutgoingResponse(this, "HTTP", version, status, message);
+ response->Set<&THttpResponse::Connection>(GetConnection());
+ if (!WorkerName.empty()) {
+ response->Set("X-Worker-Name", WorkerName);
+ }
+ if (!contentType.empty() && !body.empty()) {
+ response->Set<&THttpResponse::ContentType>(contentType);
+ }
+ if (lastModified) {
+ response->Set<&THttpResponse::LastModified>(lastModified.FormatGmTime("%a, %d %b %Y %H:%M:%S GMT"));
+ }
+ if (response->IsNeedBody() || !body.empty()) {
+ if (Method == "HEAD") {
+ response->Set<&THttpResponse::ContentLength>(ToString(body.size()));
+ } else {
+ response->Set<&THttpResponse::Body>(body);
+ }
+ }
+ return response;
+}
+
+THttpIncomingRequestPtr THttpIncomingRequest::Duplicate() {
+ THttpIncomingRequestPtr request = new THttpIncomingRequest(*this);
+ request->Reparse();
+ request->Timer.Reset();
+ return request;
+}
+
+THttpIncomingResponsePtr THttpIncomingResponse::Duplicate(THttpOutgoingRequestPtr request) {
+ THttpIncomingResponsePtr response = new THttpIncomingResponse(*this);
+ response->Reparse();
+ response->Request = request;
+ return response;
+}
+
+THttpOutgoingResponsePtr THttpOutgoingResponse::Duplicate(THttpIncomingRequestPtr request) {
+ THttpOutgoingResponsePtr response = new THttpOutgoingResponse(*this);
+ response->Reparse();
+ response->Request = request;
+ return response;
+}
+
+
+THttpOutgoingResponsePtr THttpIncomingResponse::Reverse(THttpIncomingRequestPtr request) {
+ THttpOutgoingResponsePtr response = new THttpOutgoingResponse(request);
+ response->Assign(Data(), Size());
+ response->Reparse();
+ return response;
+}
+
+THttpOutgoingRequest::THttpOutgoingRequest(TStringBuf method, TStringBuf scheme, TStringBuf host, TStringBuf uri, TStringBuf protocol, TStringBuf version) {
+ Secure = (scheme == "https");
+ TString urie = UrlEscapeRet(uri);
+ InitRequest(method, urie, protocol, version);
+ if (host) {
+ Set<&THttpRequest::Host>(host);
+ }
+}
+
+THttpOutgoingRequest::THttpOutgoingRequest(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version) {
+ TStringBuf scheme, host, uri;
+ if (!CrackURL(url, scheme, host, uri)) {
+ Y_FAIL("Invalid URL specified");
+ }
+ if (!scheme.empty() && scheme != "http" && scheme != "https") {
+ Y_FAIL("Invalid URL specified");
+ }
+ Secure = (scheme == "https");
+ TString urie = UrlEscapeRet(uri);
+ InitRequest(method, urie, protocol, version);
+ if (host) {
+ Set<&THttpRequest::Host>(host);
+ }
+}
+
+THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestString(const TString& data) {
+ THttpOutgoingRequestPtr request = new THttpOutgoingRequest();
+ request->Assign(data.data(), data.size());
+ request->Reparse();
+ return request;
+}
+
+THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestGet(TStringBuf url) {
+ return CreateRequest("GET", url);
+}
+
+THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestGet(TStringBuf host, TStringBuf uri) {
+ return CreateHttpRequest("GET", host, uri);
+}
+
+THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestPost(TStringBuf url, TStringBuf contentType, TStringBuf body) {
+ return CreateRequest("POST", url, contentType, body);
+}
+
+THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequestPost(TStringBuf host, TStringBuf uri, TStringBuf contentType, TStringBuf body) {
+ return CreateHttpRequest("POST", host, uri, contentType, body);
+}
+
+THttpOutgoingRequestPtr THttpOutgoingRequest::CreateRequest(TStringBuf method, TStringBuf url, TStringBuf contentType, TStringBuf body) {
+ THttpOutgoingRequestPtr request = new THttpOutgoingRequest(method, url, "HTTP", "1.1");
+ request->Set<&THttpRequest::Accept>("*/*");
+ if (!contentType.empty()) {
+ request->Set<&THttpRequest::ContentType>(contentType);
+ request->Set<&THttpRequest::Body>(body);
+ }
+ return request;
+}
+
+THttpOutgoingRequestPtr THttpOutgoingRequest::CreateHttpRequest(TStringBuf method, TStringBuf host, TStringBuf uri, TStringBuf contentType, TStringBuf body) {
+ THttpOutgoingRequestPtr request = new THttpOutgoingRequest(method, "http", host, uri, "HTTP", "1.1");
+ request->Set<&THttpRequest::Accept>("*/*");
+ if (!contentType.empty()) {
+ request->Set<&THttpRequest::ContentType>(contentType);
+ request->Set<&THttpRequest::Body>(body);
+ }
+ return request;
+}
+
+THttpOutgoingRequestPtr THttpOutgoingRequest::Duplicate() {
+ THttpOutgoingRequestPtr request = new THttpOutgoingRequest(*this);
+ request->Reparse();
+ return request;
+}
+
+THttpOutgoingResponse::THttpOutgoingResponse(THttpIncomingRequestPtr request)
+ : Request(request)
+{}
+
+THttpOutgoingResponse::THttpOutgoingResponse(THttpIncomingRequestPtr request, TStringBuf protocol, TStringBuf version, TStringBuf status, TStringBuf message)
+ : Request(request)
+{
+ InitResponse(protocol, version, status, message);
+}
+
+const size_t THttpConfig::BUFFER_MIN_STEP;
+const TDuration THttpConfig::CONNECTION_TIMEOUT;
+
+TUrlParameters::TUrlParameters(TStringBuf url) {
+ TStringBuf base;
+ TStringBuf params;
+ if (url.TrySplit('?', base, params)) {
+ for (TStringBuf param = params.NextTok('&'); !param.empty(); param = params.NextTok('&')) {
+ TStringBuf name = param.NextTok('=');
+ Parameters[name] = param;
+ }
+ }
+}
+
+TString TUrlParameters::operator [](TStringBuf name) const {
+ TString value(Get(name));
+ CGIUnescape(value);
+ return value;
+}
+
+bool TUrlParameters::Has(TStringBuf name) const {
+ return Parameters.count(name) != 0;
+}
+
+TStringBuf TUrlParameters::Get(TStringBuf name) const {
+ auto it = Parameters.find(name);
+ if (it != Parameters.end()) {
+ return it->second;
+ }
+ return TStringBuf();
+}
+
+TString TUrlParameters::Render() const {
+ TStringBuilder parameters;
+ for (const std::pair<TStringBuf, TStringBuf> parameter : Parameters) {
+ if (parameters.empty()) {
+ parameters << '?';
+ } else {
+ parameters << '&';
+ }
+ parameters << parameter.first;
+ parameters << '=';
+ parameters << parameter.second;
+ }
+ return parameters;
+}
+
+TCookies::TCookies(TStringBuf cookie) {
+ for (TStringBuf param = cookie.NextTok(';'); !param.empty(); param = cookie.NextTok(';')) {
+ param.SkipPrefix(" ");
+ TStringBuf name = param.NextTok('=');
+ Cookies[name] = param;
+ }
+}
+
+TStringBuf TCookies::operator [](TStringBuf name) const {
+ return Get(name);
+}
+
+bool TCookies::Has(TStringBuf name) const {
+ return Cookies.count(name) != 0;
+}
+
+TStringBuf TCookies::Get(TStringBuf name) const {
+ auto it = Cookies.find(name);
+ if (it != Cookies.end()) {
+ return it->second;
+ }
+ return TStringBuf();
+}
+
+TString TCookies::Render() const {
+ TStringBuilder cookies;
+ for (const std::pair<TStringBuf, TStringBuf> cookie : Cookies) {
+ if (!cookies.empty()) {
+ cookies << ' ';
+ }
+ cookies << cookie.first;
+ cookies << '=';
+ cookies << cookie.second;
+ cookies << ';';
+ }
+ return cookies;
+}
+
+TCookiesBuilder::TCookiesBuilder()
+ :TCookies(TStringBuf())
+{}
+
+void TCookiesBuilder::Set(TStringBuf name, TStringBuf data) {
+ Data.emplace_back(name, data);
+ Cookies[Data.back().first] = Data.back().second;
+}
+
+THeaders::THeaders(TStringBuf headers) {
+ for (TStringBuf param = headers.NextTok("\r\n"); !param.empty(); param = headers.NextTok("\r\n")) {
+ TStringBuf name = param.NextTok(":");
+ param.SkipPrefix(" ");
+ Headers[name] = param;
+ }
+}
+
+TStringBuf THeaders::operator [](TStringBuf name) const {
+ return Get(name);
+}
+
+bool THeaders::Has(TStringBuf name) const {
+ return Headers.count(name) != 0;
+}
+
+TStringBuf THeaders::Get(TStringBuf name) const {
+ auto it = Headers.find(name);
+ if (it != Headers.end()) {
+ return it->second;
+ }
+ return TStringBuf();
+}
+
+TString THeaders::Render() const {
+ TStringBuilder headers;
+ for (const std::pair<TStringBuf, TStringBuf> header : Headers) {
+ headers << header.first;
+ headers << ": ";
+ headers << header.second;
+ headers << "\r\n";
+ }
+ return headers;
+}
+
+THeadersBuilder::THeadersBuilder()
+ :THeaders(TStringBuf())
+{}
+
+THeadersBuilder::THeadersBuilder(const THeadersBuilder& builder) {
+ for (const auto& pr : builder.Headers) {
+ Set(pr.first, pr.second);
+ }
+}
+
+void THeadersBuilder::Set(TStringBuf name, TStringBuf data) {
+ Data.emplace_back(name, data);
+ Headers[Data.back().first] = Data.back().second;
+}
+
+}
diff --git a/library/cpp/actors/http/http.h b/library/cpp/actors/http/http.h
index 96c5c1ec48..a11d6158ee 100644
--- a/library/cpp/actors/http/http.h
+++ b/library/cpp/actors/http/http.h
@@ -1,703 +1,703 @@
-#pragma once
-#include <util/datetime/base.h>
-#include <util/string/builder.h>
-#include <util/system/thread.h>
-#include <util/system/hp_timer.h>
-#include <util/generic/hash_set.h>
-#include <util/generic/buffer.h>
-#include <util/generic/intrlist.h>
-#include "http_config.h"
-
-// TODO(xenoxeno): hide in implementation
-template <typename Type>
-struct THash<TIntrusivePtr<Type>> {
- size_t operator ()(const TIntrusivePtr<Type>& ptr) const { return reinterpret_cast<size_t>(ptr.Get()); }
-};
-
-template<>
-inline void Out<TSockAddrInet6>(IOutputStream& o, const TSockAddrInet6& x) {
- o << x.ToString();
-}
-
-namespace NHttp {
-
-bool IsIPv6(const TString& host);
-bool CrackURL(TStringBuf url, TStringBuf& scheme, TStringBuf& host, TStringBuf& uri);
-void CrackAddress(const TString& address, TString& hostname, TIpPort& port);
-void TrimBegin(TStringBuf& target, char delim);
-void TrimEnd(TStringBuf& target, char delim);
-void Trim(TStringBuf& target, char delim);
-void TrimEnd(TString& target, char delim);
-
-struct TLessNoCase {
- bool operator()(TStringBuf l, TStringBuf r) const {
- auto ll = l.length();
- auto rl = r.length();
- if (ll != rl) {
- return ll < rl;
- }
- return strnicmp(l.data(), r.data(), ll) < 0;
- }
-};
-
-struct TUrlParameters {
- THashMap<TStringBuf, TStringBuf> Parameters;
-
- TUrlParameters(TStringBuf url);
- TString operator [](TStringBuf name) const;
- bool Has(TStringBuf name) const;
- TStringBuf Get(TStringBuf name) const; // raw
- TString Render() const;
-};
-
-struct TCookies {
- THashMap<TStringBuf, TStringBuf> Cookies;
-
- TCookies(TStringBuf cookie);
- TCookies(const TCookies&) = delete;
- TStringBuf operator [](TStringBuf name) const;
- bool Has(TStringBuf name) const;
- TStringBuf Get(TStringBuf name) const; // raw
- TString Render() const;
-};
-
-struct TCookiesBuilder : TCookies {
+#pragma once
+#include <util/datetime/base.h>
+#include <util/string/builder.h>
+#include <util/system/thread.h>
+#include <util/system/hp_timer.h>
+#include <util/generic/hash_set.h>
+#include <util/generic/buffer.h>
+#include <util/generic/intrlist.h>
+#include "http_config.h"
+
+// TODO(xenoxeno): hide in implementation
+template <typename Type>
+struct THash<TIntrusivePtr<Type>> {
+ size_t operator ()(const TIntrusivePtr<Type>& ptr) const { return reinterpret_cast<size_t>(ptr.Get()); }
+};
+
+template<>
+inline void Out<TSockAddrInet6>(IOutputStream& o, const TSockAddrInet6& x) {
+ o << x.ToString();
+}
+
+namespace NHttp {
+
+bool IsIPv6(const TString& host);
+bool CrackURL(TStringBuf url, TStringBuf& scheme, TStringBuf& host, TStringBuf& uri);
+void CrackAddress(const TString& address, TString& hostname, TIpPort& port);
+void TrimBegin(TStringBuf& target, char delim);
+void TrimEnd(TStringBuf& target, char delim);
+void Trim(TStringBuf& target, char delim);
+void TrimEnd(TString& target, char delim);
+
+struct TLessNoCase {
+ bool operator()(TStringBuf l, TStringBuf r) const {
+ auto ll = l.length();
+ auto rl = r.length();
+ if (ll != rl) {
+ return ll < rl;
+ }
+ return strnicmp(l.data(), r.data(), ll) < 0;
+ }
+};
+
+struct TUrlParameters {
+ THashMap<TStringBuf, TStringBuf> Parameters;
+
+ TUrlParameters(TStringBuf url);
+ TString operator [](TStringBuf name) const;
+ bool Has(TStringBuf name) const;
+ TStringBuf Get(TStringBuf name) const; // raw
+ TString Render() const;
+};
+
+struct TCookies {
+ THashMap<TStringBuf, TStringBuf> Cookies;
+
+ TCookies(TStringBuf cookie);
+ TCookies(const TCookies&) = delete;
+ TStringBuf operator [](TStringBuf name) const;
+ bool Has(TStringBuf name) const;
+ TStringBuf Get(TStringBuf name) const; // raw
+ TString Render() const;
+};
+
+struct TCookiesBuilder : TCookies {
TDeque<std::pair<TString, TString>> Data;
-
- TCookiesBuilder();
- void Set(TStringBuf name, TStringBuf data);
-};
-
-struct THeaders {
- TMap<TStringBuf, TStringBuf, TLessNoCase> Headers;
-
- THeaders() = default;
- THeaders(TStringBuf headers);
- THeaders(const THeaders&) = delete;
- TStringBuf operator [](TStringBuf name) const;
- bool Has(TStringBuf name) const;
- TStringBuf Get(TStringBuf name) const; // raw
- TString Render() const;
-};
-
-struct THeadersBuilder : THeaders {
- TDeque<std::pair<TString, TString>> Data;
-
- THeadersBuilder();
- THeadersBuilder(const THeadersBuilder& builder);
- void Set(TStringBuf name, TStringBuf data);
-};
-
-class TSocketBuffer : public TBuffer, public THttpConfig {
-public:
- TSocketBuffer()
- : TBuffer(BUFFER_SIZE)
- {}
-
- bool EnsureEnoughSpaceAvailable(size_t need) {
- size_t avail = Avail();
- if (avail < need) {
- Reserve(Capacity() + std::max(need, BUFFER_MIN_STEP));
- return false;
- }
- return true;
- }
-};
-
-class THttpRequest {
-public:
- TStringBuf Method;
- TStringBuf URL;
- TStringBuf Protocol;
- TStringBuf Version;
- TStringBuf Headers;
-
- TStringBuf Host;
- TStringBuf Accept;
- TStringBuf Connection;
- TStringBuf ContentType;
- TStringBuf ContentLength;
- TStringBuf TransferEncoding;
-
- TStringBuf Body;
-
- static const TMap<TStringBuf, TStringBuf THttpRequest::*, TLessNoCase> HeadersLocation;
-
- template <TStringBuf THttpRequest::* Header>
- static TStringBuf GetName();
- void Clear();
-};
-
-class THttpResponse {
-public:
- TStringBuf Protocol;
- TStringBuf Version;
- TStringBuf Status;
- TStringBuf Message;
- TStringBuf Headers;
-
- TStringBuf Connection;
- TStringBuf ContentType;
- TStringBuf ContentLength;
- TStringBuf TransferEncoding;
- TStringBuf LastModified;
+
+ TCookiesBuilder();
+ void Set(TStringBuf name, TStringBuf data);
+};
+
+struct THeaders {
+ TMap<TStringBuf, TStringBuf, TLessNoCase> Headers;
+
+ THeaders() = default;
+ THeaders(TStringBuf headers);
+ THeaders(const THeaders&) = delete;
+ TStringBuf operator [](TStringBuf name) const;
+ bool Has(TStringBuf name) const;
+ TStringBuf Get(TStringBuf name) const; // raw
+ TString Render() const;
+};
+
+struct THeadersBuilder : THeaders {
+ TDeque<std::pair<TString, TString>> Data;
+
+ THeadersBuilder();
+ THeadersBuilder(const THeadersBuilder& builder);
+ void Set(TStringBuf name, TStringBuf data);
+};
+
+class TSocketBuffer : public TBuffer, public THttpConfig {
+public:
+ TSocketBuffer()
+ : TBuffer(BUFFER_SIZE)
+ {}
+
+ bool EnsureEnoughSpaceAvailable(size_t need) {
+ size_t avail = Avail();
+ if (avail < need) {
+ Reserve(Capacity() + std::max(need, BUFFER_MIN_STEP));
+ return false;
+ }
+ return true;
+ }
+};
+
+class THttpRequest {
+public:
+ TStringBuf Method;
+ TStringBuf URL;
+ TStringBuf Protocol;
+ TStringBuf Version;
+ TStringBuf Headers;
+
+ TStringBuf Host;
+ TStringBuf Accept;
+ TStringBuf Connection;
+ TStringBuf ContentType;
+ TStringBuf ContentLength;
+ TStringBuf TransferEncoding;
+
+ TStringBuf Body;
+
+ static const TMap<TStringBuf, TStringBuf THttpRequest::*, TLessNoCase> HeadersLocation;
+
+ template <TStringBuf THttpRequest::* Header>
+ static TStringBuf GetName();
+ void Clear();
+};
+
+class THttpResponse {
+public:
+ TStringBuf Protocol;
+ TStringBuf Version;
+ TStringBuf Status;
+ TStringBuf Message;
+ TStringBuf Headers;
+
+ TStringBuf Connection;
+ TStringBuf ContentType;
+ TStringBuf ContentLength;
+ TStringBuf TransferEncoding;
+ TStringBuf LastModified;
TStringBuf ContentEncoding;
-
- TStringBuf Body;
-
- static const TMap<TStringBuf, TStringBuf THttpResponse::*, TLessNoCase> HeadersLocation;
-
- template <TStringBuf THttpResponse::* Header>
- static TStringBuf GetName();
- void Clear();
-};
-
-template <typename HeaderType, typename BufferType>
-class THttpParser : public HeaderType, public BufferType {
-public:
- enum class EParseStage : ui8 {
- Method,
- URL,
- Protocol,
- Version,
- Status,
- Message,
- Header,
- Body,
- ChunkLength,
- ChunkData,
- Done,
- Error,
- };
-
- static constexpr size_t MaxMethodSize = 6;
- static constexpr size_t MaxURLSize = 1024;
- static constexpr size_t MaxProtocolSize = 4;
- static constexpr size_t MaxVersionSize = 4;
- static constexpr size_t MaxStatusSize = 3;
- static constexpr size_t MaxMessageSize = 1024;
+
+ TStringBuf Body;
+
+ static const TMap<TStringBuf, TStringBuf THttpResponse::*, TLessNoCase> HeadersLocation;
+
+ template <TStringBuf THttpResponse::* Header>
+ static TStringBuf GetName();
+ void Clear();
+};
+
+template <typename HeaderType, typename BufferType>
+class THttpParser : public HeaderType, public BufferType {
+public:
+ enum class EParseStage : ui8 {
+ Method,
+ URL,
+ Protocol,
+ Version,
+ Status,
+ Message,
+ Header,
+ Body,
+ ChunkLength,
+ ChunkData,
+ Done,
+ Error,
+ };
+
+ static constexpr size_t MaxMethodSize = 6;
+ static constexpr size_t MaxURLSize = 1024;
+ static constexpr size_t MaxProtocolSize = 4;
+ static constexpr size_t MaxVersionSize = 4;
+ static constexpr size_t MaxStatusSize = 3;
+ static constexpr size_t MaxMessageSize = 1024;
static constexpr size_t MaxHeaderSize = 8192;
- static constexpr size_t MaxChunkLengthSize = 8;
- static constexpr size_t MaxChunkSize = 256 * 1024 * 1024;
- static constexpr size_t MaxChunkContentSize = 1 * 1024 * 1024 * 1024;
-
- EParseStage Stage;
- EParseStage LastSuccessStage;
- TStringBuf Line;
- TStringBuf& Header = Line;
- size_t ChunkLength = 0;
- size_t ContentSize = 0;
- TString Content;
-
- THttpParser(const THttpParser& src)
- : HeaderType(src)
- , BufferType(src)
- , Stage(src.Stage)
- , LastSuccessStage(src.LastSuccessStage)
- , Line()
- , Header(Line)
- , ChunkLength(src.ChunkLength)
- , ContentSize(src.ContentSize)
- , Content(src.Content)
- {}
-
- template <typename StringType>
- bool ProcessData(StringType& target, TStringBuf& source, char delim, size_t maxLen) {
- TStringBuf maxSource(source.substr(0, maxLen + 1 - target.size()));
- size_t pos = maxSource.find(delim);
- target += maxSource.substr(0, pos);
- source.Skip(pos);
- if (target.size() > maxLen) {
- Stage = EParseStage::Error;
- return false;
- }
- if (!source.empty() && *source.begin() == delim) {
- source.Skip(1);
- }
- return pos != TStringBuf::npos;
- }
-
- template <typename StringType>
- bool ProcessData(StringType& target, TStringBuf& source, TStringBuf delim, size_t maxLen) {
- if (delim.empty()) {
- return false;
- }
- if (delim.size() == 1) {
- return ProcessData(target, source, delim[0], maxLen);
- }
+ static constexpr size_t MaxChunkLengthSize = 8;
+ static constexpr size_t MaxChunkSize = 256 * 1024 * 1024;
+ static constexpr size_t MaxChunkContentSize = 1 * 1024 * 1024 * 1024;
+
+ EParseStage Stage;
+ EParseStage LastSuccessStage;
+ TStringBuf Line;
+ TStringBuf& Header = Line;
+ size_t ChunkLength = 0;
+ size_t ContentSize = 0;
+ TString Content;
+
+ THttpParser(const THttpParser& src)
+ : HeaderType(src)
+ , BufferType(src)
+ , Stage(src.Stage)
+ , LastSuccessStage(src.LastSuccessStage)
+ , Line()
+ , Header(Line)
+ , ChunkLength(src.ChunkLength)
+ , ContentSize(src.ContentSize)
+ , Content(src.Content)
+ {}
+
+ template <typename StringType>
+ bool ProcessData(StringType& target, TStringBuf& source, char delim, size_t maxLen) {
+ TStringBuf maxSource(source.substr(0, maxLen + 1 - target.size()));
+ size_t pos = maxSource.find(delim);
+ target += maxSource.substr(0, pos);
+ source.Skip(pos);
+ if (target.size() > maxLen) {
+ Stage = EParseStage::Error;
+ return false;
+ }
+ if (!source.empty() && *source.begin() == delim) {
+ source.Skip(1);
+ }
+ return pos != TStringBuf::npos;
+ }
+
+ template <typename StringType>
+ bool ProcessData(StringType& target, TStringBuf& source, TStringBuf delim, size_t maxLen) {
+ if (delim.empty()) {
+ return false;
+ }
+ if (delim.size() == 1) {
+ return ProcessData(target, source, delim[0], maxLen);
+ }
if (ProcessData(target, source, delim.back(), maxLen + 1)) {
- for (signed i = delim.size() - 2; i >= 0; --i) {
- TrimEnd(target, delim[i]);
- }
- return true;
- }
- return false;
- }
-
- template <typename StringType>
- bool ProcessData(StringType& target, TStringBuf& source, size_t size) {
- TStringBuf maxSource(source.substr(0, size - target.size()));
- target += maxSource;
- source.Skip(maxSource.size());
- if (target.size() > size && !source.empty()) {
- Stage = EParseStage::Error;
- return false;
- }
- return target.size() == size;
- }
-
- void ProcessHeader(TStringBuf& header) {
- TStringBuf name = header.NextTok(':');
- TrimBegin(name, ' ');
- TStringBuf value = header;
- Trim(value, ' ');
- auto cit = HeaderType::HeadersLocation.find(name);
- if (cit != HeaderType::HeadersLocation.end()) {
- this->*cit->second = value;
- }
- header.Clear();
- }
-
- size_t ParseHex(TStringBuf value) {
- size_t result = 0;
- for (char ch : value) {
- if (ch >= '0' && ch <= '9') {
- result *= 16;
- result += ch - '0';
- } else if (ch >= 'a' && ch <= 'f') {
- result *= 16;
- result += 10 + ch - 'a';
- } else if (ch >= 'A' && ch <= 'F') {
- result *= 16;
- result += 10 + ch - 'A';
- } else if (ch == ';') {
- break;
- } else if (isspace(ch)) {
- continue;
- } else {
- Stage = EParseStage::Error;
- return 0;
- }
- }
- return result;
- }
-
- void Advance(size_t len);
- void ConnectionClosed();
-
- void Clear() {
- BufferType::Clear();
- HeaderType::Clear();
- Stage = GetInitialStage();
- Line.Clear();
- Content.clear();
- }
-
- bool IsReady() const {
- return Stage == EParseStage::Done;
- }
-
- bool IsError() const {
- return Stage == EParseStage::Error;
- }
-
- TStringBuf GetErrorText() const {
- switch (LastSuccessStage) {
- case EParseStage::Method:
- return "Invalid http method";
- case EParseStage::URL:
- return "Invalid url";
- case EParseStage::Protocol:
- return "Invalid http protocol";
- case EParseStage::Version:
- return "Invalid http version";
- case EParseStage::Status:
- return "Invalid http status";
- case EParseStage::Message:
- return "Invalid http message";
- case EParseStage::Header:
- return "Invalid http header";
- case EParseStage::Body:
- return "Invalid content body";
- case EParseStage::ChunkLength:
- case EParseStage::ChunkData:
- return "Broken chunked data";
- case EParseStage::Done:
- return "Everything is fine";
- case EParseStage::Error:
- return "Error on error"; // wat? ...because we don't want to include default label here
- }
- }
-
- bool IsDone() const {
- return IsReady() || IsError();
- }
-
- bool HaveBody() const {
- return !HeaderType::ContentType.empty() || !HeaderType::ContentLength.empty() || !HeaderType::TransferEncoding.empty();
- }
-
- bool EnsureEnoughSpaceAvailable(size_t need = BufferType::BUFFER_MIN_STEP) {
- bool result = BufferType::EnsureEnoughSpaceAvailable(need);
- if (!result && !BufferType::Empty()) {
- Reparse();
- }
- return true;
- }
-
- void Reparse() {
- size_t size = BufferType::Size();
- Clear();
- Advance(size);
- }
-
- TStringBuf GetRawData() const {
- return TStringBuf(BufferType::Data(), BufferType::Size());
- }
-
- TString GetObfuscatedData() const {
- THeaders headers(HeaderType::Headers);
- TStringBuf authorization(headers["Authorization"]);
- TStringBuf cookie(headers["Cookie"]);
- TStringBuf x_ydb_auth_ticket(headers["x-ydb-auth-ticket"]);
- TStringBuf x_yacloud_subjecttoken(headers["x-yacloud-subjecttoken"]);
- TString data(GetRawData());
- if (!authorization.empty()) {
- auto pos = data.find(authorization);
- if (pos != TString::npos) {
- data.replace(pos, authorization.size(), TString("<obfuscated>"));
- }
- }
- if (!cookie.empty()) {
- auto pos = data.find(cookie);
- if (pos != TString::npos) {
- data.replace(pos, cookie.size(), TString("<obfuscated>"));
- }
- }
- if (!x_ydb_auth_ticket.empty()) {
- auto pos = data.find(x_ydb_auth_ticket);
- if (pos != TString::npos) {
- data.replace(pos, x_ydb_auth_ticket.size(), TString("<obfuscated>"));
- }
- }
- if (!x_yacloud_subjecttoken.empty()) {
- auto pos = data.find(x_yacloud_subjecttoken);
- if (pos != TString::npos) {
- data.replace(pos, x_yacloud_subjecttoken.size(), TString("<obfuscated>"));
- }
- }
- return data;
- }
-
- static EParseStage GetInitialStage();
-
- THttpParser()
- : Stage(GetInitialStage())
- , LastSuccessStage(Stage)
- {}
-};
-
-template <typename HeaderType, typename BufferType>
-class THttpRenderer : public HeaderType, public BufferType {
-public:
- enum class ERenderStage {
- Init,
- Header,
- Body,
- Done,
- Error,
- };
-
- ERenderStage Stage = ERenderStage::Init;
-
- void Append(TStringBuf text) {
- EnsureEnoughSpaceAvailable(text.size());
- BufferType::Append(text.data(), text.size());
- }
-
- void Append(char c) {
- EnsureEnoughSpaceAvailable(sizeof(c));
- BufferType::Append(c);
- }
-
- template <TStringBuf HeaderType::* string>
- void AppendParsedValue(TStringBuf value) {
- Append(value);
- static_cast<HeaderType*>(this)->*string = TStringBuf(BufferType::Pos() - value.size(), value.size());
- }
-
- template <TStringBuf HeaderType::* name>
- void Set(TStringBuf value) {
- Y_VERIFY_DEBUG(Stage == ERenderStage::Header);
- Append(HeaderType::template GetName<name>());
- Append(": ");
- AppendParsedValue<name>(value);
- Append("\r\n");
- HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data());
- }
-
- void Set(TStringBuf name, TStringBuf value) {
- Y_VERIFY_DEBUG(Stage == ERenderStage::Header);
- Append(name);
- Append(": ");
- Append(value);
- Append("\r\n");
- HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data());
- }
-
- void Set(const THeaders& headers) {
- Y_VERIFY_DEBUG(Stage == ERenderStage::Header);
- Append(headers.Render());
- HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data());
- }
-
- //THttpRenderer(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version); // request
- void InitRequest(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version) {
- Y_VERIFY_DEBUG(Stage == ERenderStage::Init);
- AppendParsedValue<&THttpRequest::Method>(method);
- Append(' ');
- AppendParsedValue<&THttpRequest::URL>(url);
- Append(' ');
- AppendParsedValue<&THttpRequest::Protocol>(protocol);
- Append('/');
- AppendParsedValue<&THttpRequest::Version>(version);
- Append("\r\n");
- Stage = ERenderStage::Header;
- HeaderType::Headers = TStringBuf(BufferType::Pos(), size_t(0));
- }
-
- //THttpRenderer(TStringBuf protocol, TStringBuf version, TStringBuf status, TStringBuf message); // response
- void InitResponse(TStringBuf protocol, TStringBuf version, TStringBuf status, TStringBuf message) {
- Y_VERIFY_DEBUG(Stage == ERenderStage::Init);
- AppendParsedValue<&THttpResponse::Protocol>(protocol);
- Append('/');
- AppendParsedValue<&THttpResponse::Version>(version);
- Append(' ');
- AppendParsedValue<&THttpResponse::Status>(status);
- Append(' ');
- AppendParsedValue<&THttpResponse::Message>(message);
- Append("\r\n");
- Stage = ERenderStage::Header;
- HeaderType::Headers = TStringBuf(BufferType::Pos(), size_t(0));
- }
-
- void FinishHeader() {
- Append("\r\n");
- HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data());
- Stage = ERenderStage::Body;
- }
-
- void SetBody(TStringBuf body) {
- Y_VERIFY_DEBUG(Stage == ERenderStage::Header);
- if (HeaderType::ContentLength.empty()) {
- Set<&HeaderType::ContentLength>(ToString(body.size()));
- }
- FinishHeader();
- AppendParsedValue<&HeaderType::Body>(body);
- Stage = ERenderStage::Done;
- }
-
- bool IsDone() const {
- return Stage == ERenderStage::Done;
- }
-
- void Finish() {
- switch (Stage) {
- case ERenderStage::Header:
- FinishHeader();
- break;
- default:
- break;
- }
- }
-
- bool EnsureEnoughSpaceAvailable(size_t need = BufferType::BUFFER_MIN_STEP) {
- bool result = BufferType::EnsureEnoughSpaceAvailable(need);
- if (!result && !BufferType::Empty()) {
- Reparse();
- }
- return true;
- }
-
- void Clear() {
- BufferType::Clear();
- HeaderType::Clear();
- }
-
- void Reparse() {
- // move-magic
- size_t size = BufferType::Size();
- THttpParser<HeaderType, BufferType> parser;
- // move the buffer to parser
- static_cast<BufferType&>(parser) = std::move(static_cast<BufferType&>(*this));
- // reparse
- parser.Clear();
- parser.Advance(size);
- // move buffer and result back
- static_cast<HeaderType&>(*this) = std::move(static_cast<HeaderType&>(parser));
- static_cast<BufferType&>(*this) = std::move(static_cast<BufferType&>(parser));
- switch (parser.Stage) {
- case THttpParser<HeaderType, BufferType>::EParseStage::Method:
- case THttpParser<HeaderType, BufferType>::EParseStage::URL:
- case THttpParser<HeaderType, BufferType>::EParseStage::Protocol:
- case THttpParser<HeaderType, BufferType>::EParseStage::Version:
- case THttpParser<HeaderType, BufferType>::EParseStage::Status:
- case THttpParser<HeaderType, BufferType>::EParseStage::Message:
- Stage = ERenderStage::Init;
- break;
- case THttpParser<HeaderType, BufferType>::EParseStage::Header:
- Stage = ERenderStage::Header;
+ for (signed i = delim.size() - 2; i >= 0; --i) {
+ TrimEnd(target, delim[i]);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ template <typename StringType>
+ bool ProcessData(StringType& target, TStringBuf& source, size_t size) {
+ TStringBuf maxSource(source.substr(0, size - target.size()));
+ target += maxSource;
+ source.Skip(maxSource.size());
+ if (target.size() > size && !source.empty()) {
+ Stage = EParseStage::Error;
+ return false;
+ }
+ return target.size() == size;
+ }
+
+ void ProcessHeader(TStringBuf& header) {
+ TStringBuf name = header.NextTok(':');
+ TrimBegin(name, ' ');
+ TStringBuf value = header;
+ Trim(value, ' ');
+ auto cit = HeaderType::HeadersLocation.find(name);
+ if (cit != HeaderType::HeadersLocation.end()) {
+ this->*cit->second = value;
+ }
+ header.Clear();
+ }
+
+ size_t ParseHex(TStringBuf value) {
+ size_t result = 0;
+ for (char ch : value) {
+ if (ch >= '0' && ch <= '9') {
+ result *= 16;
+ result += ch - '0';
+ } else if (ch >= 'a' && ch <= 'f') {
+ result *= 16;
+ result += 10 + ch - 'a';
+ } else if (ch >= 'A' && ch <= 'F') {
+ result *= 16;
+ result += 10 + ch - 'A';
+ } else if (ch == ';') {
+ break;
+ } else if (isspace(ch)) {
+ continue;
+ } else {
+ Stage = EParseStage::Error;
+ return 0;
+ }
+ }
+ return result;
+ }
+
+ void Advance(size_t len);
+ void ConnectionClosed();
+
+ void Clear() {
+ BufferType::Clear();
+ HeaderType::Clear();
+ Stage = GetInitialStage();
+ Line.Clear();
+ Content.clear();
+ }
+
+ bool IsReady() const {
+ return Stage == EParseStage::Done;
+ }
+
+ bool IsError() const {
+ return Stage == EParseStage::Error;
+ }
+
+ TStringBuf GetErrorText() const {
+ switch (LastSuccessStage) {
+ case EParseStage::Method:
+ return "Invalid http method";
+ case EParseStage::URL:
+ return "Invalid url";
+ case EParseStage::Protocol:
+ return "Invalid http protocol";
+ case EParseStage::Version:
+ return "Invalid http version";
+ case EParseStage::Status:
+ return "Invalid http status";
+ case EParseStage::Message:
+ return "Invalid http message";
+ case EParseStage::Header:
+ return "Invalid http header";
+ case EParseStage::Body:
+ return "Invalid content body";
+ case EParseStage::ChunkLength:
+ case EParseStage::ChunkData:
+ return "Broken chunked data";
+ case EParseStage::Done:
+ return "Everything is fine";
+ case EParseStage::Error:
+ return "Error on error"; // wat? ...because we don't want to include default label here
+ }
+ }
+
+ bool IsDone() const {
+ return IsReady() || IsError();
+ }
+
+ bool HaveBody() const {
+ return !HeaderType::ContentType.empty() || !HeaderType::ContentLength.empty() || !HeaderType::TransferEncoding.empty();
+ }
+
+ bool EnsureEnoughSpaceAvailable(size_t need = BufferType::BUFFER_MIN_STEP) {
+ bool result = BufferType::EnsureEnoughSpaceAvailable(need);
+ if (!result && !BufferType::Empty()) {
+ Reparse();
+ }
+ return true;
+ }
+
+ void Reparse() {
+ size_t size = BufferType::Size();
+ Clear();
+ Advance(size);
+ }
+
+ TStringBuf GetRawData() const {
+ return TStringBuf(BufferType::Data(), BufferType::Size());
+ }
+
+ TString GetObfuscatedData() const {
+ THeaders headers(HeaderType::Headers);
+ TStringBuf authorization(headers["Authorization"]);
+ TStringBuf cookie(headers["Cookie"]);
+ TStringBuf x_ydb_auth_ticket(headers["x-ydb-auth-ticket"]);
+ TStringBuf x_yacloud_subjecttoken(headers["x-yacloud-subjecttoken"]);
+ TString data(GetRawData());
+ if (!authorization.empty()) {
+ auto pos = data.find(authorization);
+ if (pos != TString::npos) {
+ data.replace(pos, authorization.size(), TString("<obfuscated>"));
+ }
+ }
+ if (!cookie.empty()) {
+ auto pos = data.find(cookie);
+ if (pos != TString::npos) {
+ data.replace(pos, cookie.size(), TString("<obfuscated>"));
+ }
+ }
+ if (!x_ydb_auth_ticket.empty()) {
+ auto pos = data.find(x_ydb_auth_ticket);
+ if (pos != TString::npos) {
+ data.replace(pos, x_ydb_auth_ticket.size(), TString("<obfuscated>"));
+ }
+ }
+ if (!x_yacloud_subjecttoken.empty()) {
+ auto pos = data.find(x_yacloud_subjecttoken);
+ if (pos != TString::npos) {
+ data.replace(pos, x_yacloud_subjecttoken.size(), TString("<obfuscated>"));
+ }
+ }
+ return data;
+ }
+
+ static EParseStage GetInitialStage();
+
+ THttpParser()
+ : Stage(GetInitialStage())
+ , LastSuccessStage(Stage)
+ {}
+};
+
+template <typename HeaderType, typename BufferType>
+class THttpRenderer : public HeaderType, public BufferType {
+public:
+ enum class ERenderStage {
+ Init,
+ Header,
+ Body,
+ Done,
+ Error,
+ };
+
+ ERenderStage Stage = ERenderStage::Init;
+
+ void Append(TStringBuf text) {
+ EnsureEnoughSpaceAvailable(text.size());
+ BufferType::Append(text.data(), text.size());
+ }
+
+ void Append(char c) {
+ EnsureEnoughSpaceAvailable(sizeof(c));
+ BufferType::Append(c);
+ }
+
+ template <TStringBuf HeaderType::* string>
+ void AppendParsedValue(TStringBuf value) {
+ Append(value);
+ static_cast<HeaderType*>(this)->*string = TStringBuf(BufferType::Pos() - value.size(), value.size());
+ }
+
+ template <TStringBuf HeaderType::* name>
+ void Set(TStringBuf value) {
+ Y_VERIFY_DEBUG(Stage == ERenderStage::Header);
+ Append(HeaderType::template GetName<name>());
+ Append(": ");
+ AppendParsedValue<name>(value);
+ Append("\r\n");
+ HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data());
+ }
+
+ void Set(TStringBuf name, TStringBuf value) {
+ Y_VERIFY_DEBUG(Stage == ERenderStage::Header);
+ Append(name);
+ Append(": ");
+ Append(value);
+ Append("\r\n");
+ HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data());
+ }
+
+ void Set(const THeaders& headers) {
+ Y_VERIFY_DEBUG(Stage == ERenderStage::Header);
+ Append(headers.Render());
+ HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data());
+ }
+
+ //THttpRenderer(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version); // request
+ void InitRequest(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version) {
+ Y_VERIFY_DEBUG(Stage == ERenderStage::Init);
+ AppendParsedValue<&THttpRequest::Method>(method);
+ Append(' ');
+ AppendParsedValue<&THttpRequest::URL>(url);
+ Append(' ');
+ AppendParsedValue<&THttpRequest::Protocol>(protocol);
+ Append('/');
+ AppendParsedValue<&THttpRequest::Version>(version);
+ Append("\r\n");
+ Stage = ERenderStage::Header;
+ HeaderType::Headers = TStringBuf(BufferType::Pos(), size_t(0));
+ }
+
+ //THttpRenderer(TStringBuf protocol, TStringBuf version, TStringBuf status, TStringBuf message); // response
+ void InitResponse(TStringBuf protocol, TStringBuf version, TStringBuf status, TStringBuf message) {
+ Y_VERIFY_DEBUG(Stage == ERenderStage::Init);
+ AppendParsedValue<&THttpResponse::Protocol>(protocol);
+ Append('/');
+ AppendParsedValue<&THttpResponse::Version>(version);
+ Append(' ');
+ AppendParsedValue<&THttpResponse::Status>(status);
+ Append(' ');
+ AppendParsedValue<&THttpResponse::Message>(message);
+ Append("\r\n");
+ Stage = ERenderStage::Header;
+ HeaderType::Headers = TStringBuf(BufferType::Pos(), size_t(0));
+ }
+
+ void FinishHeader() {
+ Append("\r\n");
+ HeaderType::Headers = TStringBuf(HeaderType::Headers.Data(), BufferType::Pos() - HeaderType::Headers.Data());
+ Stage = ERenderStage::Body;
+ }
+
+ void SetBody(TStringBuf body) {
+ Y_VERIFY_DEBUG(Stage == ERenderStage::Header);
+ if (HeaderType::ContentLength.empty()) {
+ Set<&HeaderType::ContentLength>(ToString(body.size()));
+ }
+ FinishHeader();
+ AppendParsedValue<&HeaderType::Body>(body);
+ Stage = ERenderStage::Done;
+ }
+
+ bool IsDone() const {
+ return Stage == ERenderStage::Done;
+ }
+
+ void Finish() {
+ switch (Stage) {
+ case ERenderStage::Header:
+ FinishHeader();
break;
- case THttpParser<HeaderType, BufferType>::EParseStage::Body:
- case THttpParser<HeaderType, BufferType>::EParseStage::ChunkLength:
- case THttpParser<HeaderType, BufferType>::EParseStage::ChunkData:
- Stage = ERenderStage::Body;
- break;
- case THttpParser<HeaderType, BufferType>::EParseStage::Done:
- Stage = ERenderStage::Done;
- break;
- case THttpParser<HeaderType, BufferType>::EParseStage::Error:
- Stage = ERenderStage::Error;
- break;
- }
- Y_VERIFY(size == BufferType::Size());
- }
-
- TStringBuf GetRawData() const {
- return TStringBuf(BufferType::Data(), BufferType::Size());
- }
-};
-
-template <>
-template <>
-inline void THttpRenderer<THttpResponse, TSocketBuffer>::Set<&THttpResponse::Body>(TStringBuf value) {
- SetBody(value);
-}
-
-template <>
-template <>
-inline void THttpRenderer<THttpRequest, TSocketBuffer>::Set<&THttpRequest::Body>(TStringBuf value) {
- SetBody(value);
-}
-
-class THttpIncomingRequest;
-using THttpIncomingRequestPtr = TIntrusivePtr<THttpIncomingRequest>;
-
-class THttpOutgoingResponse;
-using THttpOutgoingResponsePtr = TIntrusivePtr<THttpOutgoingResponse>;
-
-class THttpIncomingRequest :
- public THttpParser<THttpRequest, TSocketBuffer>,
- public TRefCounted<THttpIncomingRequest, TAtomicCounter> {
-public:
- THttpConfig::SocketAddressType Address;
- TString WorkerName;
- THPTimer Timer;
- bool Secure = false;
-
- bool IsConnectionClose() const {
- if (Connection.empty()) {
- return Version == "1.0";
- } else {
- return Connection == "close";
- }
- }
-
- TStringBuf GetConnection() const {
- if (!Connection.empty()) {
- return Connection;
- }
- return Version == "1.0" ? "close" : "keep-alive";
- }
-
- THttpOutgoingResponsePtr CreateResponseOK(TStringBuf body, TStringBuf contentType = "text/html", TInstant lastModified = TInstant());
- THttpOutgoingResponsePtr CreateResponseString(TStringBuf data);
- THttpOutgoingResponsePtr CreateResponseBadRequest(TStringBuf html = TStringBuf(), TStringBuf contentType = "text/html"); // 400
- THttpOutgoingResponsePtr CreateResponseNotFound(TStringBuf html = TStringBuf(), TStringBuf contentType = "text/html"); // 404
- THttpOutgoingResponsePtr CreateResponseServiceUnavailable(TStringBuf html = TStringBuf(), TStringBuf contentType = "text/html"); // 503
- THttpOutgoingResponsePtr CreateResponseGatewayTimeout(TStringBuf html = TStringBuf(), TStringBuf contentType = "text/html"); // 504
- THttpOutgoingResponsePtr CreateResponse(
- TStringBuf status,
- TStringBuf message,
- TStringBuf contentType = TStringBuf(),
- TStringBuf body = TStringBuf(),
- TInstant lastModified = TInstant());
-
- THttpIncomingRequestPtr Duplicate();
-};
-
-class THttpIncomingResponse;
-using THttpIncomingResponsePtr = TIntrusivePtr<THttpIncomingResponse>;
-
-class THttpOutgoingRequest;
-using THttpOutgoingRequestPtr = TIntrusivePtr<THttpOutgoingRequest>;
-
-class THttpIncomingResponse :
- public THttpParser<THttpResponse, TSocketBuffer>,
- public TRefCounted<THttpIncomingResponse, TAtomicCounter> {
-public:
- THttpIncomingResponse(THttpOutgoingRequestPtr request);
-
- THttpOutgoingRequestPtr GetRequest() const {
- return Request;
- }
-
- THttpIncomingResponsePtr Duplicate(THttpOutgoingRequestPtr request);
- THttpOutgoingResponsePtr Reverse(THttpIncomingRequestPtr request);
-
-protected:
- THttpOutgoingRequestPtr Request;
-};
-
-class THttpOutgoingRequest :
- public THttpRenderer<THttpRequest, TSocketBuffer>,
- public TRefCounted<THttpOutgoingRequest, TAtomicCounter> {
-public:
- THPTimer Timer;
+ default:
+ break;
+ }
+ }
+
+ bool EnsureEnoughSpaceAvailable(size_t need = BufferType::BUFFER_MIN_STEP) {
+ bool result = BufferType::EnsureEnoughSpaceAvailable(need);
+ if (!result && !BufferType::Empty()) {
+ Reparse();
+ }
+ return true;
+ }
+
+ void Clear() {
+ BufferType::Clear();
+ HeaderType::Clear();
+ }
+
+ void Reparse() {
+ // move-magic
+ size_t size = BufferType::Size();
+ THttpParser<HeaderType, BufferType> parser;
+ // move the buffer to parser
+ static_cast<BufferType&>(parser) = std::move(static_cast<BufferType&>(*this));
+ // reparse
+ parser.Clear();
+ parser.Advance(size);
+ // move buffer and result back
+ static_cast<HeaderType&>(*this) = std::move(static_cast<HeaderType&>(parser));
+ static_cast<BufferType&>(*this) = std::move(static_cast<BufferType&>(parser));
+ switch (parser.Stage) {
+ case THttpParser<HeaderType, BufferType>::EParseStage::Method:
+ case THttpParser<HeaderType, BufferType>::EParseStage::URL:
+ case THttpParser<HeaderType, BufferType>::EParseStage::Protocol:
+ case THttpParser<HeaderType, BufferType>::EParseStage::Version:
+ case THttpParser<HeaderType, BufferType>::EParseStage::Status:
+ case THttpParser<HeaderType, BufferType>::EParseStage::Message:
+ Stage = ERenderStage::Init;
+ break;
+ case THttpParser<HeaderType, BufferType>::EParseStage::Header:
+ Stage = ERenderStage::Header;
+ break;
+ case THttpParser<HeaderType, BufferType>::EParseStage::Body:
+ case THttpParser<HeaderType, BufferType>::EParseStage::ChunkLength:
+ case THttpParser<HeaderType, BufferType>::EParseStage::ChunkData:
+ Stage = ERenderStage::Body;
+ break;
+ case THttpParser<HeaderType, BufferType>::EParseStage::Done:
+ Stage = ERenderStage::Done;
+ break;
+ case THttpParser<HeaderType, BufferType>::EParseStage::Error:
+ Stage = ERenderStage::Error;
+ break;
+ }
+ Y_VERIFY(size == BufferType::Size());
+ }
+
+ TStringBuf GetRawData() const {
+ return TStringBuf(BufferType::Data(), BufferType::Size());
+ }
+};
+
+template <>
+template <>
+inline void THttpRenderer<THttpResponse, TSocketBuffer>::Set<&THttpResponse::Body>(TStringBuf value) {
+ SetBody(value);
+}
+
+template <>
+template <>
+inline void THttpRenderer<THttpRequest, TSocketBuffer>::Set<&THttpRequest::Body>(TStringBuf value) {
+ SetBody(value);
+}
+
+class THttpIncomingRequest;
+using THttpIncomingRequestPtr = TIntrusivePtr<THttpIncomingRequest>;
+
+class THttpOutgoingResponse;
+using THttpOutgoingResponsePtr = TIntrusivePtr<THttpOutgoingResponse>;
+
+class THttpIncomingRequest :
+ public THttpParser<THttpRequest, TSocketBuffer>,
+ public TRefCounted<THttpIncomingRequest, TAtomicCounter> {
+public:
+ THttpConfig::SocketAddressType Address;
+ TString WorkerName;
+ THPTimer Timer;
bool Secure = false;
-
- THttpOutgoingRequest() = default;
- THttpOutgoingRequest(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version);
- THttpOutgoingRequest(TStringBuf method, TStringBuf scheme, TStringBuf host, TStringBuf uri, TStringBuf protocol, TStringBuf version);
- static THttpOutgoingRequestPtr CreateRequestString(TStringBuf data);
- static THttpOutgoingRequestPtr CreateRequestString(const TString& data);
- static THttpOutgoingRequestPtr CreateRequestGet(TStringBuf url);
- static THttpOutgoingRequestPtr CreateRequestGet(TStringBuf host, TStringBuf uri); // http only
- static THttpOutgoingRequestPtr CreateRequestPost(TStringBuf url, TStringBuf contentType = {}, TStringBuf body = {});
- static THttpOutgoingRequestPtr CreateRequestPost(TStringBuf host, TStringBuf uri, TStringBuf contentType, TStringBuf body); // http only
- static THttpOutgoingRequestPtr CreateRequest(TStringBuf method, TStringBuf url, TStringBuf contentType = TStringBuf(), TStringBuf body = TStringBuf());
- static THttpOutgoingRequestPtr CreateHttpRequest(TStringBuf method, TStringBuf host, TStringBuf uri, TStringBuf contentType = TStringBuf(), TStringBuf body = TStringBuf());
- THttpOutgoingRequestPtr Duplicate();
-};
-
-class THttpOutgoingResponse :
- public THttpRenderer<THttpResponse, TSocketBuffer>,
- public TRefCounted<THttpOutgoingResponse, TAtomicCounter> {
-public:
- THttpOutgoingResponse(THttpIncomingRequestPtr request);
- THttpOutgoingResponse(THttpIncomingRequestPtr request, TStringBuf protocol, TStringBuf version, TStringBuf status, TStringBuf message);
-
- bool IsConnectionClose() const {
- if (!Connection.empty()) {
- return Connection == "close";
- } else {
- return Request->IsConnectionClose();
- }
- }
-
- bool IsNeedBody() const {
- return Status != "204";
- }
-
- THttpIncomingRequestPtr GetRequest() const {
- return Request;
- }
-
- THttpOutgoingResponsePtr Duplicate(THttpIncomingRequestPtr request);
-
-// it's temporary accessible for cleanup
-//protected:
- THttpIncomingRequestPtr Request;
-};
-
-}
+
+ bool IsConnectionClose() const {
+ if (Connection.empty()) {
+ return Version == "1.0";
+ } else {
+ return Connection == "close";
+ }
+ }
+
+ TStringBuf GetConnection() const {
+ if (!Connection.empty()) {
+ return Connection;
+ }
+ return Version == "1.0" ? "close" : "keep-alive";
+ }
+
+ THttpOutgoingResponsePtr CreateResponseOK(TStringBuf body, TStringBuf contentType = "text/html", TInstant lastModified = TInstant());
+ THttpOutgoingResponsePtr CreateResponseString(TStringBuf data);
+ THttpOutgoingResponsePtr CreateResponseBadRequest(TStringBuf html = TStringBuf(), TStringBuf contentType = "text/html"); // 400
+ THttpOutgoingResponsePtr CreateResponseNotFound(TStringBuf html = TStringBuf(), TStringBuf contentType = "text/html"); // 404
+ THttpOutgoingResponsePtr CreateResponseServiceUnavailable(TStringBuf html = TStringBuf(), TStringBuf contentType = "text/html"); // 503
+ THttpOutgoingResponsePtr CreateResponseGatewayTimeout(TStringBuf html = TStringBuf(), TStringBuf contentType = "text/html"); // 504
+ THttpOutgoingResponsePtr CreateResponse(
+ TStringBuf status,
+ TStringBuf message,
+ TStringBuf contentType = TStringBuf(),
+ TStringBuf body = TStringBuf(),
+ TInstant lastModified = TInstant());
+
+ THttpIncomingRequestPtr Duplicate();
+};
+
+class THttpIncomingResponse;
+using THttpIncomingResponsePtr = TIntrusivePtr<THttpIncomingResponse>;
+
+class THttpOutgoingRequest;
+using THttpOutgoingRequestPtr = TIntrusivePtr<THttpOutgoingRequest>;
+
+class THttpIncomingResponse :
+ public THttpParser<THttpResponse, TSocketBuffer>,
+ public TRefCounted<THttpIncomingResponse, TAtomicCounter> {
+public:
+ THttpIncomingResponse(THttpOutgoingRequestPtr request);
+
+ THttpOutgoingRequestPtr GetRequest() const {
+ return Request;
+ }
+
+ THttpIncomingResponsePtr Duplicate(THttpOutgoingRequestPtr request);
+ THttpOutgoingResponsePtr Reverse(THttpIncomingRequestPtr request);
+
+protected:
+ THttpOutgoingRequestPtr Request;
+};
+
+class THttpOutgoingRequest :
+ public THttpRenderer<THttpRequest, TSocketBuffer>,
+ public TRefCounted<THttpOutgoingRequest, TAtomicCounter> {
+public:
+ THPTimer Timer;
+ bool Secure = false;
+
+ THttpOutgoingRequest() = default;
+ THttpOutgoingRequest(TStringBuf method, TStringBuf url, TStringBuf protocol, TStringBuf version);
+ THttpOutgoingRequest(TStringBuf method, TStringBuf scheme, TStringBuf host, TStringBuf uri, TStringBuf protocol, TStringBuf version);
+ static THttpOutgoingRequestPtr CreateRequestString(TStringBuf data);
+ static THttpOutgoingRequestPtr CreateRequestString(const TString& data);
+ static THttpOutgoingRequestPtr CreateRequestGet(TStringBuf url);
+ static THttpOutgoingRequestPtr CreateRequestGet(TStringBuf host, TStringBuf uri); // http only
+ static THttpOutgoingRequestPtr CreateRequestPost(TStringBuf url, TStringBuf contentType = {}, TStringBuf body = {});
+ static THttpOutgoingRequestPtr CreateRequestPost(TStringBuf host, TStringBuf uri, TStringBuf contentType, TStringBuf body); // http only
+ static THttpOutgoingRequestPtr CreateRequest(TStringBuf method, TStringBuf url, TStringBuf contentType = TStringBuf(), TStringBuf body = TStringBuf());
+ static THttpOutgoingRequestPtr CreateHttpRequest(TStringBuf method, TStringBuf host, TStringBuf uri, TStringBuf contentType = TStringBuf(), TStringBuf body = TStringBuf());
+ THttpOutgoingRequestPtr Duplicate();
+};
+
+class THttpOutgoingResponse :
+ public THttpRenderer<THttpResponse, TSocketBuffer>,
+ public TRefCounted<THttpOutgoingResponse, TAtomicCounter> {
+public:
+ THttpOutgoingResponse(THttpIncomingRequestPtr request);
+ THttpOutgoingResponse(THttpIncomingRequestPtr request, TStringBuf protocol, TStringBuf version, TStringBuf status, TStringBuf message);
+
+ bool IsConnectionClose() const {
+ if (!Connection.empty()) {
+ return Connection == "close";
+ } else {
+ return Request->IsConnectionClose();
+ }
+ }
+
+ bool IsNeedBody() const {
+ return Status != "204";
+ }
+
+ THttpIncomingRequestPtr GetRequest() const {
+ return Request;
+ }
+
+ THttpOutgoingResponsePtr Duplicate(THttpIncomingRequestPtr request);
+
+// it's temporary accessible for cleanup
+//protected:
+ THttpIncomingRequestPtr Request;
+};
+
+}
diff --git a/library/cpp/actors/http/http_cache.cpp b/library/cpp/actors/http/http_cache.cpp
index 27c4eeb6f3..8fe6783260 100644
--- a/library/cpp/actors/http/http_cache.cpp
+++ b/library/cpp/actors/http/http_cache.cpp
@@ -1,4 +1,4 @@
-#include "http.h"
+#include "http.h"
#include "http_proxy.h"
#include "http_cache.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -6,594 +6,594 @@
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/core/scheduler_basic.h>
#include <library/cpp/actors/http/http.h>
-#include <library/cpp/digest/md5/md5.h>
-#include <util/digest/multi.h>
-#include <util/generic/queue.h>
+#include <library/cpp/digest/md5/md5.h>
+#include <util/digest/multi.h>
+#include <util/generic/queue.h>
#include <util/string/cast.h>
-
-namespace NHttp {
-
-class THttpOutgoingCacheActor : public NActors::TActorBootstrapped<THttpOutgoingCacheActor>, THttpConfig {
-public:
- using TBase = NActors::TActorBootstrapped<THttpOutgoingCacheActor>;
+
+namespace NHttp {
+
+class THttpOutgoingCacheActor : public NActors::TActorBootstrapped<THttpOutgoingCacheActor>, THttpConfig {
+public:
+ using TBase = NActors::TActorBootstrapped<THttpOutgoingCacheActor>;
NActors::TActorId HttpProxyId;
- TGetCachePolicy GetCachePolicy;
- static constexpr TDuration RefreshTimeout = TDuration::Seconds(1);
-
- struct TCacheKey {
- TString Host;
- TString URL;
- TString Headers;
-
- operator size_t() const {
- return MultiHash(Host, URL, Headers);
- }
-
- TString GetId() const {
- return MD5::Calc(Host + ':' + URL + ':' + Headers);
- }
- };
-
- struct TCacheRecord {
- TInstant RefreshTime;
- TInstant DeathTime;
- TCachePolicy CachePolicy;
- NHttp::THttpOutgoingRequestPtr Request;
- NHttp::THttpOutgoingRequestPtr OutgoingRequest;
- TDuration Timeout;
- NHttp::THttpIncomingResponsePtr Response;
- TString Error;
- TVector<NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr> Waiters;
-
- TCacheRecord(const TCachePolicy cachePolicy)
- : CachePolicy(cachePolicy)
- {}
-
- bool IsValid() const {
- return Response != nullptr || !Error.empty();
- }
-
- void UpdateResponse(NHttp::THttpIncomingResponsePtr response, const TString& error, TInstant now) {
- if (error.empty() || Response == nullptr || !CachePolicy.KeepOnError) {
- Response = response;
- Error = error;
- }
- RefreshTime = now + CachePolicy.TimeToRefresh;
- if (CachePolicy.PaceToRefresh) {
- RefreshTime += TDuration::MilliSeconds(RandomNumber<ui64>() % CachePolicy.PaceToRefresh.MilliSeconds());
- }
- }
-
- TString GetName() const {
- return TStringBuilder() << (Request->Secure ? "https://" : "http://") << Request->Host << Request->URL;
- }
- };
-
- struct TRefreshRecord {
- TCacheKey Key;
- TInstant RefreshTime;
-
- bool operator <(const TRefreshRecord& b) const {
- return RefreshTime > b.RefreshTime;
- }
- };
-
- THashMap<TCacheKey, TCacheRecord> Cache;
- TPriorityQueue<TRefreshRecord> RefreshQueue;
- THashMap<THttpOutgoingRequest*, TCacheKey> OutgoingRequests;
-
- THttpOutgoingCacheActor(const NActors::TActorId& httpProxyId, TGetCachePolicy getCachePolicy)
- : HttpProxyId(httpProxyId)
- , GetCachePolicy(std::move(getCachePolicy))
- {}
-
- void Bootstrap(const NActors::TActorContext&) {
- //
- Become(&THttpOutgoingCacheActor::StateWork, RefreshTimeout, new NActors::TEvents::TEvWakeup());
- }
-
- static TString GetCacheHeadersKey(const NHttp::THttpOutgoingRequest* request, const TCachePolicy& policy) {
- TStringBuilder key;
- if (!policy.HeadersToCacheKey.empty()) {
- NHttp::THeaders headers(request->Headers);
- for (const TString& header : policy.HeadersToCacheKey) {
- key << headers[header];
- }
- }
- return key;
- }
-
- static TCacheKey GetCacheKey(const NHttp::THttpOutgoingRequest* request, const TCachePolicy& policy) {
- return { ToString(request->Host), ToString(request->URL), GetCacheHeadersKey(request, policy) };
- }
-
- void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) {
- ctx.Send(event->Forward(HttpProxyId));
- }
-
- void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) {
- ctx.Send(event->Forward(HttpProxyId));
- }
-
- void Handle(NHttp::TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
- ctx.Send(event->Forward(HttpProxyId));
- }
-
- void Handle(NHttp::TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) {
- ctx.Send(event->Forward(HttpProxyId));
- }
-
- void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) {
- NHttp::THttpOutgoingRequestPtr request(event->Get()->Request);
- NHttp::THttpIncomingResponsePtr response(event->Get()->Response);
- auto itRequests = OutgoingRequests.find(request.Get());
- if (itRequests == OutgoingRequests.end()) {
- LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown request " << request->Host << request->URL);
- return;
- }
- auto key = itRequests->second;
- OutgoingRequests.erase(itRequests);
- auto it = Cache.find(key);
- if (it == Cache.end()) {
- LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown cache key " << request->Host << request->URL);
- return;
- }
- TCacheRecord& cacheRecord = it->second;
- cacheRecord.OutgoingRequest.Reset();
- for (auto& waiter : cacheRecord.Waiters) {
- NHttp::THttpIncomingResponsePtr response2;
- TString error2;
- if (response != nullptr) {
- response2 = response->Duplicate(waiter->Get()->Request);
- }
- if (!event->Get()->Error.empty()) {
- error2 = event->Get()->Error;
- }
- ctx.Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpIncomingResponse(waiter->Get()->Request, response2, error2));
- }
- cacheRecord.Waiters.clear();
- TString error;
- if (event->Get()->Error.empty()) {
- if (event->Get()->Response != nullptr && event->Get()->Response->Status != "200") {
- error = event->Get()->Response->Message;
- }
- } else {
- error = event->Get()->Error;
- }
- if (!error.empty()) {
- LOG_WARN_S(ctx, HttpLog, "Error from " << cacheRecord.GetName() << ": " << error);
- }
- LOG_DEBUG_S(ctx, HttpLog, "OutgoingUpdate " << cacheRecord.GetName());
- cacheRecord.UpdateResponse(response, event->Get()->Error, ctx.Now());
- RefreshQueue.push({it->first, it->second.RefreshTime});
- LOG_DEBUG_S(ctx, HttpLog, "OutgoingSchedule " << cacheRecord.GetName() << " at " << cacheRecord.RefreshTime << " until " << cacheRecord.DeathTime);
- }
-
- void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
- const NHttp::THttpOutgoingRequest* request = event->Get()->Request.Get();
- auto policy = GetCachePolicy(request);
- if (policy.TimeToExpire == TDuration()) {
- ctx.Send(event->Forward(HttpProxyId));
- return;
- }
- auto key = GetCacheKey(request, policy);
- auto it = Cache.find(key);
- if (it != Cache.end()) {
- if (it->second.IsValid()) {
- LOG_DEBUG_S(ctx, HttpLog, "OutgoingRespond "
- << it->second.GetName()
- << " ("
- << ((it->second.Response != nullptr) ? ToString(it->second.Response->Size()) : TString("error"))
- << ")");
- NHttp::THttpIncomingResponsePtr response = it->second.Response;
- if (response != nullptr) {
- response = response->Duplicate(event->Get()->Request);
- }
- ctx.Send(event->Sender,
- new NHttp::TEvHttpProxy::TEvHttpIncomingResponse(event->Get()->Request,
- response,
- it->second.Error));
- it->second.DeathTime = ctx.Now() + it->second.CachePolicy.TimeToExpire; // prolong active cache items
- return;
- }
- } else {
- it = Cache.emplace(key, policy).first;
- it->second.Request = event->Get()->Request;
- it->second.Timeout = event->Get()->Timeout;
- it->second.OutgoingRequest = it->second.Request->Duplicate();
- OutgoingRequests[it->second.OutgoingRequest.Get()] = key;
- LOG_DEBUG_S(ctx, HttpLog, "OutgoingInitiate " << it->second.GetName());
- ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(it->second.OutgoingRequest, it->second.Timeout));
- }
- it->second.DeathTime = ctx.Now() + it->second.CachePolicy.TimeToExpire;
- it->second.Waiters.emplace_back(std::move(event));
- }
-
- void HandleRefresh(const NActors::TActorContext& ctx) {
- while (!RefreshQueue.empty() && RefreshQueue.top().RefreshTime <= ctx.Now()) {
- TRefreshRecord rrec = RefreshQueue.top();
- RefreshQueue.pop();
- auto it = Cache.find(rrec.Key);
- if (it != Cache.end()) {
- if (it->second.DeathTime > ctx.Now()) {
- LOG_DEBUG_S(ctx, HttpLog, "OutgoingRefresh " << it->second.GetName());
- it->second.OutgoingRequest = it->second.Request->Duplicate();
- OutgoingRequests[it->second.OutgoingRequest.Get()] = it->first;
- ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(it->second.OutgoingRequest, it->second.Timeout));
- } else {
- LOG_DEBUG_S(ctx, HttpLog, "OutgoingForget " << it->second.GetName());
- if (it->second.OutgoingRequest) {
- OutgoingRequests.erase(it->second.OutgoingRequest.Get());
- }
- Cache.erase(it);
- }
- }
- }
- ctx.Schedule(RefreshTimeout, new NActors::TEvents::TEvWakeup());
- }
-
- STFUNC(StateWork) {
- switch (ev->GetTypeRewrite()) {
- HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle);
- HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest, Handle);
- HFunc(NHttp::TEvHttpProxy::TEvAddListeningPort, Handle);
- HFunc(NHttp::TEvHttpProxy::TEvRegisterHandler, Handle);
- HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
- HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
- CFunc(NActors::TEvents::TSystem::Wakeup, HandleRefresh);
- }
- }
-};
-
-const TDuration THttpOutgoingCacheActor::RefreshTimeout;
-
-class THttpIncomingCacheActor : public NActors::TActorBootstrapped<THttpIncomingCacheActor>, THttpConfig {
-public:
- using TBase = NActors::TActorBootstrapped<THttpIncomingCacheActor>;
- NActors::TActorId HttpProxyId;
- TGetCachePolicy GetCachePolicy;
- static constexpr TDuration RefreshTimeout = TDuration::Seconds(1);
- THashMap<TString, TActorId> Handlers;
-
- struct TCacheKey {
- TString Host;
- TString URL;
- TString Headers;
-
- operator size_t() const {
- return MultiHash(Host, URL, Headers);
- }
-
- TString GetId() const {
- return MD5::Calc(Host + ':' + URL + ':' + Headers);
- }
- };
-
- struct TCacheRecord {
- TInstant RefreshTime;
- TInstant DeathTime;
- TCachePolicy CachePolicy;
- TString CacheId;
- NHttp::THttpIncomingRequestPtr Request;
- TDuration Timeout;
- NHttp::THttpOutgoingResponsePtr Response;
- TVector<NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr> Waiters;
- ui32 Retries = 0;
- bool Enqueued = false;
-
- TCacheRecord(const TCachePolicy cachePolicy)
- : CachePolicy(cachePolicy)
- {}
-
- bool IsValid() const {
- return Response != nullptr;
- }
-
- void InitRequest(NHttp::THttpIncomingRequestPtr request) {
- Request = request;
- if (CachePolicy.TimeToExpire) {
- DeathTime = NActors::TlsActivationContext->Now() + CachePolicy.TimeToExpire;
- }
- }
-
- void UpdateResponse(NHttp::THttpOutgoingResponsePtr response, const TString& error, TInstant now) {
- if (error.empty() || !CachePolicy.KeepOnError) {
- Response = response;
- }
- Retries = 0;
- if (CachePolicy.TimeToRefresh) {
- RefreshTime = now + CachePolicy.TimeToRefresh;
- if (CachePolicy.PaceToRefresh) {
- RefreshTime += TDuration::MilliSeconds(RandomNumber<ui64>() % CachePolicy.PaceToRefresh.MilliSeconds());
- }
- }
- }
-
- void UpdateExpireTime() {
- if (CachePolicy.TimeToExpire) {
- DeathTime = NActors::TlsActivationContext->Now() + CachePolicy.TimeToExpire;
- }
- }
-
- TString GetName() const {
- return TStringBuilder() << (Request->Secure ? "https://" : "http://") << Request->Host << Request->URL
- << " (" << CacheId << ")";
- }
- };
-
- struct TRefreshRecord {
- TCacheKey Key;
- TInstant RefreshTime;
-
- bool operator <(const TRefreshRecord& b) const {
- return RefreshTime > b.RefreshTime;
- }
- };
-
- THashMap<TCacheKey, TCacheRecord> Cache;
- TPriorityQueue<TRefreshRecord> RefreshQueue;
- THashMap<THttpIncomingRequest*, TCacheKey> IncomingRequests;
-
- THttpIncomingCacheActor(const NActors::TActorId& httpProxyId, TGetCachePolicy getCachePolicy)
- : HttpProxyId(httpProxyId)
- , GetCachePolicy(std::move(getCachePolicy))
- {}
-
- void Bootstrap(const NActors::TActorContext&) {
- //
- Become(&THttpIncomingCacheActor::StateWork, RefreshTimeout, new NActors::TEvents::TEvWakeup());
- }
-
- static TString GetCacheHeadersKey(const NHttp::THttpIncomingRequest* request, const TCachePolicy& policy) {
- TStringBuilder key;
- if (!policy.HeadersToCacheKey.empty()) {
- NHttp::THeaders headers(request->Headers);
- for (const TString& header : policy.HeadersToCacheKey) {
- key << headers[header];
- }
- }
- return key;
- }
-
- static TCacheKey GetCacheKey(const NHttp::THttpIncomingRequest* request, const TCachePolicy& policy) {
- return { ToString(request->Host), ToString(request->URL), GetCacheHeadersKey(request, policy) };
- }
-
- TActorId GetRequestHandler(NHttp::THttpIncomingRequestPtr request) {
- TStringBuf url = request->URL.Before('?');
- THashMap<TString, TActorId>::iterator it;
- while (!url.empty()) {
- it = Handlers.find(url);
- if (it != Handlers.end()) {
- return it->second;
- } else {
- if (url.EndsWith('/')) {
- url.Trunc(url.size() - 1);
- }
- size_t pos = url.rfind('/');
- if (pos == TStringBuf::npos) {
- break;
- } else {
- url = url.substr(0, pos + 1);
- }
- }
- }
- return {};
- }
-
- void SendCacheRequest(const TCacheKey& cacheKey, TCacheRecord& cacheRecord, const NActors::TActorContext& ctx) {
- cacheRecord.Request = cacheRecord.Request->Duplicate();
- IncomingRequests[cacheRecord.Request.Get()] = cacheKey;
- TActorId handler = GetRequestHandler(cacheRecord.Request);
- if (handler) {
- Send(handler, new NHttp::TEvHttpProxy::TEvHttpIncomingRequest(cacheRecord.Request));
- } else {
- LOG_ERROR_S(ctx, HttpLog, "Can't find cache handler for " << cacheRecord.GetName());
- }
- }
-
- void DropCacheRecord(THashMap<TCacheKey, TCacheRecord>::iterator it) {
- if (it->second.Request) {
- IncomingRequests.erase(it->second.Request.Get());
- }
- for (auto& waiter : it->second.Waiters) {
- NHttp::THttpOutgoingResponsePtr response;
- response = waiter->Get()->Request->CreateResponseGatewayTimeout("Timeout", "text/plain");
- Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
- }
- Cache.erase(it);
- }
-
- void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) {
- ctx.Send(event->Forward(HttpProxyId));
- }
-
- void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
- ctx.Send(event->Forward(HttpProxyId));
- }
-
- void Handle(NHttp::TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
- ctx.Send(event->Forward(HttpProxyId));
- }
-
- void Handle(NHttp::TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) {
- Handlers[event->Get()->Path] = event->Get()->Handler;
- ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvRegisterHandler(event->Get()->Path, ctx.SelfID));
- }
-
- void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) {
- NHttp::THttpIncomingRequestPtr request(event->Get()->Response->GetRequest());
- NHttp::THttpOutgoingResponsePtr response(event->Get()->Response);
- auto itRequests = IncomingRequests.find(request.Get());
- if (itRequests == IncomingRequests.end()) {
- LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown request " << request->Host << request->URL);
- return;
- }
-
- TCacheKey key = itRequests->second;
- auto it = Cache.find(key);
- if (it == Cache.end()) {
- LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown cache key " << request->Host << request->URL);
- return;
- }
-
- IncomingRequests.erase(itRequests);
- TCacheRecord& cacheRecord = it->second;
- TStringBuf status;
- TString error;
-
- if (event->Get()->Response != nullptr) {
- status = event->Get()->Response->Status;
- if (!status.StartsWith("2")) {
- error = event->Get()->Response->Message;
- }
- }
- if (cacheRecord.CachePolicy.RetriesCount > 0) {
- auto itStatusToRetry = std::find(cacheRecord.CachePolicy.StatusesToRetry.begin(), cacheRecord.CachePolicy.StatusesToRetry.end(), status);
- if (itStatusToRetry != cacheRecord.CachePolicy.StatusesToRetry.end()) {
- if (cacheRecord.Retries < cacheRecord.CachePolicy.RetriesCount) {
- ++cacheRecord.Retries;
- LOG_WARN_S(ctx, HttpLog, "IncomingRetry " << cacheRecord.GetName() << ": " << status << " " << error);
- SendCacheRequest(key, cacheRecord, ctx);
- return;
- }
- }
- }
- for (auto& waiter : cacheRecord.Waiters) {
- NHttp::THttpOutgoingResponsePtr response2;
- response2 = response->Duplicate(waiter->Get()->Request);
- ctx.Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response2));
- }
- cacheRecord.Waiters.clear();
- if (!error.empty()) {
- LOG_WARN_S(ctx, HttpLog, "Error from " << cacheRecord.GetName() << ": " << error);
- if (!cacheRecord.Response) {
- LOG_DEBUG_S(ctx, HttpLog, "IncomingDiscard " << cacheRecord.GetName());
- DropCacheRecord(it);
- return;
- }
- }
- if (cacheRecord.CachePolicy.TimeToRefresh) {
- LOG_DEBUG_S(ctx, HttpLog, "IncomingUpdate " << cacheRecord.GetName());
- cacheRecord.UpdateResponse(response, error, ctx.Now());
- if (!cacheRecord.Enqueued) {
- RefreshQueue.push({it->first, it->second.RefreshTime});
- cacheRecord.Enqueued = true;
- }
- LOG_DEBUG_S(ctx, HttpLog, "IncomingSchedule " << cacheRecord.GetName() << " at " << cacheRecord.RefreshTime << " until " << cacheRecord.DeathTime);
- } else {
- LOG_DEBUG_S(ctx, HttpLog, "IncomingDrop " << cacheRecord.GetName());
- DropCacheRecord(it);
- }
- }
-
- void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) {
- const NHttp::THttpIncomingRequest* request = event->Get()->Request.Get();
- TCachePolicy policy = GetCachePolicy(request);
- if (policy.TimeToExpire == TDuration() && policy.RetriesCount == 0) {
- TActorId handler = GetRequestHandler(event->Get()->Request);
- if (handler) {
- ctx.Send(event->Forward(handler));
- }
- return;
- }
- auto key = GetCacheKey(request, policy);
- auto it = Cache.find(key);
- if (it != Cache.end() && !policy.DiscardCache) {
- it->second.UpdateExpireTime();
- if (it->second.IsValid()) {
- LOG_DEBUG_S(ctx, HttpLog, "IncomingRespond "
- << it->second.GetName()
- << " ("
- << ((it->second.Response != nullptr) ? ToString(it->second.Response->Size()) : TString("error"))
- << ")");
- NHttp::THttpOutgoingResponsePtr response = it->second.Response;
- if (response != nullptr) {
- response = response->Duplicate(event->Get()->Request);
- }
- ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
- return;
- }
- } else {
- it = Cache.emplace(key, policy).first;
- it->second.CacheId = key.GetId(); // for debugging
- it->second.InitRequest(event->Get()->Request);
- if (policy.DiscardCache) {
- LOG_DEBUG_S(ctx, HttpLog, "IncomingDiscardCache " << it->second.GetName());
- }
- LOG_DEBUG_S(ctx, HttpLog, "IncomingInitiate " << it->second.GetName());
- SendCacheRequest(key, it->second, ctx);
- }
- it->second.Waiters.emplace_back(std::move(event));
- }
-
- void HandleRefresh(const NActors::TActorContext& ctx) {
- while (!RefreshQueue.empty() && RefreshQueue.top().RefreshTime <= ctx.Now()) {
- TRefreshRecord rrec = RefreshQueue.top();
- RefreshQueue.pop();
- auto it = Cache.find(rrec.Key);
- if (it != Cache.end()) {
- it->second.Enqueued = false;
- if (it->second.DeathTime > ctx.Now()) {
- LOG_DEBUG_S(ctx, HttpLog, "IncomingRefresh " << it->second.GetName());
- SendCacheRequest(it->first, it->second, ctx);
- } else {
- LOG_DEBUG_S(ctx, HttpLog, "IncomingForget " << it->second.GetName());
- DropCacheRecord(it);
- }
- }
- }
- ctx.Schedule(RefreshTimeout, new NActors::TEvents::TEvWakeup());
- }
-
- STFUNC(StateWork) {
- switch (ev->GetTypeRewrite()) {
- HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle);
- HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest, Handle);
- HFunc(NHttp::TEvHttpProxy::TEvAddListeningPort, Handle);
- HFunc(NHttp::TEvHttpProxy::TEvRegisterHandler, Handle);
- HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
- HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
- CFunc(NActors::TEvents::TSystem::Wakeup, HandleRefresh);
- }
- }
-};
-
-TCachePolicy GetDefaultCachePolicy(const THttpRequest* request, const TCachePolicy& defaultPolicy) {
- TCachePolicy policy = defaultPolicy;
- THeaders headers(request->Headers);
- TStringBuf cacheControl(headers["Cache-Control"]);
- while (TStringBuf cacheItem = cacheControl.NextTok(',')) {
- Trim(cacheItem, ' ');
- if (cacheItem == "no-store" || cacheItem == "no-cache") {
- policy.DiscardCache = true;
- }
- TStringBuf itemName = cacheItem.NextTok('=');
- TrimEnd(itemName, ' ');
- TrimBegin(cacheItem, ' ');
- if (itemName == "max-age") {
- policy.TimeToRefresh = policy.TimeToExpire = TDuration::Seconds(FromString(cacheItem));
- }
- if (itemName == "min-fresh") {
- policy.TimeToRefresh = policy.TimeToExpire = TDuration::Seconds(FromString(cacheItem));
- }
- if (itemName == "stale-if-error") {
- policy.KeepOnError = true;
- }
- }
- return policy;
-}
-
+ TGetCachePolicy GetCachePolicy;
+ static constexpr TDuration RefreshTimeout = TDuration::Seconds(1);
+
+ struct TCacheKey {
+ TString Host;
+ TString URL;
+ TString Headers;
+
+ operator size_t() const {
+ return MultiHash(Host, URL, Headers);
+ }
+
+ TString GetId() const {
+ return MD5::Calc(Host + ':' + URL + ':' + Headers);
+ }
+ };
+
+ struct TCacheRecord {
+ TInstant RefreshTime;
+ TInstant DeathTime;
+ TCachePolicy CachePolicy;
+ NHttp::THttpOutgoingRequestPtr Request;
+ NHttp::THttpOutgoingRequestPtr OutgoingRequest;
+ TDuration Timeout;
+ NHttp::THttpIncomingResponsePtr Response;
+ TString Error;
+ TVector<NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr> Waiters;
+
+ TCacheRecord(const TCachePolicy cachePolicy)
+ : CachePolicy(cachePolicy)
+ {}
+
+ bool IsValid() const {
+ return Response != nullptr || !Error.empty();
+ }
+
+ void UpdateResponse(NHttp::THttpIncomingResponsePtr response, const TString& error, TInstant now) {
+ if (error.empty() || Response == nullptr || !CachePolicy.KeepOnError) {
+ Response = response;
+ Error = error;
+ }
+ RefreshTime = now + CachePolicy.TimeToRefresh;
+ if (CachePolicy.PaceToRefresh) {
+ RefreshTime += TDuration::MilliSeconds(RandomNumber<ui64>() % CachePolicy.PaceToRefresh.MilliSeconds());
+ }
+ }
+
+ TString GetName() const {
+ return TStringBuilder() << (Request->Secure ? "https://" : "http://") << Request->Host << Request->URL;
+ }
+ };
+
+ struct TRefreshRecord {
+ TCacheKey Key;
+ TInstant RefreshTime;
+
+ bool operator <(const TRefreshRecord& b) const {
+ return RefreshTime > b.RefreshTime;
+ }
+ };
+
+ THashMap<TCacheKey, TCacheRecord> Cache;
+ TPriorityQueue<TRefreshRecord> RefreshQueue;
+ THashMap<THttpOutgoingRequest*, TCacheKey> OutgoingRequests;
+
+ THttpOutgoingCacheActor(const NActors::TActorId& httpProxyId, TGetCachePolicy getCachePolicy)
+ : HttpProxyId(httpProxyId)
+ , GetCachePolicy(std::move(getCachePolicy))
+ {}
+
+ void Bootstrap(const NActors::TActorContext&) {
+ //
+ Become(&THttpOutgoingCacheActor::StateWork, RefreshTimeout, new NActors::TEvents::TEvWakeup());
+ }
+
+ static TString GetCacheHeadersKey(const NHttp::THttpOutgoingRequest* request, const TCachePolicy& policy) {
+ TStringBuilder key;
+ if (!policy.HeadersToCacheKey.empty()) {
+ NHttp::THeaders headers(request->Headers);
+ for (const TString& header : policy.HeadersToCacheKey) {
+ key << headers[header];
+ }
+ }
+ return key;
+ }
+
+ static TCacheKey GetCacheKey(const NHttp::THttpOutgoingRequest* request, const TCachePolicy& policy) {
+ return { ToString(request->Host), ToString(request->URL), GetCacheHeadersKey(request, policy) };
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) {
+ ctx.Send(event->Forward(HttpProxyId));
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) {
+ ctx.Send(event->Forward(HttpProxyId));
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
+ ctx.Send(event->Forward(HttpProxyId));
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) {
+ ctx.Send(event->Forward(HttpProxyId));
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) {
+ NHttp::THttpOutgoingRequestPtr request(event->Get()->Request);
+ NHttp::THttpIncomingResponsePtr response(event->Get()->Response);
+ auto itRequests = OutgoingRequests.find(request.Get());
+ if (itRequests == OutgoingRequests.end()) {
+ LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown request " << request->Host << request->URL);
+ return;
+ }
+ auto key = itRequests->second;
+ OutgoingRequests.erase(itRequests);
+ auto it = Cache.find(key);
+ if (it == Cache.end()) {
+ LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown cache key " << request->Host << request->URL);
+ return;
+ }
+ TCacheRecord& cacheRecord = it->second;
+ cacheRecord.OutgoingRequest.Reset();
+ for (auto& waiter : cacheRecord.Waiters) {
+ NHttp::THttpIncomingResponsePtr response2;
+ TString error2;
+ if (response != nullptr) {
+ response2 = response->Duplicate(waiter->Get()->Request);
+ }
+ if (!event->Get()->Error.empty()) {
+ error2 = event->Get()->Error;
+ }
+ ctx.Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpIncomingResponse(waiter->Get()->Request, response2, error2));
+ }
+ cacheRecord.Waiters.clear();
+ TString error;
+ if (event->Get()->Error.empty()) {
+ if (event->Get()->Response != nullptr && event->Get()->Response->Status != "200") {
+ error = event->Get()->Response->Message;
+ }
+ } else {
+ error = event->Get()->Error;
+ }
+ if (!error.empty()) {
+ LOG_WARN_S(ctx, HttpLog, "Error from " << cacheRecord.GetName() << ": " << error);
+ }
+ LOG_DEBUG_S(ctx, HttpLog, "OutgoingUpdate " << cacheRecord.GetName());
+ cacheRecord.UpdateResponse(response, event->Get()->Error, ctx.Now());
+ RefreshQueue.push({it->first, it->second.RefreshTime});
+ LOG_DEBUG_S(ctx, HttpLog, "OutgoingSchedule " << cacheRecord.GetName() << " at " << cacheRecord.RefreshTime << " until " << cacheRecord.DeathTime);
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
+ const NHttp::THttpOutgoingRequest* request = event->Get()->Request.Get();
+ auto policy = GetCachePolicy(request);
+ if (policy.TimeToExpire == TDuration()) {
+ ctx.Send(event->Forward(HttpProxyId));
+ return;
+ }
+ auto key = GetCacheKey(request, policy);
+ auto it = Cache.find(key);
+ if (it != Cache.end()) {
+ if (it->second.IsValid()) {
+ LOG_DEBUG_S(ctx, HttpLog, "OutgoingRespond "
+ << it->second.GetName()
+ << " ("
+ << ((it->second.Response != nullptr) ? ToString(it->second.Response->Size()) : TString("error"))
+ << ")");
+ NHttp::THttpIncomingResponsePtr response = it->second.Response;
+ if (response != nullptr) {
+ response = response->Duplicate(event->Get()->Request);
+ }
+ ctx.Send(event->Sender,
+ new NHttp::TEvHttpProxy::TEvHttpIncomingResponse(event->Get()->Request,
+ response,
+ it->second.Error));
+ it->second.DeathTime = ctx.Now() + it->second.CachePolicy.TimeToExpire; // prolong active cache items
+ return;
+ }
+ } else {
+ it = Cache.emplace(key, policy).first;
+ it->second.Request = event->Get()->Request;
+ it->second.Timeout = event->Get()->Timeout;
+ it->second.OutgoingRequest = it->second.Request->Duplicate();
+ OutgoingRequests[it->second.OutgoingRequest.Get()] = key;
+ LOG_DEBUG_S(ctx, HttpLog, "OutgoingInitiate " << it->second.GetName());
+ ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(it->second.OutgoingRequest, it->second.Timeout));
+ }
+ it->second.DeathTime = ctx.Now() + it->second.CachePolicy.TimeToExpire;
+ it->second.Waiters.emplace_back(std::move(event));
+ }
+
+ void HandleRefresh(const NActors::TActorContext& ctx) {
+ while (!RefreshQueue.empty() && RefreshQueue.top().RefreshTime <= ctx.Now()) {
+ TRefreshRecord rrec = RefreshQueue.top();
+ RefreshQueue.pop();
+ auto it = Cache.find(rrec.Key);
+ if (it != Cache.end()) {
+ if (it->second.DeathTime > ctx.Now()) {
+ LOG_DEBUG_S(ctx, HttpLog, "OutgoingRefresh " << it->second.GetName());
+ it->second.OutgoingRequest = it->second.Request->Duplicate();
+ OutgoingRequests[it->second.OutgoingRequest.Get()] = it->first;
+ ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(it->second.OutgoingRequest, it->second.Timeout));
+ } else {
+ LOG_DEBUG_S(ctx, HttpLog, "OutgoingForget " << it->second.GetName());
+ if (it->second.OutgoingRequest) {
+ OutgoingRequests.erase(it->second.OutgoingRequest.Get());
+ }
+ Cache.erase(it);
+ }
+ }
+ }
+ ctx.Schedule(RefreshTimeout, new NActors::TEvents::TEvWakeup());
+ }
+
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvAddListeningPort, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvRegisterHandler, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
+ CFunc(NActors::TEvents::TSystem::Wakeup, HandleRefresh);
+ }
+ }
+};
+
+const TDuration THttpOutgoingCacheActor::RefreshTimeout;
+
+class THttpIncomingCacheActor : public NActors::TActorBootstrapped<THttpIncomingCacheActor>, THttpConfig {
+public:
+ using TBase = NActors::TActorBootstrapped<THttpIncomingCacheActor>;
+ NActors::TActorId HttpProxyId;
+ TGetCachePolicy GetCachePolicy;
+ static constexpr TDuration RefreshTimeout = TDuration::Seconds(1);
+ THashMap<TString, TActorId> Handlers;
+
+ struct TCacheKey {
+ TString Host;
+ TString URL;
+ TString Headers;
+
+ operator size_t() const {
+ return MultiHash(Host, URL, Headers);
+ }
+
+ TString GetId() const {
+ return MD5::Calc(Host + ':' + URL + ':' + Headers);
+ }
+ };
+
+ struct TCacheRecord {
+ TInstant RefreshTime;
+ TInstant DeathTime;
+ TCachePolicy CachePolicy;
+ TString CacheId;
+ NHttp::THttpIncomingRequestPtr Request;
+ TDuration Timeout;
+ NHttp::THttpOutgoingResponsePtr Response;
+ TVector<NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr> Waiters;
+ ui32 Retries = 0;
+ bool Enqueued = false;
+
+ TCacheRecord(const TCachePolicy cachePolicy)
+ : CachePolicy(cachePolicy)
+ {}
+
+ bool IsValid() const {
+ return Response != nullptr;
+ }
+
+ void InitRequest(NHttp::THttpIncomingRequestPtr request) {
+ Request = request;
+ if (CachePolicy.TimeToExpire) {
+ DeathTime = NActors::TlsActivationContext->Now() + CachePolicy.TimeToExpire;
+ }
+ }
+
+ void UpdateResponse(NHttp::THttpOutgoingResponsePtr response, const TString& error, TInstant now) {
+ if (error.empty() || !CachePolicy.KeepOnError) {
+ Response = response;
+ }
+ Retries = 0;
+ if (CachePolicy.TimeToRefresh) {
+ RefreshTime = now + CachePolicy.TimeToRefresh;
+ if (CachePolicy.PaceToRefresh) {
+ RefreshTime += TDuration::MilliSeconds(RandomNumber<ui64>() % CachePolicy.PaceToRefresh.MilliSeconds());
+ }
+ }
+ }
+
+ void UpdateExpireTime() {
+ if (CachePolicy.TimeToExpire) {
+ DeathTime = NActors::TlsActivationContext->Now() + CachePolicy.TimeToExpire;
+ }
+ }
+
+ TString GetName() const {
+ return TStringBuilder() << (Request->Secure ? "https://" : "http://") << Request->Host << Request->URL
+ << " (" << CacheId << ")";
+ }
+ };
+
+ struct TRefreshRecord {
+ TCacheKey Key;
+ TInstant RefreshTime;
+
+ bool operator <(const TRefreshRecord& b) const {
+ return RefreshTime > b.RefreshTime;
+ }
+ };
+
+ THashMap<TCacheKey, TCacheRecord> Cache;
+ TPriorityQueue<TRefreshRecord> RefreshQueue;
+ THashMap<THttpIncomingRequest*, TCacheKey> IncomingRequests;
+
+ THttpIncomingCacheActor(const NActors::TActorId& httpProxyId, TGetCachePolicy getCachePolicy)
+ : HttpProxyId(httpProxyId)
+ , GetCachePolicy(std::move(getCachePolicy))
+ {}
+
+ void Bootstrap(const NActors::TActorContext&) {
+ //
+ Become(&THttpIncomingCacheActor::StateWork, RefreshTimeout, new NActors::TEvents::TEvWakeup());
+ }
+
+ static TString GetCacheHeadersKey(const NHttp::THttpIncomingRequest* request, const TCachePolicy& policy) {
+ TStringBuilder key;
+ if (!policy.HeadersToCacheKey.empty()) {
+ NHttp::THeaders headers(request->Headers);
+ for (const TString& header : policy.HeadersToCacheKey) {
+ key << headers[header];
+ }
+ }
+ return key;
+ }
+
+ static TCacheKey GetCacheKey(const NHttp::THttpIncomingRequest* request, const TCachePolicy& policy) {
+ return { ToString(request->Host), ToString(request->URL), GetCacheHeadersKey(request, policy) };
+ }
+
+ TActorId GetRequestHandler(NHttp::THttpIncomingRequestPtr request) {
+ TStringBuf url = request->URL.Before('?');
+ THashMap<TString, TActorId>::iterator it;
+ while (!url.empty()) {
+ it = Handlers.find(url);
+ if (it != Handlers.end()) {
+ return it->second;
+ } else {
+ if (url.EndsWith('/')) {
+ url.Trunc(url.size() - 1);
+ }
+ size_t pos = url.rfind('/');
+ if (pos == TStringBuf::npos) {
+ break;
+ } else {
+ url = url.substr(0, pos + 1);
+ }
+ }
+ }
+ return {};
+ }
+
+ void SendCacheRequest(const TCacheKey& cacheKey, TCacheRecord& cacheRecord, const NActors::TActorContext& ctx) {
+ cacheRecord.Request = cacheRecord.Request->Duplicate();
+ IncomingRequests[cacheRecord.Request.Get()] = cacheKey;
+ TActorId handler = GetRequestHandler(cacheRecord.Request);
+ if (handler) {
+ Send(handler, new NHttp::TEvHttpProxy::TEvHttpIncomingRequest(cacheRecord.Request));
+ } else {
+ LOG_ERROR_S(ctx, HttpLog, "Can't find cache handler for " << cacheRecord.GetName());
+ }
+ }
+
+ void DropCacheRecord(THashMap<TCacheKey, TCacheRecord>::iterator it) {
+ if (it->second.Request) {
+ IncomingRequests.erase(it->second.Request.Get());
+ }
+ for (auto& waiter : it->second.Waiters) {
+ NHttp::THttpOutgoingResponsePtr response;
+ response = waiter->Get()->Request->CreateResponseGatewayTimeout("Timeout", "text/plain");
+ Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
+ }
+ Cache.erase(it);
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) {
+ ctx.Send(event->Forward(HttpProxyId));
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
+ ctx.Send(event->Forward(HttpProxyId));
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
+ ctx.Send(event->Forward(HttpProxyId));
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) {
+ Handlers[event->Get()->Path] = event->Get()->Handler;
+ ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvRegisterHandler(event->Get()->Path, ctx.SelfID));
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) {
+ NHttp::THttpIncomingRequestPtr request(event->Get()->Response->GetRequest());
+ NHttp::THttpOutgoingResponsePtr response(event->Get()->Response);
+ auto itRequests = IncomingRequests.find(request.Get());
+ if (itRequests == IncomingRequests.end()) {
+ LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown request " << request->Host << request->URL);
+ return;
+ }
+
+ TCacheKey key = itRequests->second;
+ auto it = Cache.find(key);
+ if (it == Cache.end()) {
+ LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown cache key " << request->Host << request->URL);
+ return;
+ }
+
+ IncomingRequests.erase(itRequests);
+ TCacheRecord& cacheRecord = it->second;
+ TStringBuf status;
+ TString error;
+
+ if (event->Get()->Response != nullptr) {
+ status = event->Get()->Response->Status;
+ if (!status.StartsWith("2")) {
+ error = event->Get()->Response->Message;
+ }
+ }
+ if (cacheRecord.CachePolicy.RetriesCount > 0) {
+ auto itStatusToRetry = std::find(cacheRecord.CachePolicy.StatusesToRetry.begin(), cacheRecord.CachePolicy.StatusesToRetry.end(), status);
+ if (itStatusToRetry != cacheRecord.CachePolicy.StatusesToRetry.end()) {
+ if (cacheRecord.Retries < cacheRecord.CachePolicy.RetriesCount) {
+ ++cacheRecord.Retries;
+ LOG_WARN_S(ctx, HttpLog, "IncomingRetry " << cacheRecord.GetName() << ": " << status << " " << error);
+ SendCacheRequest(key, cacheRecord, ctx);
+ return;
+ }
+ }
+ }
+ for (auto& waiter : cacheRecord.Waiters) {
+ NHttp::THttpOutgoingResponsePtr response2;
+ response2 = response->Duplicate(waiter->Get()->Request);
+ ctx.Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response2));
+ }
+ cacheRecord.Waiters.clear();
+ if (!error.empty()) {
+ LOG_WARN_S(ctx, HttpLog, "Error from " << cacheRecord.GetName() << ": " << error);
+ if (!cacheRecord.Response) {
+ LOG_DEBUG_S(ctx, HttpLog, "IncomingDiscard " << cacheRecord.GetName());
+ DropCacheRecord(it);
+ return;
+ }
+ }
+ if (cacheRecord.CachePolicy.TimeToRefresh) {
+ LOG_DEBUG_S(ctx, HttpLog, "IncomingUpdate " << cacheRecord.GetName());
+ cacheRecord.UpdateResponse(response, error, ctx.Now());
+ if (!cacheRecord.Enqueued) {
+ RefreshQueue.push({it->first, it->second.RefreshTime});
+ cacheRecord.Enqueued = true;
+ }
+ LOG_DEBUG_S(ctx, HttpLog, "IncomingSchedule " << cacheRecord.GetName() << " at " << cacheRecord.RefreshTime << " until " << cacheRecord.DeathTime);
+ } else {
+ LOG_DEBUG_S(ctx, HttpLog, "IncomingDrop " << cacheRecord.GetName());
+ DropCacheRecord(it);
+ }
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) {
+ const NHttp::THttpIncomingRequest* request = event->Get()->Request.Get();
+ TCachePolicy policy = GetCachePolicy(request);
+ if (policy.TimeToExpire == TDuration() && policy.RetriesCount == 0) {
+ TActorId handler = GetRequestHandler(event->Get()->Request);
+ if (handler) {
+ ctx.Send(event->Forward(handler));
+ }
+ return;
+ }
+ auto key = GetCacheKey(request, policy);
+ auto it = Cache.find(key);
+ if (it != Cache.end() && !policy.DiscardCache) {
+ it->second.UpdateExpireTime();
+ if (it->second.IsValid()) {
+ LOG_DEBUG_S(ctx, HttpLog, "IncomingRespond "
+ << it->second.GetName()
+ << " ("
+ << ((it->second.Response != nullptr) ? ToString(it->second.Response->Size()) : TString("error"))
+ << ")");
+ NHttp::THttpOutgoingResponsePtr response = it->second.Response;
+ if (response != nullptr) {
+ response = response->Duplicate(event->Get()->Request);
+ }
+ ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
+ return;
+ }
+ } else {
+ it = Cache.emplace(key, policy).first;
+ it->second.CacheId = key.GetId(); // for debugging
+ it->second.InitRequest(event->Get()->Request);
+ if (policy.DiscardCache) {
+ LOG_DEBUG_S(ctx, HttpLog, "IncomingDiscardCache " << it->second.GetName());
+ }
+ LOG_DEBUG_S(ctx, HttpLog, "IncomingInitiate " << it->second.GetName());
+ SendCacheRequest(key, it->second, ctx);
+ }
+ it->second.Waiters.emplace_back(std::move(event));
+ }
+
+ void HandleRefresh(const NActors::TActorContext& ctx) {
+ while (!RefreshQueue.empty() && RefreshQueue.top().RefreshTime <= ctx.Now()) {
+ TRefreshRecord rrec = RefreshQueue.top();
+ RefreshQueue.pop();
+ auto it = Cache.find(rrec.Key);
+ if (it != Cache.end()) {
+ it->second.Enqueued = false;
+ if (it->second.DeathTime > ctx.Now()) {
+ LOG_DEBUG_S(ctx, HttpLog, "IncomingRefresh " << it->second.GetName());
+ SendCacheRequest(it->first, it->second, ctx);
+ } else {
+ LOG_DEBUG_S(ctx, HttpLog, "IncomingForget " << it->second.GetName());
+ DropCacheRecord(it);
+ }
+ }
+ }
+ ctx.Schedule(RefreshTimeout, new NActors::TEvents::TEvWakeup());
+ }
+
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvAddListeningPort, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvRegisterHandler, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
+ HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
+ CFunc(NActors::TEvents::TSystem::Wakeup, HandleRefresh);
+ }
+ }
+};
+
+TCachePolicy GetDefaultCachePolicy(const THttpRequest* request, const TCachePolicy& defaultPolicy) {
+ TCachePolicy policy = defaultPolicy;
+ THeaders headers(request->Headers);
+ TStringBuf cacheControl(headers["Cache-Control"]);
+ while (TStringBuf cacheItem = cacheControl.NextTok(',')) {
+ Trim(cacheItem, ' ');
+ if (cacheItem == "no-store" || cacheItem == "no-cache") {
+ policy.DiscardCache = true;
+ }
+ TStringBuf itemName = cacheItem.NextTok('=');
+ TrimEnd(itemName, ' ');
+ TrimBegin(cacheItem, ' ');
+ if (itemName == "max-age") {
+ policy.TimeToRefresh = policy.TimeToExpire = TDuration::Seconds(FromString(cacheItem));
+ }
+ if (itemName == "min-fresh") {
+ policy.TimeToRefresh = policy.TimeToExpire = TDuration::Seconds(FromString(cacheItem));
+ }
+ if (itemName == "stale-if-error") {
+ policy.KeepOnError = true;
+ }
+ }
+ return policy;
+}
+
NActors::IActor* CreateHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) {
- return new THttpOutgoingCacheActor(httpProxyId, std::move(cachePolicy));
-}
-
-NActors::IActor* CreateOutgoingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) {
- return new THttpOutgoingCacheActor(httpProxyId, std::move(cachePolicy));
-}
-
-NActors::IActor* CreateIncomingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) {
- return new THttpIncomingCacheActor(httpProxyId, std::move(cachePolicy));
-}
-
-}
+ return new THttpOutgoingCacheActor(httpProxyId, std::move(cachePolicy));
+}
+
+NActors::IActor* CreateOutgoingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) {
+ return new THttpOutgoingCacheActor(httpProxyId, std::move(cachePolicy));
+}
+
+NActors::IActor* CreateIncomingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) {
+ return new THttpIncomingCacheActor(httpProxyId, std::move(cachePolicy));
+}
+
+}
diff --git a/library/cpp/actors/http/http_cache.h b/library/cpp/actors/http/http_cache.h
index ac38bdcac8..567a8105f0 100644
--- a/library/cpp/actors/http/http_cache.h
+++ b/library/cpp/actors/http/http_cache.h
@@ -1,27 +1,27 @@
-#pragma once
+#pragma once
#include <library/cpp/actors/core/actor.h>
-#include "http.h"
-
-namespace NHttp {
-
-struct TCachePolicy {
- TDuration TimeToExpire;
- TDuration TimeToRefresh;
- TDuration PaceToRefresh;
- bool KeepOnError = false;
- bool DiscardCache = false;
- TArrayRef<TString> HeadersToCacheKey;
- TArrayRef<TString> StatusesToRetry;
- ui32 RetriesCount = 0;
-
- TCachePolicy() = default;
-};
-
-using TGetCachePolicy = std::function<TCachePolicy(const THttpRequest*)>;
-
+#include "http.h"
+
+namespace NHttp {
+
+struct TCachePolicy {
+ TDuration TimeToExpire;
+ TDuration TimeToRefresh;
+ TDuration PaceToRefresh;
+ bool KeepOnError = false;
+ bool DiscardCache = false;
+ TArrayRef<TString> HeadersToCacheKey;
+ TArrayRef<TString> StatusesToRetry;
+ ui32 RetriesCount = 0;
+
+ TCachePolicy() = default;
+};
+
+using TGetCachePolicy = std::function<TCachePolicy(const THttpRequest*)>;
+
NActors::IActor* CreateHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy);
-NActors::IActor* CreateOutgoingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy);
-NActors::IActor* CreateIncomingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy);
-TCachePolicy GetDefaultCachePolicy(const THttpRequest* request, const TCachePolicy& policy = TCachePolicy());
-
-}
+NActors::IActor* CreateOutgoingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy);
+NActors::IActor* CreateIncomingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy);
+TCachePolicy GetDefaultCachePolicy(const THttpRequest* request, const TCachePolicy& policy = TCachePolicy());
+
+}
diff --git a/library/cpp/actors/http/http_config.h b/library/cpp/actors/http/http_config.h
index faeff79449..eeafd2a019 100644
--- a/library/cpp/actors/http/http_config.h
+++ b/library/cpp/actors/http/http_config.h
@@ -1,19 +1,19 @@
-#pragma once
-#include <util/network/sock.h>
+#pragma once
+#include <util/network/sock.h>
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/protos/services_common.pb.h>
-
-namespace NHttp {
-
-struct THttpConfig {
- static constexpr NActors::NLog::EComponent HttpLog = NActorsServices::EServiceCommon::HTTP;
- static constexpr size_t BUFFER_SIZE = 64 * 1024;
- static constexpr size_t BUFFER_MIN_STEP = 10 * 1024;
- static constexpr int LISTEN_QUEUE = 10;
- static constexpr TDuration SOCKET_TIMEOUT = TDuration::MilliSeconds(60000);
- static constexpr TDuration CONNECTION_TIMEOUT = TDuration::MilliSeconds(60000);
- using SocketType = TInet6StreamSocket;
- using SocketAddressType = TSockAddrInet6;
-};
-
-}
+
+namespace NHttp {
+
+struct THttpConfig {
+ static constexpr NActors::NLog::EComponent HttpLog = NActorsServices::EServiceCommon::HTTP;
+ static constexpr size_t BUFFER_SIZE = 64 * 1024;
+ static constexpr size_t BUFFER_MIN_STEP = 10 * 1024;
+ static constexpr int LISTEN_QUEUE = 10;
+ static constexpr TDuration SOCKET_TIMEOUT = TDuration::MilliSeconds(60000);
+ static constexpr TDuration CONNECTION_TIMEOUT = TDuration::MilliSeconds(60000);
+ using SocketType = TInet6StreamSocket;
+ using SocketAddressType = TSockAddrInet6;
+};
+
+}
diff --git a/library/cpp/actors/http/http_proxy.cpp b/library/cpp/actors/http/http_proxy.cpp
index 36c6855d93..2217838624 100644
--- a/library/cpp/actors/http/http_proxy.cpp
+++ b/library/cpp/actors/http/http_proxy.cpp
@@ -1,314 +1,314 @@
#include <library/cpp/actors/core/events.h>
#include <library/cpp/monlib/metrics/metric_registry.h>
-#include "http_proxy.h"
-
-namespace NHttp {
-
-class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpConfig {
-public:
- IActor* AddListeningPort(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
- IActor* listeningSocket = CreateHttpAcceptorActor(ctx.SelfID, Poller);
+#include "http_proxy.h"
+
+namespace NHttp {
+
+class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpConfig {
+public:
+ IActor* AddListeningPort(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
+ IActor* listeningSocket = CreateHttpAcceptorActor(ctx.SelfID, Poller);
TActorId acceptorId = ctx.Register(listeningSocket);
- ctx.Send(event->Forward(acceptorId));
- Acceptors.emplace_back(acceptorId);
- return listeningSocket;
- }
-
- IActor* AddOutgoingConnection(const TString& address, bool secure, const NActors::TActorContext& ctx) {
- IActor* connectionSocket = CreateOutgoingConnectionActor(ctx.SelfID, address, secure, Poller);
+ ctx.Send(event->Forward(acceptorId));
+ Acceptors.emplace_back(acceptorId);
+ return listeningSocket;
+ }
+
+ IActor* AddOutgoingConnection(const TString& address, bool secure, const NActors::TActorContext& ctx) {
+ IActor* connectionSocket = CreateOutgoingConnectionActor(ctx.SelfID, address, secure, Poller);
TActorId connectionId = ctx.Register(connectionSocket);
- Connections.emplace(connectionId);
- return connectionSocket;
- }
-
- void Bootstrap(const NActors::TActorContext& ctx) {
- Poller = ctx.Register(NActors::CreatePollerActor());
- Become(&THttpProxy::StateWork);
- }
-
+ Connections.emplace(connectionId);
+ return connectionSocket;
+ }
+
+ void Bootstrap(const NActors::TActorContext& ctx) {
+ Poller = ctx.Register(NActors::CreatePollerActor());
+ Become(&THttpProxy::StateWork);
+ }
+
THttpProxy(NMonitoring::TMetricRegistry& sensors)
- : Sensors(sensors)
- {}
-
-protected:
- STFUNC(StateWork) {
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvHttpProxy::TEvAddListeningPort, Handle);
- HFunc(TEvHttpProxy::TEvRegisterHandler, Handle);
- HFunc(TEvHttpProxy::TEvHttpIncomingRequest, Handle);
- HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, Handle);
- HFunc(TEvHttpProxy::TEvHttpIncomingResponse, Handle);
- HFunc(TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
- HFunc(TEvHttpProxy::TEvHttpAcceptorClosed, Handle);
- HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle);
- HFunc(TEvHttpProxy::TEvResolveHostRequest, Handle);
- HFunc(TEvHttpProxy::TEvReportSensors, Handle);
+ : Sensors(sensors)
+ {}
+
+protected:
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvHttpProxy::TEvAddListeningPort, Handle);
+ HFunc(TEvHttpProxy::TEvRegisterHandler, Handle);
+ HFunc(TEvHttpProxy::TEvHttpIncomingRequest, Handle);
+ HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, Handle);
+ HFunc(TEvHttpProxy::TEvHttpIncomingResponse, Handle);
+ HFunc(TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
+ HFunc(TEvHttpProxy::TEvHttpAcceptorClosed, Handle);
+ HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle);
+ HFunc(TEvHttpProxy::TEvResolveHostRequest, Handle);
+ HFunc(TEvHttpProxy::TEvReportSensors, Handle);
HFunc(NActors::TEvents::TEvPoison, Handle);
- }
- }
-
+ }
+ }
+
void PassAway() override {
Send(Poller, new NActors::TEvents::TEvPoisonPill());
for (const NActors::TActorId& connection : Connections) {
Send(connection, new NActors::TEvents::TEvPoisonPill());
- }
+ }
for (const NActors::TActorId& acceptor : Acceptors) {
Send(acceptor, new NActors::TEvents::TEvPoisonPill());
- }
+ }
NActors::TActorBootstrapped<THttpProxy>::PassAway();
- }
-
- void Handle(TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) {
- TStringBuf url = event->Get()->Request->URL.Before('?');
+ }
+
+ void Handle(TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) {
+ TStringBuf url = event->Get()->Request->URL.Before('?');
THashMap<TString, TActorId>::iterator it;
- while (!url.empty()) {
- it = Handlers.find(url);
- if (it != Handlers.end()) {
- ctx.Send(event->Forward(it->second));
- return;
- } else {
- if (url.EndsWith('/')) {
- url.Trunc(url.size() - 1);
- }
- size_t pos = url.rfind('/');
- if (pos == TStringBuf::npos) {
- break;
- } else {
- url = url.substr(0, pos + 1);
- }
- }
- }
- ctx.Send(event->Sender, new TEvHttpProxy::TEvHttpOutgoingResponse(event->Get()->Request->CreateResponseNotFound()));
- }
-
- void Handle(TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) {
- Y_UNUSED(event);
- Y_UNUSED(ctx);
- Y_FAIL("This event shouldn't be there, it should go to the http connection owner directly");
- }
-
- void Handle(TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) {
- Y_UNUSED(event);
- Y_UNUSED(ctx);
- Y_FAIL("This event shouldn't be there, it should go to the http connection directly");
- }
-
- void Handle(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
- TStringBuf host(event->Get()->Request->Host);
- bool secure(event->Get()->Request->Secure);
- NActors::IActor* actor = AddOutgoingConnection(TString(host), secure, ctx);
- ctx.Send(event->Forward(actor->SelfId()));
- }
-
- void Handle(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
- AddListeningPort(event, ctx);
- }
-
- void Handle(TEvHttpProxy::TEvHttpAcceptorClosed::TPtr event, const NActors::TActorContext&) {
- for (auto it = Acceptors.begin(); it != Acceptors.end(); ++it) {
- if (*it == event->Get()->ConnectionID) {
- Acceptors.erase(it);
- break;
- }
- }
- }
-
- void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) {
- Connections.erase(event->Get()->ConnectionID);
- }
-
- void Handle(TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext&) {
- Handlers[event->Get()->Path] = event->Get()->Handler;
- }
-
- void Handle(TEvHttpProxy::TEvResolveHostRequest::TPtr event, const NActors::TActorContext& ctx) {
- const TString& host(event->Get()->Host);
- auto it = Hosts.find(host);
- if (it == Hosts.end() || it->second.DeadlineTime > ctx.Now()) {
- TString addressPart;
- TIpPort portPart = 0;
- CrackAddress(host, addressPart, portPart);
- if (IsIPv6(addressPart)) {
- TSockAddrInet6 address(addressPart.c_str(), portPart);
- if (it == Hosts.end()) {
- it = Hosts.emplace(host, THostEntry()).first;
- }
- it->second.Address = address;
- it->second.DeadlineTime = ctx.Now() + HostsTimeToLive;
- } else {
- // TODO(xenoxeno): move to another, possible blocking actor
- try {
- const NDns::TResolvedHost* result = NDns::CachedResolve(NDns::TResolveInfo(addressPart, portPart));
- if (result != nullptr) {
- auto pAddr = result->Addr.Begin();
- while (pAddr != result->Addr.End() && pAddr->ai_family != AF_INET6) {
- ++pAddr;
- }
- if (pAddr == result->Addr.End()) {
- ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse("Invalid address family resolved"));
- return;
- }
- TSockAddrInet6 address = {};
- static_cast<sockaddr_in6&>(address) = *reinterpret_cast<sockaddr_in6*>(pAddr->ai_addr);
- LOG_DEBUG_S(ctx, HttpLog, "Host " << host << " resolved to " << address.ToString());
- if (it == Hosts.end()) {
- it = Hosts.emplace(host, THostEntry()).first;
- }
- it->second.Address = address;
- it->second.DeadlineTime = ctx.Now() + HostsTimeToLive;
- } else {
- ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse("Error resolving host"));
- return;
- }
- }
- catch (const yexception& e) {
- ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse(e.what()));
- return;
- }
- }
- }
- ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse(it->first, it->second.Address));
- }
-
- void Handle(TEvHttpProxy::TEvReportSensors::TPtr event, const NActors::TActorContext&) {
- const TEvHttpProxy::TEvReportSensors& sensors(*event->Get());
+ while (!url.empty()) {
+ it = Handlers.find(url);
+ if (it != Handlers.end()) {
+ ctx.Send(event->Forward(it->second));
+ return;
+ } else {
+ if (url.EndsWith('/')) {
+ url.Trunc(url.size() - 1);
+ }
+ size_t pos = url.rfind('/');
+ if (pos == TStringBuf::npos) {
+ break;
+ } else {
+ url = url.substr(0, pos + 1);
+ }
+ }
+ }
+ ctx.Send(event->Sender, new TEvHttpProxy::TEvHttpOutgoingResponse(event->Get()->Request->CreateResponseNotFound()));
+ }
+
+ void Handle(TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) {
+ Y_UNUSED(event);
+ Y_UNUSED(ctx);
+ Y_FAIL("This event shouldn't be there, it should go to the http connection owner directly");
+ }
+
+ void Handle(TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) {
+ Y_UNUSED(event);
+ Y_UNUSED(ctx);
+ Y_FAIL("This event shouldn't be there, it should go to the http connection directly");
+ }
+
+ void Handle(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
+ TStringBuf host(event->Get()->Request->Host);
+ bool secure(event->Get()->Request->Secure);
+ NActors::IActor* actor = AddOutgoingConnection(TString(host), secure, ctx);
+ ctx.Send(event->Forward(actor->SelfId()));
+ }
+
+ void Handle(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
+ AddListeningPort(event, ctx);
+ }
+
+ void Handle(TEvHttpProxy::TEvHttpAcceptorClosed::TPtr event, const NActors::TActorContext&) {
+ for (auto it = Acceptors.begin(); it != Acceptors.end(); ++it) {
+ if (*it == event->Get()->ConnectionID) {
+ Acceptors.erase(it);
+ break;
+ }
+ }
+ }
+
+ void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) {
+ Connections.erase(event->Get()->ConnectionID);
+ }
+
+ void Handle(TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext&) {
+ Handlers[event->Get()->Path] = event->Get()->Handler;
+ }
+
+ void Handle(TEvHttpProxy::TEvResolveHostRequest::TPtr event, const NActors::TActorContext& ctx) {
+ const TString& host(event->Get()->Host);
+ auto it = Hosts.find(host);
+ if (it == Hosts.end() || it->second.DeadlineTime > ctx.Now()) {
+ TString addressPart;
+ TIpPort portPart = 0;
+ CrackAddress(host, addressPart, portPart);
+ if (IsIPv6(addressPart)) {
+ TSockAddrInet6 address(addressPart.c_str(), portPart);
+ if (it == Hosts.end()) {
+ it = Hosts.emplace(host, THostEntry()).first;
+ }
+ it->second.Address = address;
+ it->second.DeadlineTime = ctx.Now() + HostsTimeToLive;
+ } else {
+ // TODO(xenoxeno): move to another, possible blocking actor
+ try {
+ const NDns::TResolvedHost* result = NDns::CachedResolve(NDns::TResolveInfo(addressPart, portPart));
+ if (result != nullptr) {
+ auto pAddr = result->Addr.Begin();
+ while (pAddr != result->Addr.End() && pAddr->ai_family != AF_INET6) {
+ ++pAddr;
+ }
+ if (pAddr == result->Addr.End()) {
+ ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse("Invalid address family resolved"));
+ return;
+ }
+ TSockAddrInet6 address = {};
+ static_cast<sockaddr_in6&>(address) = *reinterpret_cast<sockaddr_in6*>(pAddr->ai_addr);
+ LOG_DEBUG_S(ctx, HttpLog, "Host " << host << " resolved to " << address.ToString());
+ if (it == Hosts.end()) {
+ it = Hosts.emplace(host, THostEntry()).first;
+ }
+ it->second.Address = address;
+ it->second.DeadlineTime = ctx.Now() + HostsTimeToLive;
+ } else {
+ ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse("Error resolving host"));
+ return;
+ }
+ }
+ catch (const yexception& e) {
+ ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse(e.what()));
+ return;
+ }
+ }
+ }
+ ctx.Send(event->Sender, new TEvHttpProxy::TEvResolveHostResponse(it->first, it->second.Address));
+ }
+
+ void Handle(TEvHttpProxy::TEvReportSensors::TPtr event, const NActors::TActorContext&) {
+ const TEvHttpProxy::TEvReportSensors& sensors(*event->Get());
const static TString urlNotFound = "not-found";
const TString& url = (sensors.Status == "404" ? urlNotFound : sensors.Url);
- Sensors.Rate({
- {"sensor", "count"},
- {"direction", sensors.Direction},
- {"peer", sensors.Host},
+ Sensors.Rate({
+ {"sensor", "count"},
+ {"direction", sensors.Direction},
+ {"peer", sensors.Host},
{"url", url},
- {"status", sensors.Status}
- })->Inc();
- Sensors.HistogramRate({
- {"sensor", "time_us"},
- {"direction", sensors.Direction},
- {"peer", sensors.Host},
+ {"status", sensors.Status}
+ })->Inc();
+ Sensors.HistogramRate({
+ {"sensor", "time_us"},
+ {"direction", sensors.Direction},
+ {"peer", sensors.Host},
{"url", url},
- {"status", sensors.Status}
- },
- NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MicroSeconds());
- Sensors.HistogramRate({
- {"sensor", "time_ms"},
- {"direction", sensors.Direction},
- {"peer", sensors.Host},
+ {"status", sensors.Status}
+ },
+ NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MicroSeconds());
+ Sensors.HistogramRate({
+ {"sensor", "time_ms"},
+ {"direction", sensors.Direction},
+ {"peer", sensors.Host},
{"url", url},
- {"status", sensors.Status}
- },
- NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MilliSeconds());
- }
-
+ {"status", sensors.Status}
+ },
+ NMonitoring::ExplicitHistogram({1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 30000, 60000}))->Record(sensors.Time.MilliSeconds());
+ }
+
void Handle(NActors::TEvents::TEvPoison::TPtr, const NActors::TActorContext&) {
PassAway();
}
NActors::TActorId Poller;
TVector<TActorId> Acceptors;
-
- struct THostEntry {
- TSockAddrInet6 Address;
- TInstant DeadlineTime;
- };
-
- static constexpr TDuration HostsTimeToLive = TDuration::Seconds(60);
-
- THashMap<TString, THostEntry> Hosts;
+
+ struct THostEntry {
+ TSockAddrInet6 Address;
+ TInstant DeadlineTime;
+ };
+
+ static constexpr TDuration HostsTimeToLive = TDuration::Seconds(60);
+
+ THashMap<TString, THostEntry> Hosts;
THashMap<TString, TActorId> Handlers;
THashSet<TActorId> Connections; // outgoing
NMonitoring::TMetricRegistry& Sensors;
-};
-
-TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingRequestPtr& request, const THttpIncomingResponsePtr& response) {
- return new TEvHttpProxy::TEvReportSensors(
- "out",
- request->Host,
- request->URL.Before('?'),
- response ? response->Status : "504",
+};
+
+TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingRequestPtr& request, const THttpIncomingResponsePtr& response) {
+ return new TEvHttpProxy::TEvReportSensors(
+ "out",
+ request->Host,
+ request->URL.Before('?'),
+ response ? response->Status : "504",
TDuration::Seconds(std::abs(request->Timer.Passed()))
- );
-}
-
-TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingRequestPtr& request, const THttpOutgoingResponsePtr& response) {
- return new TEvHttpProxy::TEvReportSensors(
- "in",
- request->Host,
- request->URL.Before('?'),
- response->Status,
+ );
+}
+
+TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingRequestPtr& request, const THttpOutgoingResponsePtr& response) {
+ return new TEvHttpProxy::TEvReportSensors(
+ "in",
+ request->Host,
+ request->URL.Before('?'),
+ response->Status,
TDuration::Seconds(std::abs(request->Timer.Passed()))
- );
-}
-
+ );
+}
+
NActors::IActor* CreateHttpProxy(NMonitoring::TMetricRegistry& sensors) {
- return new THttpProxy(sensors);
-}
-
-bool IsIPv6(const TString& host) {
- return host.find_first_not_of(":0123456789abcdef") == TString::npos;
-}
-
-bool CrackURL(TStringBuf url, TStringBuf& scheme, TStringBuf& host, TStringBuf& uri) {
- url.TrySplit("://", scheme, url);
- auto pos = url.find('/');
- if (pos == TStringBuf::npos) {
- host = url;
- } else {
- host = url.substr(0, pos);
- uri = url.substr(pos);
- }
- return true;
-}
-
-void CrackAddress(const TString& address, TString& hostname, TIpPort& port) {
- size_t first_colon_pos = address.find(':');
- if (first_colon_pos != TString::npos) {
- size_t last_colon_pos = address.rfind(':');
- if (last_colon_pos == first_colon_pos) {
- // only one colon, simple case
- port = FromStringWithDefault<TIpPort>(address.substr(first_colon_pos + 1), 0);
- hostname = address.substr(0, first_colon_pos);
- } else {
- // ipv6?
- size_t closing_bracket_pos = address.rfind(']');
- if (closing_bracket_pos == TString::npos || closing_bracket_pos > last_colon_pos) {
- // whole address is ipv6 host
- hostname = address;
- } else {
- port = FromStringWithDefault<TIpPort>(address.substr(last_colon_pos + 1), 0);
- hostname = address.substr(0, last_colon_pos);
- }
- if (hostname.StartsWith('[') && hostname.EndsWith(']')) {
- hostname = hostname.substr(1, hostname.size() - 2);
- }
- }
- } else {
- hostname = address;
- }
-}
-
-
-void TrimBegin(TStringBuf& target, char delim) {
- while (!target.empty() && *target.begin() == delim) {
- target.Skip(1);
- }
-}
-
-void TrimEnd(TStringBuf& target, char delim) {
- while (!target.empty() && target.back() == delim) {
- target.Trunc(target.size() - 1);
- }
-}
-
-void Trim(TStringBuf& target, char delim) {
- TrimBegin(target, delim);
- TrimEnd(target, delim);
-}
-
-void TrimEnd(TString& target, char delim) {
- while (!target.empty() && target.back() == delim) {
- target.resize(target.size() - 1);
- }
-}
-
-}
+ return new THttpProxy(sensors);
+}
+
+bool IsIPv6(const TString& host) {
+ return host.find_first_not_of(":0123456789abcdef") == TString::npos;
+}
+
+bool CrackURL(TStringBuf url, TStringBuf& scheme, TStringBuf& host, TStringBuf& uri) {
+ url.TrySplit("://", scheme, url);
+ auto pos = url.find('/');
+ if (pos == TStringBuf::npos) {
+ host = url;
+ } else {
+ host = url.substr(0, pos);
+ uri = url.substr(pos);
+ }
+ return true;
+}
+
+void CrackAddress(const TString& address, TString& hostname, TIpPort& port) {
+ size_t first_colon_pos = address.find(':');
+ if (first_colon_pos != TString::npos) {
+ size_t last_colon_pos = address.rfind(':');
+ if (last_colon_pos == first_colon_pos) {
+ // only one colon, simple case
+ port = FromStringWithDefault<TIpPort>(address.substr(first_colon_pos + 1), 0);
+ hostname = address.substr(0, first_colon_pos);
+ } else {
+ // ipv6?
+ size_t closing_bracket_pos = address.rfind(']');
+ if (closing_bracket_pos == TString::npos || closing_bracket_pos > last_colon_pos) {
+ // whole address is ipv6 host
+ hostname = address;
+ } else {
+ port = FromStringWithDefault<TIpPort>(address.substr(last_colon_pos + 1), 0);
+ hostname = address.substr(0, last_colon_pos);
+ }
+ if (hostname.StartsWith('[') && hostname.EndsWith(']')) {
+ hostname = hostname.substr(1, hostname.size() - 2);
+ }
+ }
+ } else {
+ hostname = address;
+ }
+}
+
+
+void TrimBegin(TStringBuf& target, char delim) {
+ while (!target.empty() && *target.begin() == delim) {
+ target.Skip(1);
+ }
+}
+
+void TrimEnd(TStringBuf& target, char delim) {
+ while (!target.empty() && target.back() == delim) {
+ target.Trunc(target.size() - 1);
+ }
+}
+
+void Trim(TStringBuf& target, char delim) {
+ TrimBegin(target, delim);
+ TrimEnd(target, delim);
+}
+
+void TrimEnd(TString& target, char delim) {
+ while (!target.empty() && target.back() == delim) {
+ target.resize(target.size() - 1);
+ }
+}
+
+}
diff --git a/library/cpp/actors/http/http_proxy.h b/library/cpp/actors/http/http_proxy.h
index afd0170997..ce77bf8d5f 100644
--- a/library/cpp/actors/http/http_proxy.h
+++ b/library/cpp/actors/http/http_proxy.h
@@ -1,4 +1,4 @@
-#pragma once
+#pragma once
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/hfunc.h>
@@ -9,231 +9,231 @@
#include <library/cpp/actors/interconnect/poller_actor.h>
#include <library/cpp/dns/cache.h>
#include <library/cpp/monlib/metrics/metric_registry.h>
-#include <util/generic/variant.h>
-#include "http.h"
-#include "http_proxy_ssl.h"
-
-namespace NHttp {
-
-struct TSocketDescriptor : NActors::TSharedDescriptor, THttpConfig {
- SocketType Socket;
-
- int GetDescriptor() override {
- return static_cast<SOCKET>(Socket);
- }
-};
-
-struct TEvHttpProxy {
- enum EEv {
+#include <util/generic/variant.h>
+#include "http.h"
+#include "http_proxy_ssl.h"
+
+namespace NHttp {
+
+struct TSocketDescriptor : NActors::TSharedDescriptor, THttpConfig {
+ SocketType Socket;
+
+ int GetDescriptor() override {
+ return static_cast<SOCKET>(Socket);
+ }
+};
+
+struct TEvHttpProxy {
+ enum EEv {
EvAddListeningPort = EventSpaceBegin(NActors::TEvents::ES_HTTP),
- EvConfirmListen,
- EvRegisterHandler,
- EvHttpIncomingRequest,
- EvHttpOutgoingRequest,
- EvHttpIncomingResponse,
- EvHttpOutgoingResponse,
- EvHttpConnectionOpened,
- EvHttpConnectionClosed,
- EvHttpAcceptorClosed,
- EvResolveHostRequest,
- EvResolveHostResponse,
- EvReportSensors,
- EvEnd
- };
-
+ EvConfirmListen,
+ EvRegisterHandler,
+ EvHttpIncomingRequest,
+ EvHttpOutgoingRequest,
+ EvHttpIncomingResponse,
+ EvHttpOutgoingResponse,
+ EvHttpConnectionOpened,
+ EvHttpConnectionClosed,
+ EvHttpAcceptorClosed,
+ EvResolveHostRequest,
+ EvResolveHostResponse,
+ EvReportSensors,
+ EvEnd
+ };
+
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_HTTP), "ES_HTTP event space is too small.");
-
- struct TEvAddListeningPort : NActors::TEventLocal<TEvAddListeningPort, EvAddListeningPort> {
- TIpPort Port;
- TString WorkerName;
- bool Secure = false;
- TString CertificateFile;
- TString PrivateKeyFile;
+
+ struct TEvAddListeningPort : NActors::TEventLocal<TEvAddListeningPort, EvAddListeningPort> {
+ TIpPort Port;
+ TString WorkerName;
+ bool Secure = false;
+ TString CertificateFile;
+ TString PrivateKeyFile;
TString SslCertificatePem;
-
- TEvAddListeningPort(TIpPort port)
- : Port(port)
- {}
-
- TEvAddListeningPort(TIpPort port, const TString& workerName)
- : Port(port)
- , WorkerName(workerName)
- {}
- };
-
- struct TEvConfirmListen : NActors::TEventLocal<TEvConfirmListen, EvConfirmListen> {
- THttpConfig::SocketAddressType Address;
-
- TEvConfirmListen(const THttpConfig::SocketAddressType& address)
- : Address(address)
- {}
- };
-
- struct TEvRegisterHandler : NActors::TEventLocal<TEvRegisterHandler, EvRegisterHandler> {
- TString Path;
+
+ TEvAddListeningPort(TIpPort port)
+ : Port(port)
+ {}
+
+ TEvAddListeningPort(TIpPort port, const TString& workerName)
+ : Port(port)
+ , WorkerName(workerName)
+ {}
+ };
+
+ struct TEvConfirmListen : NActors::TEventLocal<TEvConfirmListen, EvConfirmListen> {
+ THttpConfig::SocketAddressType Address;
+
+ TEvConfirmListen(const THttpConfig::SocketAddressType& address)
+ : Address(address)
+ {}
+ };
+
+ struct TEvRegisterHandler : NActors::TEventLocal<TEvRegisterHandler, EvRegisterHandler> {
+ TString Path;
TActorId Handler;
-
+
TEvRegisterHandler(const TString& path, const TActorId& handler)
- : Path(path)
- , Handler(handler)
- {}
- };
-
- struct TEvHttpIncomingRequest : NActors::TEventLocal<TEvHttpIncomingRequest, EvHttpIncomingRequest> {
- THttpIncomingRequestPtr Request;
-
- TEvHttpIncomingRequest(THttpIncomingRequestPtr request)
- : Request(std::move(request))
- {}
- };
-
- struct TEvHttpOutgoingRequest : NActors::TEventLocal<TEvHttpOutgoingRequest, EvHttpOutgoingRequest> {
- THttpOutgoingRequestPtr Request;
- TDuration Timeout;
-
- TEvHttpOutgoingRequest(THttpOutgoingRequestPtr request)
- : Request(std::move(request))
- {}
-
- TEvHttpOutgoingRequest(THttpOutgoingRequestPtr request, TDuration timeout)
- : Request(std::move(request))
- , Timeout(timeout)
- {}
- };
-
- struct TEvHttpIncomingResponse : NActors::TEventLocal<TEvHttpIncomingResponse, EvHttpIncomingResponse> {
- THttpOutgoingRequestPtr Request;
- THttpIncomingResponsePtr Response;
- TString Error;
-
- TEvHttpIncomingResponse(THttpOutgoingRequestPtr request, THttpIncomingResponsePtr response, const TString& error)
- : Request(std::move(request))
- , Response(std::move(response))
- , Error(error)
- {}
-
- TEvHttpIncomingResponse(THttpOutgoingRequestPtr request, THttpIncomingResponsePtr response)
- : Request(std::move(request))
- , Response(std::move(response))
- {}
-
- TString GetError() const {
- TStringBuilder error;
- if (Response != nullptr && !Response->Status.StartsWith('2')) {
- error << Response->Status << ' ' << Response->Message;
- }
- if (!Error.empty()) {
- if (!error.empty()) {
- error << ';';
- }
- error << Error;
- }
- return error;
- }
- };
-
- struct TEvHttpOutgoingResponse : NActors::TEventLocal<TEvHttpOutgoingResponse, EvHttpOutgoingResponse> {
- THttpOutgoingResponsePtr Response;
-
- TEvHttpOutgoingResponse(THttpOutgoingResponsePtr response)
- : Response(std::move(response))
- {}
- };
-
- struct TEvHttpConnectionOpened : NActors::TEventLocal<TEvHttpConnectionOpened, EvHttpConnectionOpened> {
- TString PeerAddress;
+ : Path(path)
+ , Handler(handler)
+ {}
+ };
+
+ struct TEvHttpIncomingRequest : NActors::TEventLocal<TEvHttpIncomingRequest, EvHttpIncomingRequest> {
+ THttpIncomingRequestPtr Request;
+
+ TEvHttpIncomingRequest(THttpIncomingRequestPtr request)
+ : Request(std::move(request))
+ {}
+ };
+
+ struct TEvHttpOutgoingRequest : NActors::TEventLocal<TEvHttpOutgoingRequest, EvHttpOutgoingRequest> {
+ THttpOutgoingRequestPtr Request;
+ TDuration Timeout;
+
+ TEvHttpOutgoingRequest(THttpOutgoingRequestPtr request)
+ : Request(std::move(request))
+ {}
+
+ TEvHttpOutgoingRequest(THttpOutgoingRequestPtr request, TDuration timeout)
+ : Request(std::move(request))
+ , Timeout(timeout)
+ {}
+ };
+
+ struct TEvHttpIncomingResponse : NActors::TEventLocal<TEvHttpIncomingResponse, EvHttpIncomingResponse> {
+ THttpOutgoingRequestPtr Request;
+ THttpIncomingResponsePtr Response;
+ TString Error;
+
+ TEvHttpIncomingResponse(THttpOutgoingRequestPtr request, THttpIncomingResponsePtr response, const TString& error)
+ : Request(std::move(request))
+ , Response(std::move(response))
+ , Error(error)
+ {}
+
+ TEvHttpIncomingResponse(THttpOutgoingRequestPtr request, THttpIncomingResponsePtr response)
+ : Request(std::move(request))
+ , Response(std::move(response))
+ {}
+
+ TString GetError() const {
+ TStringBuilder error;
+ if (Response != nullptr && !Response->Status.StartsWith('2')) {
+ error << Response->Status << ' ' << Response->Message;
+ }
+ if (!Error.empty()) {
+ if (!error.empty()) {
+ error << ';';
+ }
+ error << Error;
+ }
+ return error;
+ }
+ };
+
+ struct TEvHttpOutgoingResponse : NActors::TEventLocal<TEvHttpOutgoingResponse, EvHttpOutgoingResponse> {
+ THttpOutgoingResponsePtr Response;
+
+ TEvHttpOutgoingResponse(THttpOutgoingResponsePtr response)
+ : Response(std::move(response))
+ {}
+ };
+
+ struct TEvHttpConnectionOpened : NActors::TEventLocal<TEvHttpConnectionOpened, EvHttpConnectionOpened> {
+ TString PeerAddress;
TActorId ConnectionID;
-
+
TEvHttpConnectionOpened(const TString& peerAddress, const TActorId& connectionID)
- : PeerAddress(peerAddress)
- , ConnectionID(connectionID)
- {}
- };
-
- struct TEvHttpConnectionClosed : NActors::TEventLocal<TEvHttpConnectionClosed, EvHttpConnectionClosed> {
+ : PeerAddress(peerAddress)
+ , ConnectionID(connectionID)
+ {}
+ };
+
+ struct TEvHttpConnectionClosed : NActors::TEventLocal<TEvHttpConnectionClosed, EvHttpConnectionClosed> {
TActorId ConnectionID;
- TDeque<THttpIncomingRequestPtr> RecycledRequests;
-
+ TDeque<THttpIncomingRequestPtr> RecycledRequests;
+
TEvHttpConnectionClosed(const TActorId& connectionID)
- : ConnectionID(connectionID)
- {}
-
+ : ConnectionID(connectionID)
+ {}
+
TEvHttpConnectionClosed(const TActorId& connectionID, TDeque<THttpIncomingRequestPtr> recycledRequests)
- : ConnectionID(connectionID)
- , RecycledRequests(std::move(recycledRequests))
- {}
- };
-
- struct TEvHttpAcceptorClosed : NActors::TEventLocal<TEvHttpAcceptorClosed, EvHttpAcceptorClosed> {
+ : ConnectionID(connectionID)
+ , RecycledRequests(std::move(recycledRequests))
+ {}
+ };
+
+ struct TEvHttpAcceptorClosed : NActors::TEventLocal<TEvHttpAcceptorClosed, EvHttpAcceptorClosed> {
TActorId ConnectionID;
-
+
TEvHttpAcceptorClosed(const TActorId& connectionID)
- : ConnectionID(connectionID)
- {}
- };
-
- struct TEvResolveHostRequest : NActors::TEventLocal<TEvResolveHostRequest, EvResolveHostRequest> {
- TString Host;
-
- TEvResolveHostRequest(const TString& host)
- : Host(host)
- {}
- };
-
- struct TEvResolveHostResponse : NActors::TEventLocal<TEvResolveHostResponse, EvResolveHostResponse> {
- TString Host;
- TSockAddrInet6 Address;
- TString Error;
-
- TEvResolveHostResponse(const TString& host, const TSockAddrInet6& address)
- : Host(host)
- , Address(address)
- {}
-
- TEvResolveHostResponse(const TString& error)
- : Error(error)
- {}
- };
-
- struct TEvReportSensors : NActors::TEventLocal<TEvReportSensors, EvReportSensors> {
- TString Direction;
- TString Host;
- TString Url;
- TString Status;
- TDuration Time;
-
- TEvReportSensors(
- TStringBuf direction,
- TStringBuf host,
- TStringBuf url,
- TStringBuf status,
- TDuration time)
- : Direction(direction)
- , Host(host)
- , Url(url)
- , Status(status)
- , Time(time)
- {}
- };
-};
-
-struct TEndpointInfo {
+ : ConnectionID(connectionID)
+ {}
+ };
+
+ struct TEvResolveHostRequest : NActors::TEventLocal<TEvResolveHostRequest, EvResolveHostRequest> {
+ TString Host;
+
+ TEvResolveHostRequest(const TString& host)
+ : Host(host)
+ {}
+ };
+
+ struct TEvResolveHostResponse : NActors::TEventLocal<TEvResolveHostResponse, EvResolveHostResponse> {
+ TString Host;
+ TSockAddrInet6 Address;
+ TString Error;
+
+ TEvResolveHostResponse(const TString& host, const TSockAddrInet6& address)
+ : Host(host)
+ , Address(address)
+ {}
+
+ TEvResolveHostResponse(const TString& error)
+ : Error(error)
+ {}
+ };
+
+ struct TEvReportSensors : NActors::TEventLocal<TEvReportSensors, EvReportSensors> {
+ TString Direction;
+ TString Host;
+ TString Url;
+ TString Status;
+ TDuration Time;
+
+ TEvReportSensors(
+ TStringBuf direction,
+ TStringBuf host,
+ TStringBuf url,
+ TStringBuf status,
+ TDuration time)
+ : Direction(direction)
+ , Host(host)
+ , Url(url)
+ , Status(status)
+ , Time(time)
+ {}
+ };
+};
+
+struct TEndpointInfo {
TActorId Proxy;
TActorId Owner;
- TString WorkerName;
- bool Secure;
- TSslHelpers::TSslHolder<SSL_CTX> SecureContext;
-};
-
+ TString WorkerName;
+ bool Secure;
+ TSslHelpers::TSslHolder<SSL_CTX> SecureContext;
+};
+
NActors::IActor* CreateHttpProxy(NMonitoring::TMetricRegistry& sensors);
NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller);
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, const TString& host, bool secure, const TActorId& poller);
-NActors::IActor* CreateIncomingConnectionActor(
- const TEndpointInfo& endpoint,
- TIntrusivePtr<TSocketDescriptor> socket,
- THttpConfig::SocketAddressType address,
- THttpIncomingRequestPtr recycledRequest = nullptr);
-TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingRequestPtr& request, const THttpIncomingResponsePtr& response);
-TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingRequestPtr& request, const THttpOutgoingResponsePtr& response);
-
-}
+NActors::IActor* CreateIncomingConnectionActor(
+ const TEndpointInfo& endpoint,
+ TIntrusivePtr<TSocketDescriptor> socket,
+ THttpConfig::SocketAddressType address,
+ THttpIncomingRequestPtr recycledRequest = nullptr);
+TEvHttpProxy::TEvReportSensors* BuildOutgoingRequestSensors(const THttpOutgoingRequestPtr& request, const THttpIncomingResponsePtr& response);
+TEvHttpProxy::TEvReportSensors* BuildIncomingRequestSensors(const THttpIncomingRequestPtr& request, const THttpOutgoingResponsePtr& response);
+
+}
diff --git a/library/cpp/actors/http/http_proxy_acceptor.cpp b/library/cpp/actors/http/http_proxy_acceptor.cpp
index 9780541b71..3d2557c6fe 100644
--- a/library/cpp/actors/http/http_proxy_acceptor.cpp
+++ b/library/cpp/actors/http/http_proxy_acceptor.cpp
@@ -1,135 +1,135 @@
-#include <util/network/sock.h>
-#include "http_proxy.h"
-#include "http_proxy_ssl.h"
-
-namespace NHttp {
-
-class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfig {
-public:
- using TBase = NActors::TActor<TAcceptorActor>;
+#include <util/network/sock.h>
+#include "http_proxy.h"
+#include "http_proxy_ssl.h"
+
+namespace NHttp {
+
+class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfig {
+public:
+ using TBase = NActors::TActor<TAcceptorActor>;
const TActorId Owner;
const TActorId Poller;
- TIntrusivePtr<TSocketDescriptor> Socket;
+ TIntrusivePtr<TSocketDescriptor> Socket;
NActors::TPollerToken::TPtr PollerToken;
THashSet<TActorId> Connections;
- TDeque<THttpIncomingRequestPtr> RecycledRequests;
- TEndpointInfo Endpoint;
-
+ TDeque<THttpIncomingRequestPtr> RecycledRequests;
+ TEndpointInfo Endpoint;
+
TAcceptorActor(const TActorId& owner, const TActorId& poller)
- : NActors::TActor<TAcceptorActor>(&TAcceptorActor::StateInit)
- , Owner(owner)
- , Poller(poller)
- , Socket(new TSocketDescriptor())
- {
- // for unit tests :(
- CheckedSetSockOpt(Socket->Socket, SOL_SOCKET, SO_REUSEADDR, (int)true, "reuse address");
-#ifdef SO_REUSEPORT
- CheckedSetSockOpt(Socket->Socket, SOL_SOCKET, SO_REUSEPORT, (int)true, "reuse port");
-#endif
- }
-
-protected:
- STFUNC(StateListening) {
- switch (ev->GetTypeRewrite()) {
+ : NActors::TActor<TAcceptorActor>(&TAcceptorActor::StateInit)
+ , Owner(owner)
+ , Poller(poller)
+ , Socket(new TSocketDescriptor())
+ {
+ // for unit tests :(
+ CheckedSetSockOpt(Socket->Socket, SOL_SOCKET, SO_REUSEADDR, (int)true, "reuse address");
+#ifdef SO_REUSEPORT
+ CheckedSetSockOpt(Socket->Socket, SOL_SOCKET, SO_REUSEPORT, (int)true, "reuse port");
+#endif
+ }
+
+protected:
+ STFUNC(StateListening) {
+ switch (ev->GetTypeRewrite()) {
HFunc(NActors::TEvPollerRegisterResult, Handle);
HFunc(NActors::TEvPollerReady, Handle);
- HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle);
- HFunc(TEvHttpProxy::TEvReportSensors, Handle);
- }
- }
-
- STFUNC(StateInit) {
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvHttpProxy::TEvAddListeningPort, HandleInit);
- }
- }
-
- void HandleInit(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
- SocketAddressType bindAddress("::", event->Get()->Port);
- Endpoint.Owner = ctx.SelfID;
- Endpoint.Proxy = Owner;
- Endpoint.WorkerName = event->Get()->WorkerName;
- Endpoint.Secure = event->Get()->Secure;
- int err = 0;
- if (Endpoint.Secure) {
+ HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle);
+ HFunc(TEvHttpProxy::TEvReportSensors, Handle);
+ }
+ }
+
+ STFUNC(StateInit) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvHttpProxy::TEvAddListeningPort, HandleInit);
+ }
+ }
+
+ void HandleInit(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
+ SocketAddressType bindAddress("::", event->Get()->Port);
+ Endpoint.Owner = ctx.SelfID;
+ Endpoint.Proxy = Owner;
+ Endpoint.WorkerName = event->Get()->WorkerName;
+ Endpoint.Secure = event->Get()->Secure;
+ int err = 0;
+ if (Endpoint.Secure) {
if (!event->Get()->SslCertificatePem.empty()) {
Endpoint.SecureContext = TSslHelpers::CreateServerContext(event->Get()->SslCertificatePem);
} else {
Endpoint.SecureContext = TSslHelpers::CreateServerContext(event->Get()->CertificateFile, event->Get()->PrivateKeyFile);
}
- if (Endpoint.SecureContext == nullptr) {
- err = -1;
- LOG_WARN_S(ctx, HttpLog, "Failed to construct server security context");
- }
- }
- if (err == 0) {
- err = Socket->Socket.Bind(&bindAddress);
- }
- if (err == 0) {
- err = Socket->Socket.Listen(LISTEN_QUEUE);
- if (err == 0) {
- LOG_INFO_S(ctx, HttpLog, "Listening on " << bindAddress.ToString());
- SetNonBlock(Socket->Socket);
+ if (Endpoint.SecureContext == nullptr) {
+ err = -1;
+ LOG_WARN_S(ctx, HttpLog, "Failed to construct server security context");
+ }
+ }
+ if (err == 0) {
+ err = Socket->Socket.Bind(&bindAddress);
+ }
+ if (err == 0) {
+ err = Socket->Socket.Listen(LISTEN_QUEUE);
+ if (err == 0) {
+ LOG_INFO_S(ctx, HttpLog, "Listening on " << bindAddress.ToString());
+ SetNonBlock(Socket->Socket);
ctx.Send(Poller, new NActors::TEvPollerRegister(Socket, SelfId(), SelfId()));
- TBase::Become(&TAcceptorActor::StateListening);
- ctx.Send(event->Sender, new TEvHttpProxy::TEvConfirmListen(bindAddress), 0, event->Cookie);
- return;
- }
- }
- LOG_WARN_S(ctx, HttpLog, "Failed to listen on " << bindAddress.ToString() << " - retrying...");
+ TBase::Become(&TAcceptorActor::StateListening);
+ ctx.Send(event->Sender, new TEvHttpProxy::TEvConfirmListen(bindAddress), 0, event->Cookie);
+ return;
+ }
+ }
+ LOG_WARN_S(ctx, HttpLog, "Failed to listen on " << bindAddress.ToString() << " - retrying...");
ctx.ExecutorThread.Schedule(TDuration::Seconds(1), event.Release());
- }
-
- void Die(const NActors::TActorContext& ctx) override {
- ctx.Send(Owner, new TEvHttpProxy::TEvHttpAcceptorClosed(ctx.SelfID));
+ }
+
+ void Die(const NActors::TActorContext& ctx) override {
+ ctx.Send(Owner, new TEvHttpProxy::TEvHttpAcceptorClosed(ctx.SelfID));
for (const NActors::TActorId& connection : Connections) {
- ctx.Send(connection, new NActors::TEvents::TEvPoisonPill());
- }
- }
-
+ ctx.Send(connection, new NActors::TEvents::TEvPoisonPill());
+ }
+ }
+
void Handle(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& /*ctx*/) {
PollerToken = std::move(ev->Get()->PollerToken);
PollerToken->Request(true, false); // request read polling
}
void Handle(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) {
- TIntrusivePtr<TSocketDescriptor> socket = new TSocketDescriptor();
- SocketAddressType addr;
+ TIntrusivePtr<TSocketDescriptor> socket = new TSocketDescriptor();
+ SocketAddressType addr;
int err;
while ((err = Socket->Socket.Accept(&socket->Socket, &addr)) == 0) {
- NActors::IActor* connectionSocket = nullptr;
- if (RecycledRequests.empty()) {
- connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr);
- } else {
- connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr, std::move(RecycledRequests.front()));
- RecycledRequests.pop_front();
- }
+ NActors::IActor* connectionSocket = nullptr;
+ if (RecycledRequests.empty()) {
+ connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr);
+ } else {
+ connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr, std::move(RecycledRequests.front()));
+ RecycledRequests.pop_front();
+ }
NActors::TActorId connectionId = ctx.Register(connectionSocket);
ctx.Send(Poller, new NActors::TEvPollerRegister(socket, connectionId, connectionId));
- Connections.emplace(connectionId);
- socket = new TSocketDescriptor();
- }
+ Connections.emplace(connectionId);
+ socket = new TSocketDescriptor();
+ }
if (err == -EAGAIN || err == -EWOULDBLOCK) { // request poller for further connection polling
Y_VERIFY(PollerToken);
PollerToken->Request(true, false);
}
- }
-
- void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) {
- Connections.erase(event->Get()->ConnectionID);
- for (auto& req : event->Get()->RecycledRequests) {
- req->Clear();
- RecycledRequests.push_back(std::move(req));
- }
- }
-
- void Handle(TEvHttpProxy::TEvReportSensors::TPtr event, const NActors::TActorContext& ctx) {
- ctx.Send(event->Forward(Owner));
- }
-};
-
+ }
+
+ void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) {
+ Connections.erase(event->Get()->ConnectionID);
+ for (auto& req : event->Get()->RecycledRequests) {
+ req->Clear();
+ RecycledRequests.push_back(std::move(req));
+ }
+ }
+
+ void Handle(TEvHttpProxy::TEvReportSensors::TPtr event, const NActors::TActorContext& ctx) {
+ ctx.Send(event->Forward(Owner));
+ }
+};
+
NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller) {
- return new TAcceptorActor(owner, poller);
-}
-
-}
+ return new TAcceptorActor(owner, poller);
+}
+
+}
diff --git a/library/cpp/actors/http/http_proxy_incoming.cpp b/library/cpp/actors/http/http_proxy_incoming.cpp
index 80fe2af53d..0608e0e25b 100644
--- a/library/cpp/actors/http/http_proxy_incoming.cpp
+++ b/library/cpp/actors/http/http_proxy_incoming.cpp
@@ -1,80 +1,80 @@
-#include "http_proxy.h"
-#include "http_proxy_sock_impl.h"
-
-namespace NHttp {
-
+#include "http_proxy.h"
+#include "http_proxy_sock_impl.h"
+
+namespace NHttp {
+
using namespace NActors;
-template <typename TSocketImpl>
-class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig {
-public:
- using TBase = TActor<TIncomingConnectionActor<TSocketImpl>>;
- static constexpr bool RecycleRequests = true;
-
- const TEndpointInfo& Endpoint;
- SocketAddressType Address;
- TList<THttpIncomingRequestPtr> Requests;
- THashMap<THttpIncomingRequestPtr, THttpOutgoingResponsePtr> Responses;
- THttpIncomingRequestPtr CurrentRequest;
- THttpOutgoingResponsePtr CurrentResponse;
- TDeque<THttpIncomingRequestPtr> RecycledRequests;
-
+template <typename TSocketImpl>
+class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig {
+public:
+ using TBase = TActor<TIncomingConnectionActor<TSocketImpl>>;
+ static constexpr bool RecycleRequests = true;
+
+ const TEndpointInfo& Endpoint;
+ SocketAddressType Address;
+ TList<THttpIncomingRequestPtr> Requests;
+ THashMap<THttpIncomingRequestPtr, THttpOutgoingResponsePtr> Responses;
+ THttpIncomingRequestPtr CurrentRequest;
+ THttpOutgoingResponsePtr CurrentResponse;
+ TDeque<THttpIncomingRequestPtr> RecycledRequests;
+
THPTimer InactivityTimer;
static constexpr TDuration InactivityTimeout = TDuration::Minutes(2);
TEvPollerReady* InactivityEvent = nullptr;
TPollerToken::TPtr PollerToken;
- TIncomingConnectionActor(
- const TEndpointInfo& endpoint,
- TIntrusivePtr<TSocketDescriptor> socket,
- SocketAddressType address,
- THttpIncomingRequestPtr recycledRequest = nullptr)
- : TBase(&TIncomingConnectionActor::StateAccepting)
- , TSocketImpl(std::move(socket))
- , Endpoint(endpoint)
- , Address(address)
- {
- if (recycledRequest != nullptr) {
- RecycledRequests.emplace_back(std::move(recycledRequest));
- }
- TSocketImpl::SetNonBlock();
- }
-
- void CleanupRequest(THttpIncomingRequestPtr& request) {
- if (RecycleRequests) {
- request->Clear();
- RecycledRequests.push_back(std::move(request));
- } else {
- request = nullptr;
- }
- }
-
- void CleanupResponse(THttpOutgoingResponsePtr& response) {
- CleanupRequest(response->Request);
- // TODO: maybe recycle too?
- response = nullptr;
- }
-
+ TIncomingConnectionActor(
+ const TEndpointInfo& endpoint,
+ TIntrusivePtr<TSocketDescriptor> socket,
+ SocketAddressType address,
+ THttpIncomingRequestPtr recycledRequest = nullptr)
+ : TBase(&TIncomingConnectionActor::StateAccepting)
+ , TSocketImpl(std::move(socket))
+ , Endpoint(endpoint)
+ , Address(address)
+ {
+ if (recycledRequest != nullptr) {
+ RecycledRequests.emplace_back(std::move(recycledRequest));
+ }
+ TSocketImpl::SetNonBlock();
+ }
+
+ void CleanupRequest(THttpIncomingRequestPtr& request) {
+ if (RecycleRequests) {
+ request->Clear();
+ RecycledRequests.push_back(std::move(request));
+ } else {
+ request = nullptr;
+ }
+ }
+
+ void CleanupResponse(THttpOutgoingResponsePtr& response) {
+ CleanupRequest(response->Request);
+ // TODO: maybe recycle too?
+ response = nullptr;
+ }
+
TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) override {
return new IEventHandle(self, parent, new TEvents::TEvBootstrap());
}
void Die(const TActorContext& ctx) override {
- ctx.Send(Endpoint.Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID, std::move(RecycledRequests)));
- TSocketImpl::Shutdown();
- TBase::Die(ctx);
- }
-
-protected:
+ ctx.Send(Endpoint.Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID, std::move(RecycledRequests)));
+ TSocketImpl::Shutdown();
+ TBase::Die(ctx);
+ }
+
+protected:
void Bootstrap(const TActorContext& ctx) {
InactivityTimer.Reset();
ctx.Schedule(InactivityTimeout, InactivityEvent = new TEvPollerReady(nullptr, false, false));
- LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") incoming connection opened");
- OnAccept(ctx);
+ LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") incoming connection opened");
+ OnAccept(ctx);
}
- void OnAccept(const NActors::TActorContext& ctx) {
+ void OnAccept(const NActors::TActorContext& ctx) {
int res;
bool read = false, write = false;
if ((res = TSocketImpl::OnAccept(Endpoint, read, write)) != 1) {
@@ -86,21 +86,21 @@ protected:
} else {
LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in Accept: " << strerror(-res));
return Die(ctx);
- }
- }
- TBase::Become(&TIncomingConnectionActor::StateConnected);
+ }
+ }
+ TBase::Become(&TIncomingConnectionActor::StateConnected);
ctx.Send(ctx.SelfID, new TEvPollerReady(nullptr, true, true));
- }
-
+ }
+
void HandleAccepting(TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) {
PollerToken = std::move(ev->Get()->PollerToken);
- OnAccept(ctx);
- }
-
+ OnAccept(ctx);
+ }
+
void HandleAccepting(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) {
- OnAccept(ctx);
- }
-
+ OnAccept(ctx);
+ }
+
void HandleConnected(TEvPollerReady::TPtr event, const TActorContext& ctx) {
if (event->Get()->Read) {
for (;;) {
@@ -114,7 +114,7 @@ protected:
CurrentRequest->Address = Address;
CurrentRequest->WorkerName = Endpoint.WorkerName;
CurrentRequest->Secure = Endpoint.Secure;
- }
+ }
if (!CurrentRequest->EnsureEnoughSpaceAvailable()) {
LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - not enough space available");
return Die(ctx);
@@ -134,13 +134,13 @@ protected:
CurrentRequest = nullptr;
} else if (CurrentRequest->IsError()) {
LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -! (" << CurrentRequest->Method << " " << CurrentRequest->URL << ")");
- bool success = Respond(CurrentRequest->CreateResponseBadRequest(), ctx);
- if (!success) {
- return;
- }
+ bool success = Respond(CurrentRequest->CreateResponseBadRequest(), ctx);
+ if (!success) {
+ return;
+ }
CurrentRequest = nullptr;
}
- }
+ }
} else if (-res == EAGAIN || -res == EWOULDBLOCK) {
if (PollerToken) {
if (!read && !write) {
@@ -148,18 +148,18 @@ protected:
}
PollerToken->Request(read, write);
}
- break;
+ break;
} else if (-res == EINTR) {
continue;
} else if (!res) {
// connection closed
- LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed");
- return Die(ctx);
+ LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed");
+ return Die(ctx);
} else {
- LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in Receive: " << strerror(-res));
- return Die(ctx);
- }
- }
+ LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in Receive: " << strerror(-res));
+ return Die(ctx);
+ }
+ }
if (event->Get() == InactivityEvent) {
const TDuration passed = TDuration::Seconds(std::abs(InactivityTimer.Passed()));
if (passed >= InactivityTimeout) {
@@ -173,83 +173,83 @@ protected:
if (event->Get()->Write) {
FlushOutput(ctx);
}
- }
-
+ }
+
void HandleConnected(TEvPollerRegisterResult::TPtr ev, const TActorContext& /*ctx*/) {
PollerToken = std::move(ev->Get()->PollerToken);
PollerToken->Request(true, true);
- }
-
- void HandleConnected(TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const TActorContext& ctx) {
- Respond(event->Get()->Response, ctx);
- }
-
- bool Respond(THttpOutgoingResponsePtr response, const TActorContext& ctx) {
- THttpIncomingRequestPtr request = response->GetRequest();
- response->Finish();
- LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- (" << response->Status << " " << response->Message << ")");
- if (response->Status != "200" && response->Status != "404") {
- static constexpr size_t MAX_LOGGED_SIZE = 1024;
- LOG_DEBUG_S(ctx, HttpLog,
- "(#"
- << TSocketImpl::GetRawSocket()
- << ","
- << Address
- << ") Request: "
- << request->GetObfuscatedData().substr(0, MAX_LOGGED_SIZE));
- LOG_DEBUG_S(ctx, HttpLog,
- "(#"
- << TSocketImpl::GetRawSocket()
- << ","
- << Address
- << ") Response: "
- << TString(response->GetRawData()).substr(0, MAX_LOGGED_SIZE));
- }
+ }
+
+ void HandleConnected(TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const TActorContext& ctx) {
+ Respond(event->Get()->Response, ctx);
+ }
+
+ bool Respond(THttpOutgoingResponsePtr response, const TActorContext& ctx) {
+ THttpIncomingRequestPtr request = response->GetRequest();
+ response->Finish();
+ LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- (" << response->Status << " " << response->Message << ")");
+ if (response->Status != "200" && response->Status != "404") {
+ static constexpr size_t MAX_LOGGED_SIZE = 1024;
+ LOG_DEBUG_S(ctx, HttpLog,
+ "(#"
+ << TSocketImpl::GetRawSocket()
+ << ","
+ << Address
+ << ") Request: "
+ << request->GetObfuscatedData().substr(0, MAX_LOGGED_SIZE));
+ LOG_DEBUG_S(ctx, HttpLog,
+ "(#"
+ << TSocketImpl::GetRawSocket()
+ << ","
+ << Address
+ << ") Response: "
+ << TString(response->GetRawData()).substr(0, MAX_LOGGED_SIZE));
+ }
THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildIncomingRequestSensors(request, response));
- ctx.Send(Endpoint.Owner, sensors.Release());
- if (request == Requests.front() && CurrentResponse == nullptr) {
- CurrentResponse = response;
- return FlushOutput(ctx);
- } else {
- // we are ahead of our pipeline
- Responses.emplace(request, response);
- return true;
- }
- }
-
- bool FlushOutput(const TActorContext& ctx) {
- while (CurrentResponse != nullptr) {
- size_t size = CurrentResponse->Size();
- if (size == 0) {
- Y_VERIFY(Requests.front() == CurrentResponse->GetRequest());
- bool close = CurrentResponse->IsConnectionClose();
- Requests.pop_front();
- CleanupResponse(CurrentResponse);
- if (!Requests.empty()) {
- auto it = Responses.find(Requests.front());
- if (it != Responses.end()) {
- CurrentResponse = it->second;
- Responses.erase(it);
- continue;
- } else {
- LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - FlushOutput request not found");
- Die(ctx);
- return false;
- }
- } else {
- if (close) {
- LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed");
- Die(ctx);
- return false;
- } else {
- continue;
- }
- }
- }
+ ctx.Send(Endpoint.Owner, sensors.Release());
+ if (request == Requests.front() && CurrentResponse == nullptr) {
+ CurrentResponse = response;
+ return FlushOutput(ctx);
+ } else {
+ // we are ahead of our pipeline
+ Responses.emplace(request, response);
+ return true;
+ }
+ }
+
+ bool FlushOutput(const TActorContext& ctx) {
+ while (CurrentResponse != nullptr) {
+ size_t size = CurrentResponse->Size();
+ if (size == 0) {
+ Y_VERIFY(Requests.front() == CurrentResponse->GetRequest());
+ bool close = CurrentResponse->IsConnectionClose();
+ Requests.pop_front();
+ CleanupResponse(CurrentResponse);
+ if (!Requests.empty()) {
+ auto it = Responses.find(Requests.front());
+ if (it != Responses.end()) {
+ CurrentResponse = it->second;
+ Responses.erase(it);
+ continue;
+ } else {
+ LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - FlushOutput request not found");
+ Die(ctx);
+ return false;
+ }
+ } else {
+ if (close) {
+ LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed");
+ Die(ctx);
+ return false;
+ } else {
+ continue;
+ }
+ }
+ }
bool read = false, write = false;
ssize_t res = TSocketImpl::Send(CurrentResponse->Data(), size, read, write);
- if (res > 0) {
- CurrentResponse->ChopHead(res);
+ if (res > 0) {
+ CurrentResponse->ChopHead(res);
} else if (-res == EINTR) {
continue;
} else if (-res == EAGAIN || -res == EWOULDBLOCK) {
@@ -258,45 +258,45 @@ protected:
write = true;
}
PollerToken->Request(read, write);
- }
- break;
+ }
+ break;
} else {
CleanupResponse(CurrentResponse);
LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in FlushOutput: " << strerror(-res));
- Die(ctx);
- return false;
- }
- }
- return true;
- }
-
- STFUNC(StateAccepting) {
- switch (ev->GetTypeRewrite()) {
+ Die(ctx);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ STFUNC(StateAccepting) {
+ switch (ev->GetTypeRewrite()) {
CFunc(TEvents::TEvBootstrap::EventType, Bootstrap);
HFunc(TEvPollerReady, HandleAccepting);
HFunc(TEvPollerRegisterResult, HandleAccepting);
- }
- }
-
- STFUNC(StateConnected) {
- switch (ev->GetTypeRewrite()) {
+ }
+ }
+
+ STFUNC(StateConnected) {
+ switch (ev->GetTypeRewrite()) {
HFunc(TEvPollerReady, HandleConnected);
- HFunc(TEvHttpProxy::TEvHttpOutgoingResponse, HandleConnected);
+ HFunc(TEvHttpProxy::TEvHttpOutgoingResponse, HandleConnected);
HFunc(TEvPollerRegisterResult, HandleConnected);
- }
- }
-};
-
-IActor* CreateIncomingConnectionActor(
- const TEndpointInfo& endpoint,
- TIntrusivePtr<TSocketDescriptor> socket,
- THttpConfig::SocketAddressType address,
- THttpIncomingRequestPtr recycledRequest) {
- if (endpoint.Secure) {
- return new TIncomingConnectionActor<TSecureSocketImpl>(endpoint, std::move(socket), address, std::move(recycledRequest));
- } else {
- return new TIncomingConnectionActor<TPlainSocketImpl>(endpoint, std::move(socket), address, std::move(recycledRequest));
- }
-}
-
-}
+ }
+ }
+};
+
+IActor* CreateIncomingConnectionActor(
+ const TEndpointInfo& endpoint,
+ TIntrusivePtr<TSocketDescriptor> socket,
+ THttpConfig::SocketAddressType address,
+ THttpIncomingRequestPtr recycledRequest) {
+ if (endpoint.Secure) {
+ return new TIncomingConnectionActor<TSecureSocketImpl>(endpoint, std::move(socket), address, std::move(recycledRequest));
+ } else {
+ return new TIncomingConnectionActor<TPlainSocketImpl>(endpoint, std::move(socket), address, std::move(recycledRequest));
+ }
+}
+
+}
diff --git a/library/cpp/actors/http/http_proxy_outgoing.cpp b/library/cpp/actors/http/http_proxy_outgoing.cpp
index d9189dba8a..5b3d07c614 100644
--- a/library/cpp/actors/http/http_proxy_outgoing.cpp
+++ b/library/cpp/actors/http/http_proxy_outgoing.cpp
@@ -1,92 +1,92 @@
-#include "http_proxy.h"
-#include "http_proxy_sock_impl.h"
-
-namespace NHttp {
-
-template <typename TSocketImpl>
-class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig {
-public:
- using TBase = NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>;
- using TSelf = TOutgoingConnectionActor<TSocketImpl>;
+#include "http_proxy.h"
+#include "http_proxy_sock_impl.h"
+
+namespace NHttp {
+
+template <typename TSocketImpl>
+class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig {
+public:
+ using TBase = NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>;
+ using TSelf = TOutgoingConnectionActor<TSocketImpl>;
const TActorId Owner;
const TActorId Poller;
- SocketAddressType Address;
- TString Host;
+ SocketAddressType Address;
+ TString Host;
TActorId RequestOwner;
- THttpOutgoingRequestPtr Request;
- THttpIncomingResponsePtr Response;
- TInstant LastActivity;
- TDuration ConnectionTimeout = CONNECTION_TIMEOUT;
+ THttpOutgoingRequestPtr Request;
+ THttpIncomingResponsePtr Response;
+ TInstant LastActivity;
+ TDuration ConnectionTimeout = CONNECTION_TIMEOUT;
NActors::TPollerToken::TPtr PollerToken;
-
+
TOutgoingConnectionActor(const TActorId& owner, const TString& host, const TActorId& poller)
- : TBase(&TSelf::StateWaiting)
- , Owner(owner)
- , Poller(poller)
- , Host(host)
- {
- TSocketImpl::SetNonBlock();
- TSocketImpl::SetTimeout(SOCKET_TIMEOUT);
- }
-
+ : TBase(&TSelf::StateWaiting)
+ , Owner(owner)
+ , Poller(poller)
+ , Host(host)
+ {
+ TSocketImpl::SetNonBlock();
+ TSocketImpl::SetTimeout(SOCKET_TIMEOUT);
+ }
+
void Die(const NActors::TActorContext& ctx) override {
- ctx.Send(Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID));
- TSocketImpl::Shutdown(); // to avoid errors when connection already closed
- TBase::Die(ctx);
- }
-
- void ReplyAndDie(const NActors::TActorContext& ctx) {
- LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -> (" << Response->Status << " " << Response->Message << ")");
- ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response));
+ ctx.Send(Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID));
+ TSocketImpl::Shutdown(); // to avoid errors when connection already closed
+ TBase::Die(ctx);
+ }
+
+ void ReplyAndDie(const NActors::TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -> (" << Response->Status << " " << Response->Message << ")");
+ ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response));
RequestOwner = TActorId();
THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response));
- ctx.Send(Owner, sensors.Release());
- LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed");
- Die(ctx);
- }
-
- void ReplyErrorAndDie(const NActors::TActorContext& ctx, const TString& error) {
- LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed with error: " << error);
- if (RequestOwner) {
- ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response, error));
+ ctx.Send(Owner, sensors.Release());
+ LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed");
+ Die(ctx);
+ }
+
+ void ReplyErrorAndDie(const NActors::TActorContext& ctx, const TString& error) {
+ LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed with error: " << error);
+ if (RequestOwner) {
+ ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response, error));
RequestOwner = TActorId();
THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response));
- ctx.Send(Owner, sensors.Release());
- Die(ctx);
- }
- }
-
-protected:
- void FailConnection(const NActors::TActorContext& ctx, const TString& error) {
- if (Request) {
- return ReplyErrorAndDie(ctx, error);
- }
- return TBase::Become(&TOutgoingConnectionActor::StateFailed);
- }
-
- void Connect(const NActors::TActorContext& ctx) {
- LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connecting");
- int res = TSocketImpl::Connect(Address);
- RegisterPoller(ctx);
- switch (-res) {
- case 0:
- return OnConnect(ctx);
- case EINPROGRESS:
- case EAGAIN:
- return TBase::Become(&TOutgoingConnectionActor::StateConnecting);
- default:
- return ReplyErrorAndDie(ctx, strerror(-res));
- }
- }
-
- void FlushOutput(const NActors::TActorContext& ctx) {
- if (Request != nullptr) {
- Request->Finish();
+ ctx.Send(Owner, sensors.Release());
+ Die(ctx);
+ }
+ }
+
+protected:
+ void FailConnection(const NActors::TActorContext& ctx, const TString& error) {
+ if (Request) {
+ return ReplyErrorAndDie(ctx, error);
+ }
+ return TBase::Become(&TOutgoingConnectionActor::StateFailed);
+ }
+
+ void Connect(const NActors::TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connecting");
+ int res = TSocketImpl::Connect(Address);
+ RegisterPoller(ctx);
+ switch (-res) {
+ case 0:
+ return OnConnect(ctx);
+ case EINPROGRESS:
+ case EAGAIN:
+ return TBase::Become(&TOutgoingConnectionActor::StateConnecting);
+ default:
+ return ReplyErrorAndDie(ctx, strerror(-res));
+ }
+ }
+
+ void FlushOutput(const NActors::TActorContext& ctx) {
+ if (Request != nullptr) {
+ Request->Finish();
while (auto size = Request->Size()) {
bool read = false, write = false;
ssize_t res = TSocketImpl::Send(Request->Data(), size, read, write);
- if (res > 0) {
- Request->ChopHead(res);
+ if (res > 0) {
+ Request->ChopHead(res);
} else if (-res == EINTR) {
continue;
} else if (-res == EAGAIN || -res == EWOULDBLOCK) {
@@ -97,30 +97,30 @@ protected:
PollerToken->Request(read, write);
}
break;
- } else {
+ } else {
if (!res) {
- ReplyAndDie(ctx);
+ ReplyAndDie(ctx);
} else {
- ReplyErrorAndDie(ctx, strerror(-res));
- }
+ ReplyErrorAndDie(ctx, strerror(-res));
+ }
break;
- }
- }
- }
- }
-
- void PullInput(const NActors::TActorContext& ctx) {
+ }
+ }
+ }
+ }
+
+ void PullInput(const NActors::TActorContext& ctx) {
for (;;) {
- if (Response == nullptr) {
- Response = new THttpIncomingResponse(Request);
- }
- if (!Response->EnsureEnoughSpaceAvailable()) {
- return ReplyErrorAndDie(ctx, "Not enough space in socket buffer");
- }
+ if (Response == nullptr) {
+ Response = new THttpIncomingResponse(Request);
+ }
+ if (!Response->EnsureEnoughSpaceAvailable()) {
+ return ReplyErrorAndDie(ctx, "Not enough space in socket buffer");
+ }
bool read = false, write = false;
ssize_t res = TSocketImpl::Recv(Response->Pos(), Response->Avail(), read, write);
- if (res > 0) {
- Response->Advance(res);
+ if (res > 0) {
+ Response->Advance(res);
if (Response->IsDone() && Response->IsReady()) {
return ReplyAndDie(ctx);
}
@@ -130,169 +130,169 @@ protected:
if (PollerToken) {
if (!read && !write) {
read = true;
- }
+ }
PollerToken->Request(read, write);
- }
+ }
return;
- } else {
+ } else {
if (!res) {
- Response->ConnectionClosed();
- }
+ Response->ConnectionClosed();
+ }
if (Response->IsDone() && Response->IsReady()) {
return ReplyAndDie(ctx);
}
return ReplyErrorAndDie(ctx, strerror(-res));
- }
+ }
}
- }
-
- void RegisterPoller(const NActors::TActorContext& ctx) {
+ }
+
+ void RegisterPoller(const NActors::TActorContext& ctx) {
ctx.Send(Poller, new NActors::TEvPollerRegister(TSocketImpl::Socket, ctx.SelfID, ctx.SelfID));
- }
-
- void OnConnect(const NActors::TActorContext& ctx) {
+ }
+
+ void OnConnect(const NActors::TActorContext& ctx) {
bool read = false, write = false;
if (int res = TSocketImpl::OnConnect(read, write); res != 1) {
if (-res == EAGAIN) {
if (PollerToken) {
PollerToken->Request(read, write);
}
- return;
+ return;
} else {
return ReplyErrorAndDie(ctx, strerror(-res));
- }
- }
- LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") outgoing connection opened");
- TBase::Become(&TOutgoingConnectionActor::StateConnected);
- LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- (" << Request->Method << " " << Request->URL << ")");
+ }
+ }
+ LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") outgoing connection opened");
+ TBase::Become(&TOutgoingConnectionActor::StateConnected);
+ LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- (" << Request->Method << " " << Request->URL << ")");
ctx.Send(ctx.SelfID, new NActors::TEvPollerReady(nullptr, true, true));
- }
-
- void HandleResolving(TEvHttpProxy::TEvResolveHostResponse::TPtr event, const NActors::TActorContext& ctx) {
- LastActivity = ctx.Now();
- if (!event->Get()->Error.empty()) {
- return FailConnection(ctx, event->Get()->Error);
- }
- Address = event->Get()->Address;
- if (Address.GetPort() == 0) {
- Address.SetPort(Request->Secure ? 443 : 80);
- }
- Connect(ctx);
- }
-
+ }
+
+ void HandleResolving(TEvHttpProxy::TEvResolveHostResponse::TPtr event, const NActors::TActorContext& ctx) {
+ LastActivity = ctx.Now();
+ if (!event->Get()->Error.empty()) {
+ return FailConnection(ctx, event->Get()->Error);
+ }
+ Address = event->Get()->Address;
+ if (Address.GetPort() == 0) {
+ Address.SetPort(Request->Secure ? 443 : 80);
+ }
+ Connect(ctx);
+ }
+
void HandleConnecting(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) {
- LastActivity = ctx.Now();
- int res = TSocketImpl::GetError();
- if (res == 0) {
- OnConnect(ctx);
- } else {
- FailConnection(ctx, TStringBuilder() << strerror(res));
- }
- }
-
+ LastActivity = ctx.Now();
+ int res = TSocketImpl::GetError();
+ if (res == 0) {
+ OnConnect(ctx);
+ } else {
+ FailConnection(ctx, TStringBuilder() << strerror(res));
+ }
+ }
+
void HandleConnecting(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) {
PollerToken = std::move(ev->Get()->PollerToken);
- LastActivity = ctx.Now();
- int res = TSocketImpl::GetError();
- if (res == 0) {
- OnConnect(ctx);
- } else {
- FailConnection(ctx, TStringBuilder() << strerror(res));
- }
- }
-
- void HandleWaiting(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
- LastActivity = ctx.Now();
- Request = std::move(event->Get()->Request);
- Host = Request->Host;
- LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << ") resolving " << Host);
- Request->Timer.Reset();
- RequestOwner = event->Sender;
- ctx.Send(Owner, new TEvHttpProxy::TEvResolveHostRequest(Host));
- if (event->Get()->Timeout) {
- ConnectionTimeout = event->Get()->Timeout;
- TSocketImpl::SetTimeout(ConnectionTimeout);
- }
- ctx.Schedule(ConnectionTimeout, new NActors::TEvents::TEvWakeup());
- LastActivity = ctx.Now();
- TBase::Become(&TOutgoingConnectionActor::StateResolving);
- }
-
+ LastActivity = ctx.Now();
+ int res = TSocketImpl::GetError();
+ if (res == 0) {
+ OnConnect(ctx);
+ } else {
+ FailConnection(ctx, TStringBuilder() << strerror(res));
+ }
+ }
+
+ void HandleWaiting(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
+ LastActivity = ctx.Now();
+ Request = std::move(event->Get()->Request);
+ Host = Request->Host;
+ LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << ") resolving " << Host);
+ Request->Timer.Reset();
+ RequestOwner = event->Sender;
+ ctx.Send(Owner, new TEvHttpProxy::TEvResolveHostRequest(Host));
+ if (event->Get()->Timeout) {
+ ConnectionTimeout = event->Get()->Timeout;
+ TSocketImpl::SetTimeout(ConnectionTimeout);
+ }
+ ctx.Schedule(ConnectionTimeout, new NActors::TEvents::TEvWakeup());
+ LastActivity = ctx.Now();
+ TBase::Become(&TOutgoingConnectionActor::StateResolving);
+ }
+
void HandleConnected(NActors::TEvPollerReady::TPtr event, const NActors::TActorContext& ctx) {
- LastActivity = ctx.Now();
+ LastActivity = ctx.Now();
if (event->Get()->Read) {
PullInput(ctx);
- }
+ }
if (event->Get()->Write) {
FlushOutput(ctx);
}
- }
-
+ }
+
void HandleConnected(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) {
PollerToken = std::move(ev->Get()->PollerToken);
- LastActivity = ctx.Now();
+ LastActivity = ctx.Now();
PullInput(ctx);
- FlushOutput(ctx);
- }
-
- void HandleFailed(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
- Request = std::move(event->Get()->Request);
- RequestOwner = event->Sender;
- ReplyErrorAndDie(ctx, "Failed");
- }
-
- void HandleTimeout(const NActors::TActorContext& ctx) {
- TDuration inactivityTime = ctx.Now() - LastActivity;
- if (inactivityTime >= ConnectionTimeout) {
- FailConnection(ctx, "Connection timed out");
- } else {
- ctx.Schedule(Min(ConnectionTimeout - inactivityTime, TDuration::MilliSeconds(100)), new NActors::TEvents::TEvWakeup());
- }
- }
-
- STFUNC(StateWaiting) {
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleWaiting);
- CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
- }
- }
-
- STFUNC(StateResolving) {
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvHttpProxy::TEvResolveHostResponse, HandleResolving);
- CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
- }
- }
-
- STFUNC(StateConnecting) {
- switch (ev->GetTypeRewrite()) {
+ FlushOutput(ctx);
+ }
+
+ void HandleFailed(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
+ Request = std::move(event->Get()->Request);
+ RequestOwner = event->Sender;
+ ReplyErrorAndDie(ctx, "Failed");
+ }
+
+ void HandleTimeout(const NActors::TActorContext& ctx) {
+ TDuration inactivityTime = ctx.Now() - LastActivity;
+ if (inactivityTime >= ConnectionTimeout) {
+ FailConnection(ctx, "Connection timed out");
+ } else {
+ ctx.Schedule(Min(ConnectionTimeout - inactivityTime, TDuration::MilliSeconds(100)), new NActors::TEvents::TEvWakeup());
+ }
+ }
+
+ STFUNC(StateWaiting) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleWaiting);
+ CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
+ }
+ }
+
+ STFUNC(StateResolving) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvHttpProxy::TEvResolveHostResponse, HandleResolving);
+ CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
+ }
+ }
+
+ STFUNC(StateConnecting) {
+ switch (ev->GetTypeRewrite()) {
HFunc(NActors::TEvPollerReady, HandleConnecting);
- CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
+ CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
HFunc(NActors::TEvPollerRegisterResult, HandleConnecting);
- }
- }
-
- STFUNC(StateConnected) {
- switch (ev->GetTypeRewrite()) {
+ }
+ }
+
+ STFUNC(StateConnected) {
+ switch (ev->GetTypeRewrite()) {
HFunc(NActors::TEvPollerReady, HandleConnected);
- CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
+ CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
HFunc(NActors::TEvPollerRegisterResult, HandleConnected);
- }
- }
-
- STFUNC(StateFailed) {
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleFailed);
- }
- }
-};
-
+ }
+ }
+
+ STFUNC(StateFailed) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleFailed);
+ }
+ }
+};
+
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, const TString& host, bool secure, const TActorId& poller) {
- if (secure) {
- return new TOutgoingConnectionActor<TSecureSocketImpl>(owner, host, poller);
- } else {
- return new TOutgoingConnectionActor<TPlainSocketImpl>(owner, host, poller);
- }
-}
-
-}
+ if (secure) {
+ return new TOutgoingConnectionActor<TSecureSocketImpl>(owner, host, poller);
+ } else {
+ return new TOutgoingConnectionActor<TPlainSocketImpl>(owner, host, poller);
+ }
+}
+
+}
diff --git a/library/cpp/actors/http/http_proxy_sock_impl.h b/library/cpp/actors/http/http_proxy_sock_impl.h
index bf8c71d05a..e7812cc5e1 100644
--- a/library/cpp/actors/http/http_proxy_sock_impl.h
+++ b/library/cpp/actors/http/http_proxy_sock_impl.h
@@ -1,262 +1,262 @@
-#pragma once
-
-#include "http.h"
-#include "http_proxy.h"
-
-namespace NHttp {
-
-struct TPlainSocketImpl : virtual public THttpConfig {
- TIntrusivePtr<TSocketDescriptor> Socket;
-
- TPlainSocketImpl()
- : Socket(new TSocketDescriptor())
- {}
-
- TPlainSocketImpl(TIntrusivePtr<TSocketDescriptor> socket)
- : Socket(std::move(socket))
- {}
-
- SOCKET GetRawSocket() const {
- return static_cast<SOCKET>(Socket->Socket);
- }
-
- void SetNonBlock(bool nonBlock = true) noexcept {
- try {
- ::SetNonBlock(Socket->Socket, nonBlock);
- }
- catch (const yexception&) {
- }
- }
-
- void SetTimeout(TDuration timeout) noexcept {
- try {
- ::SetSocketTimeout(Socket->Socket, timeout.Seconds(), timeout.MilliSecondsOfSecond());
- }
- catch (const yexception&) {
- }
- }
-
- void Shutdown() {
- //Socket->Socket.ShutDown(SHUT_RDWR); // KIKIMR-3895
- ::shutdown(Socket->Socket, SHUT_RDWR);
- }
-
- int Connect(const SocketAddressType& address) {
- return Socket->Socket.Connect(&address);
- }
-
+#pragma once
+
+#include "http.h"
+#include "http_proxy.h"
+
+namespace NHttp {
+
+struct TPlainSocketImpl : virtual public THttpConfig {
+ TIntrusivePtr<TSocketDescriptor> Socket;
+
+ TPlainSocketImpl()
+ : Socket(new TSocketDescriptor())
+ {}
+
+ TPlainSocketImpl(TIntrusivePtr<TSocketDescriptor> socket)
+ : Socket(std::move(socket))
+ {}
+
+ SOCKET GetRawSocket() const {
+ return static_cast<SOCKET>(Socket->Socket);
+ }
+
+ void SetNonBlock(bool nonBlock = true) noexcept {
+ try {
+ ::SetNonBlock(Socket->Socket, nonBlock);
+ }
+ catch (const yexception&) {
+ }
+ }
+
+ void SetTimeout(TDuration timeout) noexcept {
+ try {
+ ::SetSocketTimeout(Socket->Socket, timeout.Seconds(), timeout.MilliSecondsOfSecond());
+ }
+ catch (const yexception&) {
+ }
+ }
+
+ void Shutdown() {
+ //Socket->Socket.ShutDown(SHUT_RDWR); // KIKIMR-3895
+ ::shutdown(Socket->Socket, SHUT_RDWR);
+ }
+
+ int Connect(const SocketAddressType& address) {
+ return Socket->Socket.Connect(&address);
+ }
+
static constexpr int OnConnect(bool&, bool&) {
return 1;
- }
-
+ }
+
static constexpr int OnAccept(const TEndpointInfo&, bool&, bool&) {
return 1;
- }
-
- bool IsGood() {
- int res;
- GetSockOpt(Socket->Socket, SOL_SOCKET, SO_ERROR, res);
- return res == 0;
- }
-
- int GetError() {
- int res;
- GetSockOpt(Socket->Socket, SOL_SOCKET, SO_ERROR, res);
- return res;
- }
-
+ }
+
+ bool IsGood() {
+ int res;
+ GetSockOpt(Socket->Socket, SOL_SOCKET, SO_ERROR, res);
+ return res == 0;
+ }
+
+ int GetError() {
+ int res;
+ GetSockOpt(Socket->Socket, SOL_SOCKET, SO_ERROR, res);
+ return res;
+ }
+
ssize_t Send(const void* data, size_t size, bool&, bool&) {
- return Socket->Socket.Send(data, size);
- }
-
+ return Socket->Socket.Send(data, size);
+ }
+
ssize_t Recv(void* data, size_t size, bool&, bool&) {
- return Socket->Socket.Recv(data, size);
- }
-};
-
-struct TSecureSocketImpl : TPlainSocketImpl, TSslHelpers {
- static TSecureSocketImpl* IO(BIO* bio) noexcept {
- return static_cast<TSecureSocketImpl*>(BIO_get_data(bio));
- }
-
- static int IoWrite(BIO* bio, const char* data, int dlen) noexcept {
- BIO_clear_retry_flags(bio);
- int res = IO(bio)->Socket->Socket.Send(data, dlen);
- if (-res == EAGAIN) {
- BIO_set_retry_write(bio);
- }
- return res;
- }
-
- static int IoRead(BIO* bio, char* data, int dlen) noexcept {
- BIO_clear_retry_flags(bio);
- int res = IO(bio)->Socket->Socket.Recv(data, dlen);
- if (-res == EAGAIN) {
- BIO_set_retry_read(bio);
- }
- return res;
- }
-
- static int IoPuts(BIO* bio, const char* buf) noexcept {
- Y_UNUSED(bio);
- Y_UNUSED(buf);
- return -2;
- }
-
- static int IoGets(BIO* bio, char* buf, int size) noexcept {
- Y_UNUSED(bio);
- Y_UNUSED(buf);
- Y_UNUSED(size);
- return -2;
- }
-
- static long IoCtrl(BIO* bio, int cmd, long larg, void* parg) noexcept {
- Y_UNUSED(larg);
- Y_UNUSED(parg);
-
- if (cmd == BIO_CTRL_FLUSH) {
- IO(bio)->Flush();
- return 1;
- }
-
- return -2;
- }
-
- static int IoCreate(BIO* bio) noexcept {
- BIO_set_data(bio, nullptr);
- BIO_set_init(bio, 1);
- return 1;
- }
-
- static int IoDestroy(BIO* bio) noexcept {
- BIO_set_data(bio, nullptr);
- BIO_set_init(bio, 0);
- return 1;
- }
-
- static BIO_METHOD* CreateIoMethod() {
- BIO_METHOD* method = BIO_meth_new(BIO_get_new_index() | BIO_TYPE_SOURCE_SINK, "SecureSocketImpl");
- BIO_meth_set_write(method, IoWrite);
- BIO_meth_set_read(method, IoRead);
- BIO_meth_set_puts(method, IoPuts);
- BIO_meth_set_gets(method, IoGets);
- BIO_meth_set_ctrl(method, IoCtrl);
- BIO_meth_set_create(method, IoCreate);
- BIO_meth_set_destroy(method, IoDestroy);
- return method;
- }
-
- static BIO_METHOD* IoMethod() {
- static BIO_METHOD* method = CreateIoMethod();
- return method;
- }
-
- TSslHolder<BIO> Bio;
- TSslHolder<SSL_CTX> Ctx;
- TSslHolder<SSL> Ssl;
-
- TSecureSocketImpl() = default;
-
- TSecureSocketImpl(TIntrusivePtr<TSocketDescriptor> socket)
- : TPlainSocketImpl(std::move(socket))
- {}
-
- void InitClientSsl() {
+ return Socket->Socket.Recv(data, size);
+ }
+};
+
+struct TSecureSocketImpl : TPlainSocketImpl, TSslHelpers {
+ static TSecureSocketImpl* IO(BIO* bio) noexcept {
+ return static_cast<TSecureSocketImpl*>(BIO_get_data(bio));
+ }
+
+ static int IoWrite(BIO* bio, const char* data, int dlen) noexcept {
+ BIO_clear_retry_flags(bio);
+ int res = IO(bio)->Socket->Socket.Send(data, dlen);
+ if (-res == EAGAIN) {
+ BIO_set_retry_write(bio);
+ }
+ return res;
+ }
+
+ static int IoRead(BIO* bio, char* data, int dlen) noexcept {
+ BIO_clear_retry_flags(bio);
+ int res = IO(bio)->Socket->Socket.Recv(data, dlen);
+ if (-res == EAGAIN) {
+ BIO_set_retry_read(bio);
+ }
+ return res;
+ }
+
+ static int IoPuts(BIO* bio, const char* buf) noexcept {
+ Y_UNUSED(bio);
+ Y_UNUSED(buf);
+ return -2;
+ }
+
+ static int IoGets(BIO* bio, char* buf, int size) noexcept {
+ Y_UNUSED(bio);
+ Y_UNUSED(buf);
+ Y_UNUSED(size);
+ return -2;
+ }
+
+ static long IoCtrl(BIO* bio, int cmd, long larg, void* parg) noexcept {
+ Y_UNUSED(larg);
+ Y_UNUSED(parg);
+
+ if (cmd == BIO_CTRL_FLUSH) {
+ IO(bio)->Flush();
+ return 1;
+ }
+
+ return -2;
+ }
+
+ static int IoCreate(BIO* bio) noexcept {
+ BIO_set_data(bio, nullptr);
+ BIO_set_init(bio, 1);
+ return 1;
+ }
+
+ static int IoDestroy(BIO* bio) noexcept {
+ BIO_set_data(bio, nullptr);
+ BIO_set_init(bio, 0);
+ return 1;
+ }
+
+ static BIO_METHOD* CreateIoMethod() {
+ BIO_METHOD* method = BIO_meth_new(BIO_get_new_index() | BIO_TYPE_SOURCE_SINK, "SecureSocketImpl");
+ BIO_meth_set_write(method, IoWrite);
+ BIO_meth_set_read(method, IoRead);
+ BIO_meth_set_puts(method, IoPuts);
+ BIO_meth_set_gets(method, IoGets);
+ BIO_meth_set_ctrl(method, IoCtrl);
+ BIO_meth_set_create(method, IoCreate);
+ BIO_meth_set_destroy(method, IoDestroy);
+ return method;
+ }
+
+ static BIO_METHOD* IoMethod() {
+ static BIO_METHOD* method = CreateIoMethod();
+ return method;
+ }
+
+ TSslHolder<BIO> Bio;
+ TSslHolder<SSL_CTX> Ctx;
+ TSslHolder<SSL> Ssl;
+
+ TSecureSocketImpl() = default;
+
+ TSecureSocketImpl(TIntrusivePtr<TSocketDescriptor> socket)
+ : TPlainSocketImpl(std::move(socket))
+ {}
+
+ void InitClientSsl() {
Bio.Reset(BIO_new(IoMethod()));
- BIO_set_data(Bio.Get(), this);
- BIO_set_nbio(Bio.Get(), 1);
- Ctx = CreateClientContext();
- Ssl = ConstructSsl(Ctx.Get(), Bio.Get());
- SSL_set_connect_state(Ssl.Get());
- }
-
- void InitServerSsl(SSL_CTX* ctx) {
+ BIO_set_data(Bio.Get(), this);
+ BIO_set_nbio(Bio.Get(), 1);
+ Ctx = CreateClientContext();
+ Ssl = ConstructSsl(Ctx.Get(), Bio.Get());
+ SSL_set_connect_state(Ssl.Get());
+ }
+
+ void InitServerSsl(SSL_CTX* ctx) {
Bio.Reset(BIO_new(IoMethod()));
- BIO_set_data(Bio.Get(), this);
- BIO_set_nbio(Bio.Get(), 1);
- Ssl = ConstructSsl(ctx, Bio.Get());
- SSL_set_accept_state(Ssl.Get());
- }
-
- void Flush() {}
-
+ BIO_set_data(Bio.Get(), this);
+ BIO_set_nbio(Bio.Get(), 1);
+ Ssl = ConstructSsl(ctx, Bio.Get());
+ SSL_set_accept_state(Ssl.Get());
+ }
+
+ void Flush() {}
+
ssize_t Send(const void* data, size_t size, bool& read, bool& write) {
- ssize_t res = SSL_write(Ssl.Get(), data, size);
- if (res < 0) {
- res = SSL_get_error(Ssl.Get(), res);
- switch(res) {
- case SSL_ERROR_WANT_READ:
+ ssize_t res = SSL_write(Ssl.Get(), data, size);
+ if (res < 0) {
+ res = SSL_get_error(Ssl.Get(), res);
+ switch(res) {
+ case SSL_ERROR_WANT_READ:
read = true;
return -EAGAIN;
- case SSL_ERROR_WANT_WRITE:
+ case SSL_ERROR_WANT_WRITE:
write = true;
- return -EAGAIN;
- default:
- return -EIO;
- }
- }
- return res;
- }
-
+ return -EAGAIN;
+ default:
+ return -EIO;
+ }
+ }
+ return res;
+ }
+
ssize_t Recv(void* data, size_t size, bool& read, bool& write) {
- ssize_t res = SSL_read(Ssl.Get(), data, size);
- if (res < 0) {
- res = SSL_get_error(Ssl.Get(), res);
- switch(res) {
- case SSL_ERROR_WANT_READ:
+ ssize_t res = SSL_read(Ssl.Get(), data, size);
+ if (res < 0) {
+ res = SSL_get_error(Ssl.Get(), res);
+ switch(res) {
+ case SSL_ERROR_WANT_READ:
read = true;
return -EAGAIN;
- case SSL_ERROR_WANT_WRITE:
+ case SSL_ERROR_WANT_WRITE:
write = true;
- return -EAGAIN;
- default:
- return -EIO;
- }
- }
- return res;
- }
-
+ return -EAGAIN;
+ default:
+ return -EIO;
+ }
+ }
+ return res;
+ }
+
int OnConnect(bool& read, bool& write) {
- if (!Ssl) {
- InitClientSsl();
- }
- int res = SSL_connect(Ssl.Get());
+ if (!Ssl) {
+ InitClientSsl();
+ }
+ int res = SSL_connect(Ssl.Get());
if (res <= 0) {
- res = SSL_get_error(Ssl.Get(), res);
- switch(res) {
- case SSL_ERROR_WANT_READ:
+ res = SSL_get_error(Ssl.Get(), res);
+ switch(res) {
+ case SSL_ERROR_WANT_READ:
read = true;
return -EAGAIN;
- case SSL_ERROR_WANT_WRITE:
+ case SSL_ERROR_WANT_WRITE:
write = true;
- return -EAGAIN;
- default:
- return -EIO;
- }
- }
- return res;
- }
-
+ return -EAGAIN;
+ default:
+ return -EIO;
+ }
+ }
+ return res;
+ }
+
int OnAccept(const TEndpointInfo& endpoint, bool& read, bool& write) {
- if (!Ssl) {
- InitServerSsl(endpoint.SecureContext.Get());
- }
- int res = SSL_accept(Ssl.Get());
+ if (!Ssl) {
+ InitServerSsl(endpoint.SecureContext.Get());
+ }
+ int res = SSL_accept(Ssl.Get());
if (res <= 0) {
- res = SSL_get_error(Ssl.Get(), res);
- switch(res) {
- case SSL_ERROR_WANT_READ:
+ res = SSL_get_error(Ssl.Get(), res);
+ switch(res) {
+ case SSL_ERROR_WANT_READ:
read = true;
return -EAGAIN;
- case SSL_ERROR_WANT_WRITE:
+ case SSL_ERROR_WANT_WRITE:
write = true;
- return -EAGAIN;
- default:
- return -EIO;
- }
- }
- return res;
- }
-};
-
-}
+ return -EAGAIN;
+ default:
+ return -EIO;
+ }
+ }
+ return res;
+ }
+};
+
+}
diff --git a/library/cpp/actors/http/http_proxy_ssl.h b/library/cpp/actors/http/http_proxy_ssl.h
index ffce12997f..12fb372b3c 100644
--- a/library/cpp/actors/http/http_proxy_ssl.h
+++ b/library/cpp/actors/http/http_proxy_ssl.h
@@ -1,22 +1,22 @@
-#pragma once
-
-#include <openssl/bio.h>
-#include <openssl/ssl.h>
-#include <openssl/err.h>
-#include <openssl/tls1.h>
-
-namespace NHttp {
-
-struct TSslHelpers {
- struct TSslDestroy {
- static void Destroy(SSL_CTX* ctx) noexcept {
- SSL_CTX_free(ctx);
- }
-
- static void Destroy(SSL* ssl) noexcept {
- SSL_free(ssl);
- }
-
+#pragma once
+
+#include <openssl/bio.h>
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+#include <openssl/tls1.h>
+
+namespace NHttp {
+
+struct TSslHelpers {
+ struct TSslDestroy {
+ static void Destroy(SSL_CTX* ctx) noexcept {
+ SSL_CTX_free(ctx);
+ }
+
+ static void Destroy(SSL* ssl) noexcept {
+ SSL_free(ssl);
+ }
+
static void Destroy(X509* cert) noexcept {
X509_free(cert);
}
@@ -25,48 +25,48 @@ struct TSslHelpers {
EVP_PKEY_free(pkey);
}
- static void Destroy(BIO* bio) noexcept {
- BIO_free(bio);
- }
- };
-
- template <typename T>
- using TSslHolder = THolder<T, TSslDestroy>;
-
- static TSslHolder<SSL_CTX> CreateSslCtx(const SSL_METHOD* method) {
- TSslHolder<SSL_CTX> ctx(SSL_CTX_new(method));
-
- if (ctx) {
- SSL_CTX_set_options(ctx.Get(), SSL_OP_NO_SSLv2);
- SSL_CTX_set_options(ctx.Get(), SSL_OP_NO_SSLv3);
- SSL_CTX_set_options(ctx.Get(), SSL_OP_MICROSOFT_SESS_ID_BUG);
- SSL_CTX_set_options(ctx.Get(), SSL_OP_NETSCAPE_CHALLENGE_BUG);
- }
-
- return ctx;
- }
-
- static TSslHolder<SSL_CTX> CreateClientContext() {
- return CreateSslCtx(SSLv23_client_method());
- }
-
- static TSslHolder<SSL_CTX> CreateServerContext(const TString& certificate, const TString& key) {
- TSslHolder<SSL_CTX> ctx = CreateSslCtx(SSLv23_server_method());
- SSL_CTX_set_ecdh_auto(ctx.Get(), 1);
- int res;
- res = SSL_CTX_use_certificate_chain_file(ctx.Get(), certificate.c_str());
- if (res < 0) {
- // TODO(xenoxeno): more diagnostics?
- return nullptr;
- }
- res = SSL_CTX_use_PrivateKey_file(ctx.Get(), key.c_str(), SSL_FILETYPE_PEM);
- if (res < 0) {
- // TODO(xenoxeno): more diagnostics?
- return nullptr;
- }
- return ctx;
- }
-
+ static void Destroy(BIO* bio) noexcept {
+ BIO_free(bio);
+ }
+ };
+
+ template <typename T>
+ using TSslHolder = THolder<T, TSslDestroy>;
+
+ static TSslHolder<SSL_CTX> CreateSslCtx(const SSL_METHOD* method) {
+ TSslHolder<SSL_CTX> ctx(SSL_CTX_new(method));
+
+ if (ctx) {
+ SSL_CTX_set_options(ctx.Get(), SSL_OP_NO_SSLv2);
+ SSL_CTX_set_options(ctx.Get(), SSL_OP_NO_SSLv3);
+ SSL_CTX_set_options(ctx.Get(), SSL_OP_MICROSOFT_SESS_ID_BUG);
+ SSL_CTX_set_options(ctx.Get(), SSL_OP_NETSCAPE_CHALLENGE_BUG);
+ }
+
+ return ctx;
+ }
+
+ static TSslHolder<SSL_CTX> CreateClientContext() {
+ return CreateSslCtx(SSLv23_client_method());
+ }
+
+ static TSslHolder<SSL_CTX> CreateServerContext(const TString& certificate, const TString& key) {
+ TSslHolder<SSL_CTX> ctx = CreateSslCtx(SSLv23_server_method());
+ SSL_CTX_set_ecdh_auto(ctx.Get(), 1);
+ int res;
+ res = SSL_CTX_use_certificate_chain_file(ctx.Get(), certificate.c_str());
+ if (res < 0) {
+ // TODO(xenoxeno): more diagnostics?
+ return nullptr;
+ }
+ res = SSL_CTX_use_PrivateKey_file(ctx.Get(), key.c_str(), SSL_FILETYPE_PEM);
+ if (res < 0) {
+ // TODO(xenoxeno): more diagnostics?
+ return nullptr;
+ }
+ return ctx;
+ }
+
static bool LoadX509Chain(TSslHolder<SSL_CTX>& ctx, const TString& pem) {
TSslHolder<BIO> bio(BIO_new_mem_buf(pem.c_str(), pem.size()));
if (bio == nullptr) {
@@ -116,16 +116,16 @@ struct TSslHelpers {
return ctx;
}
- static TSslHolder<SSL> ConstructSsl(SSL_CTX* ctx, BIO* bio) {
- TSslHolder<SSL> ssl(SSL_new(ctx));
-
- if (ssl) {
- BIO_up_ref(bio); // SSL_set_bio consumes only one reference if rbio and wbio are the same
- SSL_set_bio(ssl.Get(), bio, bio);
- }
-
- return ssl;
- }
-};
-
-}
+ static TSslHolder<SSL> ConstructSsl(SSL_CTX* ctx, BIO* bio) {
+ TSslHolder<SSL> ssl(SSL_new(ctx));
+
+ if (ssl) {
+ BIO_up_ref(bio); // SSL_set_bio consumes only one reference if rbio and wbio are the same
+ SSL_set_bio(ssl.Get(), bio, bio);
+ }
+
+ return ssl;
+ }
+};
+
+}
diff --git a/library/cpp/actors/http/http_static.cpp b/library/cpp/actors/http/http_static.cpp
index c075c5f693..452b0a8498 100644
--- a/library/cpp/actors/http/http_static.cpp
+++ b/library/cpp/actors/http/http_static.cpp
@@ -5,91 +5,91 @@
#include <library/cpp/actors/core/scheduler_basic.h>
#include <library/cpp/actors/http/http.h>
#include <library/cpp/resource/resource.h>
-#include <util/folder/path.h>
-#include <util/stream/file.h>
-
-namespace NHttp {
-
-class THttpStaticContentHandler : public NActors::TActor<THttpStaticContentHandler> {
-public:
- using TBase = NActors::TActor<THttpStaticContentHandler>;
- const TFsPath URL;
- const TFsPath FilePath;
- const TFsPath ResourcePath;
- const TFsPath Index;
-
- THttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index)
- : TBase(&THttpStaticContentHandler::StateWork)
- , URL(url)
- , FilePath(filePath)
- , ResourcePath(resourcePath)
- , Index(index)
- {}
-
- static TInstant GetCompileTime() {
- tm compileTime;
- strptime(__DATE__ " " __TIME__, "%B %d %Y %H:%M:%S", &compileTime);
- return TInstant::Seconds(mktime(&compileTime));
- }
-
- void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) {
- THttpOutgoingResponsePtr response;
- if (event->Get()->Request->Method != "GET") {
- response = event->Get()->Request->CreateResponseBadRequest("Wrong request");
- ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
- return;
- }
- TFsPath url(event->Get()->Request->URL.Before('?'));
- if (!url.IsAbsolute()) {
- response = event->Get()->Request->CreateResponseBadRequest("Completely wrong URL");
- ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
- return;
- }
- if (url.GetPath().EndsWith('/') && Index.IsDefined()) {
- url /= Index;
- }
- url = url.RelativeTo(URL);
- try {
- // TODO: caching?
- TString contentType = mimetypeByExt(url.GetExtension().c_str());
- TString data;
- TFileStat filestat;
- TFsPath resourcename(ResourcePath / url);
- if (NResource::FindExact(resourcename.GetPath(), &data)) {
- static TInstant compileTime(GetCompileTime());
- filestat.MTime = compileTime.Seconds();
- } else {
- TFsPath filename(FilePath / url);
- if (!filename.IsSubpathOf(FilePath) && filename != FilePath) {
- response = event->Get()->Request->CreateResponseBadRequest("Wrong URL");
- ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
- return;
- }
- if (filename.Stat(filestat) && filestat.IsFile()) {
- data = TUnbufferedFileInput(filename).ReadAll();
- }
- }
- if (!filestat.IsNull()) {
- response = event->Get()->Request->CreateResponseOK(data, contentType, TInstant::Seconds(filestat.MTime));
- } else {
- response = event->Get()->Request->CreateResponseNotFound("File not found");
- }
- }
- catch (const yexception&) {
- response = event->Get()->Request->CreateResponseServiceUnavailable("Not available");
- }
- ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
- }
-
- STFUNC(StateWork) {
- switch (ev->GetTypeRewrite()) {
- HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
- }
- }
-};
-
-NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index) {
- return new THttpStaticContentHandler(url, filePath, resourcePath, index);
-}
-
-}
+#include <util/folder/path.h>
+#include <util/stream/file.h>
+
+namespace NHttp {
+
+class THttpStaticContentHandler : public NActors::TActor<THttpStaticContentHandler> {
+public:
+ using TBase = NActors::TActor<THttpStaticContentHandler>;
+ const TFsPath URL;
+ const TFsPath FilePath;
+ const TFsPath ResourcePath;
+ const TFsPath Index;
+
+ THttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index)
+ : TBase(&THttpStaticContentHandler::StateWork)
+ , URL(url)
+ , FilePath(filePath)
+ , ResourcePath(resourcePath)
+ , Index(index)
+ {}
+
+ static TInstant GetCompileTime() {
+ tm compileTime;
+ strptime(__DATE__ " " __TIME__, "%B %d %Y %H:%M:%S", &compileTime);
+ return TInstant::Seconds(mktime(&compileTime));
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) {
+ THttpOutgoingResponsePtr response;
+ if (event->Get()->Request->Method != "GET") {
+ response = event->Get()->Request->CreateResponseBadRequest("Wrong request");
+ ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
+ return;
+ }
+ TFsPath url(event->Get()->Request->URL.Before('?'));
+ if (!url.IsAbsolute()) {
+ response = event->Get()->Request->CreateResponseBadRequest("Completely wrong URL");
+ ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
+ return;
+ }
+ if (url.GetPath().EndsWith('/') && Index.IsDefined()) {
+ url /= Index;
+ }
+ url = url.RelativeTo(URL);
+ try {
+ // TODO: caching?
+ TString contentType = mimetypeByExt(url.GetExtension().c_str());
+ TString data;
+ TFileStat filestat;
+ TFsPath resourcename(ResourcePath / url);
+ if (NResource::FindExact(resourcename.GetPath(), &data)) {
+ static TInstant compileTime(GetCompileTime());
+ filestat.MTime = compileTime.Seconds();
+ } else {
+ TFsPath filename(FilePath / url);
+ if (!filename.IsSubpathOf(FilePath) && filename != FilePath) {
+ response = event->Get()->Request->CreateResponseBadRequest("Wrong URL");
+ ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
+ return;
+ }
+ if (filename.Stat(filestat) && filestat.IsFile()) {
+ data = TUnbufferedFileInput(filename).ReadAll();
+ }
+ }
+ if (!filestat.IsNull()) {
+ response = event->Get()->Request->CreateResponseOK(data, contentType, TInstant::Seconds(filestat.MTime));
+ } else {
+ response = event->Get()->Request->CreateResponseNotFound("File not found");
+ }
+ }
+ catch (const yexception&) {
+ response = event->Get()->Request->CreateResponseServiceUnavailable("Not available");
+ }
+ ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
+ }
+
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
+ }
+ }
+};
+
+NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index) {
+ return new THttpStaticContentHandler(url, filePath, resourcePath, index);
+}
+
+}
diff --git a/library/cpp/actors/http/http_static.h b/library/cpp/actors/http/http_static.h
index f91e15dfb1..f2ee13d003 100644
--- a/library/cpp/actors/http/http_static.h
+++ b/library/cpp/actors/http/http_static.h
@@ -1,9 +1,9 @@
-#pragma once
+#pragma once
#include <library/cpp/actors/core/actor.h>
-#include "http.h"
-
-namespace NHttp {
-
-NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index = TString());
-
-}
+#include "http.h"
+
+namespace NHttp {
+
+NActors::IActor* CreateHttpStaticContentHandler(const TString& url, const TString& filePath, const TString& resourcePath, const TString& index = TString());
+
+}
diff --git a/library/cpp/actors/http/http_ut.cpp b/library/cpp/actors/http/http_ut.cpp
index 4c922f8d0f..209f61f4de 100644
--- a/library/cpp/actors/http/http_ut.cpp
+++ b/library/cpp/actors/http/http_ut.cpp
@@ -3,86 +3,86 @@
#include <library/cpp/actors/core/executor_pool_basic.h>
#include <library/cpp/actors/core/scheduler_basic.h>
#include <library/cpp/actors/testlib/test_runtime.h>
-#include <util/system/tempfile.h>
-#include "http.h"
-#include "http_proxy.h"
-
-
-
-enum EService : NActors::NLog::EComponent {
- MIN,
- Logger,
- MVP,
- MAX
-};
-
-namespace {
-
-template <typename HttpType>
-void EatWholeString(TIntrusivePtr<HttpType>& request, const TString& data) {
- request->EnsureEnoughSpaceAvailable(data.size());
- auto size = std::min(request->Avail(), data.size());
- memcpy(request->Pos(), data.data(), size);
- request->Advance(size);
-}
-
-template <typename HttpType>
-void EatPartialString(TIntrusivePtr<HttpType>& request, const TString& data) {
- for (char c : data) {
- request->EnsureEnoughSpaceAvailable(1);
- memcpy(request->Pos(), &c, 1);
- request->Advance(1);
- }
-}
-
-}
-
+#include <util/system/tempfile.h>
+#include "http.h"
+#include "http_proxy.h"
+
+
+
+enum EService : NActors::NLog::EComponent {
+ MIN,
+ Logger,
+ MVP,
+ MAX
+};
+
+namespace {
+
+template <typename HttpType>
+void EatWholeString(TIntrusivePtr<HttpType>& request, const TString& data) {
+ request->EnsureEnoughSpaceAvailable(data.size());
+ auto size = std::min(request->Avail(), data.size());
+ memcpy(request->Pos(), data.data(), size);
+ request->Advance(size);
+}
+
+template <typename HttpType>
+void EatPartialString(TIntrusivePtr<HttpType>& request, const TString& data) {
+ for (char c : data) {
+ request->EnsureEnoughSpaceAvailable(1);
+ memcpy(request->Pos(), &c, 1);
+ request->Advance(1);
+ }
+}
+
+}
+
Y_UNIT_TEST_SUITE(HttpProxy) {
Y_UNIT_TEST(BasicParsing) {
- NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest();
- EatWholeString(request, "GET /test HTTP/1.1\r\nHost: test\r\nSome-Header: 32344\r\n\r\n");
- UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done);
- UNIT_ASSERT_EQUAL(request->Method, "GET");
- UNIT_ASSERT_EQUAL(request->URL, "/test");
- UNIT_ASSERT_EQUAL(request->Protocol, "HTTP");
- UNIT_ASSERT_EQUAL(request->Version, "1.1");
- UNIT_ASSERT_EQUAL(request->Host, "test");
- UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n");
- }
-
+ NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest();
+ EatWholeString(request, "GET /test HTTP/1.1\r\nHost: test\r\nSome-Header: 32344\r\n\r\n");
+ UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done);
+ UNIT_ASSERT_EQUAL(request->Method, "GET");
+ UNIT_ASSERT_EQUAL(request->URL, "/test");
+ UNIT_ASSERT_EQUAL(request->Protocol, "HTTP");
+ UNIT_ASSERT_EQUAL(request->Version, "1.1");
+ UNIT_ASSERT_EQUAL(request->Host, "test");
+ UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n");
+ }
+
Y_UNIT_TEST(BasicParsingChunkedBody) {
- NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest();
- NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request);
- EatWholeString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nthis\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n");
- UNIT_ASSERT_EQUAL(response->Stage, NHttp::THttpIncomingResponse::EParseStage::Done);
- UNIT_ASSERT_EQUAL(response->Status, "200");
- UNIT_ASSERT_EQUAL(response->Connection, "close");
- UNIT_ASSERT_EQUAL(response->Protocol, "HTTP");
- UNIT_ASSERT_EQUAL(response->Version, "1.1");
- UNIT_ASSERT_EQUAL(response->TransferEncoding, "chunked");
- UNIT_ASSERT_EQUAL(response->Body, "this is test.");
- }
-
- Y_UNIT_TEST(InvalidParsingChunkedBody) {
- NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest();
- NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request);
- EatWholeString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n5\r\nthis\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n");
- UNIT_ASSERT(response->IsError());
- }
-
- Y_UNIT_TEST(AdvancedParsingChunkedBody) {
- NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest();
- NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request);
- EatWholeString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\nthis\r\n\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n");
- UNIT_ASSERT_EQUAL(response->Stage, NHttp::THttpIncomingResponse::EParseStage::Done);
- UNIT_ASSERT_EQUAL(response->Status, "200");
- UNIT_ASSERT_EQUAL(response->Connection, "close");
- UNIT_ASSERT_EQUAL(response->Protocol, "HTTP");
- UNIT_ASSERT_EQUAL(response->Version, "1.1");
- UNIT_ASSERT_EQUAL(response->TransferEncoding, "chunked");
- UNIT_ASSERT_EQUAL(response->Body, "this\r\n is test.");
- }
-
+ NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest();
+ NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request);
+ EatWholeString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nthis\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n");
+ UNIT_ASSERT_EQUAL(response->Stage, NHttp::THttpIncomingResponse::EParseStage::Done);
+ UNIT_ASSERT_EQUAL(response->Status, "200");
+ UNIT_ASSERT_EQUAL(response->Connection, "close");
+ UNIT_ASSERT_EQUAL(response->Protocol, "HTTP");
+ UNIT_ASSERT_EQUAL(response->Version, "1.1");
+ UNIT_ASSERT_EQUAL(response->TransferEncoding, "chunked");
+ UNIT_ASSERT_EQUAL(response->Body, "this is test.");
+ }
+
+ Y_UNIT_TEST(InvalidParsingChunkedBody) {
+ NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest();
+ NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request);
+ EatWholeString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n5\r\nthis\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n");
+ UNIT_ASSERT(response->IsError());
+ }
+
+ Y_UNIT_TEST(AdvancedParsingChunkedBody) {
+ NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest();
+ NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request);
+ EatWholeString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\nthis\r\n\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n");
+ UNIT_ASSERT_EQUAL(response->Stage, NHttp::THttpIncomingResponse::EParseStage::Done);
+ UNIT_ASSERT_EQUAL(response->Status, "200");
+ UNIT_ASSERT_EQUAL(response->Connection, "close");
+ UNIT_ASSERT_EQUAL(response->Protocol, "HTTP");
+ UNIT_ASSERT_EQUAL(response->Version, "1.1");
+ UNIT_ASSERT_EQUAL(response->TransferEncoding, "chunked");
+ UNIT_ASSERT_EQUAL(response->Body, "this\r\n is test.");
+ }
+
Y_UNIT_TEST(CreateRepsonseWithCompressedBody) {
NHttp::THttpIncomingRequestPtr request = nullptr;
NHttp::THttpOutgoingResponsePtr response = new NHttp::THttpOutgoingResponse(request, "HTTP", "1.1", "200", "OK");
@@ -95,264 +95,264 @@ Y_UNIT_TEST_SUITE(HttpProxy) {
}
Y_UNIT_TEST(BasicPartialParsing) {
- NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest();
- EatPartialString(request, "GET /test HTTP/1.1\r\nHost: test\r\nSome-Header: 32344\r\n\r\n");
- UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done);
- UNIT_ASSERT_EQUAL(request->Method, "GET");
- UNIT_ASSERT_EQUAL(request->URL, "/test");
- UNIT_ASSERT_EQUAL(request->Protocol, "HTTP");
- UNIT_ASSERT_EQUAL(request->Version, "1.1");
- UNIT_ASSERT_EQUAL(request->Host, "test");
- UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n");
- }
-
+ NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest();
+ EatPartialString(request, "GET /test HTTP/1.1\r\nHost: test\r\nSome-Header: 32344\r\n\r\n");
+ UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done);
+ UNIT_ASSERT_EQUAL(request->Method, "GET");
+ UNIT_ASSERT_EQUAL(request->URL, "/test");
+ UNIT_ASSERT_EQUAL(request->Protocol, "HTTP");
+ UNIT_ASSERT_EQUAL(request->Version, "1.1");
+ UNIT_ASSERT_EQUAL(request->Host, "test");
+ UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n");
+ }
+
Y_UNIT_TEST(BasicPartialParsingChunkedBody) {
- NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest();
- NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request);
- EatPartialString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nthis\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n");
- UNIT_ASSERT_EQUAL(response->Stage, NHttp::THttpIncomingResponse::EParseStage::Done);
- UNIT_ASSERT_EQUAL(response->Status, "200");
- UNIT_ASSERT_EQUAL(response->Connection, "close");
- UNIT_ASSERT_EQUAL(response->Protocol, "HTTP");
- UNIT_ASSERT_EQUAL(response->Version, "1.1");
- UNIT_ASSERT_EQUAL(response->TransferEncoding, "chunked");
- UNIT_ASSERT_EQUAL(response->Body, "this is test.");
- }
-
+ NHttp::THttpOutgoingRequestPtr request = nullptr; //new NHttp::THttpOutgoingRequest();
+ NHttp::THttpIncomingResponsePtr response = new NHttp::THttpIncomingResponse(request);
+ EatPartialString(response, "HTTP/1.1 200 OK\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nthis\r\n4\r\n is \r\n5\r\ntest.\r\n0\r\n\r\n");
+ UNIT_ASSERT_EQUAL(response->Stage, NHttp::THttpIncomingResponse::EParseStage::Done);
+ UNIT_ASSERT_EQUAL(response->Status, "200");
+ UNIT_ASSERT_EQUAL(response->Connection, "close");
+ UNIT_ASSERT_EQUAL(response->Protocol, "HTTP");
+ UNIT_ASSERT_EQUAL(response->Version, "1.1");
+ UNIT_ASSERT_EQUAL(response->TransferEncoding, "chunked");
+ UNIT_ASSERT_EQUAL(response->Body, "this is test.");
+ }
+
Y_UNIT_TEST(AdvancedParsing) {
- NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest();
- EatWholeString(request, "GE");
- EatWholeString(request, "T");
- EatWholeString(request, " ");
- EatWholeString(request, "/test");
- EatWholeString(request, " HTTP/1.1\r");
- EatWholeString(request, "\nHo");
- EatWholeString(request, "st: test");
- EatWholeString(request, "\r\n");
- EatWholeString(request, "Some-Header: 32344\r\n\r");
- EatWholeString(request, "\n");
- UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done);
- UNIT_ASSERT_EQUAL(request->Method, "GET");
- UNIT_ASSERT_EQUAL(request->URL, "/test");
- UNIT_ASSERT_EQUAL(request->Protocol, "HTTP");
- UNIT_ASSERT_EQUAL(request->Version, "1.1");
- UNIT_ASSERT_EQUAL(request->Host, "test");
- UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n");
- }
-
+ NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest();
+ EatWholeString(request, "GE");
+ EatWholeString(request, "T");
+ EatWholeString(request, " ");
+ EatWholeString(request, "/test");
+ EatWholeString(request, " HTTP/1.1\r");
+ EatWholeString(request, "\nHo");
+ EatWholeString(request, "st: test");
+ EatWholeString(request, "\r\n");
+ EatWholeString(request, "Some-Header: 32344\r\n\r");
+ EatWholeString(request, "\n");
+ UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done);
+ UNIT_ASSERT_EQUAL(request->Method, "GET");
+ UNIT_ASSERT_EQUAL(request->URL, "/test");
+ UNIT_ASSERT_EQUAL(request->Protocol, "HTTP");
+ UNIT_ASSERT_EQUAL(request->Version, "1.1");
+ UNIT_ASSERT_EQUAL(request->Host, "test");
+ UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n");
+ }
+
Y_UNIT_TEST(AdvancedPartialParsing) {
- NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest();
- EatPartialString(request, "GE");
- EatPartialString(request, "T");
- EatPartialString(request, " ");
- EatPartialString(request, "/test");
- EatPartialString(request, " HTTP/1.1\r");
- EatPartialString(request, "\nHo");
- EatPartialString(request, "st: test");
- EatPartialString(request, "\r\n");
- EatPartialString(request, "Some-Header: 32344\r\n\r");
- EatPartialString(request, "\n");
- UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done);
- UNIT_ASSERT_EQUAL(request->Method, "GET");
- UNIT_ASSERT_EQUAL(request->URL, "/test");
- UNIT_ASSERT_EQUAL(request->Protocol, "HTTP");
- UNIT_ASSERT_EQUAL(request->Version, "1.1");
- UNIT_ASSERT_EQUAL(request->Host, "test");
- UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n");
- }
-
- Y_UNIT_TEST(BasicRenderBodyWithHeadersAndCookies) {
- NHttp::THttpOutgoingRequestPtr request = NHttp::THttpOutgoingRequest::CreateRequestGet("http://www.yandex.ru/data/url");
- NHttp::THeadersBuilder headers;
- NHttp::TCookiesBuilder cookies;
- cookies.Set("cookie1", "123456");
- cookies.Set("cookie2", "45678");
- headers.Set("Cookie", cookies.Render());
- request->Set(headers);
- TString requestData;
- request->AsString(requestData);
- UNIT_ASSERT_VALUES_EQUAL(requestData, "GET /data/url HTTP/1.1\r\nHost: www.yandex.ru\r\nAccept: */*\r\nCookie: cookie1=123456; cookie2=45678;\r\n");
- }
-
+ NHttp::THttpIncomingRequestPtr request = new NHttp::THttpIncomingRequest();
+ EatPartialString(request, "GE");
+ EatPartialString(request, "T");
+ EatPartialString(request, " ");
+ EatPartialString(request, "/test");
+ EatPartialString(request, " HTTP/1.1\r");
+ EatPartialString(request, "\nHo");
+ EatPartialString(request, "st: test");
+ EatPartialString(request, "\r\n");
+ EatPartialString(request, "Some-Header: 32344\r\n\r");
+ EatPartialString(request, "\n");
+ UNIT_ASSERT_EQUAL(request->Stage, NHttp::THttpIncomingRequest::EParseStage::Done);
+ UNIT_ASSERT_EQUAL(request->Method, "GET");
+ UNIT_ASSERT_EQUAL(request->URL, "/test");
+ UNIT_ASSERT_EQUAL(request->Protocol, "HTTP");
+ UNIT_ASSERT_EQUAL(request->Version, "1.1");
+ UNIT_ASSERT_EQUAL(request->Host, "test");
+ UNIT_ASSERT_EQUAL(request->Headers, "Host: test\r\nSome-Header: 32344\r\n\r\n");
+ }
+
+ Y_UNIT_TEST(BasicRenderBodyWithHeadersAndCookies) {
+ NHttp::THttpOutgoingRequestPtr request = NHttp::THttpOutgoingRequest::CreateRequestGet("http://www.yandex.ru/data/url");
+ NHttp::THeadersBuilder headers;
+ NHttp::TCookiesBuilder cookies;
+ cookies.Set("cookie1", "123456");
+ cookies.Set("cookie2", "45678");
+ headers.Set("Cookie", cookies.Render());
+ request->Set(headers);
+ TString requestData;
+ request->AsString(requestData);
+ UNIT_ASSERT_VALUES_EQUAL(requestData, "GET /data/url HTTP/1.1\r\nHost: www.yandex.ru\r\nAccept: */*\r\nCookie: cookie1=123456; cookie2=45678;\r\n");
+ }
+
Y_UNIT_TEST(BasicRunning) {
NActors::TTestActorRuntimeBase actorSystem;
- TPortManager portManager;
- TIpPort port = portManager.GetTcpPort();
- TAutoPtr<NActors::IEventHandle> handle;
+ TPortManager portManager;
+ TIpPort port = portManager.GetTcpPort();
+ TAutoPtr<NActors::IEventHandle> handle;
actorSystem.Initialize();
NMonitoring::TMetricRegistry sensors;
-
- NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors);
+
+ NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors);
NActors::TActorId proxyId = actorSystem.Register(proxy);
actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true);
- actorSystem.DispatchEvents();
-
+ actorSystem.DispatchEvents();
+
NActors::TActorId serverId = actorSystem.AllocateEdgeActor();
- actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true);
-
+ actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true);
+
NActors::TActorId clientId = actorSystem.AllocateEdgeActor();
- NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("http://[::1]:" + ToString(port) + "/test");
- actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
-
- NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle);
-
- UNIT_ASSERT_EQUAL(request->Request->URL, "/test");
-
- NHttp::THttpOutgoingResponsePtr httpResponse = request->Request->CreateResponseString("HTTP/1.1 200 Found\r\nConnection: Close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\npassed\r\n0\r\n\r\n");
- actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true);
-
- NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);
-
- UNIT_ASSERT_EQUAL(response->Response->Status, "200");
- UNIT_ASSERT_EQUAL(response->Response->Body, "passed");
- }
-
- Y_UNIT_TEST(TlsRunning) {
- NActors::TTestActorRuntimeBase actorSystem;
- TPortManager portManager;
- TIpPort port = portManager.GetTcpPort();
- TAutoPtr<NActors::IEventHandle> handle;
- actorSystem.Initialize();
+ NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("http://[::1]:" + ToString(port) + "/test");
+ actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
+
+ NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle);
+
+ UNIT_ASSERT_EQUAL(request->Request->URL, "/test");
+
+ NHttp::THttpOutgoingResponsePtr httpResponse = request->Request->CreateResponseString("HTTP/1.1 200 Found\r\nConnection: Close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\npassed\r\n0\r\n\r\n");
+ actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true);
+
+ NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);
+
+ UNIT_ASSERT_EQUAL(response->Response->Status, "200");
+ UNIT_ASSERT_EQUAL(response->Response->Body, "passed");
+ }
+
+ Y_UNIT_TEST(TlsRunning) {
+ NActors::TTestActorRuntimeBase actorSystem;
+ TPortManager portManager;
+ TIpPort port = portManager.GetTcpPort();
+ TAutoPtr<NActors::IEventHandle> handle;
+ actorSystem.Initialize();
NMonitoring::TMetricRegistry sensors;
-
- TString certificateContent = R"___(-----BEGIN PRIVATE KEY-----
-MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCzRZjodO7Aqe1w
-RyOj6kG6g2nn8ZGAxfao4mLT0jDTbVksrhV/h2s3uldLkFo5WrNQ8WZe+iIbXeFL
-s8tO6hslzreo9sih2IHoRcH5KnS/6YTqVhRTJb1jE2dM8NwYbwTi+T2Pe0FrBPjI
-kgVO50gAtYl9C+fc715uZiSKW+rRlP5OoFTwxrOjiU27RPZjFYyWK9wTI1Es9uRr
-lbZbLl5cY6dK2J1AViRraaYKCWO26VbOPWLsY4OD3e+ZXIc3OMCz6Yb0wmRPeJ60
-bbbkGfI8O27kDdv69MAWHIm0yYMzKEnom1dce7rNQNDEqJfocsYIsg+EvayT1yQ9
-KTBegw7LAgMBAAECggEBAKaOCrotqYQmXArsjRhFFDwMy+BKdzyEr93INrlFl0dX
-WHpCYobRcbOc1G3H94tB0UdqgAnNqtJyLlb+++ydZAuEOu4oGc8EL+10ofq0jzOd
-6Xct8kQt0/6wkFDTlii9PHUDy0X65ZRgUiNGRtg/2I2QG+SpowmI+trm2xwQueFs
-VaWrjc3cVvXx0b8Lu7hqZUv08kgC38stzuRk/n2T5VWSAr7Z4ZWQbO918Dv35HUw
-Wy/0jNUFP9CBCvFJ4l0OoH9nYhWFG+HXWzNdw6/Hca4jciRKo6esCiOZ9uWYv/ec
-/NvX9rgFg8G8/SrTisX10+Bbeq+R1RKwq/IG409TH4ECgYEA14L+3QsgNIUMeYAx
-jSCyk22R/tOHI1BM+GtKPUhnwHlAssrcPcxXMJovl6WL93VauYjym0wpCz9urSpA
-I2CqTsG8GYciA6Dr3mHgD6cK0jj9UPAU6EnZ5S0mjhPqKZqutu9QegzD2uESvuN8
-36xezwQthzAf0nI/P3sJGjVXjikCgYEA1POm5xcV6SmM6HnIdadEebhzZIJ9TXQz
-ry3Jj3a7CKyD5C7fAdkHUTCjgT/2ElxPi9ABkZnC+d/cW9GtJFa0II5qO/agm3KQ
-ZXYiutu9A7xACHYFXRiJEjVUdGG9dKMVOHUEa8IHEgrrcUVM/suy/GgutywIfaXs
-y58IFP24K9MCgYEAk6zjz7wL+XEiNy+sxLQfKf7vB9sSwxQHakK6wHuY/L8Zomp3
-uLEJHfjJm/SIkK0N2g0JkXkCtv5kbKyC/rsCeK0wo52BpVLjzaLr0k34kE0U6B1b
-dkEE2pGx1bG3x4KDLj+Wuct9ecK5Aa0IqIyI+vo16GkFpUM8K9e3SQo8UOECgYEA
-sCZYAkILYtJ293p9giz5rIISGasDAUXE1vxWBXEeJ3+kneTTnZCrx9Im/ewtnWR0
-fF90XL9HFDDD88POqAd8eo2zfKR2l/89SGBfPBg2EtfuU9FkgGyiPciVcqvC7q9U
-B15saMKX3KnhtdGwbfeLt9RqCCTJZT4SUSDcq5hwdvcCgYAxY4Be8mNipj8Cgg22
-mVWSolA0TEzbtUcNk6iGodpi+Z0LKpsPC0YRqPRyh1K+rIltG1BVdmUBHcMlOYxl
-lWWvbJH6PkJWy4n2MF7PO45kjN3pPZg4hgH63JjZeAineBwEArUGb9zHnvzcdRvF
-wuQ2pZHL/HJ0laUSieHDJ5917w==
------END PRIVATE KEY-----
-
-
------BEGIN CERTIFICATE-----
-MIIDjTCCAnWgAwIBAgIURt5IBx0J3xgEaQvmyrFH2A+NkpMwDQYJKoZIhvcNAQEL
-BQAwVjELMAkGA1UEBhMCUlUxDzANBgNVBAgMBk1vc2NvdzEPMA0GA1UEBwwGTW9z
-Y293MQ8wDQYDVQQKDAZZYW5kZXgxFDASBgNVBAMMC3Rlc3Qtc2VydmVyMB4XDTE5
-MDkyMDE3MTQ0MVoXDTQ3MDIwNDE3MTQ0MVowVjELMAkGA1UEBhMCUlUxDzANBgNV
-BAgMBk1vc2NvdzEPMA0GA1UEBwwGTW9zY293MQ8wDQYDVQQKDAZZYW5kZXgxFDAS
-BgNVBAMMC3Rlc3Qtc2VydmVyMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKC
-AQEAs0WY6HTuwKntcEcjo+pBuoNp5/GRgMX2qOJi09Iw021ZLK4Vf4drN7pXS5Ba
-OVqzUPFmXvoiG13hS7PLTuobJc63qPbIodiB6EXB+Sp0v+mE6lYUUyW9YxNnTPDc
-GG8E4vk9j3tBawT4yJIFTudIALWJfQvn3O9ebmYkilvq0ZT+TqBU8Mazo4lNu0T2
-YxWMlivcEyNRLPbka5W2Wy5eXGOnStidQFYka2mmCgljtulWzj1i7GODg93vmVyH
-NzjAs+mG9MJkT3ietG225BnyPDtu5A3b+vTAFhyJtMmDMyhJ6JtXXHu6zUDQxKiX
-6HLGCLIPhL2sk9ckPSkwXoMOywIDAQABo1MwUTAdBgNVHQ4EFgQUDv/xuJ4CvCgG
-fPrZP3hRAt2+/LwwHwYDVR0jBBgwFoAUDv/xuJ4CvCgGfPrZP3hRAt2+/LwwDwYD
-VR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAinKpMYaA2tjLpAnPVbjy
-/ZxSBhhB26RiQp3Re8XOKyhTWqgYE6kldYT0aXgK9x9mPC5obQannDDYxDc7lX+/
-qP/u1X81ZcDRo/f+qQ3iHfT6Ftt/4O3qLnt45MFM6Q7WabRm82x3KjZTqpF3QUdy
-tumWiuAP5DMd1IRDtnKjFHO721OsEsf6NLcqdX89bGeqXDvrkwg3/PNwTyW5E7cj
-feY8L2eWtg6AJUnIBu11wvfzkLiH3QKzHvO/SIZTGf5ihDsJ3aKEE9UNauTL3bVc
-CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V
-6g==
------END CERTIFICATE-----)___";
-
- TTempFileHandle certificateFile;
-
- certificateFile.Write(certificateContent.data(), certificateContent.size());
-
- NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors);
+
+ TString certificateContent = R"___(-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCzRZjodO7Aqe1w
+RyOj6kG6g2nn8ZGAxfao4mLT0jDTbVksrhV/h2s3uldLkFo5WrNQ8WZe+iIbXeFL
+s8tO6hslzreo9sih2IHoRcH5KnS/6YTqVhRTJb1jE2dM8NwYbwTi+T2Pe0FrBPjI
+kgVO50gAtYl9C+fc715uZiSKW+rRlP5OoFTwxrOjiU27RPZjFYyWK9wTI1Es9uRr
+lbZbLl5cY6dK2J1AViRraaYKCWO26VbOPWLsY4OD3e+ZXIc3OMCz6Yb0wmRPeJ60
+bbbkGfI8O27kDdv69MAWHIm0yYMzKEnom1dce7rNQNDEqJfocsYIsg+EvayT1yQ9
+KTBegw7LAgMBAAECggEBAKaOCrotqYQmXArsjRhFFDwMy+BKdzyEr93INrlFl0dX
+WHpCYobRcbOc1G3H94tB0UdqgAnNqtJyLlb+++ydZAuEOu4oGc8EL+10ofq0jzOd
+6Xct8kQt0/6wkFDTlii9PHUDy0X65ZRgUiNGRtg/2I2QG+SpowmI+trm2xwQueFs
+VaWrjc3cVvXx0b8Lu7hqZUv08kgC38stzuRk/n2T5VWSAr7Z4ZWQbO918Dv35HUw
+Wy/0jNUFP9CBCvFJ4l0OoH9nYhWFG+HXWzNdw6/Hca4jciRKo6esCiOZ9uWYv/ec
+/NvX9rgFg8G8/SrTisX10+Bbeq+R1RKwq/IG409TH4ECgYEA14L+3QsgNIUMeYAx
+jSCyk22R/tOHI1BM+GtKPUhnwHlAssrcPcxXMJovl6WL93VauYjym0wpCz9urSpA
+I2CqTsG8GYciA6Dr3mHgD6cK0jj9UPAU6EnZ5S0mjhPqKZqutu9QegzD2uESvuN8
+36xezwQthzAf0nI/P3sJGjVXjikCgYEA1POm5xcV6SmM6HnIdadEebhzZIJ9TXQz
+ry3Jj3a7CKyD5C7fAdkHUTCjgT/2ElxPi9ABkZnC+d/cW9GtJFa0II5qO/agm3KQ
+ZXYiutu9A7xACHYFXRiJEjVUdGG9dKMVOHUEa8IHEgrrcUVM/suy/GgutywIfaXs
+y58IFP24K9MCgYEAk6zjz7wL+XEiNy+sxLQfKf7vB9sSwxQHakK6wHuY/L8Zomp3
+uLEJHfjJm/SIkK0N2g0JkXkCtv5kbKyC/rsCeK0wo52BpVLjzaLr0k34kE0U6B1b
+dkEE2pGx1bG3x4KDLj+Wuct9ecK5Aa0IqIyI+vo16GkFpUM8K9e3SQo8UOECgYEA
+sCZYAkILYtJ293p9giz5rIISGasDAUXE1vxWBXEeJ3+kneTTnZCrx9Im/ewtnWR0
+fF90XL9HFDDD88POqAd8eo2zfKR2l/89SGBfPBg2EtfuU9FkgGyiPciVcqvC7q9U
+B15saMKX3KnhtdGwbfeLt9RqCCTJZT4SUSDcq5hwdvcCgYAxY4Be8mNipj8Cgg22
+mVWSolA0TEzbtUcNk6iGodpi+Z0LKpsPC0YRqPRyh1K+rIltG1BVdmUBHcMlOYxl
+lWWvbJH6PkJWy4n2MF7PO45kjN3pPZg4hgH63JjZeAineBwEArUGb9zHnvzcdRvF
+wuQ2pZHL/HJ0laUSieHDJ5917w==
+-----END PRIVATE KEY-----
+
+
+-----BEGIN CERTIFICATE-----
+MIIDjTCCAnWgAwIBAgIURt5IBx0J3xgEaQvmyrFH2A+NkpMwDQYJKoZIhvcNAQEL
+BQAwVjELMAkGA1UEBhMCUlUxDzANBgNVBAgMBk1vc2NvdzEPMA0GA1UEBwwGTW9z
+Y293MQ8wDQYDVQQKDAZZYW5kZXgxFDASBgNVBAMMC3Rlc3Qtc2VydmVyMB4XDTE5
+MDkyMDE3MTQ0MVoXDTQ3MDIwNDE3MTQ0MVowVjELMAkGA1UEBhMCUlUxDzANBgNV
+BAgMBk1vc2NvdzEPMA0GA1UEBwwGTW9zY293MQ8wDQYDVQQKDAZZYW5kZXgxFDAS
+BgNVBAMMC3Rlc3Qtc2VydmVyMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKC
+AQEAs0WY6HTuwKntcEcjo+pBuoNp5/GRgMX2qOJi09Iw021ZLK4Vf4drN7pXS5Ba
+OVqzUPFmXvoiG13hS7PLTuobJc63qPbIodiB6EXB+Sp0v+mE6lYUUyW9YxNnTPDc
+GG8E4vk9j3tBawT4yJIFTudIALWJfQvn3O9ebmYkilvq0ZT+TqBU8Mazo4lNu0T2
+YxWMlivcEyNRLPbka5W2Wy5eXGOnStidQFYka2mmCgljtulWzj1i7GODg93vmVyH
+NzjAs+mG9MJkT3ietG225BnyPDtu5A3b+vTAFhyJtMmDMyhJ6JtXXHu6zUDQxKiX
+6HLGCLIPhL2sk9ckPSkwXoMOywIDAQABo1MwUTAdBgNVHQ4EFgQUDv/xuJ4CvCgG
+fPrZP3hRAt2+/LwwHwYDVR0jBBgwFoAUDv/xuJ4CvCgGfPrZP3hRAt2+/LwwDwYD
+VR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAinKpMYaA2tjLpAnPVbjy
+/ZxSBhhB26RiQp3Re8XOKyhTWqgYE6kldYT0aXgK9x9mPC5obQannDDYxDc7lX+/
+qP/u1X81ZcDRo/f+qQ3iHfT6Ftt/4O3qLnt45MFM6Q7WabRm82x3KjZTqpF3QUdy
+tumWiuAP5DMd1IRDtnKjFHO721OsEsf6NLcqdX89bGeqXDvrkwg3/PNwTyW5E7cj
+feY8L2eWtg6AJUnIBu11wvfzkLiH3QKzHvO/SIZTGf5ihDsJ3aKEE9UNauTL3bVc
+CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V
+6g==
+-----END CERTIFICATE-----)___";
+
+ TTempFileHandle certificateFile;
+
+ certificateFile.Write(certificateContent.data(), certificateContent.size());
+
+ NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors);
NActors::TActorId proxyId = actorSystem.Register(proxy);
-
+
THolder<NHttp::TEvHttpProxy::TEvAddListeningPort> add = MakeHolder<NHttp::TEvHttpProxy::TEvAddListeningPort>(port);
- ///////// https configuration
- add->Secure = true;
- add->CertificateFile = certificateFile.Name();
- add->PrivateKeyFile = certificateFile.Name();
- /////////
+ ///////// https configuration
+ add->Secure = true;
+ add->CertificateFile = certificateFile.Name();
+ add->PrivateKeyFile = certificateFile.Name();
+ /////////
actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), add.Release()), 0, true);
- actorSystem.DispatchEvents();
-
+ actorSystem.DispatchEvents();
+
NActors::TActorId serverId = actorSystem.AllocateEdgeActor();
- actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true);
-
+ actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true);
+
NActors::TActorId clientId = actorSystem.AllocateEdgeActor();
- NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("https://[::1]:" + ToString(port) + "/test");
- actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
-
- NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle);
-
- UNIT_ASSERT_EQUAL(request->Request->URL, "/test");
-
- NHttp::THttpOutgoingResponsePtr httpResponse = request->Request->CreateResponseString("HTTP/1.1 200 Found\r\nConnection: Close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\npassed\r\n0\r\n\r\n");
- actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true);
-
- NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);
-
- UNIT_ASSERT_EQUAL(response->Response->Status, "200");
- UNIT_ASSERT_EQUAL(response->Response->Body, "passed");
- }
-
+ NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("https://[::1]:" + ToString(port) + "/test");
+ actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
+
+ NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle);
+
+ UNIT_ASSERT_EQUAL(request->Request->URL, "/test");
+
+ NHttp::THttpOutgoingResponsePtr httpResponse = request->Request->CreateResponseString("HTTP/1.1 200 Found\r\nConnection: Close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\npassed\r\n0\r\n\r\n");
+ actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true);
+
+ NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);
+
+ UNIT_ASSERT_EQUAL(response->Response->Status, "200");
+ UNIT_ASSERT_EQUAL(response->Response->Body, "passed");
+ }
+
/*Y_UNIT_TEST(AdvancedRunning) {
THolder<NActors::TActorSystemSetup> setup = MakeHolder<NActors::TActorSystemSetup>();
- setup->NodeId = 1;
- setup->ExecutorsCount = 1;
- setup->Executors = new TAutoPtr<NActors::IExecutorPool>[1];
- setup->Executors[0] = new NActors::TBasicExecutorPool(0, 2, 10);
- setup->Scheduler = new NActors::TBasicSchedulerThread(NActors::TSchedulerConfig(512, 100));
- NActors::TActorSystem actorSystem(setup);
- actorSystem.Start();
- NHttp::THttpProxy* incomingProxy = new NHttp::THttpProxy();
+ setup->NodeId = 1;
+ setup->ExecutorsCount = 1;
+ setup->Executors = new TAutoPtr<NActors::IExecutorPool>[1];
+ setup->Executors[0] = new NActors::TBasicExecutorPool(0, 2, 10);
+ setup->Scheduler = new NActors::TBasicSchedulerThread(NActors::TSchedulerConfig(512, 100));
+ NActors::TActorSystem actorSystem(setup);
+ actorSystem.Start();
+ NHttp::THttpProxy* incomingProxy = new NHttp::THttpProxy();
NActors::TActorId incomingProxyId = actorSystem.Register(incomingProxy);
- actorSystem.Send(incomingProxyId, new NHttp::TEvHttpProxy::TEvAddListeningPort(13337));
-
- NHttp::THttpProxy* outgoingProxy = new NHttp::THttpProxy();
+ actorSystem.Send(incomingProxyId, new NHttp::TEvHttpProxy::TEvAddListeningPort(13337));
+
+ NHttp::THttpProxy* outgoingProxy = new NHttp::THttpProxy();
NActors::TActorId outgoingProxyId = actorSystem.Register(outgoingProxy);
-
+
THolder<NHttp::THttpStaticStringRequest> httpRequest = MakeHolder<NHttp::THttpStaticStringRequest>("GET /test HTTP/1.1\r\n\r\n");
- actorSystem.Send(outgoingProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest("[::]:13337", std::move(httpRequest)));
-
- Sleep(TDuration::Minutes(60));
- }*/
-
- Y_UNIT_TEST(TooLongHeader) {
- NActors::TTestActorRuntimeBase actorSystem;
- TPortManager portManager;
- TIpPort port = portManager.GetTcpPort();
- TAutoPtr<NActors::IEventHandle> handle;
- actorSystem.Initialize();
- NMonitoring::TMetricRegistry sensors;
-
- NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors);
- NActors::TActorId proxyId = actorSystem.Register(proxy);
- actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true);
- actorSystem.DispatchEvents();
-
- NActors::TActorId serverId = actorSystem.AllocateEdgeActor();
- actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true);
-
- NActors::TActorId clientId = actorSystem.AllocateEdgeActor();
- NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("http://[::1]:" + ToString(port) + "/test");
- httpRequest->Set("Connection", "close");
- TString longHeader;
- longHeader.append(9000, 'X');
- httpRequest->Set(longHeader, "data");
- actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
-
- NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);
-
- UNIT_ASSERT_EQUAL(response->Response->Status, "400");
- UNIT_ASSERT_EQUAL(response->Response->Body, "Invalid http header");
- }
-}
+ actorSystem.Send(outgoingProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest("[::]:13337", std::move(httpRequest)));
+
+ Sleep(TDuration::Minutes(60));
+ }*/
+
+ Y_UNIT_TEST(TooLongHeader) {
+ NActors::TTestActorRuntimeBase actorSystem;
+ TPortManager portManager;
+ TIpPort port = portManager.GetTcpPort();
+ TAutoPtr<NActors::IEventHandle> handle;
+ actorSystem.Initialize();
+ NMonitoring::TMetricRegistry sensors;
+
+ NActors::IActor* proxy = NHttp::CreateHttpProxy(sensors);
+ NActors::TActorId proxyId = actorSystem.Register(proxy);
+ actorSystem.Send(new NActors::IEventHandle(proxyId, TActorId(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true);
+ actorSystem.DispatchEvents();
+
+ NActors::TActorId serverId = actorSystem.AllocateEdgeActor();
+ actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true);
+
+ NActors::TActorId clientId = actorSystem.AllocateEdgeActor();
+ NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("http://[::1]:" + ToString(port) + "/test");
+ httpRequest->Set("Connection", "close");
+ TString longHeader;
+ longHeader.append(9000, 'X');
+ httpRequest->Set(longHeader, "data");
+ actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
+
+ NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);
+
+ UNIT_ASSERT_EQUAL(response->Response->Status, "400");
+ UNIT_ASSERT_EQUAL(response->Response->Body, "Invalid http header");
+ }
+}
diff --git a/library/cpp/actors/http/ut/ya.make b/library/cpp/actors/http/ut/ya.make
index 8b4c04c4d3..12d360dabf 100644
--- a/library/cpp/actors/http/ut/ya.make
+++ b/library/cpp/actors/http/ut/ya.make
@@ -1,18 +1,18 @@
UNITTEST_FOR(library/cpp/actors/http)
-
-OWNER(xenoxeno)
-
-SIZE(SMALL)
-
-PEERDIR(
+
+OWNER(xenoxeno)
+
+SIZE(SMALL)
+
+PEERDIR(
library/cpp/actors/testlib
-)
-
+)
+
IF (NOT OS_WINDOWS)
-SRCS(
- http_ut.cpp
-)
+SRCS(
+ http_ut.cpp
+)
ELSE()
ENDIF()
-
-END()
+
+END()
diff --git a/library/cpp/actors/http/ya.make b/library/cpp/actors/http/ya.make
index 7ce68b7a75..60c9c93a09 100644
--- a/library/cpp/actors/http/ya.make
+++ b/library/cpp/actors/http/ya.make
@@ -1,33 +1,33 @@
-RECURSE_FOR_TESTS(ut)
-
-LIBRARY()
-
-OWNER(xenoxeno g:kikimr)
-
-SRCS(
- http_cache.cpp
- http_cache.h
- http_config.h
- http_proxy_acceptor.cpp
- http_proxy_incoming.cpp
- http_proxy_outgoing.cpp
- http_proxy_sock_impl.h
- http_proxy_ssl.h
- http_proxy.cpp
- http_proxy.h
- http_static.cpp
- http_static.h
- http.cpp
- http.h
-)
-
-PEERDIR(
- contrib/libs/openssl
+RECURSE_FOR_TESTS(ut)
+
+LIBRARY()
+
+OWNER(xenoxeno g:kikimr)
+
+SRCS(
+ http_cache.cpp
+ http_cache.h
+ http_config.h
+ http_proxy_acceptor.cpp
+ http_proxy_incoming.cpp
+ http_proxy_outgoing.cpp
+ http_proxy_sock_impl.h
+ http_proxy_ssl.h
+ http_proxy.cpp
+ http_proxy.h
+ http_static.cpp
+ http_static.h
+ http.cpp
+ http.h
+)
+
+PEERDIR(
+ contrib/libs/openssl
library/cpp/actors/core
library/cpp/actors/interconnect
library/cpp/dns
library/cpp/monlib/metrics
library/cpp/string_utils/quote
-)
-
-END()
+)
+
+END()