diff options
author | yazevnul <yazevnul@yandex-team.ru> | 2022-02-10 16:46:46 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:46 +0300 |
commit | 8cbc307de0221f84c80c42dcbe07d40727537e2c (patch) | |
tree | 625d5a673015d1df891e051033e9fcde5c7be4e5 /library/cpp/messagebus/session_impl.cpp | |
parent | 30d1ef3941e0dc835be7609de5ebee66958f215a (diff) | |
download | ydb-8cbc307de0221f84c80c42dcbe07d40727537e2c.tar.gz |
Restoring authorship annotation for <yazevnul@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/session_impl.cpp')
-rw-r--r-- | library/cpp/messagebus/session_impl.cpp | 86 |
1 files changed, 43 insertions, 43 deletions
diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp index ddf9f360c4..1552ee775f 100644 --- a/library/cpp/messagebus/session_impl.cpp +++ b/library/cpp/messagebus/session_impl.cpp @@ -58,28 +58,28 @@ namespace { copy.TotalTimeout = TDuration::Seconds(60).MilliSeconds(); copy.SendTimeout = TDuration::Seconds(15).MilliSeconds(); } else if (copy.TotalTimeout == 0) { - Y_ASSERT(copy.SendTimeout != 0); + Y_ASSERT(copy.SendTimeout != 0); copy.TotalTimeout = config.SendTimeout + TDuration::MilliSeconds(10).MilliSeconds(); } else if (copy.SendTimeout == 0) { - Y_ASSERT(copy.TotalTimeout != 0); + Y_ASSERT(copy.TotalTimeout != 0); if ((ui64)copy.TotalTimeout > (ui64)TDuration::MilliSeconds(10).MilliSeconds()) { copy.SendTimeout = copy.TotalTimeout - TDuration::MilliSeconds(10).MilliSeconds(); } else { copy.SendTimeout = copy.TotalTimeout; } } else { - Y_ASSERT(copy.TotalTimeout != 0); - Y_ASSERT(copy.SendTimeout != 0); + Y_ASSERT(copy.TotalTimeout != 0); + Y_ASSERT(copy.SendTimeout != 0); } if (copy.ConnectTimeout == 0) { copy.ConnectTimeout = copy.SendTimeout; } - Y_VERIFY(copy.SendTimeout > 0, "SendTimeout must be > 0"); - Y_VERIFY(copy.TotalTimeout > 0, "TotalTimeout must be > 0"); - Y_VERIFY(copy.ConnectTimeout > 0, "ConnectTimeout must be > 0"); - Y_VERIFY(copy.TotalTimeout >= copy.SendTimeout, "TotalTimeout must be >= SendTimeout"); + Y_VERIFY(copy.SendTimeout > 0, "SendTimeout must be > 0"); + Y_VERIFY(copy.TotalTimeout > 0, "TotalTimeout must be > 0"); + Y_VERIFY(copy.ConnectTimeout > 0, "ConnectTimeout must be > 0"); + Y_VERIFY(copy.TotalTimeout >= copy.SendTimeout, "TotalTimeout must be >= SendTimeout"); if (!copy.Name) { copy.Name = name; @@ -117,10 +117,10 @@ TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusPro } TBusSessionImpl::~TBusSessionImpl() { - Y_VERIFY(Down); - Y_VERIFY(ShutdownCompleteEvent.WaitT(TDuration::Zero())); - Y_VERIFY(!WriteEventLoop.IsRunning()); - Y_VERIFY(!ReadEventLoop.IsRunning()); + Y_VERIFY(Down); + Y_VERIFY(ShutdownCompleteEvent.WaitT(TDuration::Zero())); + Y_VERIFY(!WriteEventLoop.IsRunning()); + Y_VERIFY(!ReadEventLoop.IsRunning()); } TBusSessionStatus::TBusSessionStatus() @@ -136,7 +136,7 @@ void TBusSessionImpl::Shutdown() { return; } - Y_VERIFY(Queue->IsRunning(), "Session must be shut down prior to queue shutdown"); + Y_VERIFY(Queue->IsRunning(), "Session must be shut down prior to queue shutdown"); TUseAfterFreeCheckerGuard handlerAliveCheckedGuard(ErrorHandler->UseAfterFreeChecker); @@ -161,16 +161,16 @@ void TBusSessionImpl::Shutdown() { Acceptors.clear(); } - for (auto& acceptor : acceptors) { - acceptor->Shutdown(); + for (auto& acceptor : acceptors) { + acceptor->Shutdown(); } // shutdown connections TVector<TRemoteConnectionPtr> cs; GetConnections(&cs); - for (auto& c : cs) { - c->Shutdown(MESSAGE_SHUTDOWN); + for (auto& c : cs) { + c->Shutdown(MESSAGE_SHUTDOWN); } // shutdown connections actor @@ -205,7 +205,7 @@ size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const { } void TBusSessionImpl::GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const { - Y_VERIFY(addrs.size() == results.size(), "input.size != output.size"); + Y_VERIFY(addrs.size() == results.size(), "input.size != output.size"); for (size_t i = 0; i < addrs.size(); ++i) { results[i] = GetInFlightImpl(addrs[i]); } @@ -221,7 +221,7 @@ size_t TBusSessionImpl::GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) c } void TBusSessionImpl::GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const { - Y_VERIFY(addrs.size() == results.size(), "input.size != output.size"); + Y_VERIFY(addrs.size() == results.size(), "input.size != output.size"); for (size_t i = 0; i < addrs.size(); ++i) { results[i] = GetConnectSyscallsNumForTestImpl(addrs[i]); } @@ -232,7 +232,7 @@ void TBusSessionImpl::FillStatus() { TSessionDumpStatus TBusSessionImpl::GetStatusRecordInternal() { // Probably useless, because it returns cached info now - Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(), + Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(), "GetStatus must not be called from executor thread"); TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex); @@ -242,13 +242,13 @@ TSessionDumpStatus TBusSessionImpl::GetStatusRecordInternal() { } TString TBusSessionImpl::GetStatus(ui16 flags) { - Y_UNUSED(flags); + Y_UNUSED(flags); return GetStatusRecordInternal().PrintToString(); } TConnectionStatusMonRecord TBusSessionImpl::GetStatusProtobuf() { - Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(), + Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(), "GetStatus must not be called from executor thread"); TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex); @@ -319,12 +319,12 @@ void TBusSessionImpl::ProcessItem(TConnectionTag, TRemoveTag, TRemoteConnectionP void TBusSessionImpl::ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> snapshot) { for (TVector<TRemoteConnectionPtr>::const_iterator connection = snapshot->Connections.begin(); connection != snapshot->Connections.end(); ++connection) { - Y_ASSERT((*connection)->ConnectionId <= snapshot->LastConnectionId); + Y_ASSERT((*connection)->ConnectionId <= snapshot->LastConnectionId); } for (TVector<TAcceptorPtr>::const_iterator acceptor = snapshot->Acceptors.begin(); acceptor != snapshot->Acceptors.end(); ++acceptor) { - Y_ASSERT((*acceptor)->AcceptorId <= snapshot->LastAcceptorId); + Y_ASSERT((*acceptor)->AcceptorId <= snapshot->LastAcceptorId); } StatusData.ConnectionsAcceptorsSnapshot = snapshot; @@ -471,8 +471,8 @@ void TBusSessionImpl::Act(TConnectionTag) { EShutdownState shutdownState = ConnectionsData.ShutdownState.State.Get(); if (shutdownState == SS_SHUTDOWN_COMPLETE) { - Y_VERIFY(GetRemoveConnectionQueue()->IsEmpty()); - Y_VERIFY(GetOnAcceptQueue()->IsEmpty()); + Y_VERIFY(GetRemoveConnectionQueue()->IsEmpty()); + Y_VERIFY(GetOnAcceptQueue()->IsEmpty()); } GetRemoveConnectionQueue()->DequeueAllLikelyEmpty(); @@ -488,7 +488,7 @@ void TBusSessionImpl::Listen(int port, TBusMessageQueue* q) { } void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q) { - Y_ASSERT(q == Queue); + Y_ASSERT(q == Queue); int actualPort = -1; for (const TBindResult& br : bindTo) { @@ -511,7 +511,7 @@ void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueu } void TBusSessionImpl::SendSnapshotToStatusActor() { - //Y_ASSERT(ConnectionsLock.IsLocked()); + //Y_ASSERT(ConnectionsLock.IsLocked()); TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> snapshot(new TConnectionsAcceptorsSnapshot); GetAcceptorsLockAquired(&snapshot->Acceptors); @@ -523,7 +523,7 @@ void TBusSessionImpl::SendSnapshotToStatusActor() { } void TBusSessionImpl::InsertConnectionLockAcquired(TRemoteConnection* connection) { - //Y_ASSERT(ConnectionsLock.IsLocked()); + //Y_ASSERT(ConnectionsLock.IsLocked()); Connections.insert(std::make_pair(connection->PeerAddrSocketAddr, connection)); // connection for given adds may already exist at this point @@ -531,13 +531,13 @@ void TBusSessionImpl::InsertConnectionLockAcquired(TRemoteConnection* connection // after reconnect, if previous connections wasn't shutdown yet bool inserted2 = ConnectionsById.insert(std::make_pair(connection->ConnectionId, connection)).second; - Y_VERIFY(inserted2, "state check: must be inserted (2)"); + Y_VERIFY(inserted2, "state check: must be inserted (2)"); SendSnapshotToStatusActor(); } void TBusSessionImpl::InsertAcceptorLockAcquired(TAcceptor* acceptor) { - //Y_ASSERT(ConnectionsLock.IsLocked()); + //Y_ASSERT(ConnectionsLock.IsLocked()); Acceptors.push_back(acceptor); @@ -555,22 +555,22 @@ void TBusSessionImpl::GetAcceptors(TVector<TAcceptorPtr>* r) { } void TBusSessionImpl::GetConnectionsLockAquired(TVector<TRemoteConnectionPtr>* r) { - //Y_ASSERT(ConnectionsLock.IsLocked()); + //Y_ASSERT(ConnectionsLock.IsLocked()); r->reserve(Connections.size()); - for (auto& connection : Connections) { - r->push_back(connection.second); + for (auto& connection : Connections) { + r->push_back(connection.second); } } void TBusSessionImpl::GetAcceptorsLockAquired(TVector<TAcceptorPtr>* r) { - //Y_ASSERT(ConnectionsLock.IsLocked()); + //Y_ASSERT(ConnectionsLock.IsLocked()); r->reserve(Acceptors.size()); - for (auto& acceptor : Acceptors) { - r->push_back(acceptor); + for (auto& acceptor : Acceptors) { + r->push_back(acceptor); } } @@ -588,9 +588,9 @@ TRemoteConnectionPtr TBusSessionImpl::GetConnectionById(ui64 id) { TAcceptorPtr TBusSessionImpl::GetAcceptorById(ui64 id) { TGuard<TMutex> guard(ConnectionsLock); - for (const auto& Acceptor : Acceptors) { - if (Acceptor->AcceptorId == id) { - return Acceptor; + for (const auto& Acceptor : Acceptors) { + if (Acceptor->AcceptorId == id) { + return Acceptor; } } @@ -614,7 +614,7 @@ TRemoteConnectionPtr TBusSessionImpl::GetConnection(const TBusSocketAddr& addr, return TRemoteConnectionPtr(); } - Y_VERIFY(IsSource_, "must be source"); + Y_VERIFY(IsSource_, "must be source"); TRemoteConnectionPtr c(new TRemoteClientConnection(VerifyDynamicCast<TRemoteClientSession*>(this), ++LastConnectionId, addr.ToNetAddr())); InsertConnectionLockAcquired(c.Get()); @@ -626,8 +626,8 @@ void TBusSessionImpl::Cron() { TVector<TRemoteConnectionPtr> connections; GetConnections(&connections); - for (const auto& it : connections) { - TRemoteConnection* connection = it.Get(); + for (const auto& it : connections) { + TRemoteConnection* connection = it.Get(); if (IsSource_) { VerifyDynamicCast<TRemoteClientConnection*>(connection)->ScheduleTimeoutMessages(); } else { |