aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/acceptor.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/acceptor.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/acceptor.cpp')
-rw-r--r--library/cpp/messagebus/acceptor.cpp127
1 files changed, 127 insertions, 0 deletions
diff --git a/library/cpp/messagebus/acceptor.cpp b/library/cpp/messagebus/acceptor.cpp
new file mode 100644
index 0000000000..64a38619c2
--- /dev/null
+++ b/library/cpp/messagebus/acceptor.cpp
@@ -0,0 +1,127 @@
+#include "acceptor.h"
+
+#include "key_value_printer.h"
+#include "mb_lwtrace.h"
+#include "network.h"
+
+#include <util/network/init.h>
+#include <util/system/defaults.h>
+#include <util/system/error.h>
+#include <util/system/yassert.h>
+
+LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
+
+using namespace NActor;
+using namespace NBus;
+using namespace NBus::NPrivate;
+
+TAcceptor::TAcceptor(TBusSessionImpl* session, ui64 acceptorId, SOCKET socket, const TNetAddr& addr)
+ : TActor<TAcceptor>(session->Queue->WorkQueue.Get())
+ , AcceptorId(acceptorId)
+ , Session(session)
+ , GranStatus(session->Config.Secret.StatusFlushPeriod)
+{
+ SetNonBlock(socket, true);
+
+ Channel = Session->ReadEventLoop.Register(socket, this);
+ Channel->EnableRead();
+
+ Stats.AcceptorId = acceptorId;
+ Stats.Fd = socket;
+ Stats.ListenAddr = addr;
+
+ SendStatus(TInstant::Now());
+}
+
+void TAcceptor::Act(TDefaultTag) {
+ EShutdownState state = ShutdownState.State.Get();
+
+ if (state == SS_SHUTDOWN_COMPLETE) {
+ return;
+ }
+
+ TInstant now = TInstant::Now();
+
+ if (state == SS_SHUTDOWN_COMMAND) {
+ if (!!Channel) {
+ Channel->Unregister();
+ Channel.Drop();
+ Stats.Fd = INVALID_SOCKET;
+ }
+
+ SendStatus(now);
+
+ Session->GetDeadAcceptorStatusQueue()->EnqueueAndSchedule(Stats);
+ Stats.ResetIncremental();
+
+ ShutdownState.CompleteShutdown();
+ return;
+ }
+
+ THolder<TOpaqueAddr> addr(new TOpaqueAddr());
+ SOCKET acceptedSocket = accept(Channel->GetSocket(), addr->MutableAddr(), addr->LenPtr());
+
+ int acceptErrno = LastSystemError();
+
+ if (acceptedSocket == INVALID_SOCKET) {
+ if (LastSystemError() != EWOULDBLOCK) {
+ Stats.LastAcceptErrorErrno = acceptErrno;
+ Stats.LastAcceptErrorInstant = now;
+ ++Stats.AcceptErrorCount;
+ }
+ } else {
+ TSocketHolder s(acceptedSocket);
+ try {
+ SetKeepAlive(s, true);
+ SetNoDelay(s, Session->Config.TcpNoDelay);
+ SetSockOptTcpCork(s, Session->Config.TcpCork);
+ SetCloseOnExec(s, true);
+ SetNonBlock(s, true);
+ if (Session->Config.SocketToS >= 0) {
+ SetSocketToS(s, addr.Get(), Session->Config.SocketToS);
+ }
+ } catch (...) {
+ // It means that connection was reset just now
+ // TODO: do something better
+ goto skipAccept;
+ }
+
+ {
+ TOnAccept onAccept;
+ onAccept.s = s.Release();
+ onAccept.addr = TNetAddr(addr.Release());
+ onAccept.now = now;
+
+ LWPROBE(Accepted, ToString(onAccept.addr));
+
+ Session->GetOnAcceptQueue()->EnqueueAndSchedule(onAccept);
+
+ Stats.LastAcceptSuccessInstant = now;
+ ++Stats.AcceptSuccessCount;
+ }
+
+ skipAccept:;
+ }
+
+ Channel->EnableRead();
+
+ SendStatus(now);
+}
+
+void TAcceptor::SendStatus(TInstant now) {
+ GranStatus.Listen.Update(Stats, now);
+}
+
+void TAcceptor::HandleEvent(SOCKET socket, void* cookie) {
+ Y_UNUSED(socket);
+ Y_UNUSED(cookie);
+
+ GetActor()->Schedule();
+}
+
+void TAcceptor::Shutdown() {
+ ShutdownState.ShutdownCommand();
+ GetActor()->Schedule();
+
+ ShutdownState.ShutdownComplete.WaitI();
+}