#include "remote_connection_status.h"
#include "key_value_printer.h"
#include <library/cpp/messagebus/monitoring/mon_proto.pb.h>
#include <util/stream/format.h>
#include <util/stream/output.h>
#include <util/system/yassert.h>
using namespace NBus;
using namespace NBus::NPrivate;
template <typename T>
static void Add(T& thiz, const T& that) {
thiz += that;
}
template <typename T>
static void Max(T& thiz, const T& that) {
if (that > thiz) {
thiz = that;
}
}
template <typename T>
static void AssertZero(T& thiz, const T& that) {
Y_ASSERT(thiz == T());
Y_UNUSED(that);
}
TDurationCounter::TDurationCounter()
: DURATION_COUNTER_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA)
{
}
TDuration TDurationCounter::AvgDuration() const {
if (Count == 0) {
return TDuration::Zero();
} else {
return SumDuration / Count;
}
}
TDurationCounter& TDurationCounter::operator+=(const TDurationCounter& that) {
DURATION_COUNTER_MAP(STRUCT_FIELD_ADD, )
return *this;
}
TString TDurationCounter::ToString() const {
if (Count == 0) {
return "0";
} else {
TStringStream ss;
ss << "avg: " << AvgDuration() << ", max: " << MaxDuration << ", count: " << Count;
return ss.Str();
}
}
TRemoteConnectionStatusBase::TRemoteConnectionStatusBase()
: REMOTE_CONNECTION_STATUS_BASE_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA)
{
}
TRemoteConnectionStatusBase& TRemoteConnectionStatusBase ::operator+=(const TRemoteConnectionStatusBase& that) {
REMOTE_CONNECTION_STATUS_BASE_MAP(STRUCT_FIELD_ADD, )
return *this;
}
TRemoteConnectionIncrementalStatusBase::TRemoteConnectionIncrementalStatusBase()
: REMOTE_CONNECTION_INCREMENTAL_STATUS_BASE_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA)
{
}
TRemoteConnectionIncrementalStatusBase& TRemoteConnectionIncrementalStatusBase::operator+=(
const TRemoteConnectionIncrementalStatusBase& that) {
REMOTE_CONNECTION_INCREMENTAL_STATUS_BASE_MAP(STRUCT_FIELD_ADD, )
return *this;
}
TRemoteConnectionReaderIncrementalStatus::TRemoteConnectionReaderIncrementalStatus()
: REMOTE_CONNECTION_READER_INCREMENTAL_STATUS_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA)
{
}
TRemoteConnectionReaderIncrementalStatus& TRemoteConnectionReaderIncrementalStatus::operator+=(
const TRemoteConnectionReaderIncrementalStatus& that) {
TRemoteConnectionIncrementalStatusBase::operator+=(that);
REMOTE_CONNECTION_READER_INCREMENTAL_STATUS_MAP(STRUCT_FIELD_ADD, )
return *this;
}
TRemoteConnectionReaderStatus::TRemoteConnectionReaderStatus()
: REMOTE_CONNECTION_READER_STATUS_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA)
{
}
TRemoteConnectionReaderStatus& TRemoteConnectionReaderStatus::operator+=(const TRemoteConnectionReaderStatus& that) {
TRemoteConnectionStatusBase::operator+=(that);
REMOTE_CONNECTION_READER_STATUS_MAP(STRUCT_FIELD_ADD, )
return *this;
}
TRemoteConnectionWriterIncrementalStatus::TRemoteConnectionWriterIncrementalStatus()
: REMOTE_CONNECTION_WRITER_INCREMENTAL_STATUS(STRUCT_FIELD_INIT_DEFAULT, COMMA)
{
}
TRemoteConnectionWriterIncrementalStatus& TRemoteConnectionWriterIncrementalStatus::operator+=(
const TRemoteConnectionWriterIncrementalStatus& that) {
TRemoteConnectionIncrementalStatusBase::operator+=(that);
REMOTE_CONNECTION_WRITER_INCREMENTAL_STATUS(STRUCT_FIELD_ADD, )
return *this;
}
TRemoteConnectionWriterStatus::TRemoteConnectionWriterStatus()
: REMOTE_CONNECTION_WRITER_STATUS(STRUCT_FIELD_INIT_DEFAULT, COMMA)
{
}
TRemoteConnectionWriterStatus& TRemoteConnectionWriterStatus::operator+=(const TRemoteConnectionWriterStatus& that) {
TRemoteConnectionStatusBase::operator+=(that);
REMOTE_CONNECTION_WRITER_STATUS(STRUCT_FIELD_ADD, )
return *this;
}
size_t TRemoteConnectionWriterStatus::GetInFlight() const {
return SendQueueSize + AckMessagesSize;
}
TConnectionStatusMonRecord TRemoteConnectionStatus::GetStatusProtobuf() const {
TConnectionStatusMonRecord status;
// TODO: fill unfilled fields
status.SetSendQueueSize(WriterStatus.SendQueueSize);
status.SetAckMessagesSize(WriterStatus.AckMessagesSize);
// status.SetErrorCount();
// status.SetWriteBytes();
// status.SetWriteBytesCompressed();
// status.SetWriteMessages();
status.SetWriteSyscalls(WriterStatus.Incremental.NetworkOps);
status.SetWriteActs(WriterStatus.Acts);
// status.SetReadBytes();
// status.SetReadBytesCompressed();
// status.SetReadMessages();
status.SetReadSyscalls(ReaderStatus.Incremental.NetworkOps);
status.SetReadActs(ReaderStatus.Acts);
TMessageStatusCounter sumStatusCounter;
sumStatusCounter += WriterStatus.Incremental.StatusCounter;
sumStatusCounter += ReaderStatus.Incremental.StatusCounter;
sumStatusCounter.FillErrorsProtobuf(&status);
return status;
}
TString TRemoteConnectionStatus::PrintToString() const {
TStringStream ss;
TKeyValuePrinter p;
if (!Summary) {
// TODO: print MyAddr too, but only if it is set
ss << WriterStatus.PeerAddr << " (" << WriterStatus.ConnectionId << ")"
<< ", writefd=" << WriterStatus.Fd
<< ", readfd=" << ReaderStatus.Fd
<< Endl;
if (WriterStatus.Connected) {
p.AddRow("connect time", WriterStatus.ConnectTime.ToString());
p.AddRow("writer state", ToCString(WriterStatus.State));
} else {
ss << "not connected";
if (WriterStatus.ConnectError != 0) {
ss << ", last connect error: " << LastSystemErrorText(WriterStatus.ConnectError);
}
ss << Endl;
}
}
if (!Server) {
p.AddRow("connect syscalls", WriterStatus.ConnectSyscalls);
}
p.AddRow("send queue", LeftPad(WriterStatus.SendQueueSize, 6));
if (Server) {
p.AddRow("quota msg", LeftPad(ReaderStatus.QuotaMsg, 6));
p.AddRow("quota bytes", LeftPad(ReaderStatus.QuotaBytes, 6));
p.AddRow("quota exhausted", LeftPad(ReaderStatus.QuotaExhausted, 6));
p.AddRow("reader wakeups", LeftPad(WriterStatus.ReaderWakeups, 6));
} else {
p.AddRow("ack messages", LeftPad(WriterStatus.AckMessagesSize, 6));
}
p.AddRow("written", WriterStatus.Incremental.MessageCounter.ToString(false));
p.AddRow("read", ReaderStatus.Incremental.MessageCounter.ToString(true));
p.AddRow("write syscalls", LeftPad(WriterStatus.Incremental.NetworkOps, 12));
p.AddRow("read syscalls", LeftPad(ReaderStatus.Incremental.NetworkOps, 12));
p.AddRow("write acts", LeftPad(WriterStatus.Acts, 12));
p.AddRow("read acts", LeftPad(ReaderStatus.Acts, 12));
p.AddRow("write buffer cap", LeftPad(WriterStatus.BufferSize, 12));
p.AddRow("read buffer cap", LeftPad(ReaderStatus.BufferSize, 12));
p.AddRow("write buffer drops", LeftPad(WriterStatus.Incremental.BufferDrops, 10));
p.AddRow("read buffer drops", LeftPad(ReaderStatus.Incremental.BufferDrops, 10));
if (Server) {
p.AddRow("process dur", WriterStatus.DurationCounterPrev.ToString());
}
ss << p.PrintToString();
if (false && Server) {
ss << "time histogram:\n";
ss << WriterStatus.Incremental.ProcessDurationHistogram.PrintToString();
}
TMessageStatusCounter sumStatusCounter;
sumStatusCounter += WriterStatus.Incremental.StatusCounter;
sumStatusCounter += ReaderStatus.Incremental.StatusCounter;
ss << sumStatusCounter.PrintToString();
return ss.Str();
}
TRemoteConnectionStatus::TRemoteConnectionStatus()
: REMOTE_CONNECTION_STATUS_MAP(STRUCT_FIELD_INIT_DEFAULT, COMMA)
{
}
TString TSessionDumpStatus::PrintToString() const {
if (Shutdown) {
return "shutdown";
}
TStringStream ss;
ss << Head;
if (ConnectionStatusSummary.Server) {
ss << "\n";
ss << Acceptors;
}
ss << "\n";
ss << "connections summary:" << Endl;
ss << ConnectionsSummary;
if (!!Connections) {
ss << "\n";
ss << Connections;
}
ss << "\n";
ss << Config.PrintToString();
return ss.Str();
}
TString TBusMessageQueueStatus::PrintToString() const {
TStringStream ss;
ss << "work queue:\n";
ss << ExecutorStatus.Status;
ss << "\n";
ss << "queue config:\n";
ss << Config.PrintToString();
return ss.Str();
}