diff options
author | Alexander Rutkovsky <alexvru@mail.ru> | 2022-02-10 16:47:39 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:39 +0300 |
commit | f3646f91e0de459836a7800b9ce3e8dc57a2ab3a (patch) | |
tree | 25c1423200152570c1f8307e5b8304b9bc3840c5 /library/cpp/actors/http | |
parent | fccc62e9bfdce9be2fe7e0f23479da3a5512211a (diff) | |
download | ydb-f3646f91e0de459836a7800b9ce3e8dc57a2ab3a.tar.gz |
Restoring authorship annotation for Alexander Rutkovsky <alexvru@mail.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/http')
-rw-r--r-- | library/cpp/actors/http/http.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/http/http_proxy_acceptor.cpp | 34 | ||||
-rw-r--r-- | library/cpp/actors/http/http_proxy_incoming.cpp | 218 | ||||
-rw-r--r-- | library/cpp/actors/http/http_proxy_outgoing.cpp | 126 | ||||
-rw-r--r-- | library/cpp/actors/http/http_proxy_sock_impl.h | 48 |
5 files changed, 214 insertions, 214 deletions
diff --git a/library/cpp/actors/http/http.h b/library/cpp/actors/http/http.h index 96c5c1ec48..311c817556 100644 --- a/library/cpp/actors/http/http.h +++ b/library/cpp/actors/http/http.h @@ -62,7 +62,7 @@ struct TCookies { }; struct TCookiesBuilder : TCookies { - TDeque<std::pair<TString, TString>> Data; + TDeque<std::pair<TString, TString>> Data; TCookiesBuilder(); void Set(TStringBuf name, TStringBuf data); diff --git a/library/cpp/actors/http/http_proxy_acceptor.cpp b/library/cpp/actors/http/http_proxy_acceptor.cpp index 9780541b71..e94073c418 100644 --- a/library/cpp/actors/http/http_proxy_acceptor.cpp +++ b/library/cpp/actors/http/http_proxy_acceptor.cpp @@ -10,7 +10,7 @@ public: const TActorId Owner; const TActorId Poller; TIntrusivePtr<TSocketDescriptor> Socket; - NActors::TPollerToken::TPtr PollerToken; + NActors::TPollerToken::TPtr PollerToken; THashSet<TActorId> Connections; TDeque<THttpIncomingRequestPtr> RecycledRequests; TEndpointInfo Endpoint; @@ -31,8 +31,8 @@ public: protected: STFUNC(StateListening) { switch (ev->GetTypeRewrite()) { - HFunc(NActors::TEvPollerRegisterResult, Handle); - HFunc(NActors::TEvPollerReady, Handle); + HFunc(NActors::TEvPollerRegisterResult, Handle); + HFunc(NActors::TEvPollerReady, Handle); HFunc(TEvHttpProxy::TEvHttpConnectionClosed, Handle); HFunc(TEvHttpProxy::TEvReportSensors, Handle); } @@ -70,7 +70,7 @@ protected: if (err == 0) { LOG_INFO_S(ctx, HttpLog, "Listening on " << bindAddress.ToString()); SetNonBlock(Socket->Socket); - ctx.Send(Poller, new NActors::TEvPollerRegister(Socket, SelfId(), SelfId())); + ctx.Send(Poller, new NActors::TEvPollerRegister(Socket, SelfId(), SelfId())); TBase::Become(&TAcceptorActor::StateListening); ctx.Send(event->Sender, new TEvHttpProxy::TEvConfirmListen(bindAddress), 0, event->Cookie); return; @@ -87,16 +87,16 @@ protected: } } - void Handle(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& /*ctx*/) { - PollerToken = std::move(ev->Get()->PollerToken); - PollerToken->Request(true, false); // request read polling - } - - void Handle(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) { + void Handle(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& /*ctx*/) { + PollerToken = std::move(ev->Get()->PollerToken); + PollerToken->Request(true, false); // request read polling + } + + void Handle(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) { TIntrusivePtr<TSocketDescriptor> socket = new TSocketDescriptor(); SocketAddressType addr; - int err; - while ((err = Socket->Socket.Accept(&socket->Socket, &addr)) == 0) { + int err; + while ((err = Socket->Socket.Accept(&socket->Socket, &addr)) == 0) { NActors::IActor* connectionSocket = nullptr; if (RecycledRequests.empty()) { connectionSocket = CreateIncomingConnectionActor(Endpoint, socket, addr); @@ -105,14 +105,14 @@ protected: RecycledRequests.pop_front(); } NActors::TActorId connectionId = ctx.Register(connectionSocket); - ctx.Send(Poller, new NActors::TEvPollerRegister(socket, connectionId, connectionId)); + ctx.Send(Poller, new NActors::TEvPollerRegister(socket, connectionId, connectionId)); Connections.emplace(connectionId); socket = new TSocketDescriptor(); } - if (err == -EAGAIN || err == -EWOULDBLOCK) { // request poller for further connection polling - Y_VERIFY(PollerToken); - PollerToken->Request(true, false); - } + if (err == -EAGAIN || err == -EWOULDBLOCK) { // request poller for further connection polling + Y_VERIFY(PollerToken); + PollerToken->Request(true, false); + } } void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) { diff --git a/library/cpp/actors/http/http_proxy_incoming.cpp b/library/cpp/actors/http/http_proxy_incoming.cpp index 80fe2af53d..7aca73a8ad 100644 --- a/library/cpp/actors/http/http_proxy_incoming.cpp +++ b/library/cpp/actors/http/http_proxy_incoming.cpp @@ -3,8 +3,8 @@ namespace NHttp { -using namespace NActors; - +using namespace NActors; + template <typename TSocketImpl> class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketImpl>>, public TSocketImpl, virtual public THttpConfig { public: @@ -19,12 +19,12 @@ public: THttpOutgoingResponsePtr CurrentResponse; TDeque<THttpIncomingRequestPtr> RecycledRequests; - THPTimer InactivityTimer; + THPTimer InactivityTimer; static constexpr TDuration InactivityTimeout = TDuration::Minutes(2); - TEvPollerReady* InactivityEvent = nullptr; - - TPollerToken::TPtr PollerToken; - + TEvPollerReady* InactivityEvent = nullptr; + + TPollerToken::TPtr PollerToken; + TIncomingConnectionActor( const TEndpointInfo& endpoint, TIntrusivePtr<TSocketDescriptor> socket, @@ -57,9 +57,9 @@ public: } TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) override { - return new IEventHandle(self, parent, new TEvents::TEvBootstrap()); - } - + return new IEventHandle(self, parent, new TEvents::TEvBootstrap()); + } + void Die(const TActorContext& ctx) override { ctx.Send(Endpoint.Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID, std::move(RecycledRequests))); TSocketImpl::Shutdown(); @@ -67,117 +67,117 @@ public: } protected: - void Bootstrap(const TActorContext& ctx) { - InactivityTimer.Reset(); - ctx.Schedule(InactivityTimeout, InactivityEvent = new TEvPollerReady(nullptr, false, false)); + void Bootstrap(const TActorContext& ctx) { + InactivityTimer.Reset(); + ctx.Schedule(InactivityTimeout, InactivityEvent = new TEvPollerReady(nullptr, false, false)); LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") incoming connection opened"); OnAccept(ctx); - } - + } + void OnAccept(const NActors::TActorContext& ctx) { - int res; - bool read = false, write = false; - if ((res = TSocketImpl::OnAccept(Endpoint, read, write)) != 1) { - if (-res == EAGAIN) { - if (PollerToken) { - PollerToken->Request(read, write); - } - return; // wait for further notifications - } else { - LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in Accept: " << strerror(-res)); - return Die(ctx); + int res; + bool read = false, write = false; + if ((res = TSocketImpl::OnAccept(Endpoint, read, write)) != 1) { + if (-res == EAGAIN) { + if (PollerToken) { + PollerToken->Request(read, write); + } + return; // wait for further notifications + } else { + LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in Accept: " << strerror(-res)); + return Die(ctx); } } TBase::Become(&TIncomingConnectionActor::StateConnected); - ctx.Send(ctx.SelfID, new TEvPollerReady(nullptr, true, true)); + ctx.Send(ctx.SelfID, new TEvPollerReady(nullptr, true, true)); } - void HandleAccepting(TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) { - PollerToken = std::move(ev->Get()->PollerToken); + void HandleAccepting(TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) { + PollerToken = std::move(ev->Get()->PollerToken); OnAccept(ctx); } - void HandleAccepting(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) { + void HandleAccepting(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) { OnAccept(ctx); } - void HandleConnected(TEvPollerReady::TPtr event, const TActorContext& ctx) { - if (event->Get()->Read) { - for (;;) { - if (CurrentRequest == nullptr) { - if (RecycleRequests && !RecycledRequests.empty()) { - CurrentRequest = std::move(RecycledRequests.front()); - RecycledRequests.pop_front(); - } else { - CurrentRequest = new THttpIncomingRequest(); - } - CurrentRequest->Address = Address; - CurrentRequest->WorkerName = Endpoint.WorkerName; + void HandleConnected(TEvPollerReady::TPtr event, const TActorContext& ctx) { + if (event->Get()->Read) { + for (;;) { + if (CurrentRequest == nullptr) { + if (RecycleRequests && !RecycledRequests.empty()) { + CurrentRequest = std::move(RecycledRequests.front()); + RecycledRequests.pop_front(); + } else { + CurrentRequest = new THttpIncomingRequest(); + } + CurrentRequest->Address = Address; + CurrentRequest->WorkerName = Endpoint.WorkerName; CurrentRequest->Secure = Endpoint.Secure; } - if (!CurrentRequest->EnsureEnoughSpaceAvailable()) { - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - not enough space available"); - return Die(ctx); - } - ssize_t need = CurrentRequest->Avail(); - bool read = false, write = false; - ssize_t res = TSocketImpl::Recv(CurrentRequest->Pos(), need, read, write); - if (res > 0) { - InactivityTimer.Reset(); - CurrentRequest->Advance(res); - if (CurrentRequest->IsDone()) { - Requests.emplace_back(CurrentRequest); - CurrentRequest->Timer.Reset(); - if (CurrentRequest->IsReady()) { - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -> (" << CurrentRequest->Method << " " << CurrentRequest->URL << ")"); - ctx.Send(Endpoint.Proxy, new TEvHttpProxy::TEvHttpIncomingRequest(CurrentRequest)); - CurrentRequest = nullptr; - } else if (CurrentRequest->IsError()) { - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -! (" << CurrentRequest->Method << " " << CurrentRequest->URL << ")"); + if (!CurrentRequest->EnsureEnoughSpaceAvailable()) { + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - not enough space available"); + return Die(ctx); + } + ssize_t need = CurrentRequest->Avail(); + bool read = false, write = false; + ssize_t res = TSocketImpl::Recv(CurrentRequest->Pos(), need, read, write); + if (res > 0) { + InactivityTimer.Reset(); + CurrentRequest->Advance(res); + if (CurrentRequest->IsDone()) { + Requests.emplace_back(CurrentRequest); + CurrentRequest->Timer.Reset(); + if (CurrentRequest->IsReady()) { + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -> (" << CurrentRequest->Method << " " << CurrentRequest->URL << ")"); + ctx.Send(Endpoint.Proxy, new TEvHttpProxy::TEvHttpIncomingRequest(CurrentRequest)); + CurrentRequest = nullptr; + } else if (CurrentRequest->IsError()) { + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -! (" << CurrentRequest->Method << " " << CurrentRequest->URL << ")"); bool success = Respond(CurrentRequest->CreateResponseBadRequest(), ctx); if (!success) { return; } - CurrentRequest = nullptr; - } - } - } else if (-res == EAGAIN || -res == EWOULDBLOCK) { - if (PollerToken) { - if (!read && !write) { - read = true; - } - PollerToken->Request(read, write); + CurrentRequest = nullptr; + } } + } else if (-res == EAGAIN || -res == EWOULDBLOCK) { + if (PollerToken) { + if (!read && !write) { + read = true; + } + PollerToken->Request(read, write); + } break; - } else if (-res == EINTR) { - continue; - } else if (!res) { - // connection closed + } else if (-res == EINTR) { + continue; + } else if (!res) { + // connection closed LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed"); return Die(ctx); - } else { + } else { LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in Receive: " << strerror(-res)); return Die(ctx); } } - if (event->Get() == InactivityEvent) { - const TDuration passed = TDuration::Seconds(std::abs(InactivityTimer.Passed())); - if (passed >= InactivityTimeout) { - LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed by inactivity timeout"); - return Die(ctx); // timeout - } else { - ctx.Schedule(InactivityTimeout - passed, InactivityEvent = new TEvPollerReady(nullptr, false, false)); - } - } - } - if (event->Get()->Write) { - FlushOutput(ctx); - } + if (event->Get() == InactivityEvent) { + const TDuration passed = TDuration::Seconds(std::abs(InactivityTimer.Passed())); + if (passed >= InactivityTimeout) { + LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed by inactivity timeout"); + return Die(ctx); // timeout + } else { + ctx.Schedule(InactivityTimeout - passed, InactivityEvent = new TEvPollerReady(nullptr, false, false)); + } + } + } + if (event->Get()->Write) { + FlushOutput(ctx); + } } - void HandleConnected(TEvPollerRegisterResult::TPtr ev, const TActorContext& /*ctx*/) { - PollerToken = std::move(ev->Get()->PollerToken); - PollerToken->Request(true, true); + void HandleConnected(TEvPollerRegisterResult::TPtr ev, const TActorContext& /*ctx*/) { + PollerToken = std::move(ev->Get()->PollerToken); + PollerToken->Request(true, true); } void HandleConnected(TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const TActorContext& ctx) { @@ -246,23 +246,23 @@ protected: } } } - bool read = false, write = false; - ssize_t res = TSocketImpl::Send(CurrentResponse->Data(), size, read, write); + bool read = false, write = false; + ssize_t res = TSocketImpl::Send(CurrentResponse->Data(), size, read, write); if (res > 0) { CurrentResponse->ChopHead(res); - } else if (-res == EINTR) { - continue; - } else if (-res == EAGAIN || -res == EWOULDBLOCK) { - if (PollerToken) { - if (!read && !write) { - write = true; - } - PollerToken->Request(read, write); + } else if (-res == EINTR) { + continue; + } else if (-res == EAGAIN || -res == EWOULDBLOCK) { + if (PollerToken) { + if (!read && !write) { + write = true; + } + PollerToken->Request(read, write); } break; - } else { - CleanupResponse(CurrentResponse); - LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in FlushOutput: " << strerror(-res)); + } else { + CleanupResponse(CurrentResponse); + LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in FlushOutput: " << strerror(-res)); Die(ctx); return false; } @@ -272,17 +272,17 @@ protected: STFUNC(StateAccepting) { switch (ev->GetTypeRewrite()) { - CFunc(TEvents::TEvBootstrap::EventType, Bootstrap); - HFunc(TEvPollerReady, HandleAccepting); - HFunc(TEvPollerRegisterResult, HandleAccepting); + CFunc(TEvents::TEvBootstrap::EventType, Bootstrap); + HFunc(TEvPollerReady, HandleAccepting); + HFunc(TEvPollerRegisterResult, HandleAccepting); } } STFUNC(StateConnected) { switch (ev->GetTypeRewrite()) { - HFunc(TEvPollerReady, HandleConnected); + HFunc(TEvPollerReady, HandleConnected); HFunc(TEvHttpProxy::TEvHttpOutgoingResponse, HandleConnected); - HFunc(TEvPollerRegisterResult, HandleConnected); + HFunc(TEvPollerRegisterResult, HandleConnected); } } }; diff --git a/library/cpp/actors/http/http_proxy_outgoing.cpp b/library/cpp/actors/http/http_proxy_outgoing.cpp index d9189dba8a..e2a8c2c433 100644 --- a/library/cpp/actors/http/http_proxy_outgoing.cpp +++ b/library/cpp/actors/http/http_proxy_outgoing.cpp @@ -17,7 +17,7 @@ public: THttpIncomingResponsePtr Response; TInstant LastActivity; TDuration ConnectionTimeout = CONNECTION_TIMEOUT; - NActors::TPollerToken::TPtr PollerToken; + NActors::TPollerToken::TPtr PollerToken; TOutgoingConnectionActor(const TActorId& owner, const TString& host, const TActorId& poller) : TBase(&TSelf::StateWaiting) @@ -82,90 +82,90 @@ protected: void FlushOutput(const NActors::TActorContext& ctx) { if (Request != nullptr) { Request->Finish(); - while (auto size = Request->Size()) { - bool read = false, write = false; - ssize_t res = TSocketImpl::Send(Request->Data(), size, read, write); + while (auto size = Request->Size()) { + bool read = false, write = false; + ssize_t res = TSocketImpl::Send(Request->Data(), size, read, write); if (res > 0) { Request->ChopHead(res); - } else if (-res == EINTR) { - continue; - } else if (-res == EAGAIN || -res == EWOULDBLOCK) { - if (PollerToken) { - if (!read && !write) { - write = true; - } - PollerToken->Request(read, write); - } - break; + } else if (-res == EINTR) { + continue; + } else if (-res == EAGAIN || -res == EWOULDBLOCK) { + if (PollerToken) { + if (!read && !write) { + write = true; + } + PollerToken->Request(read, write); + } + break; } else { - if (!res) { + if (!res) { ReplyAndDie(ctx); - } else { + } else { ReplyErrorAndDie(ctx, strerror(-res)); } - break; + break; } } } } void PullInput(const NActors::TActorContext& ctx) { - for (;;) { + for (;;) { if (Response == nullptr) { Response = new THttpIncomingResponse(Request); } if (!Response->EnsureEnoughSpaceAvailable()) { return ReplyErrorAndDie(ctx, "Not enough space in socket buffer"); } - bool read = false, write = false; - ssize_t res = TSocketImpl::Recv(Response->Pos(), Response->Avail(), read, write); + bool read = false, write = false; + ssize_t res = TSocketImpl::Recv(Response->Pos(), Response->Avail(), read, write); if (res > 0) { Response->Advance(res); - if (Response->IsDone() && Response->IsReady()) { - return ReplyAndDie(ctx); - } - } else if (-res == EINTR) { - continue; - } else if (-res == EAGAIN || -res == EWOULDBLOCK) { - if (PollerToken) { - if (!read && !write) { - read = true; + if (Response->IsDone() && Response->IsReady()) { + return ReplyAndDie(ctx); + } + } else if (-res == EINTR) { + continue; + } else if (-res == EAGAIN || -res == EWOULDBLOCK) { + if (PollerToken) { + if (!read && !write) { + read = true; } - PollerToken->Request(read, write); + PollerToken->Request(read, write); } - return; + return; } else { - if (!res) { + if (!res) { Response->ConnectionClosed(); } - if (Response->IsDone() && Response->IsReady()) { - return ReplyAndDie(ctx); - } - return ReplyErrorAndDie(ctx, strerror(-res)); + if (Response->IsDone() && Response->IsReady()) { + return ReplyAndDie(ctx); + } + return ReplyErrorAndDie(ctx, strerror(-res)); } - } + } } void RegisterPoller(const NActors::TActorContext& ctx) { - ctx.Send(Poller, new NActors::TEvPollerRegister(TSocketImpl::Socket, ctx.SelfID, ctx.SelfID)); + ctx.Send(Poller, new NActors::TEvPollerRegister(TSocketImpl::Socket, ctx.SelfID, ctx.SelfID)); } void OnConnect(const NActors::TActorContext& ctx) { - bool read = false, write = false; - if (int res = TSocketImpl::OnConnect(read, write); res != 1) { - if (-res == EAGAIN) { - if (PollerToken) { - PollerToken->Request(read, write); - } + bool read = false, write = false; + if (int res = TSocketImpl::OnConnect(read, write); res != 1) { + if (-res == EAGAIN) { + if (PollerToken) { + PollerToken->Request(read, write); + } return; - } else { - return ReplyErrorAndDie(ctx, strerror(-res)); + } else { + return ReplyErrorAndDie(ctx, strerror(-res)); } } LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") outgoing connection opened"); TBase::Become(&TOutgoingConnectionActor::StateConnected); LOG_DEBUG_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- (" << Request->Method << " " << Request->URL << ")"); - ctx.Send(ctx.SelfID, new NActors::TEvPollerReady(nullptr, true, true)); + ctx.Send(ctx.SelfID, new NActors::TEvPollerReady(nullptr, true, true)); } void HandleResolving(TEvHttpProxy::TEvResolveHostResponse::TPtr event, const NActors::TActorContext& ctx) { @@ -180,7 +180,7 @@ protected: Connect(ctx); } - void HandleConnecting(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) { + void HandleConnecting(NActors::TEvPollerReady::TPtr, const NActors::TActorContext& ctx) { LastActivity = ctx.Now(); int res = TSocketImpl::GetError(); if (res == 0) { @@ -190,8 +190,8 @@ protected: } } - void HandleConnecting(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) { - PollerToken = std::move(ev->Get()->PollerToken); + void HandleConnecting(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) { + PollerToken = std::move(ev->Get()->PollerToken); LastActivity = ctx.Now(); int res = TSocketImpl::GetError(); if (res == 0) { @@ -218,20 +218,20 @@ protected: TBase::Become(&TOutgoingConnectionActor::StateResolving); } - void HandleConnected(NActors::TEvPollerReady::TPtr event, const NActors::TActorContext& ctx) { + void HandleConnected(NActors::TEvPollerReady::TPtr event, const NActors::TActorContext& ctx) { LastActivity = ctx.Now(); - if (event->Get()->Read) { - PullInput(ctx); - } - if (event->Get()->Write) { - FlushOutput(ctx); + if (event->Get()->Read) { + PullInput(ctx); } + if (event->Get()->Write) { + FlushOutput(ctx); + } } - void HandleConnected(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) { - PollerToken = std::move(ev->Get()->PollerToken); + void HandleConnected(NActors::TEvPollerRegisterResult::TPtr ev, const NActors::TActorContext& ctx) { + PollerToken = std::move(ev->Get()->PollerToken); LastActivity = ctx.Now(); - PullInput(ctx); + PullInput(ctx); FlushOutput(ctx); } @@ -266,17 +266,17 @@ protected: STFUNC(StateConnecting) { switch (ev->GetTypeRewrite()) { - HFunc(NActors::TEvPollerReady, HandleConnecting); + HFunc(NActors::TEvPollerReady, HandleConnecting); CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); - HFunc(NActors::TEvPollerRegisterResult, HandleConnecting); + HFunc(NActors::TEvPollerRegisterResult, HandleConnecting); } } STFUNC(StateConnected) { switch (ev->GetTypeRewrite()) { - HFunc(NActors::TEvPollerReady, HandleConnected); + HFunc(NActors::TEvPollerReady, HandleConnected); CFunc(NActors::TEvents::TEvWakeup::EventType, HandleTimeout); - HFunc(NActors::TEvPollerRegisterResult, HandleConnected); + HFunc(NActors::TEvPollerRegisterResult, HandleConnected); } } diff --git a/library/cpp/actors/http/http_proxy_sock_impl.h b/library/cpp/actors/http/http_proxy_sock_impl.h index bf8c71d05a..edef338d71 100644 --- a/library/cpp/actors/http/http_proxy_sock_impl.h +++ b/library/cpp/actors/http/http_proxy_sock_impl.h @@ -45,12 +45,12 @@ struct TPlainSocketImpl : virtual public THttpConfig { return Socket->Socket.Connect(&address); } - static constexpr int OnConnect(bool&, bool&) { - return 1; + static constexpr int OnConnect(bool&, bool&) { + return 1; } - static constexpr int OnAccept(const TEndpointInfo&, bool&, bool&) { - return 1; + static constexpr int OnAccept(const TEndpointInfo&, bool&, bool&) { + return 1; } bool IsGood() { @@ -65,11 +65,11 @@ struct TPlainSocketImpl : virtual public THttpConfig { return res; } - ssize_t Send(const void* data, size_t size, bool&, bool&) { + ssize_t Send(const void* data, size_t size, bool&, bool&) { return Socket->Socket.Send(data, size); } - ssize_t Recv(void* data, size_t size, bool&, bool&) { + ssize_t Recv(void* data, size_t size, bool&, bool&) { return Socket->Socket.Recv(data, size); } }; @@ -180,16 +180,16 @@ struct TSecureSocketImpl : TPlainSocketImpl, TSslHelpers { void Flush() {} - ssize_t Send(const void* data, size_t size, bool& read, bool& write) { + ssize_t Send(const void* data, size_t size, bool& read, bool& write) { ssize_t res = SSL_write(Ssl.Get(), data, size); if (res < 0) { res = SSL_get_error(Ssl.Get(), res); switch(res) { case SSL_ERROR_WANT_READ: - read = true; - return -EAGAIN; + read = true; + return -EAGAIN; case SSL_ERROR_WANT_WRITE: - write = true; + write = true; return -EAGAIN; default: return -EIO; @@ -198,16 +198,16 @@ struct TSecureSocketImpl : TPlainSocketImpl, TSslHelpers { return res; } - ssize_t Recv(void* data, size_t size, bool& read, bool& write) { + ssize_t Recv(void* data, size_t size, bool& read, bool& write) { ssize_t res = SSL_read(Ssl.Get(), data, size); if (res < 0) { res = SSL_get_error(Ssl.Get(), res); switch(res) { case SSL_ERROR_WANT_READ: - read = true; - return -EAGAIN; + read = true; + return -EAGAIN; case SSL_ERROR_WANT_WRITE: - write = true; + write = true; return -EAGAIN; default: return -EIO; @@ -216,19 +216,19 @@ struct TSecureSocketImpl : TPlainSocketImpl, TSslHelpers { return res; } - int OnConnect(bool& read, bool& write) { + int OnConnect(bool& read, bool& write) { if (!Ssl) { InitClientSsl(); } int res = SSL_connect(Ssl.Get()); - if (res <= 0) { + if (res <= 0) { res = SSL_get_error(Ssl.Get(), res); switch(res) { case SSL_ERROR_WANT_READ: - read = true; - return -EAGAIN; + read = true; + return -EAGAIN; case SSL_ERROR_WANT_WRITE: - write = true; + write = true; return -EAGAIN; default: return -EIO; @@ -237,19 +237,19 @@ struct TSecureSocketImpl : TPlainSocketImpl, TSslHelpers { return res; } - int OnAccept(const TEndpointInfo& endpoint, bool& read, bool& write) { + int OnAccept(const TEndpointInfo& endpoint, bool& read, bool& write) { if (!Ssl) { InitServerSsl(endpoint.SecureContext.Get()); } int res = SSL_accept(Ssl.Get()); - if (res <= 0) { + if (res <= 0) { res = SSL_get_error(Ssl.Get(), res); switch(res) { case SSL_ERROR_WANT_READ: - read = true; - return -EAGAIN; + read = true; + return -EAGAIN; case SSL_ERROR_WANT_WRITE: - write = true; + write = true; return -EAGAIN; default: return -EIO; |