diff options
author | Alexey Efimov <xeno@prnwatch.com> | 2022-05-06 09:16:33 +0300 |
---|---|---|
committer | Alexey Efimov <xeno@prnwatch.com> | 2022-05-06 09:16:33 +0300 |
commit | 6af3e8a9fb068483ed468663f37973595adb392e (patch) | |
tree | 78b4d4d0eb90506b534cf57935869f4a05644260 | |
parent | 264071acbaeeec9bef990f960aa7a434098305dd (diff) | |
download | ydb-6af3e8a9fb068483ed468663f37973595adb392e.tar.gz |
add settings for compression KIKIMR-14742
ref:e7f27eec7e9ed0722c245f091d0eff6e20395d63
-rw-r--r-- | library/cpp/actors/http/http.cpp | 28 | ||||
-rw-r--r-- | library/cpp/actors/http/http.h | 30 | ||||
-rw-r--r-- | library/cpp/actors/http/http_cache.cpp | 2 | ||||
-rw-r--r-- | library/cpp/actors/http/http_proxy.h | 18 | ||||
-rw-r--r-- | library/cpp/actors/http/http_proxy_acceptor.cpp | 23 | ||||
-rw-r--r-- | library/cpp/actors/http/http_proxy_incoming.cpp | 27 | ||||
-rw-r--r-- | library/cpp/actors/http/http_proxy_sock_impl.h | 6 | ||||
-rw-r--r-- | ydb/core/mon/async_http_mon.cpp | 13 |
8 files changed, 103 insertions, 44 deletions
diff --git a/library/cpp/actors/http/http.cpp b/library/cpp/actors/http/http.cpp index 09381d0fb3..20dea47573 100644 --- a/library/cpp/actors/http/http.cpp +++ b/library/cpp/actors/http/http.cpp @@ -366,16 +366,28 @@ void THttpParser<THttpResponse, TSocketBuffer>::ConnectionClosed() { THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponseString(TStringBuf data) { THttpParser<THttpResponse, TSocketBuffer> parser(data); THeadersBuilder headers(parser.Headers); - if (!WorkerName.empty()) { - headers.Set("X-Worker-Name", WorkerName); + if (!Endpoint->WorkerName.empty()) { + headers.Set("X-Worker-Name", Endpoint->WorkerName); } THttpOutgoingResponsePtr response = new THttpOutgoingResponse(this); response->InitResponse(parser.Protocol, parser.Version, parser.Status, parser.Message); - response->Set(headers); if (parser.HaveBody()) { + if (parser.ContentType && !Endpoint->CompressContentTypes.empty()) { + TStringBuf contentType = parser.ContentType.Before(';'); + Trim(ContentType, ' '); + if (Count(Endpoint->CompressContentTypes, contentType) != 0) { + if (response->EnableCompression()) { + headers.Erase("Content-Length"); // we will need new length after compression + } + } + } + response->Set(headers); response->SetBody(parser.Body); } else { - response->Set<&THttpResponse::ContentLength>("0"); + response->Set(headers); + if (!response->ContentLength) { + response->Set<&THttpResponse::ContentLength>("0"); + } } return response; } @@ -415,8 +427,8 @@ THttpOutgoingResponsePtr THttpIncomingRequest::CreateResponse(TStringBuf status, } THttpOutgoingResponsePtr response = new THttpOutgoingResponse(this, "HTTP", version, status, message); response->Set<&THttpResponse::Connection>(GetConnection()); - if (!WorkerName.empty()) { - response->Set("X-Worker-Name", WorkerName); + if (!Endpoint->WorkerName.empty()) { + response->Set("X-Worker-Name", Endpoint->WorkerName); } if (!contentType.empty() && !body.empty()) { response->Set<&THttpResponse::ContentType>(contentType); @@ -701,4 +713,8 @@ void THeadersBuilder::Set(TStringBuf name, TStringBuf data) { Headers[Data.back().first] = Data.back().second; } +void THeadersBuilder::Erase(TStringBuf name) { + Headers.erase(name); +} + } diff --git a/library/cpp/actors/http/http.h b/library/cpp/actors/http/http.h index 6d51cf589c..54ed5d6d8f 100644 --- a/library/cpp/actors/http/http.h +++ b/library/cpp/actors/http/http.h @@ -99,6 +99,7 @@ struct THeadersBuilder : THeaders { THeadersBuilder(TStringBuf headers); THeadersBuilder(const THeadersBuilder& builder); void Set(TStringBuf name, TStringBuf data); + void Erase(TStringBuf name); }; class TSocketBuffer : public TBuffer, public THttpConfig { @@ -637,6 +638,19 @@ inline void THttpRenderer<THttpResponse, TSocketBuffer>::Set<&THttpResponse::Con SetContentEncoding(value); } +struct THttpEndpointInfo { + TString WorkerName; + bool Secure = false; + const std::vector<TString> CompressContentTypes; // content types, which will be automatically compressed on response + + THttpEndpointInfo() = default; + +protected: + THttpEndpointInfo(std::vector<TString> compressContentTypes) + : CompressContentTypes(std::move(compressContentTypes)) + {} +}; + class THttpIncomingRequest; using THttpIncomingRequestPtr = TIntrusivePtr<THttpIncomingRequest>; @@ -647,10 +661,18 @@ class THttpIncomingRequest : public THttpParser<THttpRequest, TSocketBuffer>, public TRefCounted<THttpIncomingRequest, TAtomicCounter> { public: + std::shared_ptr<THttpEndpointInfo> Endpoint; THttpConfig::SocketAddressType Address; - TString WorkerName; THPTimer Timer; - bool Secure = false; + + THttpIncomingRequest() + : Endpoint(std::make_shared<THttpEndpointInfo>()) + {} + + THttpIncomingRequest(std::shared_ptr<THttpEndpointInfo> endpoint, const THttpConfig::SocketAddressType& address) + : Endpoint(std::move(endpoint)) + , Address(address) + {} bool IsConnectionClose() const { if (Connection.empty()) { @@ -746,7 +768,7 @@ public: return GetRequest()->Method != "HEAD" && Status != "204"; } - void EnableCompression() { + bool EnableCompression() { TStringBuf acceptEncoding = Request->AcceptEncoding; std::vector<TStringBuf> encodings; TStringBuf encoding; @@ -759,7 +781,9 @@ public: if (!encodings.empty()) { // TODO: prioritize encodings SetContentEncoding(encodings.front()); + return true; } + return false; } static TString CompressDeflate(TStringBuf source); diff --git a/library/cpp/actors/http/http_cache.cpp b/library/cpp/actors/http/http_cache.cpp index 27c4eeb6f3..c9dad66355 100644 --- a/library/cpp/actors/http/http_cache.cpp +++ b/library/cpp/actors/http/http_cache.cpp @@ -316,7 +316,7 @@ public: } TString GetName() const { - return TStringBuilder() << (Request->Secure ? "https://" : "http://") << Request->Host << Request->URL + return TStringBuilder() << (Request->Endpoint->Secure ? "https://" : "http://") << Request->Host << Request->URL << " (" << CacheId << ")"; } }; diff --git a/library/cpp/actors/http/http_proxy.h b/library/cpp/actors/http/http_proxy.h index 92002b047d..bd907c13a7 100644 --- a/library/cpp/actors/http/http_proxy.h +++ b/library/cpp/actors/http/http_proxy.h @@ -44,12 +44,16 @@ struct TEvHttpProxy { static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_HTTP), "ES_HTTP event space is too small."); struct TEvAddListeningPort : NActors::TEventLocal<TEvAddListeningPort, EvAddListeningPort> { + TString Address; TIpPort Port; TString WorkerName; bool Secure = false; TString CertificateFile; TString PrivateKeyFile; TString SslCertificatePem; + std::vector<TString> CompressContentTypes; + + TEvAddListeningPort() = default; TEvAddListeningPort(TIpPort port) : Port(port) @@ -63,9 +67,11 @@ struct TEvHttpProxy { struct TEvConfirmListen : NActors::TEventLocal<TEvConfirmListen, EvConfirmListen> { THttpConfig::SocketAddressType Address; + std::shared_ptr<THttpEndpointInfo> Endpoint; - TEvConfirmListen(const THttpConfig::SocketAddressType& address) + TEvConfirmListen(const THttpConfig::SocketAddressType& address, std::shared_ptr<THttpEndpointInfo> endpoint) : Address(address) + , Endpoint(std::move(endpoint)) {} }; @@ -217,19 +223,21 @@ struct TEvHttpProxy { }; }; -struct TEndpointInfo { +struct TPrivateEndpointInfo : THttpEndpointInfo { TActorId Proxy; TActorId Owner; - TString WorkerName; - bool Secure; TSslHelpers::TSslHolder<SSL_CTX> SecureContext; + + TPrivateEndpointInfo(const std::vector<TString>& compressContentTypes) + : THttpEndpointInfo(compressContentTypes) + {} }; NActors::IActor* CreateHttpProxy(std::weak_ptr<NMonitoring::TMetricRegistry> registry = NMonitoring::TMetricRegistry::SharedInstance()); NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller); NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, const TString& host, bool secure, const TActorId& poller); NActors::IActor* CreateIncomingConnectionActor( - const TEndpointInfo& endpoint, + std::shared_ptr<TPrivateEndpointInfo> endpoint, TIntrusivePtr<TSocketDescriptor> socket, THttpConfig::SocketAddressType address, THttpIncomingRequestPtr recycledRequest = nullptr); diff --git a/library/cpp/actors/http/http_proxy_acceptor.cpp b/library/cpp/actors/http/http_proxy_acceptor.cpp index 9780541b71..d74ef0abac 100644 --- a/library/cpp/actors/http/http_proxy_acceptor.cpp +++ b/library/cpp/actors/http/http_proxy_acceptor.cpp @@ -13,7 +13,7 @@ public: NActors::TPollerToken::TPtr PollerToken; THashSet<TActorId> Connections; TDeque<THttpIncomingRequestPtr> RecycledRequests; - TEndpointInfo Endpoint; + std::shared_ptr<TPrivateEndpointInfo> Endpoint; TAcceptorActor(const TActorId& owner, const TActorId& poller) : NActors::TActor<TAcceptorActor>(&TAcceptorActor::StateInit) @@ -45,19 +45,20 @@ protected: } void HandleInit(TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) { - SocketAddressType bindAddress("::", event->Get()->Port); - Endpoint.Owner = ctx.SelfID; - Endpoint.Proxy = Owner; - Endpoint.WorkerName = event->Get()->WorkerName; - Endpoint.Secure = event->Get()->Secure; + SocketAddressType bindAddress(event->Get()->Address ? event->Get()->Address.data() : "::", event->Get()->Port); + Endpoint = std::make_shared<TPrivateEndpointInfo>(event->Get()->CompressContentTypes); + Endpoint->Owner = ctx.SelfID; + Endpoint->Proxy = Owner; + Endpoint->WorkerName = event->Get()->WorkerName; + Endpoint->Secure = event->Get()->Secure; int err = 0; - if (Endpoint.Secure) { + if (Endpoint->Secure) { if (!event->Get()->SslCertificatePem.empty()) { - Endpoint.SecureContext = TSslHelpers::CreateServerContext(event->Get()->SslCertificatePem); + Endpoint->SecureContext = TSslHelpers::CreateServerContext(event->Get()->SslCertificatePem); } else { - Endpoint.SecureContext = TSslHelpers::CreateServerContext(event->Get()->CertificateFile, event->Get()->PrivateKeyFile); + Endpoint->SecureContext = TSslHelpers::CreateServerContext(event->Get()->CertificateFile, event->Get()->PrivateKeyFile); } - if (Endpoint.SecureContext == nullptr) { + if (Endpoint->SecureContext == nullptr) { err = -1; LOG_WARN_S(ctx, HttpLog, "Failed to construct server security context"); } @@ -72,7 +73,7 @@ protected: SetNonBlock(Socket->Socket); ctx.Send(Poller, new NActors::TEvPollerRegister(Socket, SelfId(), SelfId())); TBase::Become(&TAcceptorActor::StateListening); - ctx.Send(event->Sender, new TEvHttpProxy::TEvConfirmListen(bindAddress), 0, event->Cookie); + ctx.Send(event->Sender, new TEvHttpProxy::TEvConfirmListen(bindAddress, Endpoint), 0, event->Cookie); return; } } diff --git a/library/cpp/actors/http/http_proxy_incoming.cpp b/library/cpp/actors/http/http_proxy_incoming.cpp index 80fe2af53d..9d5673170b 100644 --- a/library/cpp/actors/http/http_proxy_incoming.cpp +++ b/library/cpp/actors/http/http_proxy_incoming.cpp @@ -11,7 +11,7 @@ public: using TBase = TActor<TIncomingConnectionActor<TSocketImpl>>; static constexpr bool RecycleRequests = true; - const TEndpointInfo& Endpoint; + std::shared_ptr<TPrivateEndpointInfo> Endpoint; SocketAddressType Address; TList<THttpIncomingRequestPtr> Requests; THashMap<THttpIncomingRequestPtr, THttpOutgoingResponsePtr> Responses; @@ -26,13 +26,13 @@ public: TPollerToken::TPtr PollerToken; TIncomingConnectionActor( - const TEndpointInfo& endpoint, + std::shared_ptr<TPrivateEndpointInfo> endpoint, TIntrusivePtr<TSocketDescriptor> socket, SocketAddressType address, THttpIncomingRequestPtr recycledRequest = nullptr) : TBase(&TIncomingConnectionActor::StateAccepting) , TSocketImpl(std::move(socket)) - , Endpoint(endpoint) + , Endpoint(std::move(endpoint)) , Address(address) { if (recycledRequest != nullptr) { @@ -61,7 +61,7 @@ public: } void Die(const TActorContext& ctx) override { - ctx.Send(Endpoint.Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID, std::move(RecycledRequests))); + ctx.Send(Endpoint->Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID, std::move(RecycledRequests))); TSocketImpl::Shutdown(); TBase::Die(ctx); } @@ -108,12 +108,11 @@ protected: if (RecycleRequests && !RecycledRequests.empty()) { CurrentRequest = std::move(RecycledRequests.front()); RecycledRequests.pop_front(); + CurrentRequest->Address = Address; + CurrentRequest->Endpoint = Endpoint; } else { - CurrentRequest = new THttpIncomingRequest(); + CurrentRequest = new THttpIncomingRequest(Endpoint, Address); } - CurrentRequest->Address = Address; - CurrentRequest->WorkerName = Endpoint.WorkerName; - CurrentRequest->Secure = Endpoint.Secure; } if (!CurrentRequest->EnsureEnoughSpaceAvailable()) { LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - not enough space available"); @@ -130,7 +129,7 @@ protected: CurrentRequest->Timer.Reset(); if (CurrentRequest->IsReady()) { LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -> (" << CurrentRequest->Method << " " << CurrentRequest->URL << ")"); - ctx.Send(Endpoint.Proxy, new TEvHttpProxy::TEvHttpIncomingRequest(CurrentRequest)); + ctx.Send(Endpoint->Proxy, new TEvHttpProxy::TEvHttpIncomingRequest(CurrentRequest)); CurrentRequest = nullptr; } else if (CurrentRequest->IsError()) { LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -! (" << CurrentRequest->Method << " " << CurrentRequest->URL << ")"); @@ -206,7 +205,7 @@ protected: << TString(response->GetRawData()).substr(0, MAX_LOGGED_SIZE)); } THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildIncomingRequestSensors(request, response)); - ctx.Send(Endpoint.Owner, sensors.Release()); + ctx.Send(Endpoint->Owner, sensors.Release()); if (request == Requests.front() && CurrentResponse == nullptr) { CurrentResponse = response; return FlushOutput(ctx); @@ -288,14 +287,14 @@ protected: }; IActor* CreateIncomingConnectionActor( - const TEndpointInfo& endpoint, + std::shared_ptr<TPrivateEndpointInfo> endpoint, TIntrusivePtr<TSocketDescriptor> socket, THttpConfig::SocketAddressType address, THttpIncomingRequestPtr recycledRequest) { - if (endpoint.Secure) { - return new TIncomingConnectionActor<TSecureSocketImpl>(endpoint, std::move(socket), address, std::move(recycledRequest)); + if (endpoint->Secure) { + return new TIncomingConnectionActor<TSecureSocketImpl>(std::move(endpoint), std::move(socket), address, std::move(recycledRequest)); } else { - return new TIncomingConnectionActor<TPlainSocketImpl>(endpoint, std::move(socket), address, std::move(recycledRequest)); + return new TIncomingConnectionActor<TPlainSocketImpl>(std::move(endpoint), std::move(socket), address, std::move(recycledRequest)); } } diff --git a/library/cpp/actors/http/http_proxy_sock_impl.h b/library/cpp/actors/http/http_proxy_sock_impl.h index bf8c71d05a..bd0a079dae 100644 --- a/library/cpp/actors/http/http_proxy_sock_impl.h +++ b/library/cpp/actors/http/http_proxy_sock_impl.h @@ -49,7 +49,7 @@ struct TPlainSocketImpl : virtual public THttpConfig { return 1; } - static constexpr int OnAccept(const TEndpointInfo&, bool&, bool&) { + static int OnAccept(std::shared_ptr<TPrivateEndpointInfo>, bool&, bool&) { return 1; } @@ -237,9 +237,9 @@ struct TSecureSocketImpl : TPlainSocketImpl, TSslHelpers { return res; } - int OnAccept(const TEndpointInfo& endpoint, bool& read, bool& write) { + int OnAccept(std::shared_ptr<TPrivateEndpointInfo> endpoint, bool& read, bool& write) { if (!Ssl) { - InitServerSsl(endpoint.SecureContext.Get()); + InitServerSsl(endpoint->SecureContext.Get()); } int res = SSL_accept(Ssl.Get()); if (res <= 0) { diff --git a/ydb/core/mon/async_http_mon.cpp b/ydb/core/mon/async_http_mon.cpp index e6c49866bc..f6e10a7aee 100644 --- a/ydb/core/mon/async_http_mon.cpp +++ b/ydb/core/mon/async_http_mon.cpp @@ -429,7 +429,18 @@ void TAsyncHttpMon::Start(TActorSystem* actorSystem) { ActorSystem->AppData<NKikimr::TAppData>()->UserPoolId); TStringBuilder workerName; workerName << FQDNHostName() << ":" << Config.Port; - ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvAddListeningPort(Config.Port, workerName)); + auto addPort = std::make_unique<NHttp::TEvHttpProxy::TEvAddListeningPort>(); + addPort->Port = Config.Port; + addPort->WorkerName = workerName; + addPort->Address = Config.Address; + addPort->CompressContentTypes = { + "text/plain", + "text/html", + "text/css", + "application/javascript", + "application/json", + }; + ActorSystem->Send(HttpProxyActorId, addPort.release()); ActorSystem->Send(HttpProxyActorId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/", HttpMonServiceActorId)); for (NMonitoring::IMonPage* page : ActorMonPages) { RegisterActorMonPage(page); |