aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/acceptor.cpp
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commitc2a1af049e9deca890e9923abe64fe6c59060348 (patch)
treeb222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/messagebus/acceptor.cpp
parent1f553f46fb4f3c5eec631352cdd900a0709016af (diff)
downloadydb-c2a1af049e9deca890e9923abe64fe6c59060348.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/acceptor.cpp')
-rw-r--r--library/cpp/messagebus/acceptor.cpp212
1 files changed, 106 insertions, 106 deletions
diff --git a/library/cpp/messagebus/acceptor.cpp b/library/cpp/messagebus/acceptor.cpp
index d54be57fd9..64a38619c2 100644
--- a/library/cpp/messagebus/acceptor.cpp
+++ b/library/cpp/messagebus/acceptor.cpp
@@ -1,127 +1,127 @@
#include "acceptor.h"
-
-#include "key_value_printer.h"
-#include "mb_lwtrace.h"
+
+#include "key_value_printer.h"
+#include "mb_lwtrace.h"
#include "network.h"
-
+
#include <util/network/init.h>
#include <util/system/defaults.h>
#include <util/system/error.h>
#include <util/system/yassert.h>
-
-LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
-
-using namespace NActor;
-using namespace NBus;
-using namespace NBus::NPrivate;
-
-TAcceptor::TAcceptor(TBusSessionImpl* session, ui64 acceptorId, SOCKET socket, const TNetAddr& addr)
- : TActor<TAcceptor>(session->Queue->WorkQueue.Get())
- , AcceptorId(acceptorId)
- , Session(session)
+
+LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
+
+using namespace NActor;
+using namespace NBus;
+using namespace NBus::NPrivate;
+
+TAcceptor::TAcceptor(TBusSessionImpl* session, ui64 acceptorId, SOCKET socket, const TNetAddr& addr)
+ : TActor<TAcceptor>(session->Queue->WorkQueue.Get())
+ , AcceptorId(acceptorId)
+ , Session(session)
, GranStatus(session->Config.Secret.StatusFlushPeriod)
-{
- SetNonBlock(socket, true);
-
- Channel = Session->ReadEventLoop.Register(socket, this);
- Channel->EnableRead();
-
- Stats.AcceptorId = acceptorId;
- Stats.Fd = socket;
- Stats.ListenAddr = addr;
-
+{
+ SetNonBlock(socket, true);
+
+ Channel = Session->ReadEventLoop.Register(socket, this);
+ Channel->EnableRead();
+
+ Stats.AcceptorId = acceptorId;
+ Stats.Fd = socket;
+ Stats.ListenAddr = addr;
+
SendStatus(TInstant::Now());
-}
-
-void TAcceptor::Act(TDefaultTag) {
- EShutdownState state = ShutdownState.State.Get();
-
- if (state == SS_SHUTDOWN_COMPLETE) {
- return;
- }
-
+}
+
+void TAcceptor::Act(TDefaultTag) {
+ EShutdownState state = ShutdownState.State.Get();
+
+ if (state == SS_SHUTDOWN_COMPLETE) {
+ return;
+ }
+
TInstant now = TInstant::Now();
- if (state == SS_SHUTDOWN_COMMAND) {
- if (!!Channel) {
- Channel->Unregister();
- Channel.Drop();
- Stats.Fd = INVALID_SOCKET;
- }
-
+ if (state == SS_SHUTDOWN_COMMAND) {
+ if (!!Channel) {
+ Channel->Unregister();
+ Channel.Drop();
+ Stats.Fd = INVALID_SOCKET;
+ }
+
SendStatus(now);
-
- Session->GetDeadAcceptorStatusQueue()->EnqueueAndSchedule(Stats);
- Stats.ResetIncremental();
-
- ShutdownState.CompleteShutdown();
- return;
- }
-
- THolder<TOpaqueAddr> addr(new TOpaqueAddr());
- SOCKET acceptedSocket = accept(Channel->GetSocket(), addr->MutableAddr(), addr->LenPtr());
-
- int acceptErrno = LastSystemError();
-
- if (acceptedSocket == INVALID_SOCKET) {
- if (LastSystemError() != EWOULDBLOCK) {
- Stats.LastAcceptErrorErrno = acceptErrno;
- Stats.LastAcceptErrorInstant = now;
- ++Stats.AcceptErrorCount;
- }
- } else {
- TSocketHolder s(acceptedSocket);
- try {
- SetKeepAlive(s, true);
- SetNoDelay(s, Session->Config.TcpNoDelay);
- SetSockOptTcpCork(s, Session->Config.TcpCork);
- SetCloseOnExec(s, true);
- SetNonBlock(s, true);
+
+ Session->GetDeadAcceptorStatusQueue()->EnqueueAndSchedule(Stats);
+ Stats.ResetIncremental();
+
+ ShutdownState.CompleteShutdown();
+ return;
+ }
+
+ THolder<TOpaqueAddr> addr(new TOpaqueAddr());
+ SOCKET acceptedSocket = accept(Channel->GetSocket(), addr->MutableAddr(), addr->LenPtr());
+
+ int acceptErrno = LastSystemError();
+
+ if (acceptedSocket == INVALID_SOCKET) {
+ if (LastSystemError() != EWOULDBLOCK) {
+ Stats.LastAcceptErrorErrno = acceptErrno;
+ Stats.LastAcceptErrorInstant = now;
+ ++Stats.AcceptErrorCount;
+ }
+ } else {
+ TSocketHolder s(acceptedSocket);
+ try {
+ SetKeepAlive(s, true);
+ SetNoDelay(s, Session->Config.TcpNoDelay);
+ SetSockOptTcpCork(s, Session->Config.TcpCork);
+ SetCloseOnExec(s, true);
+ SetNonBlock(s, true);
if (Session->Config.SocketToS >= 0) {
SetSocketToS(s, addr.Get(), Session->Config.SocketToS);
}
- } catch (...) {
- // It means that connection was reset just now
- // TODO: do something better
- goto skipAccept;
- }
-
- {
- TOnAccept onAccept;
- onAccept.s = s.Release();
- onAccept.addr = TNetAddr(addr.Release());
- onAccept.now = now;
-
- LWPROBE(Accepted, ToString(onAccept.addr));
-
- Session->GetOnAcceptQueue()->EnqueueAndSchedule(onAccept);
-
+ } catch (...) {
+ // It means that connection was reset just now
+ // TODO: do something better
+ goto skipAccept;
+ }
+
+ {
+ TOnAccept onAccept;
+ onAccept.s = s.Release();
+ onAccept.addr = TNetAddr(addr.Release());
+ onAccept.now = now;
+
+ LWPROBE(Accepted, ToString(onAccept.addr));
+
+ Session->GetOnAcceptQueue()->EnqueueAndSchedule(onAccept);
+
Stats.LastAcceptSuccessInstant = now;
- ++Stats.AcceptSuccessCount;
- }
-
+ ++Stats.AcceptSuccessCount;
+ }
+
skipAccept:;
- }
-
- Channel->EnableRead();
-
+ }
+
+ Channel->EnableRead();
+
SendStatus(now);
-}
-
+}
+
void TAcceptor::SendStatus(TInstant now) {
GranStatus.Listen.Update(Stats, now);
-}
-
-void TAcceptor::HandleEvent(SOCKET socket, void* cookie) {
+}
+
+void TAcceptor::HandleEvent(SOCKET socket, void* cookie) {
Y_UNUSED(socket);
Y_UNUSED(cookie);
-
- GetActor()->Schedule();
-}
-
-void TAcceptor::Shutdown() {
- ShutdownState.ShutdownCommand();
- GetActor()->Schedule();
-
- ShutdownState.ShutdownComplete.WaitI();
-}
+
+ GetActor()->Schedule();
+}
+
+void TAcceptor::Shutdown() {
+ ShutdownState.ShutdownCommand();
+ GetActor()->Schedule();
+
+ ShutdownState.ShutdownComplete.WaitI();
+}