diff options
author | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/remote_connection_status.h |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/remote_connection_status.h')
-rw-r--r-- | library/cpp/messagebus/remote_connection_status.h | 214 |
1 files changed, 214 insertions, 0 deletions
diff --git a/library/cpp/messagebus/remote_connection_status.h b/library/cpp/messagebus/remote_connection_status.h new file mode 100644 index 00000000000..5db10e51eae --- /dev/null +++ b/library/cpp/messagebus/remote_connection_status.h @@ -0,0 +1,214 @@ +#pragma once + +#include "codegen.h" +#include "duration_histogram.h" +#include "message_counter.h" +#include "message_status_counter.h" +#include "queue_config.h" +#include "session_config.h" + +#include <library/cpp/messagebus/actor/executor.h> + +#include <library/cpp/deprecated/enum_codegen/enum_codegen.h> + +namespace NBus { + class TConnectionStatusMonRecord; +} + +namespace NBus { + namespace NPrivate { +#define WRITER_STATE_MAP(XX) \ + XX(WRITER_UNKNOWN) \ + XX(WRITER_FILLING) \ + XX(WRITER_FLUSHING) \ + /**/ + + // TODO: move elsewhere + enum EWriterState { + WRITER_STATE_MAP(ENUM_VALUE_GEN_NO_VALUE) + }; + + ENUM_TO_STRING(EWriterState, WRITER_STATE_MAP) + +#define STRUCT_FIELD_ADD(name, type, func) func(name, that.name); + + template <typename T> + void Reset(T& t) { + t.~T(); + new (&t) T(); + } + +#define DURATION_COUNTER_MAP(XX, comma) \ + XX(Count, unsigned, Add) \ + comma \ + XX(SumDuration, TDuration, Add) comma \ + XX(MaxDuration, TDuration, Max) /**/ + + struct TDurationCounter { + DURATION_COUNTER_MAP(STRUCT_FIELD_GEN, ) + + TDuration AvgDuration() const; + + TDurationCounter(); + + void AddDuration(TDuration d) { + Count += 1; + SumDuration += d; + if (d > MaxDuration) { + MaxDuration = d; + } + } + + TDurationCounter& operator+=(const TDurationCounter&); + + TString ToString() const; + }; + +#define REMOTE_CONNECTION_STATUS_BASE_MAP(XX, comma) \ + XX(ConnectionId, ui64, AssertZero) \ + comma \ + XX(Fd, SOCKET, AssertZero) comma \ + XX(Acts, ui64, Add) comma \ + XX(BufferSize, ui64, Add) /**/ + + struct TRemoteConnectionStatusBase { + REMOTE_CONNECTION_STATUS_BASE_MAP(STRUCT_FIELD_GEN, ) + + TRemoteConnectionStatusBase& operator+=(const TRemoteConnectionStatusBase&); + + TRemoteConnectionStatusBase(); + }; + +#define REMOTE_CONNECTION_INCREMENTAL_STATUS_BASE_MAP(XX, comma) \ + XX(BufferDrops, unsigned, Add) \ + comma \ + XX(NetworkOps, unsigned, Add) /**/ + + struct TRemoteConnectionIncrementalStatusBase { + REMOTE_CONNECTION_INCREMENTAL_STATUS_BASE_MAP(STRUCT_FIELD_GEN, ) + + TRemoteConnectionIncrementalStatusBase& operator+=(const TRemoteConnectionIncrementalStatusBase&); + + TRemoteConnectionIncrementalStatusBase(); + }; + +#define REMOTE_CONNECTION_READER_INCREMENTAL_STATUS_MAP(XX, comma) \ + XX(MessageCounter, TMessageCounter, Add) \ + comma \ + XX(StatusCounter, TMessageStatusCounter, Add) /**/ + + struct TRemoteConnectionReaderIncrementalStatus: public TRemoteConnectionIncrementalStatusBase { + REMOTE_CONNECTION_READER_INCREMENTAL_STATUS_MAP(STRUCT_FIELD_GEN, ) + + TRemoteConnectionReaderIncrementalStatus& operator+=(const TRemoteConnectionReaderIncrementalStatus&); + + TRemoteConnectionReaderIncrementalStatus(); + }; + +#define REMOTE_CONNECTION_READER_STATUS_MAP(XX, comma) \ + XX(QuotaMsg, size_t, Add) \ + comma \ + XX(QuotaBytes, size_t, Add) comma \ + XX(QuotaExhausted, size_t, Add) comma \ + XX(Incremental, TRemoteConnectionReaderIncrementalStatus, Add) /**/ + + struct TRemoteConnectionReaderStatus: public TRemoteConnectionStatusBase { + REMOTE_CONNECTION_READER_STATUS_MAP(STRUCT_FIELD_GEN, ) + + TRemoteConnectionReaderStatus& operator+=(const TRemoteConnectionReaderStatus&); + + TRemoteConnectionReaderStatus(); + }; + +#define REMOTE_CONNECTION_WRITER_INCREMENTAL_STATUS(XX, comma) \ + XX(MessageCounter, TMessageCounter, Add) \ + comma \ + XX(StatusCounter, TMessageStatusCounter, Add) comma \ + XX(ProcessDurationHistogram, TDurationHistogram, Add) /**/ + + struct TRemoteConnectionWriterIncrementalStatus: public TRemoteConnectionIncrementalStatusBase { + REMOTE_CONNECTION_WRITER_INCREMENTAL_STATUS(STRUCT_FIELD_GEN, ) + + TRemoteConnectionWriterIncrementalStatus& operator+=(const TRemoteConnectionWriterIncrementalStatus&); + + TRemoteConnectionWriterIncrementalStatus(); + }; + +#define REMOTE_CONNECTION_WRITER_STATUS(XX, comma) \ + XX(Connected, bool, AssertZero) \ + comma \ + XX(ConnectTime, TInstant, AssertZero) comma /* either connect time on client or accept time on server */ \ + XX(ConnectError, int, AssertZero) comma \ + XX(ConnectSyscalls, unsigned, Add) comma \ + XX(PeerAddr, TNetAddr, AssertZero) comma \ + XX(MyAddr, TNetAddr, AssertZero) comma \ + XX(State, EWriterState, AssertZero) comma \ + XX(SendQueueSize, size_t, Add) comma \ + XX(AckMessagesSize, size_t, Add) comma /* client only */ \ + XX(DurationCounter, TDurationCounter, Add) comma /* server only */ \ + XX(DurationCounterPrev, TDurationCounter, Add) comma /* server only */ \ + XX(Incremental, TRemoteConnectionWriterIncrementalStatus, Add) comma \ + XX(ReaderWakeups, size_t, Add) /**/ + + struct TRemoteConnectionWriterStatus: public TRemoteConnectionStatusBase { + REMOTE_CONNECTION_WRITER_STATUS(STRUCT_FIELD_GEN, ) + + TRemoteConnectionWriterStatus(); + + TRemoteConnectionWriterStatus& operator+=(const TRemoteConnectionWriterStatus&); + + size_t GetInFlight() const; + }; + +#define REMOTE_CONNECTION_STATUS_MAP(XX, comma) \ + XX(Summary, bool) \ + comma \ + XX(Server, bool) /**/ + + struct TRemoteConnectionStatus { + REMOTE_CONNECTION_STATUS_MAP(STRUCT_FIELD_GEN, ) + + TRemoteConnectionReaderStatus ReaderStatus; + TRemoteConnectionWriterStatus WriterStatus; + + TRemoteConnectionStatus(); + + TString PrintToString() const; + TConnectionStatusMonRecord GetStatusProtobuf() const; + }; + + struct TBusSessionStatus { + size_t InFlightCount; + size_t InFlightSize; + bool InputPaused; + + TBusSessionStatus(); + }; + + struct TSessionDumpStatus { + bool Shutdown; + TString Head; + TString Acceptors; + TString ConnectionsSummary; + TString Connections; + TBusSessionStatus Status; + TRemoteConnectionStatus ConnectionStatusSummary; + TBusSessionConfig Config; + + TSessionDumpStatus() + : Shutdown(false) + { + } + + TString PrintToString() const; + }; + + // without sessions + struct TBusMessageQueueStatus { + NActor::NPrivate::TExecutorStatus ExecutorStatus; + TBusQueueConfig Config; + + TString PrintToString() const; + }; + } +} |