aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/session_impl.cpp
diff options
context:
space:
mode:
authorsomov <somov@yandex-team.ru>2022-02-10 16:45:49 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:49 +0300
commit7489e4682331202b9c7d863c0898eb83d7b12c2b (patch)
tree9142afc54d335ea52910662635b898e79e192e49 /library/cpp/messagebus/session_impl.cpp
parenta5950576e397b1909261050b8c7da16db58f10b1 (diff)
downloadydb-7489e4682331202b9c7d863c0898eb83d7b12c2b.tar.gz
Restoring authorship annotation for <somov@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/session_impl.cpp')
-rw-r--r--library/cpp/messagebus/session_impl.cpp72
1 files changed, 36 insertions, 36 deletions
diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp
index 76790221ec..ddf9f360c4 100644
--- a/library/cpp/messagebus/session_impl.cpp
+++ b/library/cpp/messagebus/session_impl.cpp
@@ -14,7 +14,7 @@ using namespace NActor;
using namespace NBus;
using namespace NBus::NPrivate;
using namespace NEventLoop;
-
+
namespace {
class TScheduleSession: public IScheduleItem {
public:
@@ -95,7 +95,7 @@ TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusPro
: TActor<TBusSessionImpl, TStatusTag>(queue->WorkQueue.Get())
, TActor<TBusSessionImpl, TConnectionTag>(queue->WorkQueue.Get())
, Impl(new TImpl)
- , IsSource_(isSource)
+ , IsSource_(isSource)
, Queue(queue)
, Proto(proto)
, ProtoName(Proto->GetService())
@@ -106,16 +106,16 @@ TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusPro
, ReadEventLoop("rd-el")
, LastAcceptorId(0)
, LastConnectionId(0)
- , Down(0)
-{
+ , Down(0)
+{
Impl->DeadAcceptorStatusSummary.Summary = true;
ReadEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(ReadEventLoop))));
WriteEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(WriteEventLoop))));
Queue->Schedule(IScheduleItemAutoPtr(new TScheduleSession(this, TInstant::Now() + Config.Secret.TimeoutPeriod)));
-}
-
+}
+
TBusSessionImpl::~TBusSessionImpl() {
Y_VERIFY(Down);
Y_VERIFY(ShutdownCompleteEvent.WaitT(TDuration::Zero()));
@@ -160,11 +160,11 @@ void TBusSessionImpl::Shutdown() {
TGuard<TMutex> guard(ConnectionsLock);
Acceptors.clear();
}
-
+
for (auto& acceptor : acceptors) {
acceptor->Shutdown();
- }
-
+ }
+
// shutdown connections
TVector<TRemoteConnectionPtr> cs;
GetConnections(&cs);
@@ -189,12 +189,12 @@ void TBusSessionImpl::Shutdown() {
HandlerUseCountHolder.Reset();
ShutdownCompleteEvent.Signal();
-}
-
+}
+
bool TBusSessionImpl::IsDown() {
- return static_cast<bool>(AtomicGet(Down));
-}
-
+ return static_cast<bool>(AtomicGet(Down));
+}
+
size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const {
TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false);
if (!!conn) {
@@ -202,8 +202,8 @@ size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const {
} else {
return 0;
}
-}
-
+}
+
void TBusSessionImpl::GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const {
Y_VERIFY(addrs.size() == results.size(), "input.size != output.size");
for (size_t i = 0; i < addrs.size(); ++i) {
@@ -427,7 +427,7 @@ void TBusSessionImpl::StatusUpdateCachedDump() {
}
r.Config = Config;
-
+
TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
StatusData.StatusDumpCached = r;
}
@@ -490,7 +490,7 @@ void TBusSessionImpl::Listen(int port, TBusMessageQueue* q) {
void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q) {
Y_ASSERT(q == Queue);
int actualPort = -1;
-
+
for (const TBindResult& br : bindTo) {
if (actualPort == -1) {
actualPort = br.Addr.GetPort();
@@ -502,14 +502,14 @@ void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueu
}
TAcceptorPtr acceptor(new TAcceptor(this, ++LastAcceptorId, br.Socket->Release(), br.Addr));
-
+
TConnectionsGuard guard(ConnectionsLock);
InsertAcceptorLockAcquired(acceptor.Get());
- }
+ }
Config.ListenPort = actualPort;
-}
-
+}
+
void TBusSessionImpl::SendSnapshotToStatusActor() {
//Y_ASSERT(ConnectionsLock.IsLocked());
@@ -604,24 +604,24 @@ void TBusSessionImpl::InvokeOnError(TNonDestroyingAutoPtr<TBusMessage> message,
TRemoteConnectionPtr TBusSessionImpl::GetConnection(const TBusSocketAddr& addr, bool create) {
TConnectionsGuard guard(ConnectionsLock);
-
- TAddrRemoteConnections::const_iterator it = Connections.find(addr);
- if (it != Connections.end()) {
- return it->second;
- }
-
- if (!create) {
- return TRemoteConnectionPtr();
- }
-
+
+ TAddrRemoteConnections::const_iterator it = Connections.find(addr);
+ if (it != Connections.end()) {
+ return it->second;
+ }
+
+ if (!create) {
+ return TRemoteConnectionPtr();
+ }
+
Y_VERIFY(IsSource_, "must be source");
TRemoteConnectionPtr c(new TRemoteClientConnection(VerifyDynamicCast<TRemoteClientSession*>(this), ++LastConnectionId, addr.ToNetAddr()));
InsertConnectionLockAcquired(c.Get());
-
- return c;
-}
-
+
+ return c;
+}
+
void TBusSessionImpl::Cron() {
TVector<TRemoteConnectionPtr> connections;
GetConnections(&connections);