aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/session_impl.cpp
diff options
context:
space:
mode:
authoryazevnul <yazevnul@yandex-team.ru>2022-02-10 16:46:46 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:46 +0300
commit8cbc307de0221f84c80c42dcbe07d40727537e2c (patch)
tree625d5a673015d1df891e051033e9fcde5c7be4e5 /library/cpp/messagebus/session_impl.cpp
parent30d1ef3941e0dc835be7609de5ebee66958f215a (diff)
downloadydb-8cbc307de0221f84c80c42dcbe07d40727537e2c.tar.gz
Restoring authorship annotation for <yazevnul@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/session_impl.cpp')
-rw-r--r--library/cpp/messagebus/session_impl.cpp86
1 files changed, 43 insertions, 43 deletions
diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp
index ddf9f360c4..1552ee775f 100644
--- a/library/cpp/messagebus/session_impl.cpp
+++ b/library/cpp/messagebus/session_impl.cpp
@@ -58,28 +58,28 @@ namespace {
copy.TotalTimeout = TDuration::Seconds(60).MilliSeconds();
copy.SendTimeout = TDuration::Seconds(15).MilliSeconds();
} else if (copy.TotalTimeout == 0) {
- Y_ASSERT(copy.SendTimeout != 0);
+ Y_ASSERT(copy.SendTimeout != 0);
copy.TotalTimeout = config.SendTimeout + TDuration::MilliSeconds(10).MilliSeconds();
} else if (copy.SendTimeout == 0) {
- Y_ASSERT(copy.TotalTimeout != 0);
+ Y_ASSERT(copy.TotalTimeout != 0);
if ((ui64)copy.TotalTimeout > (ui64)TDuration::MilliSeconds(10).MilliSeconds()) {
copy.SendTimeout = copy.TotalTimeout - TDuration::MilliSeconds(10).MilliSeconds();
} else {
copy.SendTimeout = copy.TotalTimeout;
}
} else {
- Y_ASSERT(copy.TotalTimeout != 0);
- Y_ASSERT(copy.SendTimeout != 0);
+ Y_ASSERT(copy.TotalTimeout != 0);
+ Y_ASSERT(copy.SendTimeout != 0);
}
if (copy.ConnectTimeout == 0) {
copy.ConnectTimeout = copy.SendTimeout;
}
- Y_VERIFY(copy.SendTimeout > 0, "SendTimeout must be > 0");
- Y_VERIFY(copy.TotalTimeout > 0, "TotalTimeout must be > 0");
- Y_VERIFY(copy.ConnectTimeout > 0, "ConnectTimeout must be > 0");
- Y_VERIFY(copy.TotalTimeout >= copy.SendTimeout, "TotalTimeout must be >= SendTimeout");
+ Y_VERIFY(copy.SendTimeout > 0, "SendTimeout must be > 0");
+ Y_VERIFY(copy.TotalTimeout > 0, "TotalTimeout must be > 0");
+ Y_VERIFY(copy.ConnectTimeout > 0, "ConnectTimeout must be > 0");
+ Y_VERIFY(copy.TotalTimeout >= copy.SendTimeout, "TotalTimeout must be >= SendTimeout");
if (!copy.Name) {
copy.Name = name;
@@ -117,10 +117,10 @@ TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusPro
}
TBusSessionImpl::~TBusSessionImpl() {
- Y_VERIFY(Down);
- Y_VERIFY(ShutdownCompleteEvent.WaitT(TDuration::Zero()));
- Y_VERIFY(!WriteEventLoop.IsRunning());
- Y_VERIFY(!ReadEventLoop.IsRunning());
+ Y_VERIFY(Down);
+ Y_VERIFY(ShutdownCompleteEvent.WaitT(TDuration::Zero()));
+ Y_VERIFY(!WriteEventLoop.IsRunning());
+ Y_VERIFY(!ReadEventLoop.IsRunning());
}
TBusSessionStatus::TBusSessionStatus()
@@ -136,7 +136,7 @@ void TBusSessionImpl::Shutdown() {
return;
}
- Y_VERIFY(Queue->IsRunning(), "Session must be shut down prior to queue shutdown");
+ Y_VERIFY(Queue->IsRunning(), "Session must be shut down prior to queue shutdown");
TUseAfterFreeCheckerGuard handlerAliveCheckedGuard(ErrorHandler->UseAfterFreeChecker);
@@ -161,16 +161,16 @@ void TBusSessionImpl::Shutdown() {
Acceptors.clear();
}
- for (auto& acceptor : acceptors) {
- acceptor->Shutdown();
+ for (auto& acceptor : acceptors) {
+ acceptor->Shutdown();
}
// shutdown connections
TVector<TRemoteConnectionPtr> cs;
GetConnections(&cs);
- for (auto& c : cs) {
- c->Shutdown(MESSAGE_SHUTDOWN);
+ for (auto& c : cs) {
+ c->Shutdown(MESSAGE_SHUTDOWN);
}
// shutdown connections actor
@@ -205,7 +205,7 @@ size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const {
}
void TBusSessionImpl::GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const {
- Y_VERIFY(addrs.size() == results.size(), "input.size != output.size");
+ Y_VERIFY(addrs.size() == results.size(), "input.size != output.size");
for (size_t i = 0; i < addrs.size(); ++i) {
results[i] = GetInFlightImpl(addrs[i]);
}
@@ -221,7 +221,7 @@ size_t TBusSessionImpl::GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) c
}
void TBusSessionImpl::GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const {
- Y_VERIFY(addrs.size() == results.size(), "input.size != output.size");
+ Y_VERIFY(addrs.size() == results.size(), "input.size != output.size");
for (size_t i = 0; i < addrs.size(); ++i) {
results[i] = GetConnectSyscallsNumForTestImpl(addrs[i]);
}
@@ -232,7 +232,7 @@ void TBusSessionImpl::FillStatus() {
TSessionDumpStatus TBusSessionImpl::GetStatusRecordInternal() {
// Probably useless, because it returns cached info now
- Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(),
+ Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(),
"GetStatus must not be called from executor thread");
TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
@@ -242,13 +242,13 @@ TSessionDumpStatus TBusSessionImpl::GetStatusRecordInternal() {
}
TString TBusSessionImpl::GetStatus(ui16 flags) {
- Y_UNUSED(flags);
+ Y_UNUSED(flags);
return GetStatusRecordInternal().PrintToString();
}
TConnectionStatusMonRecord TBusSessionImpl::GetStatusProtobuf() {
- Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(),
+ Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(),
"GetStatus must not be called from executor thread");
TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
@@ -319,12 +319,12 @@ void TBusSessionImpl::ProcessItem(TConnectionTag, TRemoveTag, TRemoteConnectionP
void TBusSessionImpl::ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> snapshot) {
for (TVector<TRemoteConnectionPtr>::const_iterator connection = snapshot->Connections.begin();
connection != snapshot->Connections.end(); ++connection) {
- Y_ASSERT((*connection)->ConnectionId <= snapshot->LastConnectionId);
+ Y_ASSERT((*connection)->ConnectionId <= snapshot->LastConnectionId);
}
for (TVector<TAcceptorPtr>::const_iterator acceptor = snapshot->Acceptors.begin();
acceptor != snapshot->Acceptors.end(); ++acceptor) {
- Y_ASSERT((*acceptor)->AcceptorId <= snapshot->LastAcceptorId);
+ Y_ASSERT((*acceptor)->AcceptorId <= snapshot->LastAcceptorId);
}
StatusData.ConnectionsAcceptorsSnapshot = snapshot;
@@ -471,8 +471,8 @@ void TBusSessionImpl::Act(TConnectionTag) {
EShutdownState shutdownState = ConnectionsData.ShutdownState.State.Get();
if (shutdownState == SS_SHUTDOWN_COMPLETE) {
- Y_VERIFY(GetRemoveConnectionQueue()->IsEmpty());
- Y_VERIFY(GetOnAcceptQueue()->IsEmpty());
+ Y_VERIFY(GetRemoveConnectionQueue()->IsEmpty());
+ Y_VERIFY(GetOnAcceptQueue()->IsEmpty());
}
GetRemoveConnectionQueue()->DequeueAllLikelyEmpty();
@@ -488,7 +488,7 @@ void TBusSessionImpl::Listen(int port, TBusMessageQueue* q) {
}
void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q) {
- Y_ASSERT(q == Queue);
+ Y_ASSERT(q == Queue);
int actualPort = -1;
for (const TBindResult& br : bindTo) {
@@ -511,7 +511,7 @@ void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueu
}
void TBusSessionImpl::SendSnapshotToStatusActor() {
- //Y_ASSERT(ConnectionsLock.IsLocked());
+ //Y_ASSERT(ConnectionsLock.IsLocked());
TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> snapshot(new TConnectionsAcceptorsSnapshot);
GetAcceptorsLockAquired(&snapshot->Acceptors);
@@ -523,7 +523,7 @@ void TBusSessionImpl::SendSnapshotToStatusActor() {
}
void TBusSessionImpl::InsertConnectionLockAcquired(TRemoteConnection* connection) {
- //Y_ASSERT(ConnectionsLock.IsLocked());
+ //Y_ASSERT(ConnectionsLock.IsLocked());
Connections.insert(std::make_pair(connection->PeerAddrSocketAddr, connection));
// connection for given adds may already exist at this point
@@ -531,13 +531,13 @@ void TBusSessionImpl::InsertConnectionLockAcquired(TRemoteConnection* connection
// after reconnect, if previous connections wasn't shutdown yet
bool inserted2 = ConnectionsById.insert(std::make_pair(connection->ConnectionId, connection)).second;
- Y_VERIFY(inserted2, "state check: must be inserted (2)");
+ Y_VERIFY(inserted2, "state check: must be inserted (2)");
SendSnapshotToStatusActor();
}
void TBusSessionImpl::InsertAcceptorLockAcquired(TAcceptor* acceptor) {
- //Y_ASSERT(ConnectionsLock.IsLocked());
+ //Y_ASSERT(ConnectionsLock.IsLocked());
Acceptors.push_back(acceptor);
@@ -555,22 +555,22 @@ void TBusSessionImpl::GetAcceptors(TVector<TAcceptorPtr>* r) {
}
void TBusSessionImpl::GetConnectionsLockAquired(TVector<TRemoteConnectionPtr>* r) {
- //Y_ASSERT(ConnectionsLock.IsLocked());
+ //Y_ASSERT(ConnectionsLock.IsLocked());
r->reserve(Connections.size());
- for (auto& connection : Connections) {
- r->push_back(connection.second);
+ for (auto& connection : Connections) {
+ r->push_back(connection.second);
}
}
void TBusSessionImpl::GetAcceptorsLockAquired(TVector<TAcceptorPtr>* r) {
- //Y_ASSERT(ConnectionsLock.IsLocked());
+ //Y_ASSERT(ConnectionsLock.IsLocked());
r->reserve(Acceptors.size());
- for (auto& acceptor : Acceptors) {
- r->push_back(acceptor);
+ for (auto& acceptor : Acceptors) {
+ r->push_back(acceptor);
}
}
@@ -588,9 +588,9 @@ TRemoteConnectionPtr TBusSessionImpl::GetConnectionById(ui64 id) {
TAcceptorPtr TBusSessionImpl::GetAcceptorById(ui64 id) {
TGuard<TMutex> guard(ConnectionsLock);
- for (const auto& Acceptor : Acceptors) {
- if (Acceptor->AcceptorId == id) {
- return Acceptor;
+ for (const auto& Acceptor : Acceptors) {
+ if (Acceptor->AcceptorId == id) {
+ return Acceptor;
}
}
@@ -614,7 +614,7 @@ TRemoteConnectionPtr TBusSessionImpl::GetConnection(const TBusSocketAddr& addr,
return TRemoteConnectionPtr();
}
- Y_VERIFY(IsSource_, "must be source");
+ Y_VERIFY(IsSource_, "must be source");
TRemoteConnectionPtr c(new TRemoteClientConnection(VerifyDynamicCast<TRemoteClientSession*>(this), ++LastConnectionId, addr.ToNetAddr()));
InsertConnectionLockAcquired(c.Get());
@@ -626,8 +626,8 @@ void TBusSessionImpl::Cron() {
TVector<TRemoteConnectionPtr> connections;
GetConnections(&connections);
- for (const auto& it : connections) {
- TRemoteConnection* connection = it.Get();
+ for (const auto& it : connections) {
+ TRemoteConnection* connection = it.Get();
if (IsSource_) {
VerifyDynamicCast<TRemoteClientConnection*>(connection)->ScheduleTimeoutMessages();
} else {