aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Efimov <xeno@ydb.tech>2024-12-17 06:08:21 +0100
committerGitHub <noreply@github.com>2024-12-17 12:08:21 +0700
commitf1c69636bf01d574ed1c6d75f2b97cf4bfa09ba6 (patch)
treefd3751ba33f15b4e2314aed5fb7e2a51cab74199
parentbacb7284d03e3aea6c256f85c1968268fd8eaba0 (diff)
downloadydb-f1c69636bf01d574ed1c6d75f2b97cf4bfa09ba6.tar.gz
allow reuse of outgoing connection and new test for that (#12644)
-rw-r--r--ydb/library/actors/http/http.h46
-rw-r--r--ydb/library/actors/http/http_proxy.cpp33
-rw-r--r--ydb/library/actors/http/http_proxy.h37
-rw-r--r--ydb/library/actors/http/http_proxy_acceptor.cpp4
-rw-r--r--ydb/library/actors/http/http_proxy_incoming.cpp4
-rw-r--r--ydb/library/actors/http/http_proxy_outgoing.cpp33
-rw-r--r--ydb/library/actors/http/http_ut.cpp48
7 files changed, 167 insertions, 38 deletions
diff --git a/ydb/library/actors/http/http.h b/ydb/library/actors/http/http.h
index 1f849b67032..f0f38dd6aec 100644
--- a/ydb/library/actors/http/http.h
+++ b/ydb/library/actors/http/http.h
@@ -857,23 +857,6 @@ private:
void FinishResponse(THttpOutgoingResponsePtr& response, TStringBuf body = TStringBuf());
};
-class THttpIncomingResponse :
- public THttpParser<THttpResponse, TSocketBuffer>,
- public TRefCounted<THttpIncomingResponse, TAtomicCounter> {
-public:
- THttpIncomingResponse(THttpOutgoingRequestPtr request);
-
- THttpOutgoingRequestPtr GetRequest() const {
- return Request;
- }
-
- THttpIncomingResponsePtr Duplicate(THttpOutgoingRequestPtr request);
- THttpOutgoingResponsePtr Reverse(THttpIncomingRequestPtr request);
-
-protected:
- THttpOutgoingRequestPtr Request;
-};
-
class THttpOutgoingRequest :
public THttpRenderer<THttpRequest, TSocketBuffer>,
public TRefCounted<THttpOutgoingRequest, TAtomicCounter> {
@@ -893,6 +876,35 @@ public:
static THttpOutgoingRequestPtr CreateRequest(TStringBuf method, TStringBuf url, TStringBuf contentType = TStringBuf(), TStringBuf body = TStringBuf());
static THttpOutgoingRequestPtr CreateHttpRequest(TStringBuf method, TStringBuf host, TStringBuf uri, TStringBuf contentType = TStringBuf(), TStringBuf body = TStringBuf());
THttpOutgoingRequestPtr Duplicate();
+
+ bool IsConnectionClose() const {
+ return TEqNoCase()(Connection, "close");
+ }
+
+ TString GetDestination() {
+ return Secure ? (TStringBuilder() << "https://" << Host) : (TStringBuilder() << "http://" << Host);
+ }
+};
+
+class THttpIncomingResponse :
+ public THttpParser<THttpResponse, TSocketBuffer>,
+ public TRefCounted<THttpIncomingResponse, TAtomicCounter> {
+public:
+ THttpIncomingResponse(THttpOutgoingRequestPtr request);
+
+ THttpOutgoingRequestPtr GetRequest() const {
+ return Request;
+ }
+
+ THttpIncomingResponsePtr Duplicate(THttpOutgoingRequestPtr request);
+ THttpOutgoingResponsePtr Reverse(THttpIncomingRequestPtr request);
+
+ bool IsConnectionClose() const {
+ return Request->IsConnectionClose() || TEqNoCase()(Connection, "close");
+ }
+
+protected:
+ THttpOutgoingRequestPtr Request;
};
class THttpOutgoingResponse :
diff --git a/ydb/library/actors/http/http_proxy.cpp b/ydb/library/actors/http/http_proxy.cpp
index cb01cd3a863..1f4e32f42d2 100644
--- a/ydb/library/actors/http/http_proxy.cpp
+++ b/ydb/library/actors/http/http_proxy.cpp
@@ -17,6 +17,7 @@ public:
IActor* AddOutgoingConnection(bool secure, const NActors::TActorContext& ctx) {
IActor* connectionSocket = CreateOutgoingConnectionActor(ctx.SelfID, secure, Poller);
TActorId connectionId = ctx.Register(connectionSocket);
+ ALOG_DEBUG(HttpLog, "Connection created " << connectionId);
Connections.emplace(connectionId);
return connectionSocket;
}
@@ -42,7 +43,8 @@ protected:
HFunc(TEvHttpProxy::TEvHttpIncomingResponse, Handle);
HFunc(TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
HFunc(TEvHttpProxy::TEvHttpAcceptorClosed, Handle);
- HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle);
+ HFunc(TEvHttpProxy::TEvHttpOutgoingConnectionAvailable, Handle);
+ HFunc(TEvHttpProxy::TEvHttpOutgoingConnectionClosed, Handle);
HFunc(TEvHttpProxy::TEvResolveHostRequest, Handle);
HFunc(TEvHttpProxy::TEvReportSensors, Handle);
HFunc(NActors::TEvents::TEvPoison, Handle);
@@ -97,6 +99,19 @@ protected:
}
void Handle(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
+ if (event->Get()->AllowConnectionReuse) {
+ auto destination = event->Get()->Request->GetDestination();
+ auto itAvailableConnection = AvailableConnections.find(destination);
+ if (itAvailableConnection != AvailableConnections.end()) {
+ TActorId availableConnection = itAvailableConnection->second;
+ ALOG_DEBUG(HttpLog, "Reusing connection " << availableConnection << " for destination " << destination);
+ AvailableConnections.erase(itAvailableConnection);
+ ctx.Send(event->Forward(availableConnection));
+ return;
+ } else {
+ ALOG_DEBUG(HttpLog, "Creating a new connection for destination " << destination);
+ }
+ }
bool secure(event->Get()->Request->Secure);
NActors::IActor* actor = AddOutgoingConnection(secure, ctx);
ctx.Send(event->Forward(actor->SelfId()));
@@ -115,8 +130,21 @@ protected:
}
}
- void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) {
+ void Handle(TEvHttpProxy::TEvHttpOutgoingConnectionAvailable::TPtr event, const NActors::TActorContext&) {
+ ALOG_DEBUG(HttpLog, "Connection " << event->Get()->ConnectionID << " available for destination " << event->Get()->Destination);
+ AvailableConnections.emplace(event->Get()->Destination, event->Get()->ConnectionID);
+ }
+
+ void Handle(TEvHttpProxy::TEvHttpOutgoingConnectionClosed::TPtr event, const NActors::TActorContext&) {
+ ALOG_DEBUG(HttpLog, "Connection closed " << event->Get()->ConnectionID);
Connections.erase(event->Get()->ConnectionID);
+ auto range = AvailableConnections.equal_range(event->Get()->Destination);
+ for (auto it = range.first; it != range.second; ++it) {
+ if (it->second == event->Get()->ConnectionID) {
+ AvailableConnections.erase(it);
+ break;
+ }
+ }
}
void Handle(TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) {
@@ -258,6 +286,7 @@ protected:
THashMap<TString, THostEntry> Hosts;
THashMap<TString, TActorId> Handlers;
THashSet<TActorId> Connections; // outgoing
+ std::unordered_multimap<TString, TActorId> AvailableConnections;
std::weak_ptr<NMonitoring::TMetricRegistry> Registry;
};
diff --git a/ydb/library/actors/http/http_proxy.h b/ydb/library/actors/http/http_proxy.h
index 4c8fed774d2..031a658144f 100644
--- a/ydb/library/actors/http/http_proxy.h
+++ b/ydb/library/actors/http/http_proxy.h
@@ -46,8 +46,9 @@ struct TEvHttpProxy {
EvHttpOutgoingRequest,
EvHttpIncomingResponse,
EvHttpOutgoingResponse,
- EvHttpConnectionOpened,
- EvHttpConnectionClosed,
+ EvHttpIncomingConnectionClosed,
+ EvHttpOutgoingConnectionClosed,
+ EvHttpOutgoingConnectionAvailable,
EvHttpAcceptorClosed,
EvResolveHostRequest,
EvResolveHostResponse,
@@ -114,6 +115,7 @@ struct TEvHttpProxy {
struct TEvHttpOutgoingRequest : NActors::TEventLocal<TEvHttpOutgoingRequest, EvHttpOutgoingRequest> {
THttpOutgoingRequestPtr Request;
TDuration Timeout;
+ bool AllowConnectionReuse = false;
TEvHttpOutgoingRequest(THttpOutgoingRequestPtr request)
: Request(std::move(request))
@@ -123,6 +125,11 @@ struct TEvHttpProxy {
: Request(std::move(request))
, Timeout(timeout)
{}
+
+ TEvHttpOutgoingRequest(THttpOutgoingRequestPtr request, bool allowConnectionReuse)
+ : Request(std::move(request))
+ , AllowConnectionReuse(allowConnectionReuse)
+ {}
};
struct TEvHttpIncomingResponse : NActors::TEventLocal<TEvHttpIncomingResponse, EvHttpIncomingResponse> {
@@ -172,27 +179,33 @@ struct TEvHttpProxy {
{}
};
- struct TEvHttpConnectionOpened : NActors::TEventLocal<TEvHttpConnectionOpened, EvHttpConnectionOpened> {
- TString PeerAddress;
+ struct TEvHttpIncomingConnectionClosed : NActors::TEventLocal<TEvHttpIncomingConnectionClosed, EvHttpIncomingConnectionClosed> {
TActorId ConnectionID;
+ TDeque<THttpIncomingRequestPtr> RecycledRequests;
- TEvHttpConnectionOpened(const TString& peerAddress, const TActorId& connectionID)
- : PeerAddress(peerAddress)
- , ConnectionID(connectionID)
+ TEvHttpIncomingConnectionClosed(const TActorId& connectionID, TDeque<THttpIncomingRequestPtr> recycledRequests)
+ : ConnectionID(connectionID)
+ , RecycledRequests(std::move(recycledRequests))
{}
};
- struct TEvHttpConnectionClosed : NActors::TEventLocal<TEvHttpConnectionClosed, EvHttpConnectionClosed> {
+ struct TEvHttpOutgoingConnectionClosed : NActors::TEventLocal<TEvHttpOutgoingConnectionClosed, EvHttpOutgoingConnectionClosed> {
TActorId ConnectionID;
- TDeque<THttpIncomingRequestPtr> RecycledRequests;
+ TString Destination;
- TEvHttpConnectionClosed(const TActorId& connectionID)
+ TEvHttpOutgoingConnectionClosed(const TActorId& connectionID, const TString& destination)
: ConnectionID(connectionID)
+ , Destination(destination)
{}
+ };
- TEvHttpConnectionClosed(const TActorId& connectionID, TDeque<THttpIncomingRequestPtr> recycledRequests)
+ struct TEvHttpOutgoingConnectionAvailable : NActors::TEventLocal<TEvHttpOutgoingConnectionAvailable, EvHttpOutgoingConnectionAvailable> {
+ TActorId ConnectionID;
+ TString Destination;
+
+ TEvHttpOutgoingConnectionAvailable(const TActorId& connectionID, const TString& destination)
: ConnectionID(connectionID)
- , RecycledRequests(std::move(recycledRequests))
+ , Destination(destination)
{}
};
diff --git a/ydb/library/actors/http/http_proxy_acceptor.cpp b/ydb/library/actors/http/http_proxy_acceptor.cpp
index a397d1b9bc4..3b69eafa217 100644
--- a/ydb/library/actors/http/http_proxy_acceptor.cpp
+++ b/ydb/library/actors/http/http_proxy_acceptor.cpp
@@ -30,7 +30,7 @@ protected:
switch (ev->GetTypeRewrite()) {
HFunc(NActors::TEvPollerRegisterResult, Handle);
HFunc(NActors::TEvPollerReady, Handle);
- HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle);
+ HFunc(TEvHttpProxy::TEvHttpIncomingConnectionClosed, Handle);
HFunc(TEvHttpProxy::TEvReportSensors, Handle);
}
}
@@ -143,7 +143,7 @@ protected:
}
}
- void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) {
+ void Handle(TEvHttpProxy::TEvHttpIncomingConnectionClosed::TPtr event, const NActors::TActorContext&) {
Connections.erase(event->Get()->ConnectionID);
for (auto& req : event->Get()->RecycledRequests) {
if (RecycledRequests.size() >= MaxRecycledRequestsCount) {
diff --git a/ydb/library/actors/http/http_proxy_incoming.cpp b/ydb/library/actors/http/http_proxy_incoming.cpp
index 1205dcecefe..a01b97e4aa1 100644
--- a/ydb/library/actors/http/http_proxy_incoming.cpp
+++ b/ydb/library/actors/http/http_proxy_incoming.cpp
@@ -62,7 +62,7 @@ public:
}
void Die(const TActorContext& ctx) override {
- ctx.Send(Endpoint->Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID, std::move(RecycledRequests)));
+ ctx.Send(Endpoint->Owner, new TEvHttpProxy::TEvHttpIncomingConnectionClosed(ctx.SelfID, std::move(RecycledRequests)));
TSocketImpl::Shutdown();
TBase::Die(ctx);
}
@@ -218,7 +218,7 @@ protected:
THttpIncomingRequestPtr request = response->GetRequest();
LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- ("
<< response->Status << " " << response->Message << (response->IsDone() ? ")" : ") (incomplete)"));
- if (!response->Status.StartsWith('2') && response->Status != "404") {
+ if (!response->Status.StartsWith('2') && !response->Status.StartsWith('3') && response->Status != "404") {
static constexpr size_t MAX_LOGGED_SIZE = 1024;
LOG_DEBUG_S(ctx, HttpLog,
"(#"
diff --git a/ydb/library/actors/http/http_proxy_outgoing.cpp b/ydb/library/actors/http/http_proxy_outgoing.cpp
index b1f27c2c5a1..08f77ba4dd9 100644
--- a/ydb/library/actors/http/http_proxy_outgoing.cpp
+++ b/ydb/library/actors/http/http_proxy_outgoing.cpp
@@ -11,11 +11,13 @@ public:
const TActorId Owner;
const TActorId Poller;
SocketAddressType Address;
+ TString Destination;
TActorId RequestOwner;
THttpOutgoingRequestPtr Request;
THttpIncomingResponsePtr Response;
TInstant LastActivity;
TDuration ConnectionTimeout = CONNECTION_TIMEOUT;
+ bool AllowConnectionReuse = false;
NActors::TPollerToken::TPtr PollerToken;
TOutgoingConnectionActor(const TActorId& owner, const TActorId& poller)
@@ -28,7 +30,7 @@ public:
static constexpr char ActorName[] = "OUT_CONNECTION_ACTOR";
void Die(const NActors::TActorContext& ctx) override {
- ctx.Send(Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID));
+ ctx.Send(Owner, new TEvHttpProxy::TEvHttpOutgoingConnectionClosed(ctx.SelfID, Destination));
TSocketImpl::Shutdown(); // to avoid errors when connection already closed
TBase::Die(ctx);
}
@@ -51,8 +53,13 @@ public:
RequestOwner = TActorId();
THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response));
ctx.Send(Owner, sensors.Release());
- LOG_DEBUG_S(ctx, HttpLog, GetSocketName() << "connection closed");
- Die(ctx);
+ if (!AllowConnectionReuse || Response->IsConnectionClose()) {
+ LOG_DEBUG_S(ctx, HttpLog, GetSocketName() << "connection closed");
+ Die(ctx);
+ } else {
+ LOG_DEBUG_S(ctx, HttpLog, GetSocketName() << "connection available for reuse");
+ ctx.Send(Owner, new TEvHttpProxy::TEvHttpOutgoingConnectionAvailable(ctx.SelfID, Destination));
+ }
}
void ReplyErrorAndDie(const NActors::TActorContext& ctx, const TString& error) {
@@ -242,6 +249,7 @@ protected:
void HandleWaiting(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
LastActivity = ctx.Now();
Request = std::move(event->Get()->Request);
+ Destination = Request->GetDestination();
TSocketImpl::SetHost(TString(Request->Host));
LOG_DEBUG_S(ctx, HttpLog, GetSocketName() << "resolving " << TSocketImpl::Host);
Request->Timer.Reset();
@@ -250,11 +258,29 @@ protected:
if (event->Get()->Timeout) {
ConnectionTimeout = event->Get()->Timeout;
}
+ AllowConnectionReuse = event->Get()->AllowConnectionReuse;
ctx.Schedule(ConnectionTimeout, new NActors::TEvents::TEvWakeup());
LastActivity = ctx.Now();
TBase::Become(&TOutgoingConnectionActor::StateResolving);
}
+ void HandleConnected(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
+ LastActivity = ctx.Now();
+ Request = std::move(event->Get()->Request);
+ Request->Timer.Reset();
+ Response = nullptr;
+ RequestOwner = event->Sender;
+ if (event->Get()->Timeout) {
+ ConnectionTimeout = event->Get()->Timeout;
+ }
+ AllowConnectionReuse = event->Get()->AllowConnectionReuse;
+ ctx.Schedule(ConnectionTimeout, new NActors::TEvents::TEvWakeup());
+ LastActivity = ctx.Now();
+ LOG_DEBUG_S(ctx, HttpLog, GetSocketName() << "<- (" << Request->Method << " " << Request->URL << ")");
+ FlushOutput(ctx);
+ PullInput(ctx);
+ }
+
void HandleConnected(NActors::TEvPollerReady::TPtr event, const NActors::TActorContext& ctx) {
LastActivity = ctx.Now();
if (event->Get()->Write && RequestOwner) {
@@ -314,6 +340,7 @@ protected:
HFunc(NActors::TEvPollerReady, HandleConnected);
CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
HFunc(NActors::TEvPollerRegisterResult, HandleConnected);
+ HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleConnected);
}
}
diff --git a/ydb/library/actors/http/http_ut.cpp b/ydb/library/actors/http/http_ut.cpp
index d852e051237..2af61480ed6 100644
--- a/ydb/library/actors/http/http_ut.cpp
+++ b/ydb/library/actors/http/http_ut.cpp
@@ -701,4 +701,52 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V
UNIT_ASSERT_EQUAL(response->Response->Status, "200");
UNIT_ASSERT_EQUAL(response->Response->Body, "passed");
}
+
+ Y_UNIT_TEST(RequestAfter307) {
+ NActors::TTestActorRuntimeBase actorSystem;
+ TPortManager portManager;
+ TIpPort port = portManager.GetTcpPort();
+ TAutoPtr<NActors::IEventHandle> handle;
+ actorSystem.Initialize();
+ actorSystem.SetLogPriority(NActorsServices::HTTP, NActors::NLog::PRI_TRACE);
+
+ NActors::IActor* proxy = NHttp::CreateHttpProxy();
+ NActors::TActorId proxyId = actorSystem.Register(proxy);
+ actorSystem.Send(new NActors::IEventHandle(proxyId, actorSystem.AllocateEdgeActor(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true);
+ actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvConfirmListen>(handle);
+
+ NActors::TActorId serverId = actorSystem.AllocateEdgeActor();
+ actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test1", serverId)), 0, true);
+ actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test2", serverId)), 0, true);
+
+ NActors::TActorId clientId = actorSystem.AllocateEdgeActor();
+ NHttp::THttpOutgoingRequestPtr httpRequest1 = NHttp::THttpOutgoingRequest::CreateRequestGet("http://127.0.0.1:" + ToString(port) + "/test1");
+ actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest1, true)), 0, true);
+
+ NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request1 = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle);
+
+ UNIT_ASSERT_EQUAL(request1->Request->URL, "/test1");
+
+ NHttp::THttpOutgoingResponsePtr httpResponse1 = request1->Request->CreateResponseString("HTTP/1.1 307 Temporary Redirect\r\nLocation: /test2\r\n\r\n");
+ actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse1)), 0, true);
+
+ NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response1 = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);
+
+ UNIT_ASSERT_EQUAL(response1->Response->Status, "307");
+
+ auto location = NHttp::THeaders(response1->Response->Headers)["Location"];
+ NHttp::THttpOutgoingRequestPtr httpRequest2 = NHttp::THttpOutgoingRequest::CreateRequestGet("http://127.0.0.1:" + ToString(port) + location);
+ actorSystem.Send(new NActors::IEventHandle(proxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest2, true)), 0, true);
+
+ NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request2 = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle);
+
+ UNIT_ASSERT_EQUAL(request2->Request->URL, "/test2");
+
+ NHttp::THttpOutgoingResponsePtr httpResponse2 = request2->Request->CreateResponseString("HTTP/1.1 200 Ok\r\nContent-Length: 0\r\n\r\n");
+ actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse2)), 0, true);
+
+ NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response2 = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);
+
+ UNIT_ASSERT_EQUAL(response2->Response->Status, "200");
+ }
}