aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/remote_connection_status.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/remote_connection_status.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/remote_connection_status.cpp')
-rw-r--r--library/cpp/messagebus/remote_connection_status.cpp265
1 files changed, 265 insertions, 0 deletions
diff --git a/library/cpp/messagebus/remote_connection_status.cpp b/library/cpp/messagebus/remote_connection_status.cpp
new file mode 100644
index 0000000000..2c48b2a287
--- /dev/null
+++ b/library/cpp/messagebus/remote_connection_status.cpp
@@ -0,0 +1,265 @@
+#include "remote_connection_status.h"
+
+#include "key_value_printer.h"
+
+#include <library/cpp/messagebus/monitoring/mon_proto.pb.h>
+
+#include <util/stream/format.h>
+#include <util/stream/output.h>
+#include <util/system/yassert.h>
+
+using namespace NBus;
+using namespace NBus::NPrivate;
+
+template <typename T>
+static void Add(T& thiz, const T& that) {
+ thiz += that;
+}
+
+template <typename T>
+static void Max(T& thiz, const T& that) {
+ if (that > thiz) {
+ thiz = that;
+ }
+}
+
+template <typename T>
+static void AssertZero(T& thiz, const T& that) {
+ Y_ASSERT(thiz == T());
+ Y_UNUSED(that);
+}
+
+TDurationCounter::TDurationCounter()
+ : DURATION_COUNTER_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA)
+{
+}
+
+TDuration TDurationCounter::AvgDuration() const {
+ if (Count == 0) {
+ return TDuration::Zero();
+ } else {
+ return SumDuration / Count;
+ }
+}
+
+TDurationCounter& TDurationCounter::operator+=(const TDurationCounter& that) {
+ DURATION_COUNTER_MAP(STRUCT_FIELD_ADD, )
+ return *this;
+}
+
+TString TDurationCounter::ToString() const {
+ if (Count == 0) {
+ return "0";
+ } else {
+ TStringStream ss;
+ ss << "avg: " << AvgDuration() << ", max: " << MaxDuration << ", count: " << Count;
+ return ss.Str();
+ }
+}
+
+TRemoteConnectionStatusBase::TRemoteConnectionStatusBase()
+ : REMOTE_CONNECTION_STATUS_BASE_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA)
+{
+}
+
+TRemoteConnectionStatusBase& TRemoteConnectionStatusBase ::operator+=(const TRemoteConnectionStatusBase& that) {
+ REMOTE_CONNECTION_STATUS_BASE_MAP(STRUCT_FIELD_ADD, )
+ return *this;
+}
+
+TRemoteConnectionIncrementalStatusBase::TRemoteConnectionIncrementalStatusBase()
+ : REMOTE_CONNECTION_INCREMENTAL_STATUS_BASE_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA)
+{
+}
+
+TRemoteConnectionIncrementalStatusBase& TRemoteConnectionIncrementalStatusBase::operator+=(
+ const TRemoteConnectionIncrementalStatusBase& that) {
+ REMOTE_CONNECTION_INCREMENTAL_STATUS_BASE_MAP(STRUCT_FIELD_ADD, )
+ return *this;
+}
+
+TRemoteConnectionReaderIncrementalStatus::TRemoteConnectionReaderIncrementalStatus()
+ : REMOTE_CONNECTION_READER_INCREMENTAL_STATUS_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA)
+{
+}
+
+TRemoteConnectionReaderIncrementalStatus& TRemoteConnectionReaderIncrementalStatus::operator+=(
+ const TRemoteConnectionReaderIncrementalStatus& that) {
+ TRemoteConnectionIncrementalStatusBase::operator+=(that);
+ REMOTE_CONNECTION_READER_INCREMENTAL_STATUS_MAP(STRUCT_FIELD_ADD, )
+ return *this;
+}
+
+TRemoteConnectionReaderStatus::TRemoteConnectionReaderStatus()
+ : REMOTE_CONNECTION_READER_STATUS_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA)
+{
+}
+
+TRemoteConnectionReaderStatus& TRemoteConnectionReaderStatus::operator+=(const TRemoteConnectionReaderStatus& that) {
+ TRemoteConnectionStatusBase::operator+=(that);
+ REMOTE_CONNECTION_READER_STATUS_MAP(STRUCT_FIELD_ADD, )
+ return *this;
+}
+
+TRemoteConnectionWriterIncrementalStatus::TRemoteConnectionWriterIncrementalStatus()
+ : REMOTE_CONNECTION_WRITER_INCREMENTAL_STATUS(STRUCT_FIELD_INIT_DEFAULT, COMMA)
+{
+}
+
+TRemoteConnectionWriterIncrementalStatus& TRemoteConnectionWriterIncrementalStatus::operator+=(
+ const TRemoteConnectionWriterIncrementalStatus& that) {
+ TRemoteConnectionIncrementalStatusBase::operator+=(that);
+ REMOTE_CONNECTION_WRITER_INCREMENTAL_STATUS(STRUCT_FIELD_ADD, )
+ return *this;
+}
+
+TRemoteConnectionWriterStatus::TRemoteConnectionWriterStatus()
+ : REMOTE_CONNECTION_WRITER_STATUS(STRUCT_FIELD_INIT_DEFAULT, COMMA)
+{
+}
+
+TRemoteConnectionWriterStatus& TRemoteConnectionWriterStatus::operator+=(const TRemoteConnectionWriterStatus& that) {
+ TRemoteConnectionStatusBase::operator+=(that);
+ REMOTE_CONNECTION_WRITER_STATUS(STRUCT_FIELD_ADD, )
+ return *this;
+}
+
+size_t TRemoteConnectionWriterStatus::GetInFlight() const {
+ return SendQueueSize + AckMessagesSize;
+}
+
+TConnectionStatusMonRecord TRemoteConnectionStatus::GetStatusProtobuf() const {
+ TConnectionStatusMonRecord status;
+
+ // TODO: fill unfilled fields
+ status.SetSendQueueSize(WriterStatus.SendQueueSize);
+ status.SetAckMessagesSize(WriterStatus.AckMessagesSize);
+ // status.SetErrorCount();
+ // status.SetWriteBytes();
+ // status.SetWriteBytesCompressed();
+ // status.SetWriteMessages();
+ status.SetWriteSyscalls(WriterStatus.Incremental.NetworkOps);
+ status.SetWriteActs(WriterStatus.Acts);
+ // status.SetReadBytes();
+ // status.SetReadBytesCompressed();
+ // status.SetReadMessages();
+ status.SetReadSyscalls(ReaderStatus.Incremental.NetworkOps);
+ status.SetReadActs(ReaderStatus.Acts);
+
+ TMessageStatusCounter sumStatusCounter;
+ sumStatusCounter += WriterStatus.Incremental.StatusCounter;
+ sumStatusCounter += ReaderStatus.Incremental.StatusCounter;
+ sumStatusCounter.FillErrorsProtobuf(&status);
+
+ return status;
+}
+
+TString TRemoteConnectionStatus::PrintToString() const {
+ TStringStream ss;
+
+ TKeyValuePrinter p;
+
+ if (!Summary) {
+ // TODO: print MyAddr too, but only if it is set
+ ss << WriterStatus.PeerAddr << " (" << WriterStatus.ConnectionId << ")"
+ << ", writefd=" << WriterStatus.Fd
+ << ", readfd=" << ReaderStatus.Fd
+ << Endl;
+ if (WriterStatus.Connected) {
+ p.AddRow("connect time", WriterStatus.ConnectTime.ToString());
+ p.AddRow("writer state", ToCString(WriterStatus.State));
+ } else {
+ ss << "not connected";
+ if (WriterStatus.ConnectError != 0) {
+ ss << ", last connect error: " << LastSystemErrorText(WriterStatus.ConnectError);
+ }
+ ss << Endl;
+ }
+ }
+ if (!Server) {
+ p.AddRow("connect syscalls", WriterStatus.ConnectSyscalls);
+ }
+
+ p.AddRow("send queue", LeftPad(WriterStatus.SendQueueSize, 6));
+
+ if (Server) {
+ p.AddRow("quota msg", LeftPad(ReaderStatus.QuotaMsg, 6));
+ p.AddRow("quota bytes", LeftPad(ReaderStatus.QuotaBytes, 6));
+ p.AddRow("quota exhausted", LeftPad(ReaderStatus.QuotaExhausted, 6));
+ p.AddRow("reader wakeups", LeftPad(WriterStatus.ReaderWakeups, 6));
+ } else {
+ p.AddRow("ack messages", LeftPad(WriterStatus.AckMessagesSize, 6));
+ }
+
+ p.AddRow("written", WriterStatus.Incremental.MessageCounter.ToString(false));
+ p.AddRow("read", ReaderStatus.Incremental.MessageCounter.ToString(true));
+
+ p.AddRow("write syscalls", LeftPad(WriterStatus.Incremental.NetworkOps, 12));
+ p.AddRow("read syscalls", LeftPad(ReaderStatus.Incremental.NetworkOps, 12));
+
+ p.AddRow("write acts", LeftPad(WriterStatus.Acts, 12));
+ p.AddRow("read acts", LeftPad(ReaderStatus.Acts, 12));
+
+ p.AddRow("write buffer cap", LeftPad(WriterStatus.BufferSize, 12));
+ p.AddRow("read buffer cap", LeftPad(ReaderStatus.BufferSize, 12));
+
+ p.AddRow("write buffer drops", LeftPad(WriterStatus.Incremental.BufferDrops, 10));
+ p.AddRow("read buffer drops", LeftPad(ReaderStatus.Incremental.BufferDrops, 10));
+
+ if (Server) {
+ p.AddRow("process dur", WriterStatus.DurationCounterPrev.ToString());
+ }
+
+ ss << p.PrintToString();
+
+ if (false && Server) {
+ ss << "time histogram:\n";
+ ss << WriterStatus.Incremental.ProcessDurationHistogram.PrintToString();
+ }
+
+ TMessageStatusCounter sumStatusCounter;
+ sumStatusCounter += WriterStatus.Incremental.StatusCounter;
+ sumStatusCounter += ReaderStatus.Incremental.StatusCounter;
+
+ ss << sumStatusCounter.PrintToString();
+
+ return ss.Str();
+}
+
+TRemoteConnectionStatus::TRemoteConnectionStatus()
+ : REMOTE_CONNECTION_STATUS_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA)
+{
+}
+
+TString TSessionDumpStatus::PrintToString() const {
+ if (Shutdown) {
+ return "shutdown";
+ }
+
+ TStringStream ss;
+ ss << Head;
+ if (ConnectionStatusSummary.Server) {
+ ss << "\n";
+ ss << Acceptors;
+ }
+ ss << "\n";
+ ss << "connections summary:" << Endl;
+ ss << ConnectionsSummary;
+ if (!!Connections) {
+ ss << "\n";
+ ss << Connections;
+ }
+ ss << "\n";
+ ss << Config.PrintToString();
+ return ss.Str();
+}
+
+TString TBusMessageQueueStatus::PrintToString() const {
+ TStringStream ss;
+ ss << "work queue:\n";
+ ss << ExecutorStatus.Status;
+ ss << "\n";
+ ss << "queue config:\n";
+ ss << Config.PrintToString();
+ return ss.Str();
+}