aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/session_impl.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:17 +0300
commitd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch)
treedd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/messagebus/session_impl.cpp
parent72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff)
downloadydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/session_impl.cpp')
-rw-r--r--library/cpp/messagebus/session_impl.cpp54
1 files changed, 27 insertions, 27 deletions
diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp
index cff202c5b8..ddf9f360c4 100644
--- a/library/cpp/messagebus/session_impl.cpp
+++ b/library/cpp/messagebus/session_impl.cpp
@@ -40,10 +40,10 @@ namespace {
}
TConnectionsAcceptorsSnapshot::TConnectionsAcceptorsSnapshot()
- : LastConnectionId(0)
- , LastAcceptorId(0)
-{
-}
+ : LastConnectionId(0)
+ , LastAcceptorId(0)
+{
+}
struct TBusSessionImpl::TImpl {
TRemoteConnectionWriterIncrementalStatus DeadConnectionWriterStatusSummary;
@@ -62,7 +62,7 @@ namespace {
copy.TotalTimeout = config.SendTimeout + TDuration::MilliSeconds(10).MilliSeconds();
} else if (copy.SendTimeout == 0) {
Y_ASSERT(copy.TotalTimeout != 0);
- if ((ui64)copy.TotalTimeout > (ui64)TDuration::MilliSeconds(10).MilliSeconds()) {
+ if ((ui64)copy.TotalTimeout > (ui64)TDuration::MilliSeconds(10).MilliSeconds()) {
copy.SendTimeout = copy.TotalTimeout - TDuration::MilliSeconds(10).MilliSeconds();
} else {
copy.SendTimeout = copy.TotalTimeout;
@@ -90,8 +90,8 @@ namespace {
}
TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusProtocol* proto,
- IBusErrorHandler* handler,
- const TBusSessionConfig& config, const TString& name)
+ IBusErrorHandler* handler,
+ const TBusSessionConfig& config, const TString& name)
: TActor<TBusSessionImpl, TStatusTag>(queue->WorkQueue.Get())
, TActor<TBusSessionImpl, TConnectionTag>(queue->WorkQueue.Get())
, Impl(new TImpl)
@@ -111,7 +111,7 @@ TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusPro
Impl->DeadAcceptorStatusSummary.Summary = true;
ReadEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(ReadEventLoop))));
- WriteEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(WriteEventLoop))));
+ WriteEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(WriteEventLoop))));
Queue->Schedule(IScheduleItemAutoPtr(new TScheduleSession(this, TInstant::Now() + Config.Secret.TimeoutPeriod)));
}
@@ -127,8 +127,8 @@ TBusSessionStatus::TBusSessionStatus()
: InFlightCount(0)
, InFlightSize(0)
, InputPaused(false)
-{
-}
+{
+}
void TBusSessionImpl::Shutdown() {
if (!AtomicCas(&Down, 1, 0)) {
@@ -233,7 +233,7 @@ void TBusSessionImpl::FillStatus() {
TSessionDumpStatus TBusSessionImpl::GetStatusRecordInternal() {
// Probably useless, because it returns cached info now
Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(),
- "GetStatus must not be called from executor thread");
+ "GetStatus must not be called from executor thread");
TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
// TODO: returns zeros for a second after start
@@ -247,9 +247,9 @@ TString TBusSessionImpl::GetStatus(ui16 flags) {
return GetStatusRecordInternal().PrintToString();
}
-TConnectionStatusMonRecord TBusSessionImpl::GetStatusProtobuf() {
+TConnectionStatusMonRecord TBusSessionImpl::GetStatusProtobuf() {
Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(),
- "GetStatus must not be called from executor thread");
+ "GetStatus must not be called from executor thread");
TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
@@ -289,8 +289,8 @@ void TBusSessionImpl::ProcessItem(TConnectionTag, ::NActor::TDefaultTag, const T
}
//if (Connections.find(addr) != Connections.end()) {
- // TODO: it is possible
- // won't be a problem after socket address replaced with id
+ // TODO: it is possible
+ // won't be a problem after socket address replaced with id
//}
TRemoteConnectionPtr c(new TRemoteServerConnection(VerifyDynamicCast<TRemoteServerSession*>(this), ++LastConnectionId, onAccept.addr));
@@ -316,14 +316,14 @@ void TBusSessionImpl::ProcessItem(TConnectionTag, TRemoveTag, TRemoteConnectionP
SendSnapshotToStatusActor();
}
-void TBusSessionImpl::ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> snapshot) {
+void TBusSessionImpl::ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> snapshot) {
for (TVector<TRemoteConnectionPtr>::const_iterator connection = snapshot->Connections.begin();
- connection != snapshot->Connections.end(); ++connection) {
+ connection != snapshot->Connections.end(); ++connection) {
Y_ASSERT((*connection)->ConnectionId <= snapshot->LastConnectionId);
}
for (TVector<TAcceptorPtr>::const_iterator acceptor = snapshot->Acceptors.begin();
- acceptor != snapshot->Acceptors.end(); ++acceptor) {
+ acceptor != snapshot->Acceptors.end(); ++acceptor) {
Y_ASSERT((*acceptor)->AcceptorId <= snapshot->LastAcceptorId);
}
@@ -388,7 +388,7 @@ void TBusSessionImpl::StatusUpdateCachedDump() {
TStringStream ss;
for (TVector<TAcceptorPtr>::const_iterator acceptor = acceptors.begin();
- acceptor != acceptors.end(); ++acceptor) {
+ acceptor != acceptors.end(); ++acceptor) {
const TAcceptorStatus status = (*acceptor)->GranStatus.Listen.Get();
acceptorStatusSummary += status;
@@ -406,7 +406,7 @@ void TBusSessionImpl::StatusUpdateCachedDump() {
TStringStream ss;
for (TVector<TRemoteConnectionPtr>::const_iterator connection = connections.begin();
- connection != connections.end(); ++connection) {
+ connection != connections.end(); ++connection) {
if (connection != connections.begin()) {
ss << "\n";
}
@@ -434,8 +434,8 @@ void TBusSessionImpl::StatusUpdateCachedDump() {
TBusSessionImpl::TStatusData::TStatusData()
: ConnectionsAcceptorsSnapshot(new TConnectionsAcceptorsSnapshot)
-{
-}
+{
+}
void TBusSessionImpl::Act(TStatusTag) {
TInstant now = TInstant::Now();
@@ -463,8 +463,8 @@ void TBusSessionImpl::Act(TStatusTag) {
StatusUpdateCachedDumpIfNecessary(now);
}
-TBusSessionImpl::TConnectionsData::TConnectionsData() {
-}
+TBusSessionImpl::TConnectionsData::TConnectionsData() {
+}
void TBusSessionImpl::Act(TConnectionTag) {
TConnectionsGuard guard(ConnectionsLock);
@@ -487,11 +487,11 @@ void TBusSessionImpl::Listen(int port, TBusMessageQueue* q) {
Listen(BindOnPort(port, Config.ReusePort).second, q);
}
-void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q) {
+void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q) {
Y_ASSERT(q == Queue);
int actualPort = -1;
- for (const TBindResult& br : bindTo) {
+ for (const TBindResult& br : bindTo) {
if (actualPort == -1) {
actualPort = br.Addr.GetPort();
} else {
@@ -513,7 +513,7 @@ void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueu
void TBusSessionImpl::SendSnapshotToStatusActor() {
//Y_ASSERT(ConnectionsLock.IsLocked());
- TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> snapshot(new TConnectionsAcceptorsSnapshot);
+ TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> snapshot(new TConnectionsAcceptorsSnapshot);
GetAcceptorsLockAquired(&snapshot->Acceptors);
GetConnectionsLockAquired(&snapshot->Connections);
snapshot->LastAcceptorId = LastAcceptorId;