aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus
diff options
context:
space:
mode:
authorSergey Polovko <sergey@polovko.me>2022-02-10 16:47:02 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:02 +0300
commit3e0b762a82514bac89c1dd6ea7211e381d8aa248 (patch)
treec2d1b379ecaf05ca8f11ed0b5da9d1a950e6e554 /library/cpp/messagebus
parentab3783171cc30e262243a0227c86118f7080c896 (diff)
downloadydb-3e0b762a82514bac89c1dd6ea7211e381d8aa248.tar.gz
Restoring authorship annotation for Sergey Polovko <sergey@polovko.me>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus')
-rw-r--r--library/cpp/messagebus/messqueue.cpp4
-rw-r--r--library/cpp/messagebus/monitoring/mon_proto.proto22
-rw-r--r--library/cpp/messagebus/monitoring/ya.make2
-rw-r--r--library/cpp/messagebus/network.h4
-rw-r--r--library/cpp/messagebus/session.cpp6
-rw-r--r--library/cpp/messagebus/session.h6
-rw-r--r--library/cpp/messagebus/session_impl.cpp16
-rw-r--r--library/cpp/messagebus/www/www.cpp42
-rw-r--r--library/cpp/messagebus/www/ya.make2
9 files changed, 52 insertions, 52 deletions
diff --git a/library/cpp/messagebus/messqueue.cpp b/library/cpp/messagebus/messqueue.cpp
index 3474d62705..6197b6bbf3 100644
--- a/library/cpp/messagebus/messqueue.cpp
+++ b/library/cpp/messagebus/messqueue.cpp
@@ -159,8 +159,8 @@ TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IB
} catch (...) {
Y_FAIL("create destination failure: %s", CurrentExceptionMessage().c_str());
}
-}
-
+}
+
void TBusMessageQueue::Add(TIntrusivePtr<TBusSessionImpl> session) {
TGuard<TMutex> scope(Lock);
Sessions.push_back(session);
diff --git a/library/cpp/messagebus/monitoring/mon_proto.proto b/library/cpp/messagebus/monitoring/mon_proto.proto
index 73b6614481..eda77de7a5 100644
--- a/library/cpp/messagebus/monitoring/mon_proto.proto
+++ b/library/cpp/messagebus/monitoring/mon_proto.proto
@@ -1,4 +1,4 @@
-import "library/cpp/monlib/encode/legacy_protobuf/protos/metric_meta.proto";
+import "library/cpp/monlib/encode/legacy_protobuf/protos/metric_meta.proto";
package NBus;
@@ -29,19 +29,19 @@ message TMessageStatusRecord {
}
message TConnectionStatusMonRecord {
- optional uint32 SendQueueSize = 1 [ (NMonProto.Metric).Type = GAUGE ];
+ optional uint32 SendQueueSize = 1 [ (NMonProto.Metric).Type = GAUGE ];
// client only
- optional uint32 AckMessagesSize = 2 [ (NMonProto.Metric).Type = GAUGE ];
- optional uint32 ErrorCount = 3 [ (NMonProto.Metric).Type = RATE ];
+ optional uint32 AckMessagesSize = 2 [ (NMonProto.Metric).Type = GAUGE ];
+ optional uint32 ErrorCount = 3 [ (NMonProto.Metric).Type = RATE ];
- optional uint64 WriteBytes = 10 [ (NMonProto.Metric).Type = RATE ];
+ optional uint64 WriteBytes = 10 [ (NMonProto.Metric).Type = RATE ];
optional uint64 WriteBytesCompressed = 11;
- optional uint64 WriteMessages = 12 [ (NMonProto.Metric).Type = RATE ];
+ optional uint64 WriteMessages = 12 [ (NMonProto.Metric).Type = RATE ];
optional uint64 WriteSyscalls = 13;
optional uint64 WriteActs = 14;
- optional uint64 ReadBytes = 20 [ (NMonProto.Metric).Type = RATE ];
+ optional uint64 ReadBytes = 20 [ (NMonProto.Metric).Type = RATE ];
optional uint64 ReadBytesCompressed = 21;
- optional uint64 ReadMessages = 22 [ (NMonProto.Metric).Type = RATE ];
+ optional uint64 ReadMessages = 22 [ (NMonProto.Metric).Type = RATE ];
optional uint64 ReadSyscalls = 23;
optional uint64 ReadActs = 24;
@@ -49,7 +49,7 @@ message TConnectionStatusMonRecord {
}
message TSessionStatusMonRecord {
- optional uint32 InFlight = 1 [ (NMonProto.Metric).Type = GAUGE ];
- optional uint32 ConnectionCount = 2 [ (NMonProto.Metric).Type = GAUGE ];
- optional uint32 ConnectCount = 3 [ (NMonProto.Metric).Type = RATE ];
+ optional uint32 InFlight = 1 [ (NMonProto.Metric).Type = GAUGE ];
+ optional uint32 ConnectionCount = 2 [ (NMonProto.Metric).Type = GAUGE ];
+ optional uint32 ConnectCount = 3 [ (NMonProto.Metric).Type = RATE ];
}
diff --git a/library/cpp/messagebus/monitoring/ya.make b/library/cpp/messagebus/monitoring/ya.make
index 25782492b1..7eec26150f 100644
--- a/library/cpp/messagebus/monitoring/ya.make
+++ b/library/cpp/messagebus/monitoring/ya.make
@@ -3,7 +3,7 @@ PROTO_LIBRARY()
OWNER(g:messagebus)
PEERDIR(
- library/cpp/monlib/encode/legacy_protobuf/protos
+ library/cpp/monlib/encode/legacy_protobuf/protos
)
SRCS(
diff --git a/library/cpp/messagebus/network.h b/library/cpp/messagebus/network.h
index cc4bd76ea3..6f589bd8d8 100644
--- a/library/cpp/messagebus/network.h
+++ b/library/cpp/messagebus/network.h
@@ -8,7 +8,7 @@
#include <utility>
-namespace NBus {
+namespace NBus {
namespace NPrivate {
void SetSockOptTcpCork(SOCKET s, bool value);
@@ -24,5 +24,5 @@ namespace NBus {
};
std::pair<unsigned, TVector<TBindResult>> BindOnPort(int port, bool reusePort);
-
+
}
diff --git a/library/cpp/messagebus/session.cpp b/library/cpp/messagebus/session.cpp
index 46a7ece6a8..7da9a931c1 100644
--- a/library/cpp/messagebus/session.cpp
+++ b/library/cpp/messagebus/session.cpp
@@ -124,7 +124,7 @@ TBusClientSessionPtr TBusClientSession::Create(TBusProtocol* proto, IBusClientHa
TBusServerSessionPtr TBusServerSession::Create(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue) {
return queue->CreateDestination(proto, handler, config);
}
-
+
TBusServerSessionPtr TBusServerSession::Create(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue, const TVector<TBindResult>& bindTo) {
- return queue->CreateDestination(proto, handler, config, bindTo);
-}
+ return queue->CreateDestination(proto, handler, config, bindTo);
+}
diff --git a/library/cpp/messagebus/session.h b/library/cpp/messagebus/session.h
index fb12ab7c22..b1734a89e8 100644
--- a/library/cpp/messagebus/session.h
+++ b/library/cpp/messagebus/session.h
@@ -124,10 +124,10 @@ namespace NBus {
static TBusServerSessionPtr Create(
TBusProtocol* proto,
IBusServerHandler* handler,
- const TBusServerSessionConfig& config,
- TBusMessageQueuePtr queue,
+ const TBusServerSessionConfig& config,
+ TBusMessageQueuePtr queue,
const TVector<TBindResult>& bindTo);
-
+
// TODO: make parameter non-const
virtual EMessageStatus SendReply(const TBusIdentity& ident, TBusMessage* pRep) = 0;
diff --git a/library/cpp/messagebus/session_impl.cpp b/library/cpp/messagebus/session_impl.cpp
index ddf9f360c4..5e11c70d89 100644
--- a/library/cpp/messagebus/session_impl.cpp
+++ b/library/cpp/messagebus/session_impl.cpp
@@ -485,23 +485,23 @@ void TBusSessionImpl::Act(TConnectionTag) {
void TBusSessionImpl::Listen(int port, TBusMessageQueue* q) {
Listen(BindOnPort(port, Config.ReusePort).second, q);
-}
-
+}
+
void TBusSessionImpl::Listen(const TVector<TBindResult>& bindTo, TBusMessageQueue* q) {
Y_ASSERT(q == Queue);
- int actualPort = -1;
+ int actualPort = -1;
for (const TBindResult& br : bindTo) {
- if (actualPort == -1) {
- actualPort = br.Addr.GetPort();
+ if (actualPort == -1) {
+ actualPort = br.Addr.GetPort();
} else {
- Y_VERIFY(actualPort == br.Addr.GetPort(), "state check");
+ Y_VERIFY(actualPort == br.Addr.GetPort(), "state check");
}
if (Config.SocketToS >= 0) {
- SetSocketToS(*br.Socket, &(br.Addr), Config.SocketToS);
+ SetSocketToS(*br.Socket, &(br.Addr), Config.SocketToS);
}
- TAcceptorPtr acceptor(new TAcceptor(this, ++LastAcceptorId, br.Socket->Release(), br.Addr));
+ TAcceptorPtr acceptor(new TAcceptor(this, ++LastAcceptorId, br.Socket->Release(), br.Addr));
TConnectionsGuard guard(ConnectionsLock);
InsertAcceptorLockAcquired(acceptor.Get());
diff --git a/library/cpp/messagebus/www/www.cpp b/library/cpp/messagebus/www/www.cpp
index 62ec241d85..222195e70b 100644
--- a/library/cpp/messagebus/www/www.cpp
+++ b/library/cpp/messagebus/www/www.cpp
@@ -4,7 +4,7 @@
#include "html_output.h"
#include <library/cpp/messagebus/remote_connection_status.h>
-#include <library/cpp/monlib/deprecated/json/writer.h>
+#include <library/cpp/monlib/deprecated/json/writer.h>
#include <library/cpp/archive/yarchive.h>
#include <library/cpp/http/fetch/httpfsm.h>
@@ -621,57 +621,57 @@ struct TBusWww::TImpl {
}
}
- void WriteQueueSensors(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf queueName, TBusMessageQueue* queue) {
+ void WriteQueueSensors(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf queueName, TBusMessageQueue* queue) {
auto status = queue->GetStatusRecordInternal();
- sj.OpenMetric();
+ sj.OpenMetric();
sj.WriteLabels("mb_queue", queueName, "sensor", "WorkQueueSize");
sj.WriteValue(status.ExecutorStatus.WorkQueueSize);
- sj.CloseMetric();
+ sj.CloseMetric();
}
- void WriteMessageCounterSensors(NMonitoring::TDeprecatedJsonWriter& sj,
+ void WriteMessageCounterSensors(NMonitoring::TDeprecatedJsonWriter& sj,
TStringBuf labelName, TStringBuf sessionName, bool read, const TMessageCounter& counter) {
TStringBuf readOrWrite = read ? "read" : "write";
- sj.OpenMetric();
+ sj.OpenMetric();
sj.WriteLabels(labelName, sessionName, "mb_dir", readOrWrite, "sensor", "MessageBytes");
sj.WriteValue(counter.BytesData);
sj.WriteModeDeriv();
- sj.CloseMetric();
+ sj.CloseMetric();
- sj.OpenMetric();
+ sj.OpenMetric();
sj.WriteLabels(labelName, sessionName, "mb_dir", readOrWrite, "sensor", "MessageCount");
sj.WriteValue(counter.Count);
sj.WriteModeDeriv();
- sj.CloseMetric();
+ sj.CloseMetric();
}
- void WriteSessionStatus(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf sessionName, bool client,
+ void WriteSessionStatus(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf sessionName, bool client,
TBusSession* session) {
TStringBuf labelName = client ? "mb_client_session" : "mb_server_session";
auto status = session->GetStatusRecordInternal();
- sj.OpenMetric();
+ sj.OpenMetric();
sj.WriteLabels(labelName, sessionName, "sensor", "InFlightCount");
sj.WriteValue(status.Status.InFlightCount);
- sj.CloseMetric();
+ sj.CloseMetric();
- sj.OpenMetric();
+ sj.OpenMetric();
sj.WriteLabels(labelName, sessionName, "sensor", "InFlightSize");
sj.WriteValue(status.Status.InFlightSize);
- sj.CloseMetric();
+ sj.CloseMetric();
- sj.OpenMetric();
+ sj.OpenMetric();
sj.WriteLabels(labelName, sessionName, "sensor", "SendQueueSize");
sj.WriteValue(status.ConnectionStatusSummary.WriterStatus.SendQueueSize);
- sj.CloseMetric();
+ sj.CloseMetric();
if (client) {
- sj.OpenMetric();
+ sj.OpenMetric();
sj.WriteLabels(labelName, sessionName, "sensor", "AckMessagesSize");
sj.WriteValue(status.ConnectionStatusSummary.WriterStatus.AckMessagesSize);
- sj.CloseMetric();
+ sj.CloseMetric();
}
WriteMessageCounterSensors(sj, labelName, sessionName, false,
@@ -686,10 +686,10 @@ struct TBusWww::TImpl {
Y_UNUSED(ss);
bool all = q == "" && cs == "" && ss == "";
- NMonitoring::TDeprecatedJsonWriter sj(&Os);
+ NMonitoring::TDeprecatedJsonWriter sj(&Os);
sj.OpenDocument();
- sj.OpenMetrics();
+ sj.OpenMetrics();
for (unsigned i = 0; i < Outer->Queues.size(); ++i) {
TString queueName = Outer->Queues.Entries[i].first;
@@ -716,7 +716,7 @@ struct TBusWww::TImpl {
}
}
- sj.CloseMetrics();
+ sj.CloseMetrics();
sj.CloseDocument();
}
diff --git a/library/cpp/messagebus/www/ya.make b/library/cpp/messagebus/www/ya.make
index 972390cea3..de2fc0beca 100644
--- a/library/cpp/messagebus/www/ya.make
+++ b/library/cpp/messagebus/www/ya.make
@@ -22,7 +22,7 @@ PEERDIR(
library/cpp/json/writer
library/cpp/messagebus
library/cpp/messagebus/oldmodule
- library/cpp/monlib/deprecated/json
+ library/cpp/monlib/deprecated/json
library/cpp/uri
)