diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/messagebus/session_impl.cpp | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/session_impl.cpp')
-rw-r--r-- | library/cpp/messagebus/session_impl.cpp | 54 |
1 files changed, 27 insertions, 27 deletions
diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp index ddf9f360c4..cff202c5b8 100644 --- a/library/cpp/messagebus/session_impl.cpp +++ b/library/cpp/messagebus/session_impl.cpp @@ -40,10 +40,10 @@ namespace { } TConnectionsAcceptorsSnapshot::TConnectionsAcceptorsSnapshot() - : LastConnectionId(0) - , LastAcceptorId(0) -{ -} + : LastConnectionId(0) + , LastAcceptorId(0) +{ +} struct TBusSessionImpl::TImpl { TRemoteConnectionWriterIncrementalStatus DeadConnectionWriterStatusSummary; @@ -62,7 +62,7 @@ namespace { copy.TotalTimeout = config.SendTimeout + TDuration::MilliSeconds(10).MilliSeconds(); } else if (copy.SendTimeout == 0) { Y_ASSERT(copy.TotalTimeout != 0); - if ((ui64)copy.TotalTimeout > (ui64)TDuration::MilliSeconds(10).MilliSeconds()) { + if ((ui64)copy.TotalTimeout > (ui64)TDuration::MilliSeconds(10).MilliSeconds()) { copy.SendTimeout = copy.TotalTimeout - TDuration::MilliSeconds(10).MilliSeconds(); } else { copy.SendTimeout = copy.TotalTimeout; @@ -90,8 +90,8 @@ namespace { } TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusProtocol* proto, - IBusErrorHandler* handler, - const TBusSessionConfig& config, const TString& name) + IBusErrorHandler* handler, + const TBusSessionConfig& config, const TString& name) : TActor<TBusSessionImpl, TStatusTag>(queue->WorkQueue.Get()) , TActor<TBusSessionImpl, TConnectionTag>(queue->WorkQueue.Get()) , Impl(new TImpl) @@ -111,7 +111,7 @@ TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusPro Impl->DeadAcceptorStatusSummary.Summary = true; ReadEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(ReadEventLoop)))); - WriteEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(WriteEventLoop)))); + WriteEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(WriteEventLoop)))); Queue->Schedule(IScheduleItemAutoPtr(new TScheduleSession(this, TInstant::Now() + Config.Secret.TimeoutPeriod))); } @@ -127,8 +127,8 @@ TBusSessionStatus::TBusSessionStatus() : InFlightCount(0) , InFlightSize(0) , InputPaused(false) -{ -} +{ +} void TBusSessionImpl::Shutdown() { if (!AtomicCas(&Down, 1, 0)) { @@ -233,7 +233,7 @@ void TBusSessionImpl::FillStatus() { TSessionDumpStatus TBusSessionImpl::GetStatusRecordInternal() { // Probably useless, because it returns cached info now Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(), - "GetStatus must not be called from executor thread"); + "GetStatus must not be called from executor thread"); TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex); // TODO: returns zeros for a second after start @@ -247,9 +247,9 @@ TString TBusSessionImpl::GetStatus(ui16 flags) { return GetStatusRecordInternal().PrintToString(); } -TConnectionStatusMonRecord TBusSessionImpl::GetStatusProtobuf() { +TConnectionStatusMonRecord TBusSessionImpl::GetStatusProtobuf() { Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(), - "GetStatus must not be called from executor thread"); + "GetStatus must not be called from executor thread"); TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex); @@ -289,8 +289,8 @@ void TBusSessionImpl::ProcessItem(TConnectionTag, ::NActor::TDefaultTag, const T } //if (Connections.find(addr) != Connections.end()) { - // TODO: it is possible - // won't be a problem after socket address replaced with id + // TODO: it is possible + // won't be a problem after socket address replaced with id //} TRemoteConnectionPtr c(new TRemoteServerConnection(VerifyDynamicCast<TRemoteServerSession*>(this), ++LastConnectionId, onAccept.addr)); @@ -316,14 +316,14 @@ void TBusSessionImpl::ProcessItem(TConnectionTag, TRemoveTag, TRemoteConnectionP SendSnapshotToStatusActor(); } -void TBusSessionImpl::ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> snapshot) { +void TBusSessionImpl::ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> snapshot) { for (TVector<TRemoteConnectionPtr>::const_iterator connection = snapshot->Connections.begin(); - connection != snapshot->Connections.end(); ++connection) { + connection != snapshot->Connections.end(); ++connection) { Y_ASSERT((*connection)->ConnectionId <= snapshot->LastConnectionId); } for (TVector<TAcceptorPtr>::const_iterator acceptor = snapshot->Acceptors.begin(); - acceptor != snapshot->Acceptors.end(); ++acceptor) { + acceptor != snapshot->Acceptors.end(); ++acceptor) { Y_ASSERT((*acceptor)->AcceptorId <= snapshot->LastAcceptorId); } @@ -388,7 +388,7 @@ void TBusSessionImpl::StatusUpdateCachedDump() { TStringStream ss; for (TVector<TAcceptorPtr>::const_iterator acceptor = acceptors.begin(); - acceptor != acceptors.end(); ++acceptor) { + acceptor != acceptors.end(); ++acceptor) { const TAcceptorStatus status = (*acceptor)->GranStatus.Listen.Get(); acceptorStatusSummary += status; @@ -406,7 +406,7 @@ void TBusSessionImpl::StatusUpdateCachedDump() { TStringStream ss; for (TVector<TRemoteConnectionPtr>::const_iterator connection = connections.begin(); - connection != connections.end(); ++connection) { + connection != connections.end(); ++connection) { if (connection != connections.begin()) { ss << "\n"; } @@ -434,8 +434,8 @@ void TBusSessionImpl::StatusUpdateCachedDump() { TBusSessionImpl::TStatusData::TStatusData() : ConnectionsAcceptorsSnapshot(new TConnectionsAcceptorsSnapshot) -{ -} +{ +} void TBusSessionImpl::Act(TStatusTag) { TInstant now = TInstant::Now(); @@ -463,8 +463,8 @@ void TBusSessionImpl::Act(TStatusTag) { StatusUpdateCachedDumpIfNecessary(now); } -TBusSessionImpl::TConnectionsData::TConnectionsData() { -} +TBusSessionImpl::TConnectionsData::TConnectionsData() { +} void TBusSessionImpl::Act(TConnectionTag) { TConnectionsGuard guard(ConnectionsLock); @@ -487,11 +487,11 @@ void TBusSessionImpl::Listen(int port, TBusMessageQueue* q) { Listen(BindOnPort(port, Config.ReusePort).second, q); } -void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q) { +void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q) { Y_ASSERT(q == Queue); int actualPort = -1; - for (const TBindResult& br : bindTo) { + for (const TBindResult& br : bindTo) { if (actualPort == -1) { actualPort = br.Addr.GetPort(); } else { @@ -513,7 +513,7 @@ void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueu void TBusSessionImpl::SendSnapshotToStatusActor() { //Y_ASSERT(ConnectionsLock.IsLocked()); - TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> snapshot(new TConnectionsAcceptorsSnapshot); + TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> snapshot(new TConnectionsAcceptorsSnapshot); GetAcceptorsLockAquired(&snapshot->Acceptors); GetConnectionsLockAquired(&snapshot->Connections); snapshot->LastAcceptorId = LastAcceptorId; |