diff options
author | Alexey Efimov <xeno@ydb.tech> | 2024-12-17 06:08:21 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-17 12:08:21 +0700 |
commit | f1c69636bf01d574ed1c6d75f2b97cf4bfa09ba6 (patch) | |
tree | fd3751ba33f15b4e2314aed5fb7e2a51cab74199 | |
parent | bacb7284d03e3aea6c256f85c1968268fd8eaba0 (diff) | |
download | ydb-f1c69636bf01d574ed1c6d75f2b97cf4bfa09ba6.tar.gz |
allow reuse of outgoing connection and new test for that (#12644)
-rw-r--r-- | ydb/library/actors/http/http.h | 46 | ||||
-rw-r--r-- | ydb/library/actors/http/http_proxy.cpp | 33 | ||||
-rw-r--r-- | ydb/library/actors/http/http_proxy.h | 37 | ||||
-rw-r--r-- | ydb/library/actors/http/http_proxy_acceptor.cpp | 4 | ||||
-rw-r--r-- | ydb/library/actors/http/http_proxy_incoming.cpp | 4 | ||||
-rw-r--r-- | ydb/library/actors/http/http_proxy_outgoing.cpp | 33 | ||||
-rw-r--r-- | ydb/library/actors/http/http_ut.cpp | 48 |
7 files changed, 167 insertions, 38 deletions
diff --git a/ydb/library/actors/http/http.h b/ydb/library/actors/http/http.h index 1f849b67032..f0f38dd6aec 100644 --- a/ydb/library/actors/http/http.h +++ b/ydb/library/actors/http/http.h @@ -857,23 +857,6 @@ private: void FinishResponse(THttpOutgoingResponsePtr& response, TStringBuf body = TStringBuf()); }; -class THttpIncomingResponse : - public THttpParser<THttpResponse, TSocketBuffer>, - public TRefCounted<THttpIncomingResponse, TAtomicCounter> { -public: - THttpIncomingResponse(THttpOutgoingRequestPtr request); - - THttpOutgoingRequestPtr GetRequest() const { - return Request; - } - - THttpIncomingResponsePtr Duplicate(THttpOutgoingRequestPtr request); - THttpOutgoingResponsePtr Reverse(THttpIncomingRequestPtr request); - -protected: - THttpOutgoingRequestPtr Request; -}; - class THttpOutgoingRequest : public THttpRenderer<THttpRequest, TSocketBuffer>, public TRefCounted<THttpOutgoingRequest, TAtomicCounter> { @@ -893,6 +876,35 @@ public: static THttpOutgoingRequestPtr CreateRequest(TStringBuf method, TStringBuf url, TStringBuf contentType = TStringBuf(), TStringBuf body = TStringBuf()); static THttpOutgoingRequestPtr CreateHttpRequest(TStringBuf method, TStringBuf host, TStringBuf uri, TStringBuf contentType = TStringBuf(), TStringBuf body = TStringBuf()); THttpOutgoingRequestPtr Duplicate(); + + bool IsConnectionClose() const { + return TEqNoCase()(Connection, "close"); + } + + TString GetDestination() { + return Secure ? (TStringBuilder() << "https://" << Host) : (TStringBuilder() << "http://" << Host); + } +}; + +class THttpIncomingResponse : + public THttpParser<THttpResponse, TSocketBuffer>, + public TRefCounted<THttpIncomingResponse, TAtomicCounter> { +public: + THttpIncomingResponse(THttpOutgoingRequestPtr request); + + THttpOutgoingRequestPtr GetRequest() const { + return Request; + } + + THttpIncomingResponsePtr Duplicate(THttpOutgoingRequestPtr request); + THttpOutgoingResponsePtr Reverse(THttpIncomingRequestPtr request); + + bool IsConnectionClose() const { + return Request->IsConnectionClose() || TEqNoCase()(Connection, "close"); + } + +protected: + THttpOutgoingRequestPtr Request; }; class THttpOutgoingResponse : diff --git a/ydb/library/actors/http/http_proxy.cpp b/ydb/library/actors/http/http_proxy.cpp index cb01cd3a863..1f4e32f42d2 100644 --- a/ydb/library/actors/http/http_proxy.cpp +++ b/ydb/library/actors/http/http_proxy.cpp @@ -17,6 +17,7 @@ public: IActor* AddOutgoingConnection(bool secure, const NActors::TActorContext& ctx) { IActor* connectionSocket = CreateOutgoingConnectionActor(ctx.SelfID, secure, Poller); TActorId connectionId = ctx.Register(connectionSocket); + ALOG_DEBUG(HttpLog, "Connection created " << connectionId); Connections.emplace(connectionId); return connectionSocket; } @@ -42,7 +43,8 @@ protected: HFunc(TEvHttpProxy::TEvHttpIncomingResponse, Handle); HFunc(TEvHttpProxy::TEvHttpOutgoingResponse, Handle); HFunc(TEvHttpProxy::TEvHttpAcceptorClosed, Handle); - HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle); + HFunc(TEvHttpProxy::TEvHttpOutgoingConnectionAvailable, Handle); + HFunc(TEvHttpProxy::TEvHttpOutgoingConnectionClosed, Handle); HFunc(TEvHttpProxy::TEvResolveHostRequest, Handle); HFunc(TEvHttpProxy::TEvReportSensors, Handle); HFunc(NActors::TEvents::TEvPoison, Handle); @@ -97,6 +99,19 @@ protected: } void Handle(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { + if (event->Get()->AllowConnectionReuse) { + auto destination = event->Get()->Request->GetDestination(); + auto itAvailableConnection = AvailableConnections.find(destination); + if (itAvailableConnection != AvailableConnections.end()) { + TActorId availableConnection = itAvailableConnection->second; + ALOG_DEBUG(HttpLog, "Reusing connection " << availableConnection << " for destination " << destination); + AvailableConnections.erase(itAvailableConnection); + ctx.Send(event->Forward(availableConnection)); + return; + } else { + ALOG_DEBUG(HttpLog, "Creating a new connection for destination " << destination); + } + } bool secure(event->Get()->Request->Secure); NActors::IActor* actor = AddOutgoingConnection(secure, ctx); ctx.Send(event->Forward(actor->SelfId())); @@ -115,8 +130,21 @@ protected: } } - void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) { + void Handle(TEvHttpProxy::TEvHttpOutgoingConnectionAvailable::TPtr event, const NActors::TActorContext&) { + ALOG_DEBUG(HttpLog, "Connection " << event->Get()->ConnectionID << " available for destination " << event->Get()->Destination); + AvailableConnections.emplace(event->Get()->Destination, event->Get()->ConnectionID); + } + + void Handle(TEvHttpProxy::TEvHttpOutgoingConnectionClosed::TPtr event, const NActors::TActorContext&) { + ALOG_DEBUG(HttpLog, "Connection closed " << event->Get()->ConnectionID); Connections.erase(event->Get()->ConnectionID); + auto range = AvailableConnections.equal_range(event->Get()->Destination); + for (auto it = range.first; it != range.second; ++it) { + if (it->second == event->Get()->ConnectionID) { + AvailableConnections.erase(it); + break; + } + } } void Handle(TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) { @@ -258,6 +286,7 @@ protected: THashMap<TString, THostEntry> Hosts; THashMap<TString, TActorId> Handlers; THashSet<TActorId> Connections; // outgoing + std::unordered_multimap<TString, TActorId> AvailableConnections; std::weak_ptr<NMonitoring::TMetricRegistry> Registry; }; diff --git a/ydb/library/actors/http/http_proxy.h b/ydb/library/actors/http/http_proxy.h index 4c8fed774d2..031a658144f 100644 --- a/ydb/library/actors/http/http_proxy.h +++ b/ydb/library/actors/http/http_proxy.h @@ -46,8 +46,9 @@ struct TEvHttpProxy { EvHttpOutgoingRequest, EvHttpIncomingResponse, EvHttpOutgoingResponse, - EvHttpConnectionOpened, - EvHttpConnectionClosed, + EvHttpIncomingConnectionClosed, + EvHttpOutgoingConnectionClosed, + EvHttpOutgoingConnectionAvailable, EvHttpAcceptorClosed, EvResolveHostRequest, EvResolveHostResponse, @@ -114,6 +115,7 @@ struct TEvHttpProxy { struct TEvHttpOutgoingRequest : NActors::TEventLocal<TEvHttpOutgoingRequest, EvHttpOutgoingRequest> { THttpOutgoingRequestPtr Request; TDuration Timeout; + bool AllowConnectionReuse = false; TEvHttpOutgoingRequest(THttpOutgoingRequestPtr request) : Request(std::move(request)) @@ -123,6 +125,11 @@ struct TEvHttpProxy { : Request(std::move(request)) , Timeout(timeout) {} + + TEvHttpOutgoingRequest(THttpOutgoingRequestPtr request, bool allowConnectionReuse) + : Request(std::move(request)) + , AllowConnectionReuse(allowConnectionReuse) + {} }; struct TEvHttpIncomingResponse : NActors::TEventLocal<TEvHttpIncomingResponse, EvHttpIncomingResponse> { @@ -172,27 +179,33 @@ struct TEvHttpProxy { {} }; - struct TEvHttpConnectionOpened : NActors::TEventLocal<TEvHttpConnectionOpened, EvHttpConnectionOpened> { - TString PeerAddress; + struct TEvHttpIncomingConnectionClosed : NActors::TEventLocal<TEvHttpIncomingConnectionClosed, EvHttpIncomingConnectionClosed> { TActorId ConnectionID; + TDeque<THttpIncomingRequestPtr> RecycledRequests; - TEvHttpConnectionOpened(const TString& peerAddress, const TActorId& connectionID) - : PeerAddress(peerAddress) - , ConnectionID(connectionID) + TEvHttpIncomingConnectionClosed(const TActorId& connectionID, TDeque<THttpIncomingRequestPtr> recycledRequests) + : ConnectionID(connectionID) + , RecycledRequests(std::move(recycledRequests)) {} }; - struct TEvHttpConnectionClosed : NActors::TEventLocal<TEvHttpConnectionClosed, EvHttpConnectionClosed> { + struct TEvHttpOutgoingConnectionClosed : NActors::TEventLocal<TEvHttpOutgoingConnectionClosed, EvHttpOutgoingConnectionClosed> { TActorId ConnectionID; - TDeque<THttpIncomingRequestPtr> RecycledRequests; + TString Destination; - TEvHttpConnectionClosed(const TActorId& connectionID) + TEvHttpOutgoingConnectionClosed(const TActorId& connectionID, const TString& destination) : ConnectionID(connectionID) + , Destination(destination) {} + }; - TEvHttpConnectionClosed(const TActorId& connectionID, TDeque<THttpIncomingRequestPtr> recycledRequests) + struct TEvHttpOutgoingConnectionAvailable : NActors::TEventLocal<TEvHttpOutgoingConnectionAvailable, EvHttpOutgoingConnectionAvailable> { + TActorId ConnectionID; + TString Destination; + + TEvHttpOutgoingConnectionAvailable(const TActorId& connectionID, const TString& destination) : ConnectionID(connectionID) - , RecycledRequests(std::move(recycledRequests)) + , Destination(destination) {} }; diff --git a/ydb/library/actors/http/http_proxy_acceptor.cpp b/ydb/library/actors/http/http_proxy_acceptor.cpp index a397d1b9bc4..3b69eafa217 100644 --- a/ydb/library/actors/http/http_proxy_acceptor.cpp +++ b/ydb/library/actors/http/http_proxy_acceptor.cpp @@ -30,7 +30,7 @@ protected: switch (ev->GetTypeRewrite()) { HFunc(NActors::TEvPollerRegisterResult, Handle); HFunc(NActors::TEvPollerReady, Handle); - HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle); + HFunc(TEvHttpProxy::TEvHttpIncomingConnectionClosed, Handle); HFunc(TEvHttpProxy::TEvReportSensors, Handle); } } @@ -143,7 +143,7 @@ protected: } } - void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) { + void Handle(TEvHttpProxy::TEvHttpIncomingConnectionClosed::TPtr event, const NActors::TActorContext&) { Connections.erase(event->Get()->ConnectionID); for (auto& req : event->Get()->RecycledRequests) { if (RecycledRequests.size() >= MaxRecycledRequestsCount) { diff --git a/ydb/library/actors/http/http_proxy_incoming.cpp b/ydb/library/actors/http/http_proxy_incoming.cpp index 1205dcecefe..a01b97e4aa1 100644 --- a/ydb/library/actors/http/http_proxy_incoming.cpp +++ b/ydb/library/actors/http/http_proxy_incoming.cpp @@ -62,7 +62,7 @@ public: } void Die(const TActorContext& ctx) override { - ctx.Send(Endpoint->Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID, std::move(RecycledRequests))); + ctx.Send(Endpoint->Owner, new TEvHttpProxy::TEvHttpIncomingConnectionClosed(ctx.SelfID, std::move(RecycledRequests))); TSocketImpl::Shutdown(); TBase::Die(ctx); } @@ -218,7 +218,7 @@ protected: THttpIncomingRequestPtr request = response->GetRequest(); LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- (" << response->Status << " " << response->Message << (response->IsDone() ? ")" : ") (incomplete)")); - if (!response->Status.StartsWith('2') && response->Status != "404") { + if (!response->Status.StartsWith('2') && !response->Status.StartsWith('3') && response->Status != "404") { static constexpr size_t MAX_LOGGED_SIZE = 1024; LOG_DEBUG_S(ctx, HttpLog, "(#" diff --git a/ydb/library/actors/http/http_proxy_outgoing.cpp b/ydb/library/actors/http/http_proxy_outgoing.cpp index b1f27c2c5a1..08f77ba4dd9 100644 --- a/ydb/library/actors/http/http_proxy_outgoing.cpp +++ b/ydb/library/actors/http/http_proxy_outgoing.cpp @@ -11,11 +11,13 @@ public: const TActorId Owner; const TActorId Poller; SocketAddressType Address; + TString Destination; TActorId RequestOwner; THttpOutgoingRequestPtr Request; THttpIncomingResponsePtr Response; TInstant LastActivity; TDuration ConnectionTimeout = CONNECTION_TIMEOUT; + bool AllowConnectionReuse = false; NActors::TPollerToken::TPtr PollerToken; TOutgoingConnectionActor(const TActorId& owner, const TActorId& poller) @@ -28,7 +30,7 @@ public: static constexpr char ActorName[] = "OUT_CONNECTION_ACTOR"; void Die(const NActors::TActorContext& ctx) override { - ctx.Send(Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID)); + ctx.Send(Owner, new TEvHttpProxy::TEvHttpOutgoingConnectionClosed(ctx.SelfID, Destination)); TSocketImpl::Shutdown(); // to avoid errors when connection already closed TBase::Die(ctx); } @@ -51,8 +53,13 @@ public: RequestOwner = TActorId(); THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response)); ctx.Send(Owner, sensors.Release()); - LOG_DEBUG_S(ctx, HttpLog, GetSocketName() << "connection closed"); - Die(ctx); + if (!AllowConnectionReuse || Response->IsConnectionClose()) { + LOG_DEBUG_S(ctx, HttpLog, GetSocketName() << "connection closed"); + Die(ctx); + } else { + LOG_DEBUG_S(ctx, HttpLog, GetSocketName() << "connection available for reuse"); + ctx.Send(Owner, new TEvHttpProxy::TEvHttpOutgoingConnectionAvailable(ctx.SelfID, Destination)); + } } void ReplyErrorAndDie(const NActors::TActorContext& ctx, const TString& error) { @@ -242,6 +249,7 @@ protected: void HandleWaiting(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { LastActivity = ctx.Now(); Request = std::move(event->Get()->Request); + Destination = Request->GetDestination(); TSocketImpl::SetHost(TString(Request->Host)); LOG_DEBUG_S(ctx, HttpLog, GetSocketName() << "resolving " << TSocketImpl::Host); Request->Timer.Reset(); @@ -250,11 +258,29 @@ protected: if (event->Get()->Timeout) { ConnectionTimeout = event->Get()->Timeout; } + AllowConnectionReuse = event->Get()->AllowConnectionReuse; ctx.Schedule(ConnectionTimeout, new NActors::TEvents::TEvWakeup()); LastActivity = ctx.Now(); TBase::Become(&TOutgoingConnectionActor::StateResolving); } + void HandleConnected(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { + LastActivity = ctx.Now(); + Request = std::move(event->Get()->Request); + Request->Timer.Reset(); + Response = nullptr; + RequestOwner = event->Sender; + if (event->Get()->Timeout) { + ConnectionTimeout = event->Get()->Timeout; + } + AllowConnectionReuse = event->Get()->AllowConnectionReuse; + ctx.Schedule(ConnectionTimeout, new NActors::TEvents::TEvWakeup()); + LastActivity = ctx.Now(); + LOG_DEBUG_S(ctx, HttpLog, GetSocketName() << "<- (" << Request->Method << " " << Request->URL << ")"); + FlushOutput(ctx); + PullInput(ctx); + } + void HandleConnected(NActors::TEvPollerReady::TPtr event, const NActors::TActorContext& ctx) { LastActivity = ctx.Now(); if (event->Get()->Write && RequestOwner) { @@ -314,6 +340,7 @@ protected: HFunc(NActors::TEvPollerReady, HandleConnected); CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); HFunc(NActors::TEvPollerRegisterResult, HandleConnected); + HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleConnected); } } diff --git a/ydb/library/actors/http/http_ut.cpp b/ydb/library/actors/http/http_ut.cpp index d852e051237..2af61480ed6 100644 --- a/ydb/library/actors/http/http_ut.cpp +++ b/ydb/library/actors/http/http_ut.cpp @@ -701,4 +701,52 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V UNIT_ASSERT_EQUAL(response->Response->Status, "200"); UNIT_ASSERT_EQUAL(response->Response->Body, "passed"); } + + Y_UNIT_TEST(RequestAfter307) { + NActors::TTestActorRuntimeBase actorSystem; + TPortManager portManager; + TIpPort port = portManager.GetTcpPort(); + TAutoPtr<NActors::IEventHandle> handle; + actorSystem.Initialize(); + actorSystem.SetLogPriority(NActorsServices::HTTP, NActors::NLog::PRI_TRACE); + + NActors::IActor* proxy = NHttp::CreateHttpProxy(); + NActors::TActorId proxyId = actorSystem.Register(proxy); + actorSystem.Send(new NActors::IEventHandle(proxyId, actorSystem.AllocateEdgeActor(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true); + actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvConfirmListen>(handle); + + NActors::TActorId serverId = actorSystem.AllocateEdgeActor(); + actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test1", serverId)), 0, true); + actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test2", serverId)), 0, true); + + NActors::TActorId clientId = actorSystem.AllocateEdgeActor(); + NHttp::THttpOutgoingRequestPtr httpRequest1 = NHttp::THttpOutgoingRequest::CreateRequestGet("http://127.0.0.1:" + ToString(port) + "/test1"); + actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest1, true)), 0, true); + + NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request1 = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle); + + UNIT_ASSERT_EQUAL(request1->Request->URL, "/test1"); + + NHttp::THttpOutgoingResponsePtr httpResponse1 = request1->Request->CreateResponseString("HTTP/1.1 307 Temporary Redirect\r\nLocation: /test2\r\n\r\n"); + actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse1)), 0, true); + + NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response1 = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle); + + UNIT_ASSERT_EQUAL(response1->Response->Status, "307"); + + auto location = NHttp::THeaders(response1->Response->Headers)["Location"]; + NHttp::THttpOutgoingRequestPtr httpRequest2 = NHttp::THttpOutgoingRequest::CreateRequestGet("http://127.0.0.1:" + ToString(port) + location); + actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest2, true)), 0, true); + + NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request2 = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle); + + UNIT_ASSERT_EQUAL(request2->Request->URL, "/test2"); + + NHttp::THttpOutgoingResponsePtr httpResponse2 = request2->Request->CreateResponseString("HTTP/1.1 200 Ok\r\nContent-Length: 0\r\n\r\n"); + actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse2)), 0, true); + + NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response2 = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle); + + UNIT_ASSERT_EQUAL(response2->Response->Status, "200"); + } } |