aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Efimov <xeno@prnwatch.com>2022-05-06 09:16:33 +0300
committerAlexey Efimov <xeno@prnwatch.com>2022-05-06 09:16:33 +0300
commit6af3e8a9fb068483ed468663f37973595adb392e (patch)
tree78b4d4d0eb90506b534cf57935869f4a05644260
parent264071acbaeeec9bef990f960aa7a434098305dd (diff)
downloadydb-6af3e8a9fb068483ed468663f37973595adb392e.tar.gz
add settings for compression KIKIMR-14742
ref:e7f27eec7e9ed0722c245f091d0eff6e20395d63
-rw-r--r--library/cpp/actors/http/http.cpp28
-rw-r--r--library/cpp/actors/http/http.h30
-rw-r--r--library/cpp/actors/http/http_cache.cpp2
-rw-r--r--library/cpp/actors/http/http_proxy.h18
-rw-r--r--library/cpp/actors/http/http_proxy_acceptor.cpp23
-rw-r--r--library/cpp/actors/http/http_proxy_incoming.cpp27
-rw-r--r--library/cpp/actors/http/http_proxy_sock_impl.h6
-rw-r--r--ydb/core/mon/async_http_mon.cpp13
8 files changed, 103 insertions, 44 deletions
diff --git a/library/cpp/actors/http/http.cpp b/library/cpp/actors/http/http.cpp
index 09381d0fb3..20dea47573 100644
--- a/library/cpp/actors/http/http.cpp
+++ b/library/cpp/actors/http/http.cpp
@@ -366,16 +366,28 @@ void THttpParser<THttpResponse, TSocketBuffer>::ConnectionClosed() {
THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseString(TStringBuf data) {
THttpParser<THttpResponse, TSocketBuffer> parser(data);
THeadersBuilder headers(parser.Headers);
- if (!WorkerName.empty()) {
- headers.Set("X-Worker-Name", WorkerName);
+ if (!Endpoint->WorkerName.empty()) {
+ headers.Set("X-Worker-Name", Endpoint->WorkerName);
}
THttpOutgoingResponsePtr response = new THttpOutgoingResponse(this);
response->InitResponse(parser.Protocol, parser.Version, parser.Status, parser.Message);
- response->Set(headers);
if (parser.HaveBody()) {
+ if (parser.ContentType && !Endpoint->CompressContentTypes.empty()) {
+ TStringBuf contentType = parser.ContentType.Before(';');
+ Trim(ContentType, ' ');
+ if (Count(Endpoint->CompressContentTypes, contentType) != 0) {
+ if (response->EnableCompression()) {
+ headers.Erase("Content-Length"); // we will need new length after compression
+ }
+ }
+ }
+ response->Set(headers);
response->SetBody(parser.Body);
} else {
- response->Set<&THttpResponse::ContentLength>("0");
+ response->Set(headers);
+ if (!response->ContentLength) {
+ response->Set<&THttpResponse::ContentLength>("0");
+ }
}
return response;
}
@@ -415,8 +427,8 @@ THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponse(TStringBuf status,
}
THttpOutgoingResponsePtr response = new THttpOutgoingResponse(this, "HTTP", version, status, message);
response->Set<&THttpResponse::Connection>(GetConnection());
- if (!WorkerName.empty()) {
- response->Set("X-Worker-Name", WorkerName);
+ if (!Endpoint->WorkerName.empty()) {
+ response->Set("X-Worker-Name", Endpoint->WorkerName);
}
if (!contentType.empty() && !body.empty()) {
response->Set<&THttpResponse::ContentType>(contentType);
@@ -701,4 +713,8 @@ void THeadersBuilder::Set(TStringBuf name, TStringBuf data) {
Headers[Data.back().first] = Data.back().second;
}
+void THeadersBuilder::Erase(TStringBuf name) {
+ Headers.erase(name);
+}
+
}
diff --git a/library/cpp/actors/http/http.h b/library/cpp/actors/http/http.h
index 6d51cf589c..54ed5d6d8f 100644
--- a/library/cpp/actors/http/http.h
+++ b/library/cpp/actors/http/http.h
@@ -99,6 +99,7 @@ struct THeadersBuilder : THeaders {
THeadersBuilder(TStringBuf headers);
THeadersBuilder(const THeadersBuilder& builder);
void Set(TStringBuf name, TStringBuf data);
+ void Erase(TStringBuf name);
};
class TSocketBuffer : public TBuffer, public THttpConfig {
@@ -637,6 +638,19 @@ inline void THttpRenderer<THttpResponse, TSocketBuffer>::Set<&THttpResponse::Con
SetContentEncoding(value);
}
+struct THttpEndpointInfo {
+ TString WorkerName;
+ bool Secure = false;
+ const std::vector<TString> CompressContentTypes; // content types, which will be automatically compressed on response
+
+ THttpEndpointInfo() = default;
+
+protected:
+ THttpEndpointInfo(std::vector<TString> compressContentTypes)
+ : CompressContentTypes(std::move(compressContentTypes))
+ {}
+};
+
class THttpIncomingRequest;
using THttpIncomingRequestPtr = TIntrusivePtr<THttpIncomingRequest>;
@@ -647,10 +661,18 @@ class THttpIncomingRequest :
public THttpParser<THttpRequest, TSocketBuffer>,
public TRefCounted<THttpIncomingRequest, TAtomicCounter> {
public:
+ std::shared_ptr<THttpEndpointInfo> Endpoint;
THttpConfig::SocketAddressType Address;
- TString WorkerName;
THPTimer Timer;
- bool Secure = false;
+
+ THttpIncomingRequest()
+ : Endpoint(std::make_shared<THttpEndpointInfo>())
+ {}
+
+ THttpIncomingRequest(std::shared_ptr<THttpEndpointInfo> endpoint, const THttpConfig::SocketAddressType& address)
+ : Endpoint(std::move(endpoint))
+ , Address(address)
+ {}
bool IsConnectionClose() const {
if (Connection.empty()) {
@@ -746,7 +768,7 @@ public:
return GetRequest()->Method != "HEAD" && Status != "204";
}
- void EnableCompression() {
+ bool EnableCompression() {
TStringBuf acceptEncoding = Request->AcceptEncoding;
std::vector<TStringBuf> encodings;
TStringBuf encoding;
@@ -759,7 +781,9 @@ public:
if (!encodings.empty()) {
// TODO: prioritize encodings
SetContentEncoding(encodings.front());
+ return true;
}
+ return false;
}
static TString CompressDeflate(TStringBuf source);
diff --git a/library/cpp/actors/http/http_cache.cpp b/library/cpp/actors/http/http_cache.cpp
index 27c4eeb6f3..c9dad66355 100644
--- a/library/cpp/actors/http/http_cache.cpp
+++ b/library/cpp/actors/http/http_cache.cpp
@@ -316,7 +316,7 @@ public:
}
TString GetName() const {
- return TStringBuilder() << (Request->Secure ? "https://" : "http://") << Request->Host << Request->URL
+ return TStringBuilder() << (Request->Endpoint->Secure ? "https://" : "http://") << Request->Host << Request->URL
<< " (" << CacheId << ")";
}
};
diff --git a/library/cpp/actors/http/http_proxy.h b/library/cpp/actors/http/http_proxy.h
index 92002b047d..bd907c13a7 100644
--- a/library/cpp/actors/http/http_proxy.h
+++ b/library/cpp/actors/http/http_proxy.h
@@ -44,12 +44,16 @@ struct TEvHttpProxy {
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_HTTP), "ES_HTTP event space is too small.");
struct TEvAddListeningPort : NActors::TEventLocal<TEvAddListeningPort, EvAddListeningPort> {
+ TString Address;
TIpPort Port;
TString WorkerName;
bool Secure = false;
TString CertificateFile;
TString PrivateKeyFile;
TString SslCertificatePem;
+ std::vector<TString> CompressContentTypes;
+
+ TEvAddListeningPort() = default;
TEvAddListeningPort(TIpPort port)
: Port(port)
@@ -63,9 +67,11 @@ struct TEvHttpProxy {
struct TEvConfirmListen : NActors::TEventLocal<TEvConfirmListen, EvConfirmListen> {
THttpConfig::SocketAddressType Address;
+ std::shared_ptr<THttpEndpointInfo> Endpoint;
- TEvConfirmListen(const THttpConfig::SocketAddressType& address)
+ TEvConfirmListen(const THttpConfig::SocketAddressType& address, std::shared_ptr<THttpEndpointInfo> endpoint)
: Address(address)
+ , Endpoint(std::move(endpoint))
{}
};
@@ -217,19 +223,21 @@ struct TEvHttpProxy {
};
};
-struct TEndpointInfo {
+struct TPrivateEndpointInfo : THttpEndpointInfo {
TActorId Proxy;
TActorId Owner;
- TString WorkerName;
- bool Secure;
TSslHelpers::TSslHolder<SSL_CTX> SecureContext;
+
+ TPrivateEndpointInfo(const std::vector<TString>& compressContentTypes)
+ : THttpEndpointInfo(compressContentTypes)
+ {}
};
NActors::IActor* CreateHttpProxy(std::weak_ptr<NMonitoring::TMetricRegistry> registry = NMonitoring::TMetricRegistry::SharedInstance());
NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller);
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, const TString& host, bool secure, const TActorId& poller);
NActors::IActor* CreateIncomingConnectionActor(
- const TEndpointInfo& endpoint,
+ std::shared_ptr<TPrivateEndpointInfo> endpoint,
TIntrusivePtr<TSocketDescriptor> socket,
THttpConfig::SocketAddressType address,
THttpIncomingRequestPtr recycledRequest = nullptr);
diff --git a/library/cpp/actors/http/http_proxy_acceptor.cpp b/library/cpp/actors/http/http_proxy_acceptor.cpp
index 9780541b71..d74ef0abac 100644
--- a/library/cpp/actors/http/http_proxy_acceptor.cpp
+++ b/library/cpp/actors/http/http_proxy_acceptor.cpp
@@ -13,7 +13,7 @@ public:
NActors::TPollerToken::TPtr PollerToken;
THashSet<TActorId> Connections;
TDeque<THttpIncomingRequestPtr> RecycledRequests;
- TEndpointInfo Endpoint;
+ std::shared_ptr<TPrivateEndpointInfo> Endpoint;
TAcceptorActor(const TActorId& owner, const TActorId& poller)
: NActors::TActor<TAcceptorActor>(&TAcceptorActor::StateInit)
@@ -45,19 +45,20 @@ protected:
}
void HandleInit(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
- SocketAddressType bindAddress("::", event->Get()->Port);
- Endpoint.Owner = ctx.SelfID;
- Endpoint.Proxy = Owner;
- Endpoint.WorkerName = event->Get()->WorkerName;
- Endpoint.Secure = event->Get()->Secure;
+ SocketAddressType bindAddress(event->Get()->Address ? event->Get()->Address.data() : "::", event->Get()->Port);
+ Endpoint = std::make_shared<TPrivateEndpointInfo>(event->Get()->CompressContentTypes);
+ Endpoint->Owner = ctx.SelfID;
+ Endpoint->Proxy = Owner;
+ Endpoint->WorkerName = event->Get()->WorkerName;
+ Endpoint->Secure = event->Get()->Secure;
int err = 0;
- if (Endpoint.Secure) {
+ if (Endpoint->Secure) {
if (!event->Get()->SslCertificatePem.empty()) {
- Endpoint.SecureContext = TSslHelpers::CreateServerContext(event->Get()->SslCertificatePem);
+ Endpoint->SecureContext = TSslHelpers::CreateServerContext(event->Get()->SslCertificatePem);
} else {
- Endpoint.SecureContext = TSslHelpers::CreateServerContext(event->Get()->CertificateFile, event->Get()->PrivateKeyFile);
+ Endpoint->SecureContext = TSslHelpers::CreateServerContext(event->Get()->CertificateFile, event->Get()->PrivateKeyFile);
}
- if (Endpoint.SecureContext == nullptr) {
+ if (Endpoint->SecureContext == nullptr) {
err = -1;
LOG_WARN_S(ctx, HttpLog, "Failed to construct server security context");
}
@@ -72,7 +73,7 @@ protected:
SetNonBlock(Socket->Socket);
ctx.Send(Poller, new NActors::TEvPollerRegister(Socket, SelfId(), SelfId()));
TBase::Become(&TAcceptorActor::StateListening);
- ctx.Send(event->Sender, new TEvHttpProxy::TEvConfirmListen(bindAddress), 0, event->Cookie);
+ ctx.Send(event->Sender, new TEvHttpProxy::TEvConfirmListen(bindAddress, Endpoint), 0, event->Cookie);
return;
}
}
diff --git a/library/cpp/actors/http/http_proxy_incoming.cpp b/library/cpp/actors/http/http_proxy_incoming.cpp
index 80fe2af53d..9d5673170b 100644
--- a/library/cpp/actors/http/http_proxy_incoming.cpp
+++ b/library/cpp/actors/http/http_proxy_incoming.cpp
@@ -11,7 +11,7 @@ public:
using TBase = TActor<TIncomingConnectionActor<TSocketImpl>>;
static constexpr bool RecycleRequests = true;
- const TEndpointInfo& Endpoint;
+ std::shared_ptr<TPrivateEndpointInfo> Endpoint;
SocketAddressType Address;
TList<THttpIncomingRequestPtr> Requests;
THashMap<THttpIncomingRequestPtr, THttpOutgoingResponsePtr> Responses;
@@ -26,13 +26,13 @@ public:
TPollerToken::TPtr PollerToken;
TIncomingConnectionActor(
- const TEndpointInfo& endpoint,
+ std::shared_ptr<TPrivateEndpointInfo> endpoint,
TIntrusivePtr<TSocketDescriptor> socket,
SocketAddressType address,
THttpIncomingRequestPtr recycledRequest = nullptr)
: TBase(&TIncomingConnectionActor::StateAccepting)
, TSocketImpl(std::move(socket))
- , Endpoint(endpoint)
+ , Endpoint(std::move(endpoint))
, Address(address)
{
if (recycledRequest != nullptr) {
@@ -61,7 +61,7 @@ public:
}
void Die(const TActorContext& ctx) override {
- ctx.Send(Endpoint.Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID, std::move(RecycledRequests)));
+ ctx.Send(Endpoint->Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID, std::move(RecycledRequests)));
TSocketImpl::Shutdown();
TBase::Die(ctx);
}
@@ -108,12 +108,11 @@ protected:
if (RecycleRequests && !RecycledRequests.empty()) {
CurrentRequest = std::move(RecycledRequests.front());
RecycledRequests.pop_front();
+ CurrentRequest->Address = Address;
+ CurrentRequest->Endpoint = Endpoint;
} else {
- CurrentRequest = new THttpIncomingRequest();
+ CurrentRequest = new THttpIncomingRequest(Endpoint, Address);
}
- CurrentRequest->Address = Address;
- CurrentRequest->WorkerName = Endpoint.WorkerName;
- CurrentRequest->Secure = Endpoint.Secure;
}
if (!CurrentRequest->EnsureEnoughSpaceAvailable()) {
LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - not enough space available");
@@ -130,7 +129,7 @@ protected:
CurrentRequest->Timer.Reset();
if (CurrentRequest->IsReady()) {
LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -> (" << CurrentRequest->Method << " " << CurrentRequest->URL << ")");
- ctx.Send(Endpoint.Proxy, new TEvHttpProxy::TEvHttpIncomingRequest(CurrentRequest));
+ ctx.Send(Endpoint->Proxy, new TEvHttpProxy::TEvHttpIncomingRequest(CurrentRequest));
CurrentRequest = nullptr;
} else if (CurrentRequest->IsError()) {
LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -! (" << CurrentRequest->Method << " " << CurrentRequest->URL << ")");
@@ -206,7 +205,7 @@ protected:
<< TString(response->GetRawData()).substr(0, MAX_LOGGED_SIZE));
}
THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildIncomingRequestSensors(request, response));
- ctx.Send(Endpoint.Owner, sensors.Release());
+ ctx.Send(Endpoint->Owner, sensors.Release());
if (request == Requests.front() && CurrentResponse == nullptr) {
CurrentResponse = response;
return FlushOutput(ctx);
@@ -288,14 +287,14 @@ protected:
};
IActor* CreateIncomingConnectionActor(
- const TEndpointInfo& endpoint,
+ std::shared_ptr<TPrivateEndpointInfo> endpoint,
TIntrusivePtr<TSocketDescriptor> socket,
THttpConfig::SocketAddressType address,
THttpIncomingRequestPtr recycledRequest) {
- if (endpoint.Secure) {
- return new TIncomingConnectionActor<TSecureSocketImpl>(endpoint, std::move(socket), address, std::move(recycledRequest));
+ if (endpoint->Secure) {
+ return new TIncomingConnectionActor<TSecureSocketImpl>(std::move(endpoint), std::move(socket), address, std::move(recycledRequest));
} else {
- return new TIncomingConnectionActor<TPlainSocketImpl>(endpoint, std::move(socket), address, std::move(recycledRequest));
+ return new TIncomingConnectionActor<TPlainSocketImpl>(std::move(endpoint), std::move(socket), address, std::move(recycledRequest));
}
}
diff --git a/library/cpp/actors/http/http_proxy_sock_impl.h b/library/cpp/actors/http/http_proxy_sock_impl.h
index bf8c71d05a..bd0a079dae 100644
--- a/library/cpp/actors/http/http_proxy_sock_impl.h
+++ b/library/cpp/actors/http/http_proxy_sock_impl.h
@@ -49,7 +49,7 @@ struct TPlainSocketImpl : virtual public THttpConfig {
return 1;
}
- static constexpr int OnAccept(const TEndpointInfo&, bool&, bool&) {
+ static int OnAccept(std::shared_ptr<TPrivateEndpointInfo>, bool&, bool&) {
return 1;
}
@@ -237,9 +237,9 @@ struct TSecureSocketImpl : TPlainSocketImpl, TSslHelpers {
return res;
}
- int OnAccept(const TEndpointInfo& endpoint, bool& read, bool& write) {
+ int OnAccept(std::shared_ptr<TPrivateEndpointInfo> endpoint, bool& read, bool& write) {
if (!Ssl) {
- InitServerSsl(endpoint.SecureContext.Get());
+ InitServerSsl(endpoint->SecureContext.Get());
}
int res = SSL_accept(Ssl.Get());
if (res <= 0) {
diff --git a/ydb/core/mon/async_http_mon.cpp b/ydb/core/mon/async_http_mon.cpp
index e6c49866bc..f6e10a7aee 100644
--- a/ydb/core/mon/async_http_mon.cpp
+++ b/ydb/core/mon/async_http_mon.cpp
@@ -429,7 +429,18 @@ void TAsyncHttpMon::Start(TActorSystem* actorSystem) {
ActorSystem->AppData<NKikimr::TAppData>()->UserPoolId);
TStringBuilder workerName;
workerName << FQDNHostName() << ":" << Config.Port;
- ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvAddListeningPort(Config.Port, workerName));
+ auto addPort = std::make_unique<NHttp::TEvHttpProxy::TEvAddListeningPort>();
+ addPort->Port = Config.Port;
+ addPort->WorkerName = workerName;
+ addPort->Address = Config.Address;
+ addPort->CompressContentTypes = {
+ "text/plain",
+ "text/html",
+ "text/css",
+ "application/javascript",
+ "application/json",
+ };
+ ActorSystem->Send(HttpProxyActorId, addPort.release());
ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/", HttpMonServiceActorId));
for (NMonitoring::IMonPage* page : ActorMonPages) {
RegisterActorMonPage(page);