diff options
| author | Sergey Polovko <[email protected]> | 2022-02-10 16:47:02 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:47:02 +0300 | 
| commit | 3e0b762a82514bac89c1dd6ea7211e381d8aa248 (patch) | |
| tree | c2d1b379ecaf05ca8f11ed0b5da9d1a950e6e554 /library/cpp/messagebus | |
| parent | ab3783171cc30e262243a0227c86118f7080c896 (diff) | |
Restoring authorship annotation for Sergey Polovko <[email protected]>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus')
| -rw-r--r-- | library/cpp/messagebus/messqueue.cpp | 4 | ||||
| -rw-r--r-- | library/cpp/messagebus/monitoring/mon_proto.proto | 22 | ||||
| -rw-r--r-- | library/cpp/messagebus/monitoring/ya.make | 2 | ||||
| -rw-r--r-- | library/cpp/messagebus/network.h | 4 | ||||
| -rw-r--r-- | library/cpp/messagebus/session.cpp | 6 | ||||
| -rw-r--r-- | library/cpp/messagebus/session.h | 6 | ||||
| -rw-r--r-- | library/cpp/messagebus/session_impl.cpp | 16 | ||||
| -rw-r--r-- | library/cpp/messagebus/www/www.cpp | 42 | ||||
| -rw-r--r-- | library/cpp/messagebus/www/ya.make | 2 | 
9 files changed, 52 insertions, 52 deletions
diff --git a/library/cpp/messagebus/messqueue.cpp b/library/cpp/messagebus/messqueue.cpp index 3474d62705f..6197b6bbf3f 100644 --- a/library/cpp/messagebus/messqueue.cpp +++ b/library/cpp/messagebus/messqueue.cpp @@ -159,8 +159,8 @@ TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IB      } catch (...) {          Y_FAIL("create destination failure: %s", CurrentExceptionMessage().c_str());      } -} - +}  +   void TBusMessageQueue::Add(TIntrusivePtr<TBusSessionImpl> session) {      TGuard<TMutex> scope(Lock);      Sessions.push_back(session); diff --git a/library/cpp/messagebus/monitoring/mon_proto.proto b/library/cpp/messagebus/monitoring/mon_proto.proto index 73b6614481e..eda77de7a52 100644 --- a/library/cpp/messagebus/monitoring/mon_proto.proto +++ b/library/cpp/messagebus/monitoring/mon_proto.proto @@ -1,4 +1,4 @@ -import "library/cpp/monlib/encode/legacy_protobuf/protos/metric_meta.proto"; +import "library/cpp/monlib/encode/legacy_protobuf/protos/metric_meta.proto";   package NBus; @@ -29,19 +29,19 @@ message TMessageStatusRecord {  }  message TConnectionStatusMonRecord { -    optional uint32 SendQueueSize = 1          [ (NMonProto.Metric).Type = GAUGE ]; +    optional uint32 SendQueueSize = 1          [ (NMonProto.Metric).Type = GAUGE ];       // client only -    optional uint32 AckMessagesSize = 2        [ (NMonProto.Metric).Type = GAUGE ]; -    optional uint32 ErrorCount = 3             [ (NMonProto.Metric).Type = RATE ]; +    optional uint32 AckMessagesSize = 2        [ (NMonProto.Metric).Type = GAUGE ];  +    optional uint32 ErrorCount = 3             [ (NMonProto.Metric).Type = RATE ];  -    optional uint64 WriteBytes = 10            [ (NMonProto.Metric).Type = RATE ]; +    optional uint64 WriteBytes = 10            [ (NMonProto.Metric).Type = RATE ];       optional uint64 WriteBytesCompressed = 11; -    optional uint64 WriteMessages = 12         [ (NMonProto.Metric).Type = RATE ]; +    optional uint64 WriteMessages = 12         [ (NMonProto.Metric).Type = RATE ];       optional uint64 WriteSyscalls = 13;      optional uint64 WriteActs = 14; -    optional uint64 ReadBytes = 20             [ (NMonProto.Metric).Type = RATE ]; +    optional uint64 ReadBytes = 20             [ (NMonProto.Metric).Type = RATE ];       optional uint64 ReadBytesCompressed = 21; -    optional uint64 ReadMessages = 22          [ (NMonProto.Metric).Type = RATE ]; +    optional uint64 ReadMessages = 22          [ (NMonProto.Metric).Type = RATE ];       optional uint64 ReadSyscalls = 23;      optional uint64 ReadActs = 24; @@ -49,7 +49,7 @@ message TConnectionStatusMonRecord {  }  message TSessionStatusMonRecord { -    optional uint32 InFlight = 1         [ (NMonProto.Metric).Type = GAUGE ]; -    optional uint32 ConnectionCount = 2  [ (NMonProto.Metric).Type = GAUGE ]; -    optional uint32 ConnectCount = 3     [ (NMonProto.Metric).Type = RATE ]; +    optional uint32 InFlight = 1         [ (NMonProto.Metric).Type = GAUGE ];  +    optional uint32 ConnectionCount = 2  [ (NMonProto.Metric).Type = GAUGE ];  +    optional uint32 ConnectCount = 3     [ (NMonProto.Metric).Type = RATE ];   } diff --git a/library/cpp/messagebus/monitoring/ya.make b/library/cpp/messagebus/monitoring/ya.make index 25782492b11..7eec26150fb 100644 --- a/library/cpp/messagebus/monitoring/ya.make +++ b/library/cpp/messagebus/monitoring/ya.make @@ -3,7 +3,7 @@ PROTO_LIBRARY()  OWNER(g:messagebus)  PEERDIR( -    library/cpp/monlib/encode/legacy_protobuf/protos +    library/cpp/monlib/encode/legacy_protobuf/protos   )  SRCS( diff --git a/library/cpp/messagebus/network.h b/library/cpp/messagebus/network.h index cc4bd76ea31..6f589bd8d81 100644 --- a/library/cpp/messagebus/network.h +++ b/library/cpp/messagebus/network.h @@ -8,7 +8,7 @@  #include <utility> -namespace NBus { +namespace NBus {       namespace NPrivate {          void SetSockOptTcpCork(SOCKET s, bool value); @@ -24,5 +24,5 @@ namespace NBus {      };      std::pair<unsigned, TVector<TBindResult>> BindOnPort(int port, bool reusePort); - +   } diff --git a/library/cpp/messagebus/session.cpp b/library/cpp/messagebus/session.cpp index 46a7ece6a8f..7da9a931c13 100644 --- a/library/cpp/messagebus/session.cpp +++ b/library/cpp/messagebus/session.cpp @@ -124,7 +124,7 @@ TBusClientSessionPtr TBusClientSession::Create(TBusProtocol* proto, IBusClientHa  TBusServerSessionPtr TBusServerSession::Create(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue) {      return queue->CreateDestination(proto, handler, config);  } - +   TBusServerSessionPtr TBusServerSession::Create(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue, const TVector<TBindResult>& bindTo) { -    return queue->CreateDestination(proto, handler, config, bindTo); -} +    return queue->CreateDestination(proto, handler, config, bindTo);  +}  diff --git a/library/cpp/messagebus/session.h b/library/cpp/messagebus/session.h index fb12ab7c229..b1734a89e82 100644 --- a/library/cpp/messagebus/session.h +++ b/library/cpp/messagebus/session.h @@ -124,10 +124,10 @@ namespace NBus {          static TBusServerSessionPtr Create(              TBusProtocol* proto,              IBusServerHandler* handler, -            const TBusServerSessionConfig& config, -            TBusMessageQueuePtr queue, +            const TBusServerSessionConfig& config,  +            TBusMessageQueuePtr queue,               const TVector<TBindResult>& bindTo); - +           // TODO: make parameter non-const          virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0; diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp index ddf9f360c43..5e11c70d891 100644 --- a/library/cpp/messagebus/session_impl.cpp +++ b/library/cpp/messagebus/session_impl.cpp @@ -485,23 +485,23 @@ void TBusSessionImpl::Act(TConnectionTag) {  void TBusSessionImpl::Listen(int port, TBusMessageQueue* q) {      Listen(BindOnPort(port, Config.ReusePort).second, q); -} - +}  +   void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q) {      Y_ASSERT(q == Queue); -    int actualPort = -1; +    int actualPort = -1;       for (const TBindResult& br : bindTo) { -        if (actualPort == -1) { -            actualPort = br.Addr.GetPort(); +        if (actualPort == -1) {  +            actualPort = br.Addr.GetPort();           } else { -            Y_VERIFY(actualPort == br.Addr.GetPort(), "state check"); +            Y_VERIFY(actualPort == br.Addr.GetPort(), "state check");           }          if (Config.SocketToS >= 0) { -            SetSocketToS(*br.Socket, &(br.Addr), Config.SocketToS); +            SetSocketToS(*br.Socket, &(br.Addr), Config.SocketToS);           } -        TAcceptorPtr acceptor(new TAcceptor(this, ++LastAcceptorId, br.Socket->Release(), br.Addr)); +        TAcceptorPtr acceptor(new TAcceptor(this, ++LastAcceptorId, br.Socket->Release(), br.Addr));           TConnectionsGuard guard(ConnectionsLock);          InsertAcceptorLockAcquired(acceptor.Get()); diff --git a/library/cpp/messagebus/www/www.cpp b/library/cpp/messagebus/www/www.cpp index 62ec241d852..222195e70bb 100644 --- a/library/cpp/messagebus/www/www.cpp +++ b/library/cpp/messagebus/www/www.cpp @@ -4,7 +4,7 @@  #include "html_output.h"  #include <library/cpp/messagebus/remote_connection_status.h> -#include <library/cpp/monlib/deprecated/json/writer.h> +#include <library/cpp/monlib/deprecated/json/writer.h>   #include <library/cpp/archive/yarchive.h>  #include <library/cpp/http/fetch/httpfsm.h> @@ -621,57 +621,57 @@ struct TBusWww::TImpl {              }          } -        void WriteQueueSensors(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf queueName, TBusMessageQueue* queue) { +        void WriteQueueSensors(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf queueName, TBusMessageQueue* queue) {               auto status = queue->GetStatusRecordInternal(); -            sj.OpenMetric(); +            sj.OpenMetric();               sj.WriteLabels("mb_queue", queueName, "sensor", "WorkQueueSize");              sj.WriteValue(status.ExecutorStatus.WorkQueueSize); -            sj.CloseMetric(); +            sj.CloseMetric();           } -        void WriteMessageCounterSensors(NMonitoring::TDeprecatedJsonWriter& sj, +        void WriteMessageCounterSensors(NMonitoring::TDeprecatedJsonWriter& sj,                                           TStringBuf labelName, TStringBuf sessionName, bool read, const TMessageCounter& counter) {              TStringBuf readOrWrite = read ? "read" : "write"; -            sj.OpenMetric(); +            sj.OpenMetric();               sj.WriteLabels(labelName, sessionName, "mb_dir", readOrWrite, "sensor", "MessageBytes");              sj.WriteValue(counter.BytesData);              sj.WriteModeDeriv(); -            sj.CloseMetric(); +            sj.CloseMetric();  -            sj.OpenMetric(); +            sj.OpenMetric();               sj.WriteLabels(labelName, sessionName, "mb_dir", readOrWrite, "sensor", "MessageCount");              sj.WriteValue(counter.Count);              sj.WriteModeDeriv(); -            sj.CloseMetric(); +            sj.CloseMetric();           } -        void WriteSessionStatus(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf sessionName, bool client, +        void WriteSessionStatus(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf sessionName, bool client,                                   TBusSession* session) {              TStringBuf labelName = client ? "mb_client_session" : "mb_server_session";              auto status = session->GetStatusRecordInternal(); -            sj.OpenMetric(); +            sj.OpenMetric();               sj.WriteLabels(labelName, sessionName, "sensor", "InFlightCount");              sj.WriteValue(status.Status.InFlightCount); -            sj.CloseMetric(); +            sj.CloseMetric();  -            sj.OpenMetric(); +            sj.OpenMetric();               sj.WriteLabels(labelName, sessionName, "sensor", "InFlightSize");              sj.WriteValue(status.Status.InFlightSize); -            sj.CloseMetric(); +            sj.CloseMetric();  -            sj.OpenMetric(); +            sj.OpenMetric();               sj.WriteLabels(labelName, sessionName, "sensor", "SendQueueSize");              sj.WriteValue(status.ConnectionStatusSummary.WriterStatus.SendQueueSize); -            sj.CloseMetric(); +            sj.CloseMetric();               if (client) { -                sj.OpenMetric(); +                sj.OpenMetric();                   sj.WriteLabels(labelName, sessionName, "sensor", "AckMessagesSize");                  sj.WriteValue(status.ConnectionStatusSummary.WriterStatus.AckMessagesSize); -                sj.CloseMetric(); +                sj.CloseMetric();               }              WriteMessageCounterSensors(sj, labelName, sessionName, false, @@ -686,10 +686,10 @@ struct TBusWww::TImpl {              Y_UNUSED(ss);              bool all = q == "" && cs == "" && ss == ""; -            NMonitoring::TDeprecatedJsonWriter sj(&Os); +            NMonitoring::TDeprecatedJsonWriter sj(&Os);               sj.OpenDocument(); -            sj.OpenMetrics(); +            sj.OpenMetrics();               for (unsigned i = 0; i < Outer->Queues.size(); ++i) {                  TString queueName = Outer->Queues.Entries[i].first; @@ -716,7 +716,7 @@ struct TBusWww::TImpl {                  }              } -            sj.CloseMetrics(); +            sj.CloseMetrics();               sj.CloseDocument();          } diff --git a/library/cpp/messagebus/www/ya.make b/library/cpp/messagebus/www/ya.make index 972390cea3f..de2fc0becad 100644 --- a/library/cpp/messagebus/www/ya.make +++ b/library/cpp/messagebus/www/ya.make @@ -22,7 +22,7 @@ PEERDIR(      library/cpp/json/writer      library/cpp/messagebus      library/cpp/messagebus/oldmodule -    library/cpp/monlib/deprecated/json +    library/cpp/monlib/deprecated/json       library/cpp/uri  )  | 
