diff options
author | Sergey Polovko <sergey@polovko.me> | 2022-02-10 16:47:02 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:02 +0300 |
commit | 3e0b762a82514bac89c1dd6ea7211e381d8aa248 (patch) | |
tree | c2d1b379ecaf05ca8f11ed0b5da9d1a950e6e554 /library/cpp/messagebus | |
parent | ab3783171cc30e262243a0227c86118f7080c896 (diff) | |
download | ydb-3e0b762a82514bac89c1dd6ea7211e381d8aa248.tar.gz |
Restoring authorship annotation for Sergey Polovko <sergey@polovko.me>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus')
-rw-r--r-- | library/cpp/messagebus/messqueue.cpp | 4 | ||||
-rw-r--r-- | library/cpp/messagebus/monitoring/mon_proto.proto | 22 | ||||
-rw-r--r-- | library/cpp/messagebus/monitoring/ya.make | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/network.h | 4 | ||||
-rw-r--r-- | library/cpp/messagebus/session.cpp | 6 | ||||
-rw-r--r-- | library/cpp/messagebus/session.h | 6 | ||||
-rw-r--r-- | library/cpp/messagebus/session_impl.cpp | 16 | ||||
-rw-r--r-- | library/cpp/messagebus/www/www.cpp | 42 | ||||
-rw-r--r-- | library/cpp/messagebus/www/ya.make | 2 |
9 files changed, 52 insertions, 52 deletions
diff --git a/library/cpp/messagebus/messqueue.cpp b/library/cpp/messagebus/messqueue.cpp index 3474d62705..6197b6bbf3 100644 --- a/library/cpp/messagebus/messqueue.cpp +++ b/library/cpp/messagebus/messqueue.cpp @@ -159,8 +159,8 @@ TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IB } catch (...) { Y_FAIL("create destination failure: %s", CurrentExceptionMessage().c_str()); } -} - +} + void TBusMessageQueue::Add(TIntrusivePtr<TBusSessionImpl> session) { TGuard<TMutex> scope(Lock); Sessions.push_back(session); diff --git a/library/cpp/messagebus/monitoring/mon_proto.proto b/library/cpp/messagebus/monitoring/mon_proto.proto index 73b6614481..eda77de7a5 100644 --- a/library/cpp/messagebus/monitoring/mon_proto.proto +++ b/library/cpp/messagebus/monitoring/mon_proto.proto @@ -1,4 +1,4 @@ -import "library/cpp/monlib/encode/legacy_protobuf/protos/metric_meta.proto"; +import "library/cpp/monlib/encode/legacy_protobuf/protos/metric_meta.proto"; package NBus; @@ -29,19 +29,19 @@ message TMessageStatusRecord { } message TConnectionStatusMonRecord { - optional uint32 SendQueueSize = 1 [ (NMonProto.Metric).Type = GAUGE ]; + optional uint32 SendQueueSize = 1 [ (NMonProto.Metric).Type = GAUGE ]; // client only - optional uint32 AckMessagesSize = 2 [ (NMonProto.Metric).Type = GAUGE ]; - optional uint32 ErrorCount = 3 [ (NMonProto.Metric).Type = RATE ]; + optional uint32 AckMessagesSize = 2 [ (NMonProto.Metric).Type = GAUGE ]; + optional uint32 ErrorCount = 3 [ (NMonProto.Metric).Type = RATE ]; - optional uint64 WriteBytes = 10 [ (NMonProto.Metric).Type = RATE ]; + optional uint64 WriteBytes = 10 [ (NMonProto.Metric).Type = RATE ]; optional uint64 WriteBytesCompressed = 11; - optional uint64 WriteMessages = 12 [ (NMonProto.Metric).Type = RATE ]; + optional uint64 WriteMessages = 12 [ (NMonProto.Metric).Type = RATE ]; optional uint64 WriteSyscalls = 13; optional uint64 WriteActs = 14; - optional uint64 ReadBytes = 20 [ (NMonProto.Metric).Type = RATE ]; + optional uint64 ReadBytes = 20 [ (NMonProto.Metric).Type = RATE ]; optional uint64 ReadBytesCompressed = 21; - optional uint64 ReadMessages = 22 [ (NMonProto.Metric).Type = RATE ]; + optional uint64 ReadMessages = 22 [ (NMonProto.Metric).Type = RATE ]; optional uint64 ReadSyscalls = 23; optional uint64 ReadActs = 24; @@ -49,7 +49,7 @@ message TConnectionStatusMonRecord { } message TSessionStatusMonRecord { - optional uint32 InFlight = 1 [ (NMonProto.Metric).Type = GAUGE ]; - optional uint32 ConnectionCount = 2 [ (NMonProto.Metric).Type = GAUGE ]; - optional uint32 ConnectCount = 3 [ (NMonProto.Metric).Type = RATE ]; + optional uint32 InFlight = 1 [ (NMonProto.Metric).Type = GAUGE ]; + optional uint32 ConnectionCount = 2 [ (NMonProto.Metric).Type = GAUGE ]; + optional uint32 ConnectCount = 3 [ (NMonProto.Metric).Type = RATE ]; } diff --git a/library/cpp/messagebus/monitoring/ya.make b/library/cpp/messagebus/monitoring/ya.make index 25782492b1..7eec26150f 100644 --- a/library/cpp/messagebus/monitoring/ya.make +++ b/library/cpp/messagebus/monitoring/ya.make @@ -3,7 +3,7 @@ PROTO_LIBRARY() OWNER(g:messagebus) PEERDIR( - library/cpp/monlib/encode/legacy_protobuf/protos + library/cpp/monlib/encode/legacy_protobuf/protos ) SRCS( diff --git a/library/cpp/messagebus/network.h b/library/cpp/messagebus/network.h index cc4bd76ea3..6f589bd8d8 100644 --- a/library/cpp/messagebus/network.h +++ b/library/cpp/messagebus/network.h @@ -8,7 +8,7 @@ #include <utility> -namespace NBus { +namespace NBus { namespace NPrivate { void SetSockOptTcpCork(SOCKET s, bool value); @@ -24,5 +24,5 @@ namespace NBus { }; std::pair<unsigned, TVector<TBindResult>> BindOnPort(int port, bool reusePort); - + } diff --git a/library/cpp/messagebus/session.cpp b/library/cpp/messagebus/session.cpp index 46a7ece6a8..7da9a931c1 100644 --- a/library/cpp/messagebus/session.cpp +++ b/library/cpp/messagebus/session.cpp @@ -124,7 +124,7 @@ TBusClientSessionPtr TBusClientSession::Create(TBusProtocol* proto, IBusClientHa TBusServerSessionPtr TBusServerSession::Create(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue) { return queue->CreateDestination(proto, handler, config); } - + TBusServerSessionPtr TBusServerSession::Create(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue, const TVector<TBindResult>& bindTo) { - return queue->CreateDestination(proto, handler, config, bindTo); -} + return queue->CreateDestination(proto, handler, config, bindTo); +} diff --git a/library/cpp/messagebus/session.h b/library/cpp/messagebus/session.h index fb12ab7c22..b1734a89e8 100644 --- a/library/cpp/messagebus/session.h +++ b/library/cpp/messagebus/session.h @@ -124,10 +124,10 @@ namespace NBus { static TBusServerSessionPtr Create( TBusProtocol* proto, IBusServerHandler* handler, - const TBusServerSessionConfig& config, - TBusMessageQueuePtr queue, + const TBusServerSessionConfig& config, + TBusMessageQueuePtr queue, const TVector<TBindResult>& bindTo); - + // TODO: make parameter non-const virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0; diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp index ddf9f360c4..5e11c70d89 100644 --- a/library/cpp/messagebus/session_impl.cpp +++ b/library/cpp/messagebus/session_impl.cpp @@ -485,23 +485,23 @@ void TBusSessionImpl::Act(TConnectionTag) { void TBusSessionImpl::Listen(int port, TBusMessageQueue* q) { Listen(BindOnPort(port, Config.ReusePort).second, q); -} - +} + void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q) { Y_ASSERT(q == Queue); - int actualPort = -1; + int actualPort = -1; for (const TBindResult& br : bindTo) { - if (actualPort == -1) { - actualPort = br.Addr.GetPort(); + if (actualPort == -1) { + actualPort = br.Addr.GetPort(); } else { - Y_VERIFY(actualPort == br.Addr.GetPort(), "state check"); + Y_VERIFY(actualPort == br.Addr.GetPort(), "state check"); } if (Config.SocketToS >= 0) { - SetSocketToS(*br.Socket, &(br.Addr), Config.SocketToS); + SetSocketToS(*br.Socket, &(br.Addr), Config.SocketToS); } - TAcceptorPtr acceptor(new TAcceptor(this, ++LastAcceptorId, br.Socket->Release(), br.Addr)); + TAcceptorPtr acceptor(new TAcceptor(this, ++LastAcceptorId, br.Socket->Release(), br.Addr)); TConnectionsGuard guard(ConnectionsLock); InsertAcceptorLockAcquired(acceptor.Get()); diff --git a/library/cpp/messagebus/www/www.cpp b/library/cpp/messagebus/www/www.cpp index 62ec241d85..222195e70b 100644 --- a/library/cpp/messagebus/www/www.cpp +++ b/library/cpp/messagebus/www/www.cpp @@ -4,7 +4,7 @@ #include "html_output.h" #include <library/cpp/messagebus/remote_connection_status.h> -#include <library/cpp/monlib/deprecated/json/writer.h> +#include <library/cpp/monlib/deprecated/json/writer.h> #include <library/cpp/archive/yarchive.h> #include <library/cpp/http/fetch/httpfsm.h> @@ -621,57 +621,57 @@ struct TBusWww::TImpl { } } - void WriteQueueSensors(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf queueName, TBusMessageQueue* queue) { + void WriteQueueSensors(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf queueName, TBusMessageQueue* queue) { auto status = queue->GetStatusRecordInternal(); - sj.OpenMetric(); + sj.OpenMetric(); sj.WriteLabels("mb_queue", queueName, "sensor", "WorkQueueSize"); sj.WriteValue(status.ExecutorStatus.WorkQueueSize); - sj.CloseMetric(); + sj.CloseMetric(); } - void WriteMessageCounterSensors(NMonitoring::TDeprecatedJsonWriter& sj, + void WriteMessageCounterSensors(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf labelName, TStringBuf sessionName, bool read, const TMessageCounter& counter) { TStringBuf readOrWrite = read ? "read" : "write"; - sj.OpenMetric(); + sj.OpenMetric(); sj.WriteLabels(labelName, sessionName, "mb_dir", readOrWrite, "sensor", "MessageBytes"); sj.WriteValue(counter.BytesData); sj.WriteModeDeriv(); - sj.CloseMetric(); + sj.CloseMetric(); - sj.OpenMetric(); + sj.OpenMetric(); sj.WriteLabels(labelName, sessionName, "mb_dir", readOrWrite, "sensor", "MessageCount"); sj.WriteValue(counter.Count); sj.WriteModeDeriv(); - sj.CloseMetric(); + sj.CloseMetric(); } - void WriteSessionStatus(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf sessionName, bool client, + void WriteSessionStatus(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf sessionName, bool client, TBusSession* session) { TStringBuf labelName = client ? "mb_client_session" : "mb_server_session"; auto status = session->GetStatusRecordInternal(); - sj.OpenMetric(); + sj.OpenMetric(); sj.WriteLabels(labelName, sessionName, "sensor", "InFlightCount"); sj.WriteValue(status.Status.InFlightCount); - sj.CloseMetric(); + sj.CloseMetric(); - sj.OpenMetric(); + sj.OpenMetric(); sj.WriteLabels(labelName, sessionName, "sensor", "InFlightSize"); sj.WriteValue(status.Status.InFlightSize); - sj.CloseMetric(); + sj.CloseMetric(); - sj.OpenMetric(); + sj.OpenMetric(); sj.WriteLabels(labelName, sessionName, "sensor", "SendQueueSize"); sj.WriteValue(status.ConnectionStatusSummary.WriterStatus.SendQueueSize); - sj.CloseMetric(); + sj.CloseMetric(); if (client) { - sj.OpenMetric(); + sj.OpenMetric(); sj.WriteLabels(labelName, sessionName, "sensor", "AckMessagesSize"); sj.WriteValue(status.ConnectionStatusSummary.WriterStatus.AckMessagesSize); - sj.CloseMetric(); + sj.CloseMetric(); } WriteMessageCounterSensors(sj, labelName, sessionName, false, @@ -686,10 +686,10 @@ struct TBusWww::TImpl { Y_UNUSED(ss); bool all = q == "" && cs == "" && ss == ""; - NMonitoring::TDeprecatedJsonWriter sj(&Os); + NMonitoring::TDeprecatedJsonWriter sj(&Os); sj.OpenDocument(); - sj.OpenMetrics(); + sj.OpenMetrics(); for (unsigned i = 0; i < Outer->Queues.size(); ++i) { TString queueName = Outer->Queues.Entries[i].first; @@ -716,7 +716,7 @@ struct TBusWww::TImpl { } } - sj.CloseMetrics(); + sj.CloseMetrics(); sj.CloseDocument(); } diff --git a/library/cpp/messagebus/www/ya.make b/library/cpp/messagebus/www/ya.make index 972390cea3..de2fc0beca 100644 --- a/library/cpp/messagebus/www/ya.make +++ b/library/cpp/messagebus/www/ya.make @@ -22,7 +22,7 @@ PEERDIR( library/cpp/json/writer library/cpp/messagebus library/cpp/messagebus/oldmodule - library/cpp/monlib/deprecated/json + library/cpp/monlib/deprecated/json library/cpp/uri ) |