aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/http/http_proxy_outgoing.cpp
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@mail.ru>2022-02-10 16:47:39 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:39 +0300
commitf3646f91e0de459836a7800b9ce3e8dc57a2ab3a (patch)
tree25c1423200152570c1f8307e5b8304b9bc3840c5 /library/cpp/actors/http/http_proxy_outgoing.cpp
parentfccc62e9bfdce9be2fe7e0f23479da3a5512211a (diff)
downloadydb-f3646f91e0de459836a7800b9ce3e8dc57a2ab3a.tar.gz
Restoring authorship annotation for Alexander Rutkovsky <alexvru@mail.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/http/http_proxy_outgoing.cpp')
-rw-r--r--library/cpp/actors/http/http_proxy_outgoing.cpp126
1 files changed, 63 insertions, 63 deletions
diff --git a/library/cpp/actors/http/http_proxy_outgoing.cpp b/library/cpp/actors/http/http_proxy_outgoing.cpp
index d9189dba8a..e2a8c2c433 100644
--- a/library/cpp/actors/http/http_proxy_outgoing.cpp
+++ b/library/cpp/actors/http/http_proxy_outgoing.cpp
@@ -17,7 +17,7 @@ public:
THttpIncomingResponsePtr Response;
TInstant LastActivity;
TDuration ConnectionTimeout = CONNECTION_TIMEOUT;
- NActors::TPollerToken::TPtr PollerToken;
+ NActors::TPollerToken::TPtr PollerToken;
TOutgoingConnectionActor(const TActorId& owner, const TString& host, const TActorId& poller)
: TBase(&TSelf::StateWaiting)
@@ -82,90 +82,90 @@ protected:
void FlushOutput(const NActors::TActorContext& ctx) {
if (Request != nullptr) {
Request->Finish();
- while (auto size = Request->Size()) {
- bool read = false, write = false;
- ssize_t res = TSocketImpl::Send(Request->Data(), size, read, write);
+ while (auto size = Request->Size()) {
+ bool read = false, write = false;
+ ssize_t res = TSocketImpl::Send(Request->Data(), size, read, write);
if (res > 0) {
Request->ChopHead(res);
- } else if (-res == EINTR) {
- continue;
- } else if (-res == EAGAIN || -res == EWOULDBLOCK) {
- if (PollerToken) {
- if (!read && !write) {
- write = true;
- }
- PollerToken->Request(read, write);
- }
- break;
+ } else if (-res == EINTR) {
+ continue;
+ } else if (-res == EAGAIN || -res == EWOULDBLOCK) {
+ if (PollerToken) {
+ if (!read && !write) {
+ write = true;
+ }
+ PollerToken->Request(read, write);
+ }
+ break;
} else {
- if (!res) {
+ if (!res) {
ReplyAndDie(ctx);
- } else {
+ } else {
ReplyErrorAndDie(ctx, strerror(-res));
}
- break;
+ break;
}
}
}
}
void PullInput(const NActors::TActorContext& ctx) {
- for (;;) {
+ for (;;) {
if (Response == nullptr) {
Response = new THttpIncomingResponse(Request);
}
if (!Response->EnsureEnoughSpaceAvailable()) {
return ReplyErrorAndDie(ctx, "Not enough space in socket buffer");
}
- bool read = false, write = false;
- ssize_t res = TSocketImpl::Recv(Response->Pos(), Response->Avail(), read, write);
+ bool read = false, write = false;
+ ssize_t res = TSocketImpl::Recv(Response->Pos(), Response->Avail(), read, write);
if (res > 0) {
Response->Advance(res);
- if (Response->IsDone() && Response->IsReady()) {
- return ReplyAndDie(ctx);
- }
- } else if (-res == EINTR) {
- continue;
- } else if (-res == EAGAIN || -res == EWOULDBLOCK) {
- if (PollerToken) {
- if (!read && !write) {
- read = true;
+ if (Response->IsDone() && Response->IsReady()) {
+ return ReplyAndDie(ctx);
+ }
+ } else if (-res == EINTR) {
+ continue;
+ } else if (-res == EAGAIN || -res == EWOULDBLOCK) {
+ if (PollerToken) {
+ if (!read && !write) {
+ read = true;
}
- PollerToken->Request(read, write);
+ PollerToken->Request(read, write);
}
- return;
+ return;
} else {
- if (!res) {
+ if (!res) {
Response->ConnectionClosed();
}
- if (Response->IsDone() && Response->IsReady()) {
- return ReplyAndDie(ctx);
- }
- return ReplyErrorAndDie(ctx, strerror(-res));
+ if (Response->IsDone() && Response->IsReady()) {
+ return ReplyAndDie(ctx);
+ }
+ return ReplyErrorAndDie(ctx, strerror(-res));
}
- }
+ }
}
void RegisterPoller(const NActors::TActorContext& ctx) {
- ctx.Send(Poller, new NActors::TEvPollerRegister(TSocketImpl::Socket, ctx.SelfID, ctx.SelfID));
+ ctx.Send(Poller, new NActors::TEvPollerRegister(TSocketImpl::Socket, ctx.SelfID, ctx.SelfID));
}
void OnConnect(const NActors::TActorContext& ctx) {
- bool read = false, write = false;
- if (int res = TSocketImpl::OnConnect(read, write); res != 1) {
- if (-res == EAGAIN) {
- if (PollerToken) {
- PollerToken->Request(read, write);
- }
+ bool read = false, write = false;
+ if (int res = TSocketImpl::OnConnect(read, write); res != 1) {
+ if (-res == EAGAIN) {
+ if (PollerToken) {
+ PollerToken->Request(read, write);
+ }
return;
- } else {
- return ReplyErrorAndDie(ctx, strerror(-res));
+ } else {
+ return ReplyErrorAndDie(ctx, strerror(-res));
}
}
LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") outgoing connection opened");
TBase::Become(&TOutgoingConnectionActor::StateConnected);
LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- (" << Request->Method << " " << Request->URL << ")");
- ctx.Send(ctx.SelfID, new NActors::TEvPollerReady(nullptr, true, true));
+ ctx.Send(ctx.SelfID, new NActors::TEvPollerReady(nullptr, true, true));
}
void HandleResolving(TEvHttpProxy::TEvResolveHostResponse::TPtr event, const NActors::TActorContext& ctx) {
@@ -180,7 +180,7 @@ protected:
Connect(ctx);
}
- void HandleConnecting(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) {
+ void HandleConnecting(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) {
LastActivity = ctx.Now();
int res = TSocketImpl::GetError();
if (res == 0) {
@@ -190,8 +190,8 @@ protected:
}
}
- void HandleConnecting(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) {
- PollerToken = std::move(ev->Get()->PollerToken);
+ void HandleConnecting(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) {
+ PollerToken = std::move(ev->Get()->PollerToken);
LastActivity = ctx.Now();
int res = TSocketImpl::GetError();
if (res == 0) {
@@ -218,20 +218,20 @@ protected:
TBase::Become(&TOutgoingConnectionActor::StateResolving);
}
- void HandleConnected(NActors::TEvPollerReady::TPtr event, const NActors::TActorContext& ctx) {
+ void HandleConnected(NActors::TEvPollerReady::TPtr event, const NActors::TActorContext& ctx) {
LastActivity = ctx.Now();
- if (event->Get()->Read) {
- PullInput(ctx);
- }
- if (event->Get()->Write) {
- FlushOutput(ctx);
+ if (event->Get()->Read) {
+ PullInput(ctx);
}
+ if (event->Get()->Write) {
+ FlushOutput(ctx);
+ }
}
- void HandleConnected(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) {
- PollerToken = std::move(ev->Get()->PollerToken);
+ void HandleConnected(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) {
+ PollerToken = std::move(ev->Get()->PollerToken);
LastActivity = ctx.Now();
- PullInput(ctx);
+ PullInput(ctx);
FlushOutput(ctx);
}
@@ -266,17 +266,17 @@ protected:
STFUNC(StateConnecting) {
switch (ev->GetTypeRewrite()) {
- HFunc(NActors::TEvPollerReady, HandleConnecting);
+ HFunc(NActors::TEvPollerReady, HandleConnecting);
CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
- HFunc(NActors::TEvPollerRegisterResult, HandleConnecting);
+ HFunc(NActors::TEvPollerRegisterResult, HandleConnecting);
}
}
STFUNC(StateConnected) {
switch (ev->GetTypeRewrite()) {
- HFunc(NActors::TEvPollerReady, HandleConnected);
+ HFunc(NActors::TEvPollerReady, HandleConnected);
CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout);
- HFunc(NActors::TEvPollerRegisterResult, HandleConnected);
+ HFunc(NActors::TEvPollerRegisterResult, HandleConnected);
}
}