diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/remote_connection_status.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/remote_connection_status.cpp')
-rw-r--r-- | library/cpp/messagebus/remote_connection_status.cpp | 265 |
1 files changed, 265 insertions, 0 deletions
diff --git a/library/cpp/messagebus/remote_connection_status.cpp b/library/cpp/messagebus/remote_connection_status.cpp new file mode 100644 index 0000000000..2c48b2a287 --- /dev/null +++ b/library/cpp/messagebus/remote_connection_status.cpp @@ -0,0 +1,265 @@ +#include "remote_connection_status.h" + +#include "key_value_printer.h" + +#include <library/cpp/messagebus/monitoring/mon_proto.pb.h> + +#include <util/stream/format.h> +#include <util/stream/output.h> +#include <util/system/yassert.h> + +using namespace NBus; +using namespace NBus::NPrivate; + +template <typename T> +static void Add(T& thiz, const T& that) { + thiz += that; +} + +template <typename T> +static void Max(T& thiz, const T& that) { + if (that > thiz) { + thiz = that; + } +} + +template <typename T> +static void AssertZero(T& thiz, const T& that) { + Y_ASSERT(thiz == T()); + Y_UNUSED(that); +} + +TDurationCounter::TDurationCounter() + : DURATION_COUNTER_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA) +{ +} + +TDuration TDurationCounter::AvgDuration() const { + if (Count == 0) { + return TDuration::Zero(); + } else { + return SumDuration / Count; + } +} + +TDurationCounter& TDurationCounter::operator+=(const TDurationCounter& that) { + DURATION_COUNTER_MAP(STRUCT_FIELD_ADD, ) + return *this; +} + +TString TDurationCounter::ToString() const { + if (Count == 0) { + return "0"; + } else { + TStringStream ss; + ss << "avg: " << AvgDuration() << ", max: " << MaxDuration << ", count: " << Count; + return ss.Str(); + } +} + +TRemoteConnectionStatusBase::TRemoteConnectionStatusBase() + : REMOTE_CONNECTION_STATUS_BASE_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA) +{ +} + +TRemoteConnectionStatusBase& TRemoteConnectionStatusBase ::operator+=(const TRemoteConnectionStatusBase& that) { + REMOTE_CONNECTION_STATUS_BASE_MAP(STRUCT_FIELD_ADD, ) + return *this; +} + +TRemoteConnectionIncrementalStatusBase::TRemoteConnectionIncrementalStatusBase() + : REMOTE_CONNECTION_INCREMENTAL_STATUS_BASE_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA) +{ +} + +TRemoteConnectionIncrementalStatusBase& TRemoteConnectionIncrementalStatusBase::operator+=( + const TRemoteConnectionIncrementalStatusBase& that) { + REMOTE_CONNECTION_INCREMENTAL_STATUS_BASE_MAP(STRUCT_FIELD_ADD, ) + return *this; +} + +TRemoteConnectionReaderIncrementalStatus::TRemoteConnectionReaderIncrementalStatus() + : REMOTE_CONNECTION_READER_INCREMENTAL_STATUS_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA) +{ +} + +TRemoteConnectionReaderIncrementalStatus& TRemoteConnectionReaderIncrementalStatus::operator+=( + const TRemoteConnectionReaderIncrementalStatus& that) { + TRemoteConnectionIncrementalStatusBase::operator+=(that); + REMOTE_CONNECTION_READER_INCREMENTAL_STATUS_MAP(STRUCT_FIELD_ADD, ) + return *this; +} + +TRemoteConnectionReaderStatus::TRemoteConnectionReaderStatus() + : REMOTE_CONNECTION_READER_STATUS_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA) +{ +} + +TRemoteConnectionReaderStatus& TRemoteConnectionReaderStatus::operator+=(const TRemoteConnectionReaderStatus& that) { + TRemoteConnectionStatusBase::operator+=(that); + REMOTE_CONNECTION_READER_STATUS_MAP(STRUCT_FIELD_ADD, ) + return *this; +} + +TRemoteConnectionWriterIncrementalStatus::TRemoteConnectionWriterIncrementalStatus() + : REMOTE_CONNECTION_WRITER_INCREMENTAL_STATUS(STRUCT_FIELD_INIT_DEFAULT, COMMA) +{ +} + +TRemoteConnectionWriterIncrementalStatus& TRemoteConnectionWriterIncrementalStatus::operator+=( + const TRemoteConnectionWriterIncrementalStatus& that) { + TRemoteConnectionIncrementalStatusBase::operator+=(that); + REMOTE_CONNECTION_WRITER_INCREMENTAL_STATUS(STRUCT_FIELD_ADD, ) + return *this; +} + +TRemoteConnectionWriterStatus::TRemoteConnectionWriterStatus() + : REMOTE_CONNECTION_WRITER_STATUS(STRUCT_FIELD_INIT_DEFAULT, COMMA) +{ +} + +TRemoteConnectionWriterStatus& TRemoteConnectionWriterStatus::operator+=(const TRemoteConnectionWriterStatus& that) { + TRemoteConnectionStatusBase::operator+=(that); + REMOTE_CONNECTION_WRITER_STATUS(STRUCT_FIELD_ADD, ) + return *this; +} + +size_t TRemoteConnectionWriterStatus::GetInFlight() const { + return SendQueueSize + AckMessagesSize; +} + +TConnectionStatusMonRecord TRemoteConnectionStatus::GetStatusProtobuf() const { + TConnectionStatusMonRecord status; + + // TODO: fill unfilled fields + status.SetSendQueueSize(WriterStatus.SendQueueSize); + status.SetAckMessagesSize(WriterStatus.AckMessagesSize); + // status.SetErrorCount(); + // status.SetWriteBytes(); + // status.SetWriteBytesCompressed(); + // status.SetWriteMessages(); + status.SetWriteSyscalls(WriterStatus.Incremental.NetworkOps); + status.SetWriteActs(WriterStatus.Acts); + // status.SetReadBytes(); + // status.SetReadBytesCompressed(); + // status.SetReadMessages(); + status.SetReadSyscalls(ReaderStatus.Incremental.NetworkOps); + status.SetReadActs(ReaderStatus.Acts); + + TMessageStatusCounter sumStatusCounter; + sumStatusCounter += WriterStatus.Incremental.StatusCounter; + sumStatusCounter += ReaderStatus.Incremental.StatusCounter; + sumStatusCounter.FillErrorsProtobuf(&status); + + return status; +} + +TString TRemoteConnectionStatus::PrintToString() const { + TStringStream ss; + + TKeyValuePrinter p; + + if (!Summary) { + // TODO: print MyAddr too, but only if it is set + ss << WriterStatus.PeerAddr << " (" << WriterStatus.ConnectionId << ")" + << ", writefd=" << WriterStatus.Fd + << ", readfd=" << ReaderStatus.Fd + << Endl; + if (WriterStatus.Connected) { + p.AddRow("connect time", WriterStatus.ConnectTime.ToString()); + p.AddRow("writer state", ToCString(WriterStatus.State)); + } else { + ss << "not connected"; + if (WriterStatus.ConnectError != 0) { + ss << ", last connect error: " << LastSystemErrorText(WriterStatus.ConnectError); + } + ss << Endl; + } + } + if (!Server) { + p.AddRow("connect syscalls", WriterStatus.ConnectSyscalls); + } + + p.AddRow("send queue", LeftPad(WriterStatus.SendQueueSize, 6)); + + if (Server) { + p.AddRow("quota msg", LeftPad(ReaderStatus.QuotaMsg, 6)); + p.AddRow("quota bytes", LeftPad(ReaderStatus.QuotaBytes, 6)); + p.AddRow("quota exhausted", LeftPad(ReaderStatus.QuotaExhausted, 6)); + p.AddRow("reader wakeups", LeftPad(WriterStatus.ReaderWakeups, 6)); + } else { + p.AddRow("ack messages", LeftPad(WriterStatus.AckMessagesSize, 6)); + } + + p.AddRow("written", WriterStatus.Incremental.MessageCounter.ToString(false)); + p.AddRow("read", ReaderStatus.Incremental.MessageCounter.ToString(true)); + + p.AddRow("write syscalls", LeftPad(WriterStatus.Incremental.NetworkOps, 12)); + p.AddRow("read syscalls", LeftPad(ReaderStatus.Incremental.NetworkOps, 12)); + + p.AddRow("write acts", LeftPad(WriterStatus.Acts, 12)); + p.AddRow("read acts", LeftPad(ReaderStatus.Acts, 12)); + + p.AddRow("write buffer cap", LeftPad(WriterStatus.BufferSize, 12)); + p.AddRow("read buffer cap", LeftPad(ReaderStatus.BufferSize, 12)); + + p.AddRow("write buffer drops", LeftPad(WriterStatus.Incremental.BufferDrops, 10)); + p.AddRow("read buffer drops", LeftPad(ReaderStatus.Incremental.BufferDrops, 10)); + + if (Server) { + p.AddRow("process dur", WriterStatus.DurationCounterPrev.ToString()); + } + + ss << p.PrintToString(); + + if (false && Server) { + ss << "time histogram:\n"; + ss << WriterStatus.Incremental.ProcessDurationHistogram.PrintToString(); + } + + TMessageStatusCounter sumStatusCounter; + sumStatusCounter += WriterStatus.Incremental.StatusCounter; + sumStatusCounter += ReaderStatus.Incremental.StatusCounter; + + ss << sumStatusCounter.PrintToString(); + + return ss.Str(); +} + +TRemoteConnectionStatus::TRemoteConnectionStatus() + : REMOTE_CONNECTION_STATUS_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA) +{ +} + +TString TSessionDumpStatus::PrintToString() const { + if (Shutdown) { + return "shutdown"; + } + + TStringStream ss; + ss << Head; + if (ConnectionStatusSummary.Server) { + ss << "\n"; + ss << Acceptors; + } + ss << "\n"; + ss << "connections summary:" << Endl; + ss << ConnectionsSummary; + if (!!Connections) { + ss << "\n"; + ss << Connections; + } + ss << "\n"; + ss << Config.PrintToString(); + return ss.Str(); +} + +TString TBusMessageQueueStatus::PrintToString() const { + TStringStream ss; + ss << "work queue:\n"; + ss << ExecutorStatus.Status; + ss << "\n"; + ss << "queue config:\n"; + ss << Config.PrintToString(); + return ss.Str(); +} |