summaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/remote_connection_status.h
diff options
context:
space:
mode:
authorDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/remote_connection_status.h
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/remote_connection_status.h')
-rw-r--r--library/cpp/messagebus/remote_connection_status.h214
1 files changed, 214 insertions, 0 deletions
diff --git a/library/cpp/messagebus/remote_connection_status.h b/library/cpp/messagebus/remote_connection_status.h
new file mode 100644
index 00000000000..5db10e51eae
--- /dev/null
+++ b/library/cpp/messagebus/remote_connection_status.h
@@ -0,0 +1,214 @@
+#pragma once
+
+#include "codegen.h"
+#include "duration_histogram.h"
+#include "message_counter.h"
+#include "message_status_counter.h"
+#include "queue_config.h"
+#include "session_config.h"
+
+#include <library/cpp/messagebus/actor/executor.h>
+
+#include <library/cpp/deprecated/enum_codegen/enum_codegen.h>
+
+namespace NBus {
+ class TConnectionStatusMonRecord;
+}
+
+namespace NBus {
+ namespace NPrivate {
+#define WRITER_STATE_MAP(XX) \
+ XX(WRITER_UNKNOWN) \
+ XX(WRITER_FILLING) \
+ XX(WRITER_FLUSHING) \
+ /**/
+
+ // TODO: move elsewhere
+ enum EWriterState {
+ WRITER_STATE_MAP(ENUM_VALUE_GEN_NO_VALUE)
+ };
+
+ ENUM_TO_STRING(EWriterState, WRITER_STATE_MAP)
+
+#define STRUCT_FIELD_ADD(name, type, func) func(name, that.name);
+
+ template <typename T>
+ void Reset(T& t) {
+ t.~T();
+ new (&t) T();
+ }
+
+#define DURATION_COUNTER_MAP(XX, comma) \
+ XX(Count, unsigned, Add) \
+ comma \
+ XX(SumDuration, TDuration, Add) comma \
+ XX(MaxDuration, TDuration, Max) /**/
+
+ struct TDurationCounter {
+ DURATION_COUNTER_MAP(STRUCT_FIELD_GEN, )
+
+ TDuration AvgDuration() const;
+
+ TDurationCounter();
+
+ void AddDuration(TDuration d) {
+ Count += 1;
+ SumDuration += d;
+ if (d > MaxDuration) {
+ MaxDuration = d;
+ }
+ }
+
+ TDurationCounter& operator+=(const TDurationCounter&);
+
+ TString ToString() const;
+ };
+
+#define REMOTE_CONNECTION_STATUS_BASE_MAP(XX, comma) \
+ XX(ConnectionId, ui64, AssertZero) \
+ comma \
+ XX(Fd, SOCKET, AssertZero) comma \
+ XX(Acts, ui64, Add) comma \
+ XX(BufferSize, ui64, Add) /**/
+
+ struct TRemoteConnectionStatusBase {
+ REMOTE_CONNECTION_STATUS_BASE_MAP(STRUCT_FIELD_GEN, )
+
+ TRemoteConnectionStatusBase& operator+=(const TRemoteConnectionStatusBase&);
+
+ TRemoteConnectionStatusBase();
+ };
+
+#define REMOTE_CONNECTION_INCREMENTAL_STATUS_BASE_MAP(XX, comma) \
+ XX(BufferDrops, unsigned, Add) \
+ comma \
+ XX(NetworkOps, unsigned, Add) /**/
+
+ struct TRemoteConnectionIncrementalStatusBase {
+ REMOTE_CONNECTION_INCREMENTAL_STATUS_BASE_MAP(STRUCT_FIELD_GEN, )
+
+ TRemoteConnectionIncrementalStatusBase& operator+=(const TRemoteConnectionIncrementalStatusBase&);
+
+ TRemoteConnectionIncrementalStatusBase();
+ };
+
+#define REMOTE_CONNECTION_READER_INCREMENTAL_STATUS_MAP(XX, comma) \
+ XX(MessageCounter, TMessageCounter, Add) \
+ comma \
+ XX(StatusCounter, TMessageStatusCounter, Add) /**/
+
+ struct TRemoteConnectionReaderIncrementalStatus: public TRemoteConnectionIncrementalStatusBase {
+ REMOTE_CONNECTION_READER_INCREMENTAL_STATUS_MAP(STRUCT_FIELD_GEN, )
+
+ TRemoteConnectionReaderIncrementalStatus& operator+=(const TRemoteConnectionReaderIncrementalStatus&);
+
+ TRemoteConnectionReaderIncrementalStatus();
+ };
+
+#define REMOTE_CONNECTION_READER_STATUS_MAP(XX, comma) \
+ XX(QuotaMsg, size_t, Add) \
+ comma \
+ XX(QuotaBytes, size_t, Add) comma \
+ XX(QuotaExhausted, size_t, Add) comma \
+ XX(Incremental, TRemoteConnectionReaderIncrementalStatus, Add) /**/
+
+ struct TRemoteConnectionReaderStatus: public TRemoteConnectionStatusBase {
+ REMOTE_CONNECTION_READER_STATUS_MAP(STRUCT_FIELD_GEN, )
+
+ TRemoteConnectionReaderStatus& operator+=(const TRemoteConnectionReaderStatus&);
+
+ TRemoteConnectionReaderStatus();
+ };
+
+#define REMOTE_CONNECTION_WRITER_INCREMENTAL_STATUS(XX, comma) \
+ XX(MessageCounter, TMessageCounter, Add) \
+ comma \
+ XX(StatusCounter, TMessageStatusCounter, Add) comma \
+ XX(ProcessDurationHistogram, TDurationHistogram, Add) /**/
+
+ struct TRemoteConnectionWriterIncrementalStatus: public TRemoteConnectionIncrementalStatusBase {
+ REMOTE_CONNECTION_WRITER_INCREMENTAL_STATUS(STRUCT_FIELD_GEN, )
+
+ TRemoteConnectionWriterIncrementalStatus& operator+=(const TRemoteConnectionWriterIncrementalStatus&);
+
+ TRemoteConnectionWriterIncrementalStatus();
+ };
+
+#define REMOTE_CONNECTION_WRITER_STATUS(XX, comma) \
+ XX(Connected, bool, AssertZero) \
+ comma \
+ XX(ConnectTime, TInstant, AssertZero) comma /* either connect time on client or accept time on server */ \
+ XX(ConnectError, int, AssertZero) comma \
+ XX(ConnectSyscalls, unsigned, Add) comma \
+ XX(PeerAddr, TNetAddr, AssertZero) comma \
+ XX(MyAddr, TNetAddr, AssertZero) comma \
+ XX(State, EWriterState, AssertZero) comma \
+ XX(SendQueueSize, size_t, Add) comma \
+ XX(AckMessagesSize, size_t, Add) comma /* client only */ \
+ XX(DurationCounter, TDurationCounter, Add) comma /* server only */ \
+ XX(DurationCounterPrev, TDurationCounter, Add) comma /* server only */ \
+ XX(Incremental, TRemoteConnectionWriterIncrementalStatus, Add) comma \
+ XX(ReaderWakeups, size_t, Add) /**/
+
+ struct TRemoteConnectionWriterStatus: public TRemoteConnectionStatusBase {
+ REMOTE_CONNECTION_WRITER_STATUS(STRUCT_FIELD_GEN, )
+
+ TRemoteConnectionWriterStatus();
+
+ TRemoteConnectionWriterStatus& operator+=(const TRemoteConnectionWriterStatus&);
+
+ size_t GetInFlight() const;
+ };
+
+#define REMOTE_CONNECTION_STATUS_MAP(XX, comma) \
+ XX(Summary, bool) \
+ comma \
+ XX(Server, bool) /**/
+
+ struct TRemoteConnectionStatus {
+ REMOTE_CONNECTION_STATUS_MAP(STRUCT_FIELD_GEN, )
+
+ TRemoteConnectionReaderStatus ReaderStatus;
+ TRemoteConnectionWriterStatus WriterStatus;
+
+ TRemoteConnectionStatus();
+
+ TString PrintToString() const;
+ TConnectionStatusMonRecord GetStatusProtobuf() const;
+ };
+
+ struct TBusSessionStatus {
+ size_t InFlightCount;
+ size_t InFlightSize;
+ bool InputPaused;
+
+ TBusSessionStatus();
+ };
+
+ struct TSessionDumpStatus {
+ bool Shutdown;
+ TString Head;
+ TString Acceptors;
+ TString ConnectionsSummary;
+ TString Connections;
+ TBusSessionStatus Status;
+ TRemoteConnectionStatus ConnectionStatusSummary;
+ TBusSessionConfig Config;
+
+ TSessionDumpStatus()
+ : Shutdown(false)
+ {
+ }
+
+ TString PrintToString() const;
+ };
+
+ // without sessions
+ struct TBusMessageQueueStatus {
+ NActor::NPrivate::TExecutorStatus ExecutorStatus;
+ TBusQueueConfig Config;
+
+ TString PrintToString() const;
+ };
+ }
+}