diff options
author | Alexey Efimov <xeno@prnwatch.com> | 2022-02-10 16:49:41 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:41 +0300 |
commit | 26e0e4fb5e5cd6b4d7f4c21f9fcd7978891bf946 (patch) | |
tree | d34555f21d4d9f94f84d460e55b77d7eb41a953c /library/cpp/actors/http/http_proxy_outgoing.cpp | |
parent | ca3252a147a429eac4ba8221857493c58dcd09b5 (diff) | |
download | ydb-26e0e4fb5e5cd6b4d7f4c21f9fcd7978891bf946.tar.gz |
Restoring authorship annotation for Alexey Efimov <xeno@prnwatch.com>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/http/http_proxy_outgoing.cpp')
-rw-r--r-- | library/cpp/actors/http/http_proxy_outgoing.cpp | 450 |
1 files changed, 225 insertions, 225 deletions
diff --git a/library/cpp/actors/http/http_proxy_outgoing.cpp b/library/cpp/actors/http/http_proxy_outgoing.cpp index d9189dba8a..5b3d07c614 100644 --- a/library/cpp/actors/http/http_proxy_outgoing.cpp +++ b/library/cpp/actors/http/http_proxy_outgoing.cpp @@ -1,92 +1,92 @@ -#include "http_proxy.h" -#include "http_proxy_sock_impl.h" - -namespace NHttp { - -template <typename TSocketImpl> -class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig { -public: - using TBase = NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>; - using TSelf = TOutgoingConnectionActor<TSocketImpl>; +#include "http_proxy.h" +#include "http_proxy_sock_impl.h" + +namespace NHttp { + +template <typename TSocketImpl> +class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig { +public: + using TBase = NActors::TActor<TOutgoingConnectionActor<TSocketImpl>>; + using TSelf = TOutgoingConnectionActor<TSocketImpl>; const TActorId Owner; const TActorId Poller; - SocketAddressType Address; - TString Host; + SocketAddressType Address; + TString Host; TActorId RequestOwner; - THttpOutgoingRequestPtr Request; - THttpIncomingResponsePtr Response; - TInstant LastActivity; - TDuration ConnectionTimeout = CONNECTION_TIMEOUT; + THttpOutgoingRequestPtr Request; + THttpIncomingResponsePtr Response; + TInstant LastActivity; + TDuration ConnectionTimeout = CONNECTION_TIMEOUT; NActors::TPollerToken::TPtr PollerToken; - + TOutgoingConnectionActor(const TActorId& owner, const TString& host, const TActorId& poller) - : TBase(&TSelf::StateWaiting) - , Owner(owner) - , Poller(poller) - , Host(host) - { - TSocketImpl::SetNonBlock(); - TSocketImpl::SetTimeout(SOCKET_TIMEOUT); - } - + : TBase(&TSelf::StateWaiting) + , Owner(owner) + , Poller(poller) + , Host(host) + { + TSocketImpl::SetNonBlock(); + TSocketImpl::SetTimeout(SOCKET_TIMEOUT); + } + void Die(const NActors::TActorContext& ctx) override { - ctx.Send(Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID)); - TSocketImpl::Shutdown(); // to avoid errors when connection already closed - TBase::Die(ctx); - } - - void ReplyAndDie(const NActors::TActorContext& ctx) { - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -> (" << Response->Status << " " << Response->Message << ")"); - ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response)); + ctx.Send(Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID)); + TSocketImpl::Shutdown(); // to avoid errors when connection already closed + TBase::Die(ctx); + } + + void ReplyAndDie(const NActors::TActorContext& ctx) { + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -> (" << Response->Status << " " << Response->Message << ")"); + ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response)); RequestOwner = TActorId(); THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response)); - ctx.Send(Owner, sensors.Release()); - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed"); - Die(ctx); - } - - void ReplyErrorAndDie(const NActors::TActorContext& ctx, const TString& error) { - LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed with error: " << error); - if (RequestOwner) { - ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response, error)); + ctx.Send(Owner, sensors.Release()); + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed"); + Die(ctx); + } + + void ReplyErrorAndDie(const NActors::TActorContext& ctx, const TString& error) { + LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed with error: " << error); + if (RequestOwner) { + ctx.Send(RequestOwner, new TEvHttpProxy::TEvHttpIncomingResponse(Request, Response, error)); RequestOwner = TActorId(); THolder<TEvHttpProxy::TEvReportSensors> sensors(BuildOutgoingRequestSensors(Request, Response)); - ctx.Send(Owner, sensors.Release()); - Die(ctx); - } - } - -protected: - void FailConnection(const NActors::TActorContext& ctx, const TString& error) { - if (Request) { - return ReplyErrorAndDie(ctx, error); - } - return TBase::Become(&TOutgoingConnectionActor::StateFailed); - } - - void Connect(const NActors::TActorContext& ctx) { - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connecting"); - int res = TSocketImpl::Connect(Address); - RegisterPoller(ctx); - switch (-res) { - case 0: - return OnConnect(ctx); - case EINPROGRESS: - case EAGAIN: - return TBase::Become(&TOutgoingConnectionActor::StateConnecting); - default: - return ReplyErrorAndDie(ctx, strerror(-res)); - } - } - - void FlushOutput(const NActors::TActorContext& ctx) { - if (Request != nullptr) { - Request->Finish(); + ctx.Send(Owner, sensors.Release()); + Die(ctx); + } + } + +protected: + void FailConnection(const NActors::TActorContext& ctx, const TString& error) { + if (Request) { + return ReplyErrorAndDie(ctx, error); + } + return TBase::Become(&TOutgoingConnectionActor::StateFailed); + } + + void Connect(const NActors::TActorContext& ctx) { + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connecting"); + int res = TSocketImpl::Connect(Address); + RegisterPoller(ctx); + switch (-res) { + case 0: + return OnConnect(ctx); + case EINPROGRESS: + case EAGAIN: + return TBase::Become(&TOutgoingConnectionActor::StateConnecting); + default: + return ReplyErrorAndDie(ctx, strerror(-res)); + } + } + + void FlushOutput(const NActors::TActorContext& ctx) { + if (Request != nullptr) { + Request->Finish(); while (auto size = Request->Size()) { bool read = false, write = false; ssize_t res = TSocketImpl::Send(Request->Data(), size, read, write); - if (res > 0) { - Request->ChopHead(res); + if (res > 0) { + Request->ChopHead(res); } else if (-res == EINTR) { continue; } else if (-res == EAGAIN || -res == EWOULDBLOCK) { @@ -97,30 +97,30 @@ protected: PollerToken->Request(read, write); } break; - } else { + } else { if (!res) { - ReplyAndDie(ctx); + ReplyAndDie(ctx); } else { - ReplyErrorAndDie(ctx, strerror(-res)); - } + ReplyErrorAndDie(ctx, strerror(-res)); + } break; - } - } - } - } - - void PullInput(const NActors::TActorContext& ctx) { + } + } + } + } + + void PullInput(const NActors::TActorContext& ctx) { for (;;) { - if (Response == nullptr) { - Response = new THttpIncomingResponse(Request); - } - if (!Response->EnsureEnoughSpaceAvailable()) { - return ReplyErrorAndDie(ctx, "Not enough space in socket buffer"); - } + if (Response == nullptr) { + Response = new THttpIncomingResponse(Request); + } + if (!Response->EnsureEnoughSpaceAvailable()) { + return ReplyErrorAndDie(ctx, "Not enough space in socket buffer"); + } bool read = false, write = false; ssize_t res = TSocketImpl::Recv(Response->Pos(), Response->Avail(), read, write); - if (res > 0) { - Response->Advance(res); + if (res > 0) { + Response->Advance(res); if (Response->IsDone() && Response->IsReady()) { return ReplyAndDie(ctx); } @@ -130,169 +130,169 @@ protected: if (PollerToken) { if (!read && !write) { read = true; - } + } PollerToken->Request(read, write); - } + } return; - } else { + } else { if (!res) { - Response->ConnectionClosed(); - } + Response->ConnectionClosed(); + } if (Response->IsDone() && Response->IsReady()) { return ReplyAndDie(ctx); } return ReplyErrorAndDie(ctx, strerror(-res)); - } + } } - } - - void RegisterPoller(const NActors::TActorContext& ctx) { + } + + void RegisterPoller(const NActors::TActorContext& ctx) { ctx.Send(Poller, new NActors::TEvPollerRegister(TSocketImpl::Socket, ctx.SelfID, ctx.SelfID)); - } - - void OnConnect(const NActors::TActorContext& ctx) { + } + + void OnConnect(const NActors::TActorContext& ctx) { bool read = false, write = false; if (int res = TSocketImpl::OnConnect(read, write); res != 1) { if (-res == EAGAIN) { if (PollerToken) { PollerToken->Request(read, write); } - return; + return; } else { return ReplyErrorAndDie(ctx, strerror(-res)); - } - } - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") outgoing connection opened"); - TBase::Become(&TOutgoingConnectionActor::StateConnected); - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- (" << Request->Method << " " << Request->URL << ")"); + } + } + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") outgoing connection opened"); + TBase::Become(&TOutgoingConnectionActor::StateConnected); + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- (" << Request->Method << " " << Request->URL << ")"); ctx.Send(ctx.SelfID, new NActors::TEvPollerReady(nullptr, true, true)); - } - - void HandleResolving(TEvHttpProxy::TEvResolveHostResponse::TPtr event, const NActors::TActorContext& ctx) { - LastActivity = ctx.Now(); - if (!event->Get()->Error.empty()) { - return FailConnection(ctx, event->Get()->Error); - } - Address = event->Get()->Address; - if (Address.GetPort() == 0) { - Address.SetPort(Request->Secure ? 443 : 80); - } - Connect(ctx); - } - + } + + void HandleResolving(TEvHttpProxy::TEvResolveHostResponse::TPtr event, const NActors::TActorContext& ctx) { + LastActivity = ctx.Now(); + if (!event->Get()->Error.empty()) { + return FailConnection(ctx, event->Get()->Error); + } + Address = event->Get()->Address; + if (Address.GetPort() == 0) { + Address.SetPort(Request->Secure ? 443 : 80); + } + Connect(ctx); + } + void HandleConnecting(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) { - LastActivity = ctx.Now(); - int res = TSocketImpl::GetError(); - if (res == 0) { - OnConnect(ctx); - } else { - FailConnection(ctx, TStringBuilder() << strerror(res)); - } - } - + LastActivity = ctx.Now(); + int res = TSocketImpl::GetError(); + if (res == 0) { + OnConnect(ctx); + } else { + FailConnection(ctx, TStringBuilder() << strerror(res)); + } + } + void HandleConnecting(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) { PollerToken = std::move(ev->Get()->PollerToken); - LastActivity = ctx.Now(); - int res = TSocketImpl::GetError(); - if (res == 0) { - OnConnect(ctx); - } else { - FailConnection(ctx, TStringBuilder() << strerror(res)); - } - } - - void HandleWaiting(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { - LastActivity = ctx.Now(); - Request = std::move(event->Get()->Request); - Host = Request->Host; - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << ") resolving " << Host); - Request->Timer.Reset(); - RequestOwner = event->Sender; - ctx.Send(Owner, new TEvHttpProxy::TEvResolveHostRequest(Host)); - if (event->Get()->Timeout) { - ConnectionTimeout = event->Get()->Timeout; - TSocketImpl::SetTimeout(ConnectionTimeout); - } - ctx.Schedule(ConnectionTimeout, new NActors::TEvents::TEvWakeup()); - LastActivity = ctx.Now(); - TBase::Become(&TOutgoingConnectionActor::StateResolving); - } - + LastActivity = ctx.Now(); + int res = TSocketImpl::GetError(); + if (res == 0) { + OnConnect(ctx); + } else { + FailConnection(ctx, TStringBuilder() << strerror(res)); + } + } + + void HandleWaiting(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { + LastActivity = ctx.Now(); + Request = std::move(event->Get()->Request); + Host = Request->Host; + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << ") resolving " << Host); + Request->Timer.Reset(); + RequestOwner = event->Sender; + ctx.Send(Owner, new TEvHttpProxy::TEvResolveHostRequest(Host)); + if (event->Get()->Timeout) { + ConnectionTimeout = event->Get()->Timeout; + TSocketImpl::SetTimeout(ConnectionTimeout); + } + ctx.Schedule(ConnectionTimeout, new NActors::TEvents::TEvWakeup()); + LastActivity = ctx.Now(); + TBase::Become(&TOutgoingConnectionActor::StateResolving); + } + void HandleConnected(NActors::TEvPollerReady::TPtr event, const NActors::TActorContext& ctx) { - LastActivity = ctx.Now(); + LastActivity = ctx.Now(); if (event->Get()->Read) { PullInput(ctx); - } + } if (event->Get()->Write) { FlushOutput(ctx); } - } - + } + void HandleConnected(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) { PollerToken = std::move(ev->Get()->PollerToken); - LastActivity = ctx.Now(); + LastActivity = ctx.Now(); PullInput(ctx); - FlushOutput(ctx); - } - - void HandleFailed(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { - Request = std::move(event->Get()->Request); - RequestOwner = event->Sender; - ReplyErrorAndDie(ctx, "Failed"); - } - - void HandleTimeout(const NActors::TActorContext& ctx) { - TDuration inactivityTime = ctx.Now() - LastActivity; - if (inactivityTime >= ConnectionTimeout) { - FailConnection(ctx, "Connection timed out"); - } else { - ctx.Schedule(Min(ConnectionTimeout - inactivityTime, TDuration::MilliSeconds(100)), new NActors::TEvents::TEvWakeup()); - } - } - - STFUNC(StateWaiting) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleWaiting); - CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); - } - } - - STFUNC(StateResolving) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvHttpProxy::TEvResolveHostResponse, HandleResolving); - CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); - } - } - - STFUNC(StateConnecting) { - switch (ev->GetTypeRewrite()) { + FlushOutput(ctx); + } + + void HandleFailed(TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) { + Request = std::move(event->Get()->Request); + RequestOwner = event->Sender; + ReplyErrorAndDie(ctx, "Failed"); + } + + void HandleTimeout(const NActors::TActorContext& ctx) { + TDuration inactivityTime = ctx.Now() - LastActivity; + if (inactivityTime >= ConnectionTimeout) { + FailConnection(ctx, "Connection timed out"); + } else { + ctx.Schedule(Min(ConnectionTimeout - inactivityTime, TDuration::MilliSeconds(100)), new NActors::TEvents::TEvWakeup()); + } + } + + STFUNC(StateWaiting) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleWaiting); + CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); + } + } + + STFUNC(StateResolving) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvHttpProxy::TEvResolveHostResponse, HandleResolving); + CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); + } + } + + STFUNC(StateConnecting) { + switch (ev->GetTypeRewrite()) { HFunc(NActors::TEvPollerReady, HandleConnecting); - CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); + CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); HFunc(NActors::TEvPollerRegisterResult, HandleConnecting); - } - } - - STFUNC(StateConnected) { - switch (ev->GetTypeRewrite()) { + } + } + + STFUNC(StateConnected) { + switch (ev->GetTypeRewrite()) { HFunc(NActors::TEvPollerReady, HandleConnected); - CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); + CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); HFunc(NActors::TEvPollerRegisterResult, HandleConnected); - } - } - - STFUNC(StateFailed) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleFailed); - } - } -}; - + } + } + + STFUNC(StateFailed) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvHttpProxy::TEvHttpOutgoingRequest, HandleFailed); + } + } +}; + NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, const TString& host, bool secure, const TActorId& poller) { - if (secure) { - return new TOutgoingConnectionActor<TSecureSocketImpl>(owner, host, poller); - } else { - return new TOutgoingConnectionActor<TPlainSocketImpl>(owner, host, poller); - } -} - -} + if (secure) { + return new TOutgoingConnectionActor<TSecureSocketImpl>(owner, host, poller); + } else { + return new TOutgoingConnectionActor<TPlainSocketImpl>(owner, host, poller); + } +} + +} |