diff options
author | Alexander Rutkovsky <alexvru@mail.ru> | 2022-02-10 16:47:39 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:39 +0300 |
commit | f3646f91e0de459836a7800b9ce3e8dc57a2ab3a (patch) | |
tree | 25c1423200152570c1f8307e5b8304b9bc3840c5 /library/cpp/actors/http/http_proxy_outgoing.cpp | |
parent | fccc62e9bfdce9be2fe7e0f23479da3a5512211a (diff) | |
download | ydb-f3646f91e0de459836a7800b9ce3e8dc57a2ab3a.tar.gz |
Restoring authorship annotation for Alexander Rutkovsky <alexvru@mail.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/http/http_proxy_outgoing.cpp')
-rw-r--r-- | library/cpp/actors/http/http_proxy_outgoing.cpp | 126 |
1 files changed, 63 insertions, 63 deletions
diff --git a/library/cpp/actors/http/http_proxy_outgoing.cpp b/library/cpp/actors/http/http_proxy_outgoing.cpp index d9189dba8a..e2a8c2c433 100644 --- a/library/cpp/actors/http/http_proxy_outgoing.cpp +++ b/library/cpp/actors/http/http_proxy_outgoing.cpp @@ -17,7 +17,7 @@ public: THttpIncomingResponsePtr Response; TInstant LastActivity; TDuration ConnectionTimeout = CONNECTION_TIMEOUT; - NActors::TPollerToken::TPtr PollerToken; + NActors::TPollerToken::TPtr PollerToken; TOutgoingConnectionActor(const TActorId& owner, const TString& host, const TActorId& poller) : TBase(&TSelf::StateWaiting) @@ -82,90 +82,90 @@ protected: void FlushOutput(const NActors::TActorContext& ctx) { if (Request != nullptr) { Request->Finish(); - while (auto size = Request->Size()) { - bool read = false, write = false; - ssize_t res = TSocketImpl::Send(Request->Data(), size, read, write); + while (auto size = Request->Size()) { + bool read = false, write = false; + ssize_t res = TSocketImpl::Send(Request->Data(), size, read, write); if (res > 0) { Request->ChopHead(res); - } else if (-res == EINTR) { - continue; - } else if (-res == EAGAIN || -res == EWOULDBLOCK) { - if (PollerToken) { - if (!read && !write) { - write = true; - } - PollerToken->Request(read, write); - } - break; + } else if (-res == EINTR) { + continue; + } else if (-res == EAGAIN || -res == EWOULDBLOCK) { + if (PollerToken) { + if (!read && !write) { + write = true; + } + PollerToken->Request(read, write); + } + break; } else { - if (!res) { + if (!res) { ReplyAndDie(ctx); - } else { + } else { ReplyErrorAndDie(ctx, strerror(-res)); } - break; + break; } } } } void PullInput(const NActors::TActorContext& ctx) { - for (;;) { + for (;;) { if (Response == nullptr) { Response = new THttpIncomingResponse(Request); } if (!Response->EnsureEnoughSpaceAvailable()) { return ReplyErrorAndDie(ctx, "Not enough space in socket buffer"); } - bool read = false, write = false; - ssize_t res = TSocketImpl::Recv(Response->Pos(), Response->Avail(), read, write); + bool read = false, write = false; + ssize_t res = TSocketImpl::Recv(Response->Pos(), Response->Avail(), read, write); if (res > 0) { Response->Advance(res); - if (Response->IsDone() && Response->IsReady()) { - return ReplyAndDie(ctx); - } - } else if (-res == EINTR) { - continue; - } else if (-res == EAGAIN || -res == EWOULDBLOCK) { - if (PollerToken) { - if (!read && !write) { - read = true; + if (Response->IsDone() && Response->IsReady()) { + return ReplyAndDie(ctx); + } + } else if (-res == EINTR) { + continue; + } else if (-res == EAGAIN || -res == EWOULDBLOCK) { + if (PollerToken) { + if (!read && !write) { + read = true; } - PollerToken->Request(read, write); + PollerToken->Request(read, write); } - return; + return; } else { - if (!res) { + if (!res) { Response->ConnectionClosed(); } - if (Response->IsDone() && Response->IsReady()) { - return ReplyAndDie(ctx); - } - return ReplyErrorAndDie(ctx, strerror(-res)); + if (Response->IsDone() && Response->IsReady()) { + return ReplyAndDie(ctx); + } + return ReplyErrorAndDie(ctx, strerror(-res)); } - } + } } void RegisterPoller(const NActors::TActorContext& ctx) { - ctx.Send(Poller, new NActors::TEvPollerRegister(TSocketImpl::Socket, ctx.SelfID, ctx.SelfID)); + ctx.Send(Poller, new NActors::TEvPollerRegister(TSocketImpl::Socket, ctx.SelfID, ctx.SelfID)); } void OnConnect(const NActors::TActorContext& ctx) { - bool read = false, write = false; - if (int res = TSocketImpl::OnConnect(read, write); res != 1) { - if (-res == EAGAIN) { - if (PollerToken) { - PollerToken->Request(read, write); - } + bool read = false, write = false; + if (int res = TSocketImpl::OnConnect(read, write); res != 1) { + if (-res == EAGAIN) { + if (PollerToken) { + PollerToken->Request(read, write); + } return; - } else { - return ReplyErrorAndDie(ctx, strerror(-res)); + } else { + return ReplyErrorAndDie(ctx, strerror(-res)); } } LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") outgoing connection opened"); TBase::Become(&TOutgoingConnectionActor::StateConnected); LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- (" << Request->Method << " " << Request->URL << ")"); - ctx.Send(ctx.SelfID, new NActors::TEvPollerReady(nullptr, true, true)); + ctx.Send(ctx.SelfID, new NActors::TEvPollerReady(nullptr, true, true)); } void HandleResolving(TEvHttpProxy::TEvResolveHostResponse::TPtr event, const NActors::TActorContext& ctx) { @@ -180,7 +180,7 @@ protected: Connect(ctx); } - void HandleConnecting(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) { + void HandleConnecting(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) { LastActivity = ctx.Now(); int res = TSocketImpl::GetError(); if (res == 0) { @@ -190,8 +190,8 @@ protected: } } - void HandleConnecting(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) { - PollerToken = std::move(ev->Get()->PollerToken); + void HandleConnecting(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) { + PollerToken = std::move(ev->Get()->PollerToken); LastActivity = ctx.Now(); int res = TSocketImpl::GetError(); if (res == 0) { @@ -218,20 +218,20 @@ protected: TBase::Become(&TOutgoingConnectionActor::StateResolving); } - void HandleConnected(NActors::TEvPollerReady::TPtr event, const NActors::TActorContext& ctx) { + void HandleConnected(NActors::TEvPollerReady::TPtr event, const NActors::TActorContext& ctx) { LastActivity = ctx.Now(); - if (event->Get()->Read) { - PullInput(ctx); - } - if (event->Get()->Write) { - FlushOutput(ctx); + if (event->Get()->Read) { + PullInput(ctx); } + if (event->Get()->Write) { + FlushOutput(ctx); + } } - void HandleConnected(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) { - PollerToken = std::move(ev->Get()->PollerToken); + void HandleConnected(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) { + PollerToken = std::move(ev->Get()->PollerToken); LastActivity = ctx.Now(); - PullInput(ctx); + PullInput(ctx); FlushOutput(ctx); } @@ -266,17 +266,17 @@ protected: STFUNC(StateConnecting) { switch (ev->GetTypeRewrite()) { - HFunc(NActors::TEvPollerReady, HandleConnecting); + HFunc(NActors::TEvPollerReady, HandleConnecting); CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); - HFunc(NActors::TEvPollerRegisterResult, HandleConnecting); + HFunc(NActors::TEvPollerRegisterResult, HandleConnecting); } } STFUNC(StateConnected) { switch (ev->GetTypeRewrite()) { - HFunc(NActors::TEvPollerReady, HandleConnected); + HFunc(NActors::TEvPollerReady, HandleConnected); CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); - HFunc(NActors::TEvPollerRegisterResult, HandleConnected); + HFunc(NActors::TEvPollerRegisterResult, HandleConnected); } } |