aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/session_impl.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/session_impl.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/session_impl.cpp')
-rw-r--r--library/cpp/messagebus/session_impl.cpp650
1 files changed, 650 insertions, 0 deletions
diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp
new file mode 100644
index 0000000000..ddf9f360c4
--- /dev/null
+++ b/library/cpp/messagebus/session_impl.cpp
@@ -0,0 +1,650 @@
+#include "session_impl.h"
+
+#include "acceptor.h"
+#include "network.h"
+#include "remote_client_connection.h"
+#include "remote_client_session.h"
+#include "remote_server_connection.h"
+#include "remote_server_session.h"
+#include "misc/weak_ptr.h"
+
+#include <util/generic/cast.h>
+
+using namespace NActor;
+using namespace NBus;
+using namespace NBus::NPrivate;
+using namespace NEventLoop;
+
+namespace {
+ class TScheduleSession: public IScheduleItem {
+ public:
+ TScheduleSession(TBusSessionImpl* session, TInstant deadline)
+ : IScheduleItem(deadline)
+ , Session(session)
+ , SessionImpl(session)
+ {
+ }
+
+ void Do() override {
+ TIntrusivePtr<TBusSession> session = Session.Get();
+ if (!!session) {
+ SessionImpl->Cron();
+ }
+ }
+
+ private:
+ TWeakPtr<TBusSession> Session;
+ // Work around TWeakPtr limitation
+ TBusSessionImpl* SessionImpl;
+ };
+}
+
+TConnectionsAcceptorsSnapshot::TConnectionsAcceptorsSnapshot()
+ : LastConnectionId(0)
+ , LastAcceptorId(0)
+{
+}
+
+struct TBusSessionImpl::TImpl {
+ TRemoteConnectionWriterIncrementalStatus DeadConnectionWriterStatusSummary;
+ TRemoteConnectionReaderIncrementalStatus DeadConnectionReaderStatusSummary;
+ TAcceptorStatus DeadAcceptorStatusSummary;
+};
+
+namespace {
+ TBusSessionConfig SessionConfigFillDefaults(const TBusSessionConfig& config, const TString& name) {
+ TBusSessionConfig copy = config;
+ if (copy.TotalTimeout == 0 && copy.SendTimeout == 0) {
+ copy.TotalTimeout = TDuration::Seconds(60).MilliSeconds();
+ copy.SendTimeout = TDuration::Seconds(15).MilliSeconds();
+ } else if (copy.TotalTimeout == 0) {
+ Y_ASSERT(copy.SendTimeout != 0);
+ copy.TotalTimeout = config.SendTimeout + TDuration::MilliSeconds(10).MilliSeconds();
+ } else if (copy.SendTimeout == 0) {
+ Y_ASSERT(copy.TotalTimeout != 0);
+ if ((ui64)copy.TotalTimeout > (ui64)TDuration::MilliSeconds(10).MilliSeconds()) {
+ copy.SendTimeout = copy.TotalTimeout - TDuration::MilliSeconds(10).MilliSeconds();
+ } else {
+ copy.SendTimeout = copy.TotalTimeout;
+ }
+ } else {
+ Y_ASSERT(copy.TotalTimeout != 0);
+ Y_ASSERT(copy.SendTimeout != 0);
+ }
+
+ if (copy.ConnectTimeout == 0) {
+ copy.ConnectTimeout = copy.SendTimeout;
+ }
+
+ Y_VERIFY(copy.SendTimeout > 0, "SendTimeout must be > 0");
+ Y_VERIFY(copy.TotalTimeout > 0, "TotalTimeout must be > 0");
+ Y_VERIFY(copy.ConnectTimeout > 0, "ConnectTimeout must be > 0");
+ Y_VERIFY(copy.TotalTimeout >= copy.SendTimeout, "TotalTimeout must be >= SendTimeout");
+
+ if (!copy.Name) {
+ copy.Name = name;
+ }
+
+ return copy;
+ }
+}
+
+TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusProtocol* proto,
+ IBusErrorHandler* handler,
+ const TBusSessionConfig& config, const TString& name)
+ : TActor<TBusSessionImpl, TStatusTag>(queue->WorkQueue.Get())
+ , TActor<TBusSessionImpl, TConnectionTag>(queue->WorkQueue.Get())
+ , Impl(new TImpl)
+ , IsSource_(isSource)
+ , Queue(queue)
+ , Proto(proto)
+ , ProtoName(Proto->GetService())
+ , ErrorHandler(handler)
+ , HandlerUseCountHolder(&handler->UseCountChecker)
+ , Config(SessionConfigFillDefaults(config, name))
+ , WriteEventLoop("wr-el")
+ , ReadEventLoop("rd-el")
+ , LastAcceptorId(0)
+ , LastConnectionId(0)
+ , Down(0)
+{
+ Impl->DeadAcceptorStatusSummary.Summary = true;
+
+ ReadEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(ReadEventLoop))));
+ WriteEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(WriteEventLoop))));
+
+ Queue->Schedule(IScheduleItemAutoPtr(new TScheduleSession(this, TInstant::Now() + Config.Secret.TimeoutPeriod)));
+}
+
+TBusSessionImpl::~TBusSessionImpl() {
+ Y_VERIFY(Down);
+ Y_VERIFY(ShutdownCompleteEvent.WaitT(TDuration::Zero()));
+ Y_VERIFY(!WriteEventLoop.IsRunning());
+ Y_VERIFY(!ReadEventLoop.IsRunning());
+}
+
+TBusSessionStatus::TBusSessionStatus()
+ : InFlightCount(0)
+ , InFlightSize(0)
+ , InputPaused(false)
+{
+}
+
+void TBusSessionImpl::Shutdown() {
+ if (!AtomicCas(&Down, 1, 0)) {
+ ShutdownCompleteEvent.WaitI();
+ return;
+ }
+
+ Y_VERIFY(Queue->IsRunning(), "Session must be shut down prior to queue shutdown");
+
+ TUseAfterFreeCheckerGuard handlerAliveCheckedGuard(ErrorHandler->UseAfterFreeChecker);
+
+ // For legacy clients that don't use smart pointers
+ TIntrusivePtr<TBusSessionImpl> thiz(this);
+
+ Queue->Remove(this);
+
+ // shutdown event loops first, so they won't send more events
+ // to acceptors and connections
+ ReadEventLoop.Stop();
+ WriteEventLoop.Stop();
+ ReadEventLoopThread->Get();
+ WriteEventLoopThread->Get();
+
+ // shutdown acceptors before connections
+ // so they won't create more connections
+ TVector<TAcceptorPtr> acceptors;
+ GetAcceptors(&acceptors);
+ {
+ TGuard<TMutex> guard(ConnectionsLock);
+ Acceptors.clear();
+ }
+
+ for (auto& acceptor : acceptors) {
+ acceptor->Shutdown();
+ }
+
+ // shutdown connections
+ TVector<TRemoteConnectionPtr> cs;
+ GetConnections(&cs);
+
+ for (auto& c : cs) {
+ c->Shutdown(MESSAGE_SHUTDOWN);
+ }
+
+ // shutdown connections actor
+ // must shutdown after connections destroyed
+ ConnectionsData.ShutdownState.ShutdownCommand();
+ GetConnectionsActor()->Schedule();
+ ConnectionsData.ShutdownState.ShutdownComplete.WaitI();
+
+ // finally shutdown status actor
+ StatusData.ShutdownState.ShutdownCommand();
+ GetStatusActor()->Schedule();
+ StatusData.ShutdownState.ShutdownComplete.WaitI();
+
+ // Make sure no one references IMessageHandler after Shutdown()
+ JobCount.WaitForZero();
+ HandlerUseCountHolder.Reset();
+
+ ShutdownCompleteEvent.Signal();
+}
+
+bool TBusSessionImpl::IsDown() {
+ return static_cast<bool>(AtomicGet(Down));
+}
+
+size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const {
+ TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false);
+ if (!!conn) {
+ return conn->GetInFlight();
+ } else {
+ return 0;
+ }
+}
+
+void TBusSessionImpl::GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const {
+ Y_VERIFY(addrs.size() == results.size(), "input.size != output.size");
+ for (size_t i = 0; i < addrs.size(); ++i) {
+ results[i] = GetInFlightImpl(addrs[i]);
+ }
+}
+
+size_t TBusSessionImpl::GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const {
+ TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false);
+ if (!!conn) {
+ return conn->GetConnectSyscallsNumForTest();
+ } else {
+ return 0;
+ }
+}
+
+void TBusSessionImpl::GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const {
+ Y_VERIFY(addrs.size() == results.size(), "input.size != output.size");
+ for (size_t i = 0; i < addrs.size(); ++i) {
+ results[i] = GetConnectSyscallsNumForTestImpl(addrs[i]);
+ }
+}
+
+void TBusSessionImpl::FillStatus() {
+}
+
+TSessionDumpStatus TBusSessionImpl::GetStatusRecordInternal() {
+ // Probably useless, because it returns cached info now
+ Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(),
+ "GetStatus must not be called from executor thread");
+
+ TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
+ // TODO: returns zeros for a second after start
+ // (until first cron)
+ return StatusData.StatusDumpCached;
+}
+
+TString TBusSessionImpl::GetStatus(ui16 flags) {
+ Y_UNUSED(flags);
+
+ return GetStatusRecordInternal().PrintToString();
+}
+
+TConnectionStatusMonRecord TBusSessionImpl::GetStatusProtobuf() {
+ Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(),
+ "GetStatus must not be called from executor thread");
+
+ TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
+
+ return StatusData.StatusDumpCached.ConnectionStatusSummary.GetStatusProtobuf();
+}
+
+TString TBusSessionImpl::GetStatusSingleLine() {
+ TSessionDumpStatus status = GetStatusRecordInternal();
+
+ TStringStream ss;
+ ss << "in-flight: " << status.Status.InFlightCount;
+ if (IsSource_) {
+ ss << " ack: " << status.ConnectionStatusSummary.WriterStatus.AckMessagesSize;
+ }
+ ss << " send-q: " << status.ConnectionStatusSummary.WriterStatus.SendQueueSize;
+ return ss.Str();
+}
+
+void TBusSessionImpl::ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionWriterIncrementalStatus& connectionStatus) {
+ Impl->DeadConnectionWriterStatusSummary += connectionStatus;
+}
+
+void TBusSessionImpl::ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionReaderIncrementalStatus& connectionStatus) {
+ Impl->DeadConnectionReaderStatusSummary += connectionStatus;
+}
+
+void TBusSessionImpl::ProcessItem(TStatusTag, TDeadConnectionTag, const TAcceptorStatus& acceptorStatus) {
+ Impl->DeadAcceptorStatusSummary += acceptorStatus;
+}
+
+void TBusSessionImpl::ProcessItem(TConnectionTag, ::NActor::TDefaultTag, const TOnAccept& onAccept) {
+ TSocketHolder socket(onAccept.s);
+
+ if (AtomicGet(Down)) {
+ // do not create connections after shutdown initiated
+ return;
+ }
+
+ //if (Connections.find(addr) != Connections.end()) {
+ // TODO: it is possible
+ // won't be a problem after socket address replaced with id
+ //}
+
+ TRemoteConnectionPtr c(new TRemoteServerConnection(VerifyDynamicCast<TRemoteServerSession*>(this), ++LastConnectionId, onAccept.addr));
+
+ VerifyDynamicCast<TRemoteServerConnection*>(c.Get())->Init(socket.Release(), onAccept.now);
+
+ InsertConnectionLockAcquired(c.Get());
+}
+
+void TBusSessionImpl::ProcessItem(TConnectionTag, TRemoveTag, TRemoteConnectionPtr c) {
+ TAddrRemoteConnections::iterator it1 = Connections.find(c->PeerAddrSocketAddr);
+ if (it1 != Connections.end()) {
+ if (it1->second.Get() == c.Get()) {
+ Connections.erase(it1);
+ }
+ }
+
+ THashMap<ui64, TRemoteConnectionPtr>::iterator it2 = ConnectionsById.find(c->ConnectionId);
+ if (it2 != ConnectionsById.end()) {
+ ConnectionsById.erase(it2);
+ }
+
+ SendSnapshotToStatusActor();
+}
+
+void TBusSessionImpl::ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> snapshot) {
+ for (TVector<TRemoteConnectionPtr>::const_iterator connection = snapshot->Connections.begin();
+ connection != snapshot->Connections.end(); ++connection) {
+ Y_ASSERT((*connection)->ConnectionId <= snapshot->LastConnectionId);
+ }
+
+ for (TVector<TAcceptorPtr>::const_iterator acceptor = snapshot->Acceptors.begin();
+ acceptor != snapshot->Acceptors.end(); ++acceptor) {
+ Y_ASSERT((*acceptor)->AcceptorId <= snapshot->LastAcceptorId);
+ }
+
+ StatusData.ConnectionsAcceptorsSnapshot = snapshot;
+}
+
+void TBusSessionImpl::StatusUpdateCachedDumpIfNecessary(TInstant now) {
+ if (now - StatusData.StatusDumpCachedLastUpdate > Config.Secret.StatusFlushPeriod) {
+ StatusUpdateCachedDump();
+ StatusData.StatusDumpCachedLastUpdate = now;
+ }
+}
+
+void TBusSessionImpl::StatusUpdateCachedDump() {
+ TSessionDumpStatus r;
+
+ if (AtomicGet(Down)) {
+ r.Shutdown = true;
+ TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
+ StatusData.StatusDumpCached = r;
+ return;
+ }
+
+ // TODO: make thread-safe
+ FillStatus();
+
+ r.Status = StatusData.Status;
+
+ {
+ TStringStream ss;
+
+ TString name = Config.Name;
+ if (!name) {
+ name = "unnamed";
+ }
+
+ ss << (IsSource_ ? "client" : "server") << " session " << name << ", proto " << Proto->GetService() << Endl;
+ ss << "in flight: " << r.Status.InFlightCount;
+ if (!IsSource_) {
+ ss << ", " << r.Status.InFlightSize << "b";
+ }
+ if (r.Status.InputPaused) {
+ ss << " (input paused)";
+ }
+ ss << "\n";
+
+ r.Head = ss.Str();
+ }
+
+ TVector<TRemoteConnectionPtr>& connections = StatusData.ConnectionsAcceptorsSnapshot->Connections;
+ TVector<TAcceptorPtr>& acceptors = StatusData.ConnectionsAcceptorsSnapshot->Acceptors;
+
+ r.ConnectionStatusSummary = TRemoteConnectionStatus();
+ r.ConnectionStatusSummary.Summary = true;
+ r.ConnectionStatusSummary.Server = !IsSource_;
+ r.ConnectionStatusSummary.WriterStatus.Incremental = Impl->DeadConnectionWriterStatusSummary;
+ r.ConnectionStatusSummary.ReaderStatus.Incremental = Impl->DeadConnectionReaderStatusSummary;
+
+ TAcceptorStatus acceptorStatusSummary = Impl->DeadAcceptorStatusSummary;
+
+ {
+ TStringStream ss;
+
+ for (TVector<TAcceptorPtr>::const_iterator acceptor = acceptors.begin();
+ acceptor != acceptors.end(); ++acceptor) {
+ const TAcceptorStatus status = (*acceptor)->GranStatus.Listen.Get();
+
+ acceptorStatusSummary += status;
+
+ if (acceptor != acceptors.begin()) {
+ ss << "\n";
+ }
+ ss << status.PrintToString();
+ }
+
+ r.Acceptors = ss.Str();
+ }
+
+ {
+ TStringStream ss;
+
+ for (TVector<TRemoteConnectionPtr>::const_iterator connection = connections.begin();
+ connection != connections.end(); ++connection) {
+ if (connection != connections.begin()) {
+ ss << "\n";
+ }
+
+ TRemoteConnectionStatus status;
+ status.Server = !IsSource_;
+ status.ReaderStatus = (*connection)->GranStatus.Reader.Get();
+ status.WriterStatus = (*connection)->GranStatus.Writer.Get();
+
+ ss << status.PrintToString();
+
+ r.ConnectionStatusSummary.ReaderStatus += status.ReaderStatus;
+ r.ConnectionStatusSummary.WriterStatus += status.WriterStatus;
+ }
+
+ r.ConnectionsSummary = r.ConnectionStatusSummary.PrintToString();
+ r.Connections = ss.Str();
+ }
+
+ r.Config = Config;
+
+ TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
+ StatusData.StatusDumpCached = r;
+}
+
+TBusSessionImpl::TStatusData::TStatusData()
+ : ConnectionsAcceptorsSnapshot(new TConnectionsAcceptorsSnapshot)
+{
+}
+
+void TBusSessionImpl::Act(TStatusTag) {
+ TInstant now = TInstant::Now();
+
+ EShutdownState shutdownState = StatusData.ShutdownState.State.Get();
+
+ StatusData.ConnectionsAcceptorsSnapshotsQueue.DequeueAllLikelyEmpty(std::bind(&TBusSessionImpl::ProcessConnectionsAcceptorsShapshotQueueItem, this, std::placeholders::_1));
+
+ GetDeadConnectionWriterStatusQueue()->DequeueAllLikelyEmpty();
+ GetDeadConnectionReaderStatusQueue()->DequeueAllLikelyEmpty();
+ GetDeadAcceptorStatusQueue()->DequeueAllLikelyEmpty();
+
+ // TODO: check queues are empty if already stopped
+
+ if (shutdownState != SS_RUNNING) {
+ // important to beak cyclic link session -> connection -> session
+ StatusData.ConnectionsAcceptorsSnapshot->Connections.clear();
+ StatusData.ConnectionsAcceptorsSnapshot->Acceptors.clear();
+ }
+
+ if (shutdownState == SS_SHUTDOWN_COMMAND) {
+ StatusData.ShutdownState.CompleteShutdown();
+ }
+
+ StatusUpdateCachedDumpIfNecessary(now);
+}
+
+TBusSessionImpl::TConnectionsData::TConnectionsData() {
+}
+
+void TBusSessionImpl::Act(TConnectionTag) {
+ TConnectionsGuard guard(ConnectionsLock);
+
+ EShutdownState shutdownState = ConnectionsData.ShutdownState.State.Get();
+ if (shutdownState == SS_SHUTDOWN_COMPLETE) {
+ Y_VERIFY(GetRemoveConnectionQueue()->IsEmpty());
+ Y_VERIFY(GetOnAcceptQueue()->IsEmpty());
+ }
+
+ GetRemoveConnectionQueue()->DequeueAllLikelyEmpty();
+ GetOnAcceptQueue()->DequeueAllLikelyEmpty();
+
+ if (shutdownState == SS_SHUTDOWN_COMMAND) {
+ ConnectionsData.ShutdownState.CompleteShutdown();
+ }
+}
+
+void TBusSessionImpl::Listen(int port, TBusMessageQueue* q) {
+ Listen(BindOnPort(port, Config.ReusePort).second, q);
+}
+
+void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q) {
+ Y_ASSERT(q == Queue);
+ int actualPort = -1;
+
+ for (const TBindResult& br : bindTo) {
+ if (actualPort == -1) {
+ actualPort = br.Addr.GetPort();
+ } else {
+ Y_VERIFY(actualPort == br.Addr.GetPort(), "state check");
+ }
+ if (Config.SocketToS >= 0) {
+ SetSocketToS(*br.Socket, &(br.Addr), Config.SocketToS);
+ }
+
+ TAcceptorPtr acceptor(new TAcceptor(this, ++LastAcceptorId, br.Socket->Release(), br.Addr));
+
+ TConnectionsGuard guard(ConnectionsLock);
+ InsertAcceptorLockAcquired(acceptor.Get());
+ }
+
+ Config.ListenPort = actualPort;
+}
+
+void TBusSessionImpl::SendSnapshotToStatusActor() {
+ //Y_ASSERT(ConnectionsLock.IsLocked());
+
+ TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> snapshot(new TConnectionsAcceptorsSnapshot);
+ GetAcceptorsLockAquired(&snapshot->Acceptors);
+ GetConnectionsLockAquired(&snapshot->Connections);
+ snapshot->LastAcceptorId = LastAcceptorId;
+ snapshot->LastConnectionId = LastConnectionId;
+ StatusData.ConnectionsAcceptorsSnapshotsQueue.Enqueue(snapshot);
+ GetStatusActor()->Schedule();
+}
+
+void TBusSessionImpl::InsertConnectionLockAcquired(TRemoteConnection* connection) {
+ //Y_ASSERT(ConnectionsLock.IsLocked());
+
+ Connections.insert(std::make_pair(connection->PeerAddrSocketAddr, connection));
+ // connection for given adds may already exist at this point
+ // (so we overwrite old connection)
+ // after reconnect, if previous connections wasn't shutdown yet
+
+ bool inserted2 = ConnectionsById.insert(std::make_pair(connection->ConnectionId, connection)).second;
+ Y_VERIFY(inserted2, "state check: must be inserted (2)");
+
+ SendSnapshotToStatusActor();
+}
+
+void TBusSessionImpl::InsertAcceptorLockAcquired(TAcceptor* acceptor) {
+ //Y_ASSERT(ConnectionsLock.IsLocked());
+
+ Acceptors.push_back(acceptor);
+
+ SendSnapshotToStatusActor();
+}
+
+void TBusSessionImpl::GetConnections(TVector<TRemoteConnectionPtr>* r) {
+ TConnectionsGuard guard(ConnectionsLock);
+ GetConnectionsLockAquired(r);
+}
+
+void TBusSessionImpl::GetAcceptors(TVector<TAcceptorPtr>* r) {
+ TConnectionsGuard guard(ConnectionsLock);
+ GetAcceptorsLockAquired(r);
+}
+
+void TBusSessionImpl::GetConnectionsLockAquired(TVector<TRemoteConnectionPtr>* r) {
+ //Y_ASSERT(ConnectionsLock.IsLocked());
+
+ r->reserve(Connections.size());
+
+ for (auto& connection : Connections) {
+ r->push_back(connection.second);
+ }
+}
+
+void TBusSessionImpl::GetAcceptorsLockAquired(TVector<TAcceptorPtr>* r) {
+ //Y_ASSERT(ConnectionsLock.IsLocked());
+
+ r->reserve(Acceptors.size());
+
+ for (auto& acceptor : Acceptors) {
+ r->push_back(acceptor);
+ }
+}
+
+TRemoteConnectionPtr TBusSessionImpl::GetConnectionById(ui64 id) {
+ TConnectionsGuard guard(ConnectionsLock);
+
+ THashMap<ui64, TRemoteConnectionPtr>::const_iterator it = ConnectionsById.find(id);
+ if (it == ConnectionsById.end()) {
+ return nullptr;
+ } else {
+ return it->second;
+ }
+}
+
+TAcceptorPtr TBusSessionImpl::GetAcceptorById(ui64 id) {
+ TGuard<TMutex> guard(ConnectionsLock);
+
+ for (const auto& Acceptor : Acceptors) {
+ if (Acceptor->AcceptorId == id) {
+ return Acceptor;
+ }
+ }
+
+ return nullptr;
+}
+
+void TBusSessionImpl::InvokeOnError(TNonDestroyingAutoPtr<TBusMessage> message, EMessageStatus status) {
+ message->CheckClean();
+ ErrorHandler->OnError(message, status);
+}
+
+TRemoteConnectionPtr TBusSessionImpl::GetConnection(const TBusSocketAddr& addr, bool create) {
+ TConnectionsGuard guard(ConnectionsLock);
+
+ TAddrRemoteConnections::const_iterator it = Connections.find(addr);
+ if (it != Connections.end()) {
+ return it->second;
+ }
+
+ if (!create) {
+ return TRemoteConnectionPtr();
+ }
+
+ Y_VERIFY(IsSource_, "must be source");
+
+ TRemoteConnectionPtr c(new TRemoteClientConnection(VerifyDynamicCast<TRemoteClientSession*>(this), ++LastConnectionId, addr.ToNetAddr()));
+ InsertConnectionLockAcquired(c.Get());
+
+ return c;
+}
+
+void TBusSessionImpl::Cron() {
+ TVector<TRemoteConnectionPtr> connections;
+ GetConnections(&connections);
+
+ for (const auto& it : connections) {
+ TRemoteConnection* connection = it.Get();
+ if (IsSource_) {
+ VerifyDynamicCast<TRemoteClientConnection*>(connection)->ScheduleTimeoutMessages();
+ } else {
+ VerifyDynamicCast<TRemoteServerConnection*>(connection)->WriterData.TimeToRotateCounters.AddTask();
+ // no schedule: do not rotate if there's no traffic
+ }
+ }
+
+ // status updates are sent without scheduling
+ GetStatusActor()->Schedule();
+
+ Queue->Schedule(IScheduleItemAutoPtr(new TScheduleSession(this, TInstant::Now() + Config.Secret.TimeoutPeriod)));
+}
+
+TString TBusSessionImpl::GetNameInternal() {
+ if (!!Config.Name) {
+ return Config.Name;
+ }
+ return ProtoName;
+}