aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/session_impl.cpp
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commit1f553f46fb4f3c5eec631352cdd900a0709016af (patch)
treea231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/session_impl.cpp
parentc4de7efdedc25b49cbea74bd589eecb61b55b60a (diff)
downloadydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/session_impl.cpp')
-rw-r--r--library/cpp/messagebus/session_impl.cpp936
1 files changed, 468 insertions, 468 deletions
diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp
index ddf9f360c4..fd123f7dd5 100644
--- a/library/cpp/messagebus/session_impl.cpp
+++ b/library/cpp/messagebus/session_impl.cpp
@@ -1,216 +1,216 @@
#include "session_impl.h"
-
+
#include "acceptor.h"
-#include "network.h"
+#include "network.h"
#include "remote_client_connection.h"
-#include "remote_client_session.h"
-#include "remote_server_connection.h"
+#include "remote_client_session.h"
+#include "remote_server_connection.h"
#include "remote_server_session.h"
#include "misc/weak_ptr.h"
-
+
#include <util/generic/cast.h>
-using namespace NActor;
-using namespace NBus;
-using namespace NBus::NPrivate;
-using namespace NEventLoop;
-
-namespace {
- class TScheduleSession: public IScheduleItem {
- public:
- TScheduleSession(TBusSessionImpl* session, TInstant deadline)
- : IScheduleItem(deadline)
- , Session(session)
- , SessionImpl(session)
- {
- }
-
+using namespace NActor;
+using namespace NBus;
+using namespace NBus::NPrivate;
+using namespace NEventLoop;
+
+namespace {
+ class TScheduleSession: public IScheduleItem {
+ public:
+ TScheduleSession(TBusSessionImpl* session, TInstant deadline)
+ : IScheduleItem(deadline)
+ , Session(session)
+ , SessionImpl(session)
+ {
+ }
+
void Do() override {
- TIntrusivePtr<TBusSession> session = Session.Get();
- if (!!session) {
- SessionImpl->Cron();
- }
- }
-
- private:
- TWeakPtr<TBusSession> Session;
- // Work around TWeakPtr limitation
- TBusSessionImpl* SessionImpl;
- };
-}
-
-TConnectionsAcceptorsSnapshot::TConnectionsAcceptorsSnapshot()
+ TIntrusivePtr<TBusSession> session = Session.Get();
+ if (!!session) {
+ SessionImpl->Cron();
+ }
+ }
+
+ private:
+ TWeakPtr<TBusSession> Session;
+ // Work around TWeakPtr limitation
+ TBusSessionImpl* SessionImpl;
+ };
+}
+
+TConnectionsAcceptorsSnapshot::TConnectionsAcceptorsSnapshot()
: LastConnectionId(0)
, LastAcceptorId(0)
{
}
-
-struct TBusSessionImpl::TImpl {
- TRemoteConnectionWriterIncrementalStatus DeadConnectionWriterStatusSummary;
- TRemoteConnectionReaderIncrementalStatus DeadConnectionReaderStatusSummary;
- TAcceptorStatus DeadAcceptorStatusSummary;
-};
-
-namespace {
+
+struct TBusSessionImpl::TImpl {
+ TRemoteConnectionWriterIncrementalStatus DeadConnectionWriterStatusSummary;
+ TRemoteConnectionReaderIncrementalStatus DeadConnectionReaderStatusSummary;
+ TAcceptorStatus DeadAcceptorStatusSummary;
+};
+
+namespace {
TBusSessionConfig SessionConfigFillDefaults(const TBusSessionConfig& config, const TString& name) {
- TBusSessionConfig copy = config;
- if (copy.TotalTimeout == 0 && copy.SendTimeout == 0) {
- copy.TotalTimeout = TDuration::Seconds(60).MilliSeconds();
- copy.SendTimeout = TDuration::Seconds(15).MilliSeconds();
- } else if (copy.TotalTimeout == 0) {
+ TBusSessionConfig copy = config;
+ if (copy.TotalTimeout == 0 && copy.SendTimeout == 0) {
+ copy.TotalTimeout = TDuration::Seconds(60).MilliSeconds();
+ copy.SendTimeout = TDuration::Seconds(15).MilliSeconds();
+ } else if (copy.TotalTimeout == 0) {
Y_ASSERT(copy.SendTimeout != 0);
- copy.TotalTimeout = config.SendTimeout + TDuration::MilliSeconds(10).MilliSeconds();
- } else if (copy.SendTimeout == 0) {
+ copy.TotalTimeout = config.SendTimeout + TDuration::MilliSeconds(10).MilliSeconds();
+ } else if (copy.SendTimeout == 0) {
Y_ASSERT(copy.TotalTimeout != 0);
if ((ui64)copy.TotalTimeout > (ui64)TDuration::MilliSeconds(10).MilliSeconds()) {
- copy.SendTimeout = copy.TotalTimeout - TDuration::MilliSeconds(10).MilliSeconds();
- } else {
- copy.SendTimeout = copy.TotalTimeout;
- }
- } else {
+ copy.SendTimeout = copy.TotalTimeout - TDuration::MilliSeconds(10).MilliSeconds();
+ } else {
+ copy.SendTimeout = copy.TotalTimeout;
+ }
+ } else {
Y_ASSERT(copy.TotalTimeout != 0);
Y_ASSERT(copy.SendTimeout != 0);
- }
-
- if (copy.ConnectTimeout == 0) {
- copy.ConnectTimeout = copy.SendTimeout;
- }
-
+ }
+
+ if (copy.ConnectTimeout == 0) {
+ copy.ConnectTimeout = copy.SendTimeout;
+ }
+
Y_VERIFY(copy.SendTimeout > 0, "SendTimeout must be > 0");
Y_VERIFY(copy.TotalTimeout > 0, "TotalTimeout must be > 0");
Y_VERIFY(copy.ConnectTimeout > 0, "ConnectTimeout must be > 0");
Y_VERIFY(copy.TotalTimeout >= copy.SendTimeout, "TotalTimeout must be >= SendTimeout");
-
- if (!copy.Name) {
- copy.Name = name;
- }
-
- return copy;
- }
-}
-
-TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusProtocol* proto,
+
+ if (!copy.Name) {
+ copy.Name = name;
+ }
+
+ return copy;
+ }
+}
+
+TBusSessionImpl::TBusSessionImpl(bool isSource, TBusMessageQueue* queue, TBusProtocol* proto,
IBusErrorHandler* handler,
const TBusSessionConfig& config, const TString& name)
- : TActor<TBusSessionImpl, TStatusTag>(queue->WorkQueue.Get())
- , TActor<TBusSessionImpl, TConnectionTag>(queue->WorkQueue.Get())
- , Impl(new TImpl)
+ : TActor<TBusSessionImpl, TStatusTag>(queue->WorkQueue.Get())
+ , TActor<TBusSessionImpl, TConnectionTag>(queue->WorkQueue.Get())
+ , Impl(new TImpl)
, IsSource_(isSource)
- , Queue(queue)
- , Proto(proto)
- , ProtoName(Proto->GetService())
- , ErrorHandler(handler)
- , HandlerUseCountHolder(&handler->UseCountChecker)
- , Config(SessionConfigFillDefaults(config, name))
- , WriteEventLoop("wr-el")
- , ReadEventLoop("rd-el")
- , LastAcceptorId(0)
- , LastConnectionId(0)
+ , Queue(queue)
+ , Proto(proto)
+ , ProtoName(Proto->GetService())
+ , ErrorHandler(handler)
+ , HandlerUseCountHolder(&handler->UseCountChecker)
+ , Config(SessionConfigFillDefaults(config, name))
+ , WriteEventLoop("wr-el")
+ , ReadEventLoop("rd-el")
+ , LastAcceptorId(0)
+ , LastConnectionId(0)
, Down(0)
{
- Impl->DeadAcceptorStatusSummary.Summary = true;
-
+ Impl->DeadAcceptorStatusSummary.Summary = true;
+
ReadEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(ReadEventLoop))));
WriteEventLoopThread.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TEventLoop::Run, std::ref(WriteEventLoop))));
-
- Queue->Schedule(IScheduleItemAutoPtr(new TScheduleSession(this, TInstant::Now() + Config.Secret.TimeoutPeriod)));
+
+ Queue->Schedule(IScheduleItemAutoPtr(new TScheduleSession(this, TInstant::Now() + Config.Secret.TimeoutPeriod)));
}
-TBusSessionImpl::~TBusSessionImpl() {
+TBusSessionImpl::~TBusSessionImpl() {
Y_VERIFY(Down);
Y_VERIFY(ShutdownCompleteEvent.WaitT(TDuration::Zero()));
Y_VERIFY(!WriteEventLoop.IsRunning());
Y_VERIFY(!ReadEventLoop.IsRunning());
-}
-
-TBusSessionStatus::TBusSessionStatus()
- : InFlightCount(0)
- , InFlightSize(0)
- , InputPaused(false)
+}
+
+TBusSessionStatus::TBusSessionStatus()
+ : InFlightCount(0)
+ , InFlightSize(0)
+ , InputPaused(false)
{
}
-
-void TBusSessionImpl::Shutdown() {
- if (!AtomicCas(&Down, 1, 0)) {
- ShutdownCompleteEvent.WaitI();
- return;
- }
-
+
+void TBusSessionImpl::Shutdown() {
+ if (!AtomicCas(&Down, 1, 0)) {
+ ShutdownCompleteEvent.WaitI();
+ return;
+ }
+
Y_VERIFY(Queue->IsRunning(), "Session must be shut down prior to queue shutdown");
-
- TUseAfterFreeCheckerGuard handlerAliveCheckedGuard(ErrorHandler->UseAfterFreeChecker);
-
- // For legacy clients that don't use smart pointers
- TIntrusivePtr<TBusSessionImpl> thiz(this);
-
- Queue->Remove(this);
-
- // shutdown event loops first, so they won't send more events
- // to acceptors and connections
- ReadEventLoop.Stop();
- WriteEventLoop.Stop();
- ReadEventLoopThread->Get();
- WriteEventLoopThread->Get();
-
- // shutdown acceptors before connections
- // so they won't create more connections
+
+ TUseAfterFreeCheckerGuard handlerAliveCheckedGuard(ErrorHandler->UseAfterFreeChecker);
+
+ // For legacy clients that don't use smart pointers
+ TIntrusivePtr<TBusSessionImpl> thiz(this);
+
+ Queue->Remove(this);
+
+ // shutdown event loops first, so they won't send more events
+ // to acceptors and connections
+ ReadEventLoop.Stop();
+ WriteEventLoop.Stop();
+ ReadEventLoopThread->Get();
+ WriteEventLoopThread->Get();
+
+ // shutdown acceptors before connections
+ // so they won't create more connections
TVector<TAcceptorPtr> acceptors;
- GetAcceptors(&acceptors);
- {
- TGuard<TMutex> guard(ConnectionsLock);
- Acceptors.clear();
- }
+ GetAcceptors(&acceptors);
+ {
+ TGuard<TMutex> guard(ConnectionsLock);
+ Acceptors.clear();
+ }
for (auto& acceptor : acceptors) {
acceptor->Shutdown();
}
- // shutdown connections
+ // shutdown connections
TVector<TRemoteConnectionPtr> cs;
- GetConnections(&cs);
-
+ GetConnections(&cs);
+
for (auto& c : cs) {
c->Shutdown(MESSAGE_SHUTDOWN);
- }
-
- // shutdown connections actor
- // must shutdown after connections destroyed
- ConnectionsData.ShutdownState.ShutdownCommand();
- GetConnectionsActor()->Schedule();
- ConnectionsData.ShutdownState.ShutdownComplete.WaitI();
-
- // finally shutdown status actor
- StatusData.ShutdownState.ShutdownCommand();
- GetStatusActor()->Schedule();
- StatusData.ShutdownState.ShutdownComplete.WaitI();
-
- // Make sure no one references IMessageHandler after Shutdown()
- JobCount.WaitForZero();
- HandlerUseCountHolder.Reset();
-
- ShutdownCompleteEvent.Signal();
-}
-
-bool TBusSessionImpl::IsDown() {
+ }
+
+ // shutdown connections actor
+ // must shutdown after connections destroyed
+ ConnectionsData.ShutdownState.ShutdownCommand();
+ GetConnectionsActor()->Schedule();
+ ConnectionsData.ShutdownState.ShutdownComplete.WaitI();
+
+ // finally shutdown status actor
+ StatusData.ShutdownState.ShutdownCommand();
+ GetStatusActor()->Schedule();
+ StatusData.ShutdownState.ShutdownComplete.WaitI();
+
+ // Make sure no one references IMessageHandler after Shutdown()
+ JobCount.WaitForZero();
+ HandlerUseCountHolder.Reset();
+
+ ShutdownCompleteEvent.Signal();
+}
+
+bool TBusSessionImpl::IsDown() {
return static_cast<bool>(AtomicGet(Down));
}
-size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const {
- TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false);
- if (!!conn) {
- return conn->GetInFlight();
- } else {
- return 0;
- }
+size_t TBusSessionImpl::GetInFlightImpl(const TNetAddr& addr) const {
+ TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false);
+ if (!!conn) {
+ return conn->GetInFlight();
+ } else {
+ return 0;
+ }
}
void TBusSessionImpl::GetInFlightBulk(TArrayRef<const TNetAddr> addrs, TArrayRef<size_t> results) const {
Y_VERIFY(addrs.size() == results.size(), "input.size != output.size");
- for (size_t i = 0; i < addrs.size(); ++i) {
- results[i] = GetInFlightImpl(addrs[i]);
- }
-}
-
+ for (size_t i = 0; i < addrs.size(); ++i) {
+ results[i] = GetInFlightImpl(addrs[i]);
+ }
+}
+
size_t TBusSessionImpl::GetConnectSyscallsNumForTestImpl(const TNetAddr& addr) const {
TRemoteConnectionPtr conn = const_cast<TBusSessionImpl*>(this)->GetConnection(addr, false);
if (!!conn) {
@@ -227,26 +227,26 @@ void TBusSessionImpl::GetConnectSyscallsNumBulkForTest(TArrayRef<const TNetAddr>
}
}
-void TBusSessionImpl::FillStatus() {
-}
-
-TSessionDumpStatus TBusSessionImpl::GetStatusRecordInternal() {
- // Probably useless, because it returns cached info now
+void TBusSessionImpl::FillStatus() {
+}
+
+TSessionDumpStatus TBusSessionImpl::GetStatusRecordInternal() {
+ // Probably useless, because it returns cached info now
Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(),
"GetStatus must not be called from executor thread");
-
- TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
- // TODO: returns zeros for a second after start
- // (until first cron)
- return StatusData.StatusDumpCached;
-}
-
+
+ TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
+ // TODO: returns zeros for a second after start
+ // (until first cron)
+ return StatusData.StatusDumpCached;
+}
+
TString TBusSessionImpl::GetStatus(ui16 flags) {
Y_UNUSED(flags);
-
- return GetStatusRecordInternal().PrintToString();
-}
-
+
+ return GetStatusRecordInternal().PrintToString();
+}
+
TConnectionStatusMonRecord TBusSessionImpl::GetStatusProtobuf() {
Y_VERIFY(!Queue->GetExecutor()->IsInExecutorThread(),
"GetStatus must not be called from executor thread");
@@ -257,233 +257,233 @@ TConnectionStatusMonRecord TBusSessionImpl::GetStatusProtobuf() {
}
TString TBusSessionImpl::GetStatusSingleLine() {
- TSessionDumpStatus status = GetStatusRecordInternal();
-
- TStringStream ss;
- ss << "in-flight: " << status.Status.InFlightCount;
- if (IsSource_) {
- ss << " ack: " << status.ConnectionStatusSummary.WriterStatus.AckMessagesSize;
- }
- ss << " send-q: " << status.ConnectionStatusSummary.WriterStatus.SendQueueSize;
- return ss.Str();
-}
-
-void TBusSessionImpl::ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionWriterIncrementalStatus& connectionStatus) {
- Impl->DeadConnectionWriterStatusSummary += connectionStatus;
-}
-
-void TBusSessionImpl::ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionReaderIncrementalStatus& connectionStatus) {
- Impl->DeadConnectionReaderStatusSummary += connectionStatus;
-}
-
-void TBusSessionImpl::ProcessItem(TStatusTag, TDeadConnectionTag, const TAcceptorStatus& acceptorStatus) {
- Impl->DeadAcceptorStatusSummary += acceptorStatus;
-}
-
-void TBusSessionImpl::ProcessItem(TConnectionTag, ::NActor::TDefaultTag, const TOnAccept& onAccept) {
- TSocketHolder socket(onAccept.s);
-
- if (AtomicGet(Down)) {
- // do not create connections after shutdown initiated
- return;
- }
-
- //if (Connections.find(addr) != Connections.end()) {
+ TSessionDumpStatus status = GetStatusRecordInternal();
+
+ TStringStream ss;
+ ss << "in-flight: " << status.Status.InFlightCount;
+ if (IsSource_) {
+ ss << " ack: " << status.ConnectionStatusSummary.WriterStatus.AckMessagesSize;
+ }
+ ss << " send-q: " << status.ConnectionStatusSummary.WriterStatus.SendQueueSize;
+ return ss.Str();
+}
+
+void TBusSessionImpl::ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionWriterIncrementalStatus& connectionStatus) {
+ Impl->DeadConnectionWriterStatusSummary += connectionStatus;
+}
+
+void TBusSessionImpl::ProcessItem(TStatusTag, TDeadConnectionTag, const TRemoteConnectionReaderIncrementalStatus& connectionStatus) {
+ Impl->DeadConnectionReaderStatusSummary += connectionStatus;
+}
+
+void TBusSessionImpl::ProcessItem(TStatusTag, TDeadConnectionTag, const TAcceptorStatus& acceptorStatus) {
+ Impl->DeadAcceptorStatusSummary += acceptorStatus;
+}
+
+void TBusSessionImpl::ProcessItem(TConnectionTag, ::NActor::TDefaultTag, const TOnAccept& onAccept) {
+ TSocketHolder socket(onAccept.s);
+
+ if (AtomicGet(Down)) {
+ // do not create connections after shutdown initiated
+ return;
+ }
+
+ //if (Connections.find(addr) != Connections.end()) {
// TODO: it is possible
// won't be a problem after socket address replaced with id
- //}
-
- TRemoteConnectionPtr c(new TRemoteServerConnection(VerifyDynamicCast<TRemoteServerSession*>(this), ++LastConnectionId, onAccept.addr));
-
- VerifyDynamicCast<TRemoteServerConnection*>(c.Get())->Init(socket.Release(), onAccept.now);
-
- InsertConnectionLockAcquired(c.Get());
-}
-
-void TBusSessionImpl::ProcessItem(TConnectionTag, TRemoveTag, TRemoteConnectionPtr c) {
- TAddrRemoteConnections::iterator it1 = Connections.find(c->PeerAddrSocketAddr);
- if (it1 != Connections.end()) {
- if (it1->second.Get() == c.Get()) {
- Connections.erase(it1);
- }
- }
-
+ //}
+
+ TRemoteConnectionPtr c(new TRemoteServerConnection(VerifyDynamicCast<TRemoteServerSession*>(this), ++LastConnectionId, onAccept.addr));
+
+ VerifyDynamicCast<TRemoteServerConnection*>(c.Get())->Init(socket.Release(), onAccept.now);
+
+ InsertConnectionLockAcquired(c.Get());
+}
+
+void TBusSessionImpl::ProcessItem(TConnectionTag, TRemoveTag, TRemoteConnectionPtr c) {
+ TAddrRemoteConnections::iterator it1 = Connections.find(c->PeerAddrSocketAddr);
+ if (it1 != Connections.end()) {
+ if (it1->second.Get() == c.Get()) {
+ Connections.erase(it1);
+ }
+ }
+
THashMap<ui64, TRemoteConnectionPtr>::iterator it2 = ConnectionsById.find(c->ConnectionId);
- if (it2 != ConnectionsById.end()) {
- ConnectionsById.erase(it2);
- }
-
- SendSnapshotToStatusActor();
-}
-
+ if (it2 != ConnectionsById.end()) {
+ ConnectionsById.erase(it2);
+ }
+
+ SendSnapshotToStatusActor();
+}
+
void TBusSessionImpl::ProcessConnectionsAcceptorsShapshotQueueItem(TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> snapshot) {
for (TVector<TRemoteConnectionPtr>::const_iterator connection = snapshot->Connections.begin();
connection != snapshot->Connections.end(); ++connection) {
Y_ASSERT((*connection)->ConnectionId <= snapshot->LastConnectionId);
- }
-
+ }
+
for (TVector<TAcceptorPtr>::const_iterator acceptor = snapshot->Acceptors.begin();
acceptor != snapshot->Acceptors.end(); ++acceptor) {
Y_ASSERT((*acceptor)->AcceptorId <= snapshot->LastAcceptorId);
- }
-
- StatusData.ConnectionsAcceptorsSnapshot = snapshot;
-}
-
-void TBusSessionImpl::StatusUpdateCachedDumpIfNecessary(TInstant now) {
- if (now - StatusData.StatusDumpCachedLastUpdate > Config.Secret.StatusFlushPeriod) {
- StatusUpdateCachedDump();
- StatusData.StatusDumpCachedLastUpdate = now;
- }
-}
-
-void TBusSessionImpl::StatusUpdateCachedDump() {
- TSessionDumpStatus r;
-
- if (AtomicGet(Down)) {
- r.Shutdown = true;
- TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
- StatusData.StatusDumpCached = r;
- return;
- }
-
- // TODO: make thread-safe
- FillStatus();
-
- r.Status = StatusData.Status;
-
- {
- TStringStream ss;
-
+ }
+
+ StatusData.ConnectionsAcceptorsSnapshot = snapshot;
+}
+
+void TBusSessionImpl::StatusUpdateCachedDumpIfNecessary(TInstant now) {
+ if (now - StatusData.StatusDumpCachedLastUpdate > Config.Secret.StatusFlushPeriod) {
+ StatusUpdateCachedDump();
+ StatusData.StatusDumpCachedLastUpdate = now;
+ }
+}
+
+void TBusSessionImpl::StatusUpdateCachedDump() {
+ TSessionDumpStatus r;
+
+ if (AtomicGet(Down)) {
+ r.Shutdown = true;
+ TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
+ StatusData.StatusDumpCached = r;
+ return;
+ }
+
+ // TODO: make thread-safe
+ FillStatus();
+
+ r.Status = StatusData.Status;
+
+ {
+ TStringStream ss;
+
TString name = Config.Name;
- if (!name) {
- name = "unnamed";
- }
-
- ss << (IsSource_ ? "client" : "server") << " session " << name << ", proto " << Proto->GetService() << Endl;
- ss << "in flight: " << r.Status.InFlightCount;
- if (!IsSource_) {
- ss << ", " << r.Status.InFlightSize << "b";
- }
- if (r.Status.InputPaused) {
- ss << " (input paused)";
- }
- ss << "\n";
-
- r.Head = ss.Str();
- }
-
+ if (!name) {
+ name = "unnamed";
+ }
+
+ ss << (IsSource_ ? "client" : "server") << " session " << name << ", proto " << Proto->GetService() << Endl;
+ ss << "in flight: " << r.Status.InFlightCount;
+ if (!IsSource_) {
+ ss << ", " << r.Status.InFlightSize << "b";
+ }
+ if (r.Status.InputPaused) {
+ ss << " (input paused)";
+ }
+ ss << "\n";
+
+ r.Head = ss.Str();
+ }
+
TVector<TRemoteConnectionPtr>& connections = StatusData.ConnectionsAcceptorsSnapshot->Connections;
TVector<TAcceptorPtr>& acceptors = StatusData.ConnectionsAcceptorsSnapshot->Acceptors;
-
- r.ConnectionStatusSummary = TRemoteConnectionStatus();
- r.ConnectionStatusSummary.Summary = true;
- r.ConnectionStatusSummary.Server = !IsSource_;
- r.ConnectionStatusSummary.WriterStatus.Incremental = Impl->DeadConnectionWriterStatusSummary;
- r.ConnectionStatusSummary.ReaderStatus.Incremental = Impl->DeadConnectionReaderStatusSummary;
-
- TAcceptorStatus acceptorStatusSummary = Impl->DeadAcceptorStatusSummary;
-
- {
- TStringStream ss;
-
+
+ r.ConnectionStatusSummary = TRemoteConnectionStatus();
+ r.ConnectionStatusSummary.Summary = true;
+ r.ConnectionStatusSummary.Server = !IsSource_;
+ r.ConnectionStatusSummary.WriterStatus.Incremental = Impl->DeadConnectionWriterStatusSummary;
+ r.ConnectionStatusSummary.ReaderStatus.Incremental = Impl->DeadConnectionReaderStatusSummary;
+
+ TAcceptorStatus acceptorStatusSummary = Impl->DeadAcceptorStatusSummary;
+
+ {
+ TStringStream ss;
+
for (TVector<TAcceptorPtr>::const_iterator acceptor = acceptors.begin();
acceptor != acceptors.end(); ++acceptor) {
const TAcceptorStatus status = (*acceptor)->GranStatus.Listen.Get();
acceptorStatusSummary += status;
- if (acceptor != acceptors.begin()) {
- ss << "\n";
- }
+ if (acceptor != acceptors.begin()) {
+ ss << "\n";
+ }
ss << status.PrintToString();
- }
-
- r.Acceptors = ss.Str();
- }
-
- {
- TStringStream ss;
-
+ }
+
+ r.Acceptors = ss.Str();
+ }
+
+ {
+ TStringStream ss;
+
for (TVector<TRemoteConnectionPtr>::const_iterator connection = connections.begin();
connection != connections.end(); ++connection) {
- if (connection != connections.begin()) {
- ss << "\n";
- }
+ if (connection != connections.begin()) {
+ ss << "\n";
+ }
- TRemoteConnectionStatus status;
- status.Server = !IsSource_;
+ TRemoteConnectionStatus status;
+ status.Server = !IsSource_;
status.ReaderStatus = (*connection)->GranStatus.Reader.Get();
status.WriterStatus = (*connection)->GranStatus.Writer.Get();
- ss << status.PrintToString();
+ ss << status.PrintToString();
r.ConnectionStatusSummary.ReaderStatus += status.ReaderStatus;
r.ConnectionStatusSummary.WriterStatus += status.WriterStatus;
- }
-
+ }
+
r.ConnectionsSummary = r.ConnectionStatusSummary.PrintToString();
- r.Connections = ss.Str();
- }
-
- r.Config = Config;
-
- TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
- StatusData.StatusDumpCached = r;
-}
-
-TBusSessionImpl::TStatusData::TStatusData()
- : ConnectionsAcceptorsSnapshot(new TConnectionsAcceptorsSnapshot)
+ r.Connections = ss.Str();
+ }
+
+ r.Config = Config;
+
+ TGuard<TMutex> guard(StatusData.StatusDumpCachedMutex);
+ StatusData.StatusDumpCached = r;
+}
+
+TBusSessionImpl::TStatusData::TStatusData()
+ : ConnectionsAcceptorsSnapshot(new TConnectionsAcceptorsSnapshot)
{
}
-
-void TBusSessionImpl::Act(TStatusTag) {
- TInstant now = TInstant::Now();
-
- EShutdownState shutdownState = StatusData.ShutdownState.State.Get();
-
+
+void TBusSessionImpl::Act(TStatusTag) {
+ TInstant now = TInstant::Now();
+
+ EShutdownState shutdownState = StatusData.ShutdownState.State.Get();
+
StatusData.ConnectionsAcceptorsSnapshotsQueue.DequeueAllLikelyEmpty(std::bind(&TBusSessionImpl::ProcessConnectionsAcceptorsShapshotQueueItem, this, std::placeholders::_1));
-
- GetDeadConnectionWriterStatusQueue()->DequeueAllLikelyEmpty();
- GetDeadConnectionReaderStatusQueue()->DequeueAllLikelyEmpty();
- GetDeadAcceptorStatusQueue()->DequeueAllLikelyEmpty();
-
- // TODO: check queues are empty if already stopped
-
- if (shutdownState != SS_RUNNING) {
- // important to beak cyclic link session -> connection -> session
- StatusData.ConnectionsAcceptorsSnapshot->Connections.clear();
- StatusData.ConnectionsAcceptorsSnapshot->Acceptors.clear();
- }
-
- if (shutdownState == SS_SHUTDOWN_COMMAND) {
- StatusData.ShutdownState.CompleteShutdown();
- }
-
- StatusUpdateCachedDumpIfNecessary(now);
-}
-
+
+ GetDeadConnectionWriterStatusQueue()->DequeueAllLikelyEmpty();
+ GetDeadConnectionReaderStatusQueue()->DequeueAllLikelyEmpty();
+ GetDeadAcceptorStatusQueue()->DequeueAllLikelyEmpty();
+
+ // TODO: check queues are empty if already stopped
+
+ if (shutdownState != SS_RUNNING) {
+ // important to beak cyclic link session -> connection -> session
+ StatusData.ConnectionsAcceptorsSnapshot->Connections.clear();
+ StatusData.ConnectionsAcceptorsSnapshot->Acceptors.clear();
+ }
+
+ if (shutdownState == SS_SHUTDOWN_COMMAND) {
+ StatusData.ShutdownState.CompleteShutdown();
+ }
+
+ StatusUpdateCachedDumpIfNecessary(now);
+}
+
TBusSessionImpl::TConnectionsData::TConnectionsData() {
}
-
-void TBusSessionImpl::Act(TConnectionTag) {
- TConnectionsGuard guard(ConnectionsLock);
-
- EShutdownState shutdownState = ConnectionsData.ShutdownState.State.Get();
- if (shutdownState == SS_SHUTDOWN_COMPLETE) {
+
+void TBusSessionImpl::Act(TConnectionTag) {
+ TConnectionsGuard guard(ConnectionsLock);
+
+ EShutdownState shutdownState = ConnectionsData.ShutdownState.State.Get();
+ if (shutdownState == SS_SHUTDOWN_COMPLETE) {
Y_VERIFY(GetRemoveConnectionQueue()->IsEmpty());
Y_VERIFY(GetOnAcceptQueue()->IsEmpty());
- }
-
- GetRemoveConnectionQueue()->DequeueAllLikelyEmpty();
- GetOnAcceptQueue()->DequeueAllLikelyEmpty();
-
- if (shutdownState == SS_SHUTDOWN_COMMAND) {
- ConnectionsData.ShutdownState.CompleteShutdown();
- }
-}
-
-void TBusSessionImpl::Listen(int port, TBusMessageQueue* q) {
+ }
+
+ GetRemoveConnectionQueue()->DequeueAllLikelyEmpty();
+ GetOnAcceptQueue()->DequeueAllLikelyEmpty();
+
+ if (shutdownState == SS_SHUTDOWN_COMMAND) {
+ ConnectionsData.ShutdownState.CompleteShutdown();
+ }
+}
+
+void TBusSessionImpl::Listen(int port, TBusMessageQueue* q) {
Listen(BindOnPort(port, Config.ReusePort).second, q);
}
@@ -494,116 +494,116 @@ void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueu
for (const TBindResult& br : bindTo) {
if (actualPort == -1) {
actualPort = br.Addr.GetPort();
- } else {
+ } else {
Y_VERIFY(actualPort == br.Addr.GetPort(), "state check");
- }
+ }
if (Config.SocketToS >= 0) {
SetSocketToS(*br.Socket, &(br.Addr), Config.SocketToS);
}
-
+
TAcceptorPtr acceptor(new TAcceptor(this, ++LastAcceptorId, br.Socket->Release(), br.Addr));
- TConnectionsGuard guard(ConnectionsLock);
- InsertAcceptorLockAcquired(acceptor.Get());
+ TConnectionsGuard guard(ConnectionsLock);
+ InsertAcceptorLockAcquired(acceptor.Get());
}
-
- Config.ListenPort = actualPort;
+
+ Config.ListenPort = actualPort;
}
-void TBusSessionImpl::SendSnapshotToStatusActor() {
+void TBusSessionImpl::SendSnapshotToStatusActor() {
//Y_ASSERT(ConnectionsLock.IsLocked());
-
+
TAtomicSharedPtr<TConnectionsAcceptorsSnapshot> snapshot(new TConnectionsAcceptorsSnapshot);
- GetAcceptorsLockAquired(&snapshot->Acceptors);
- GetConnectionsLockAquired(&snapshot->Connections);
- snapshot->LastAcceptorId = LastAcceptorId;
- snapshot->LastConnectionId = LastConnectionId;
- StatusData.ConnectionsAcceptorsSnapshotsQueue.Enqueue(snapshot);
- GetStatusActor()->Schedule();
-}
-
-void TBusSessionImpl::InsertConnectionLockAcquired(TRemoteConnection* connection) {
+ GetAcceptorsLockAquired(&snapshot->Acceptors);
+ GetConnectionsLockAquired(&snapshot->Connections);
+ snapshot->LastAcceptorId = LastAcceptorId;
+ snapshot->LastConnectionId = LastConnectionId;
+ StatusData.ConnectionsAcceptorsSnapshotsQueue.Enqueue(snapshot);
+ GetStatusActor()->Schedule();
+}
+
+void TBusSessionImpl::InsertConnectionLockAcquired(TRemoteConnection* connection) {
//Y_ASSERT(ConnectionsLock.IsLocked());
-
+
Connections.insert(std::make_pair(connection->PeerAddrSocketAddr, connection));
- // connection for given adds may already exist at this point
- // (so we overwrite old connection)
- // after reconnect, if previous connections wasn't shutdown yet
-
+ // connection for given adds may already exist at this point
+ // (so we overwrite old connection)
+ // after reconnect, if previous connections wasn't shutdown yet
+
bool inserted2 = ConnectionsById.insert(std::make_pair(connection->ConnectionId, connection)).second;
Y_VERIFY(inserted2, "state check: must be inserted (2)");
-
- SendSnapshotToStatusActor();
-}
-
-void TBusSessionImpl::InsertAcceptorLockAcquired(TAcceptor* acceptor) {
+
+ SendSnapshotToStatusActor();
+}
+
+void TBusSessionImpl::InsertAcceptorLockAcquired(TAcceptor* acceptor) {
//Y_ASSERT(ConnectionsLock.IsLocked());
-
- Acceptors.push_back(acceptor);
-
- SendSnapshotToStatusActor();
-}
-
+
+ Acceptors.push_back(acceptor);
+
+ SendSnapshotToStatusActor();
+}
+
void TBusSessionImpl::GetConnections(TVector<TRemoteConnectionPtr>* r) {
- TConnectionsGuard guard(ConnectionsLock);
- GetConnectionsLockAquired(r);
-}
-
+ TConnectionsGuard guard(ConnectionsLock);
+ GetConnectionsLockAquired(r);
+}
+
void TBusSessionImpl::GetAcceptors(TVector<TAcceptorPtr>* r) {
- TConnectionsGuard guard(ConnectionsLock);
- GetAcceptorsLockAquired(r);
-}
-
+ TConnectionsGuard guard(ConnectionsLock);
+ GetAcceptorsLockAquired(r);
+}
+
void TBusSessionImpl::GetConnectionsLockAquired(TVector<TRemoteConnectionPtr>* r) {
//Y_ASSERT(ConnectionsLock.IsLocked());
-
- r->reserve(Connections.size());
-
+
+ r->reserve(Connections.size());
+
for (auto& connection : Connections) {
r->push_back(connection.second);
- }
-}
-
+ }
+}
+
void TBusSessionImpl::GetAcceptorsLockAquired(TVector<TAcceptorPtr>* r) {
//Y_ASSERT(ConnectionsLock.IsLocked());
-
- r->reserve(Acceptors.size());
-
+
+ r->reserve(Acceptors.size());
+
for (auto& acceptor : Acceptors) {
r->push_back(acceptor);
- }
-}
-
-TRemoteConnectionPtr TBusSessionImpl::GetConnectionById(ui64 id) {
- TConnectionsGuard guard(ConnectionsLock);
-
+ }
+}
+
+TRemoteConnectionPtr TBusSessionImpl::GetConnectionById(ui64 id) {
+ TConnectionsGuard guard(ConnectionsLock);
+
THashMap<ui64, TRemoteConnectionPtr>::const_iterator it = ConnectionsById.find(id);
- if (it == ConnectionsById.end()) {
+ if (it == ConnectionsById.end()) {
return nullptr;
- } else {
- return it->second;
- }
-}
-
-TAcceptorPtr TBusSessionImpl::GetAcceptorById(ui64 id) {
- TGuard<TMutex> guard(ConnectionsLock);
-
+ } else {
+ return it->second;
+ }
+}
+
+TAcceptorPtr TBusSessionImpl::GetAcceptorById(ui64 id) {
+ TGuard<TMutex> guard(ConnectionsLock);
+
for (const auto& Acceptor : Acceptors) {
if (Acceptor->AcceptorId == id) {
return Acceptor;
- }
- }
-
+ }
+ }
+
return nullptr;
-}
-
-void TBusSessionImpl::InvokeOnError(TNonDestroyingAutoPtr<TBusMessage> message, EMessageStatus status) {
- message->CheckClean();
- ErrorHandler->OnError(message, status);
-}
-
-TRemoteConnectionPtr TBusSessionImpl::GetConnection(const TBusSocketAddr& addr, bool create) {
- TConnectionsGuard guard(ConnectionsLock);
+}
+
+void TBusSessionImpl::InvokeOnError(TNonDestroyingAutoPtr<TBusMessage> message, EMessageStatus status) {
+ message->CheckClean();
+ ErrorHandler->OnError(message, status);
+}
+
+TRemoteConnectionPtr TBusSessionImpl::GetConnection(const TBusSocketAddr& addr, bool create) {
+ TConnectionsGuard guard(ConnectionsLock);
TAddrRemoteConnections::const_iterator it = Connections.find(addr);
if (it != Connections.end()) {
@@ -615,36 +615,36 @@ TRemoteConnectionPtr TBusSessionImpl::GetConnection(const TBusSocketAddr& addr,
}
Y_VERIFY(IsSource_, "must be source");
-
- TRemoteConnectionPtr c(new TRemoteClientConnection(VerifyDynamicCast<TRemoteClientSession*>(this), ++LastConnectionId, addr.ToNetAddr()));
- InsertConnectionLockAcquired(c.Get());
+
+ TRemoteConnectionPtr c(new TRemoteClientConnection(VerifyDynamicCast<TRemoteClientSession*>(this), ++LastConnectionId, addr.ToNetAddr()));
+ InsertConnectionLockAcquired(c.Get());
return c;
}
-void TBusSessionImpl::Cron() {
+void TBusSessionImpl::Cron() {
TVector<TRemoteConnectionPtr> connections;
- GetConnections(&connections);
-
+ GetConnections(&connections);
+
for (const auto& it : connections) {
TRemoteConnection* connection = it.Get();
- if (IsSource_) {
- VerifyDynamicCast<TRemoteClientConnection*>(connection)->ScheduleTimeoutMessages();
- } else {
- VerifyDynamicCast<TRemoteServerConnection*>(connection)->WriterData.TimeToRotateCounters.AddTask();
- // no schedule: do not rotate if there's no traffic
- }
- }
-
- // status updates are sent without scheduling
- GetStatusActor()->Schedule();
-
- Queue->Schedule(IScheduleItemAutoPtr(new TScheduleSession(this, TInstant::Now() + Config.Secret.TimeoutPeriod)));
-}
-
+ if (IsSource_) {
+ VerifyDynamicCast<TRemoteClientConnection*>(connection)->ScheduleTimeoutMessages();
+ } else {
+ VerifyDynamicCast<TRemoteServerConnection*>(connection)->WriterData.TimeToRotateCounters.AddTask();
+ // no schedule: do not rotate if there's no traffic
+ }
+ }
+
+ // status updates are sent without scheduling
+ GetStatusActor()->Schedule();
+
+ Queue->Schedule(IScheduleItemAutoPtr(new TScheduleSession(this, TInstant::Now() + Config.Secret.TimeoutPeriod)));
+}
+
TString TBusSessionImpl::GetNameInternal() {
- if (!!Config.Name) {
- return Config.Name;
- }
- return ProtoName;
-}
+ if (!!Config.Name) {
+ return Config.Name;
+ }
+ return ProtoName;
+}