diff options
author | somov <somov@yandex-team.ru> | 2022-02-10 16:45:47 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:47 +0300 |
commit | a5950576e397b1909261050b8c7da16db58f10b1 (patch) | |
tree | 7ba7677f6a4c3e19e2cefab34d16df2c8963b4d4 /library/cpp/messagebus/session_impl.cpp | |
parent | 81eddc8c0b55990194e112b02d127b87d54164a9 (diff) | |
download | ydb-a5950576e397b1909261050b8c7da16db58f10b1.tar.gz |
Restoring authorship annotation for <somov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/session_impl.cpp')
-rw-r--r-- | library/cpp/messagebus/session_impl.cpp | 72 |
1 files changed, 36 insertions, 36 deletions
diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp index ddf9f360c4..76790221ec 100644 --- a/library/cpp/messagebus/session_impl.cpp +++ b/library/cpp/messagebus/session_impl.cpp @@ -14,7 +14,7 @@ using namespace NActor; using namespace NBus; using namespace NBus::NPrivate; using namespace NEventLoop; - + namespace { class TScheduleSession: public IScheduleItem { public: @@ -95,7 +95,7 @@ TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusPro : TActor<TBusSessionImpl, TStatusTag>(queue->WorkQueue.Get()) , TActor<TBusSessionImpl, TConnectionTag>(queue->WorkQueue.Get()) , Impl(new TImpl) - , IsSource_(isSource) + , IsSource_(isSource) , Queue(queue) , Proto(proto) , ProtoName(Proto->GetService()) @@ -106,16 +106,16 @@ TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusPro , ReadEventLoop("rd-el") , LastAcceptorId(0) , LastConnectionId(0) - , Down(0) -{ + , Down(0) +{ Impl->DeadAcceptorStatusSummary.Summary = true; ReadEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(ReadEventLoop)))); WriteEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(WriteEventLoop)))); Queue->Schedule(IScheduleItemAutoPtr(new TScheduleSession(this, TInstant::Now() + Config.Secret.TimeoutPeriod))); -} - +} + TBusSessionImpl::~TBusSessionImpl() { Y_VERIFY(Down); Y_VERIFY(ShutdownCompleteEvent.WaitT(TDuration::Zero())); @@ -160,11 +160,11 @@ void TBusSessionImpl::Shutdown() { TGuard<TMutex> guard(ConnectionsLock); Acceptors.clear(); } - + for (auto& acceptor : acceptors) { acceptor->Shutdown(); - } - + } + // shutdown connections TVector<TRemoteConnectionPtr> cs; GetConnections(&cs); @@ -189,12 +189,12 @@ void TBusSessionImpl::Shutdown() { HandlerUseCountHolder.Reset(); ShutdownCompleteEvent.Signal(); -} - +} + bool TBusSessionImpl::IsDown() { - return static_cast<bool>(AtomicGet(Down)); -} - + return static_cast<bool>(AtomicGet(Down)); +} + size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const { TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false); if (!!conn) { @@ -202,8 +202,8 @@ size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const { } else { return 0; } -} - +} + void TBusSessionImpl::GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const { Y_VERIFY(addrs.size() == results.size(), "input.size != output.size"); for (size_t i = 0; i < addrs.size(); ++i) { @@ -427,7 +427,7 @@ void TBusSessionImpl::StatusUpdateCachedDump() { } r.Config = Config; - + TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex); StatusData.StatusDumpCached = r; } @@ -490,7 +490,7 @@ void TBusSessionImpl::Listen(int port, TBusMessageQueue* q) { void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q) { Y_ASSERT(q == Queue); int actualPort = -1; - + for (const TBindResult& br : bindTo) { if (actualPort == -1) { actualPort = br.Addr.GetPort(); @@ -502,14 +502,14 @@ void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueu } TAcceptorPtr acceptor(new TAcceptor(this, ++LastAcceptorId, br.Socket->Release(), br.Addr)); - + TConnectionsGuard guard(ConnectionsLock); InsertAcceptorLockAcquired(acceptor.Get()); - } + } Config.ListenPort = actualPort; -} - +} + void TBusSessionImpl::SendSnapshotToStatusActor() { //Y_ASSERT(ConnectionsLock.IsLocked()); @@ -604,24 +604,24 @@ void TBusSessionImpl::InvokeOnError(TNonDestroyingAutoPtr<TBusMessage> message, TRemoteConnectionPtr TBusSessionImpl::GetConnection(const TBusSocketAddr& addr, bool create) { TConnectionsGuard guard(ConnectionsLock); - - TAddrRemoteConnections::const_iterator it = Connections.find(addr); - if (it != Connections.end()) { - return it->second; - } - - if (!create) { - return TRemoteConnectionPtr(); - } - + + TAddrRemoteConnections::const_iterator it = Connections.find(addr); + if (it != Connections.end()) { + return it->second; + } + + if (!create) { + return TRemoteConnectionPtr(); + } + Y_VERIFY(IsSource_, "must be source"); TRemoteConnectionPtr c(new TRemoteClientConnection(VerifyDynamicCast<TRemoteClientSession*>(this), ++LastConnectionId, addr.ToNetAddr())); InsertConnectionLockAcquired(c.Get()); - - return c; -} - + + return c; +} + void TBusSessionImpl::Cron() { TVector<TRemoteConnectionPtr> connections; GetConnections(&connections); |