aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus
diff options
context:
space:
mode:
authorAlexander Gololobov <davenger@yandex-team.com>2022-02-10 16:47:37 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:37 +0300
commit39608cdb86363c75ce55b2b9a69841c3b71f22cf (patch)
tree4ec132c1665bd4d68e3628aa18d937c70d32413b /library/cpp/messagebus
parent54295b9bd4dc45c54d804084fd846d945148a7f0 (diff)
downloadydb-39608cdb86363c75ce55b2b9a69841c3b71f22cf.tar.gz
Restoring authorship annotation for Alexander Gololobov <davenger@yandex-team.com>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus')
-rw-r--r--library/cpp/messagebus/actor/executor.cpp20
-rw-r--r--library/cpp/messagebus/actor/thread_extra.h22
-rw-r--r--library/cpp/messagebus/actor/what_thread_does.cpp8
-rw-r--r--library/cpp/messagebus/config/defs.h2
-rw-r--r--library/cpp/messagebus/latch.h4
-rw-r--r--library/cpp/messagebus/local_tasks.h12
-rw-r--r--library/cpp/messagebus/message.h2
-rw-r--r--library/cpp/messagebus/oldmodule/module.cpp8
-rw-r--r--library/cpp/messagebus/protobuf/ybusbuf.cpp10
-rw-r--r--library/cpp/messagebus/remote_connection.cpp6
-rw-r--r--library/cpp/messagebus/remote_connection_status.cpp2
-rw-r--r--library/cpp/messagebus/test/ut/messagebus_ut.cpp70
-rw-r--r--library/cpp/messagebus/test/ut/one_way_ut.cpp20
13 files changed, 93 insertions, 93 deletions
diff --git a/library/cpp/messagebus/actor/executor.cpp b/library/cpp/messagebus/actor/executor.cpp
index 7a2227a458..b58e07f4bd 100644
--- a/library/cpp/messagebus/actor/executor.cpp
+++ b/library/cpp/messagebus/actor/executor.cpp
@@ -18,7 +18,7 @@ using namespace NActor::NPrivate;
namespace {
struct THistoryInternal {
struct TRecord {
- TAtomic MaxQueueSize;
+ TAtomic MaxQueueSize;
TRecord()
: MaxQueueSize()
@@ -27,7 +27,7 @@ namespace {
TExecutorHistory::THistoryRecord Capture() {
TExecutorHistory::THistoryRecord r;
- r.MaxQueueSize = AtomicGet(MaxQueueSize);
+ r.MaxQueueSize = AtomicGet(MaxQueueSize);
return r;
}
};
@@ -237,14 +237,14 @@ size_t TExecutor::GetWorkQueueSize() const {
return WorkItems.Size();
}
-using namespace NTSAN;
-
+using namespace NTSAN;
+
ui32 TExecutor::GetMaxQueueSizeAndClear() const {
ui32 max = 0;
for (unsigned i = 0; i < WorkerThreads.size(); ++i) {
- TExecutorWorkerThreadLocalData* wtls = RelaxedLoad(&WorkerThreads[i]->ThreadLocalData);
- max = Max<ui32>(max, RelaxedLoad(&wtls->MaxQueueSize));
- RelaxedStore<ui32>(&wtls->MaxQueueSize, 0);
+ TExecutorWorkerThreadLocalData* wtls = RelaxedLoad(&WorkerThreads[i]->ThreadLocalData);
+ max = Max<ui32>(max, RelaxedLoad(&wtls->MaxQueueSize));
+ RelaxedStore<ui32>(&wtls->MaxQueueSize, 0);
}
return max;
}
@@ -269,7 +269,7 @@ TExecutorStatus TExecutor::GetStatusRecordInternal() const {
ss << "work items: " << GetWorkQueueSize() << "\n";
ss << "workers:\n";
for (unsigned i = 0; i < WorkerThreads.size(); ++i) {
- ss << "-- " << AtomicGet(*AtomicGet(WorkerThreads[i]->WhatThreadDoesLocation)) << "\n";
+ ss << "-- " << AtomicGet(*AtomicGet(WorkerThreads[i]->WhatThreadDoesLocation)) << "\n";
}
r.Status = ss.Str();
}
@@ -299,8 +299,8 @@ TAutoPtr<IWorkItem> TExecutor::DequeueWork() {
auto& wtls = TlsRef(WorkerThreadLocalData);
- if (queueSize > RelaxedLoad(&wtls.MaxQueueSize)) {
- RelaxedStore<ui32>(&wtls.MaxQueueSize, queueSize);
+ if (queueSize > RelaxedLoad(&wtls.MaxQueueSize)) {
+ RelaxedStore<ui32>(&wtls.MaxQueueSize, queueSize);
}
return wi;
diff --git a/library/cpp/messagebus/actor/thread_extra.h b/library/cpp/messagebus/actor/thread_extra.h
index b5aa151618..e4f37a9760 100644
--- a/library/cpp/messagebus/actor/thread_extra.h
+++ b/library/cpp/messagebus/actor/thread_extra.h
@@ -2,28 +2,28 @@
#include <util/thread/singleton.h>
-namespace NTSAN {
+namespace NTSAN {
template <typename T>
inline void RelaxedStore(volatile T* a, T x) {
static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value");
-#ifdef _win_
+#ifdef _win_
*a = x;
-#else
+#else
__atomic_store_n(a, x, __ATOMIC_RELAXED);
-#endif
+#endif
}
-
+
template <typename T>
inline T RelaxedLoad(volatile T* a) {
-#ifdef _win_
+#ifdef _win_
return *a;
-#else
+#else
return __atomic_load_n(a, __ATOMIC_RELAXED);
-#endif
+#endif
}
-
-}
-
+
+}
+
void SetCurrentThreadName(const char* name);
namespace NThreadExtra {
diff --git a/library/cpp/messagebus/actor/what_thread_does.cpp b/library/cpp/messagebus/actor/what_thread_does.cpp
index bebb6a888c..bce5ccd15e 100644
--- a/library/cpp/messagebus/actor/what_thread_does.cpp
+++ b/library/cpp/messagebus/actor/what_thread_does.cpp
@@ -1,6 +1,6 @@
#include "what_thread_does.h"
-#include "thread_extra.h"
+#include "thread_extra.h"
#include <util/system/tls.h>
@@ -8,13 +8,13 @@ Y_POD_STATIC_THREAD(const char*)
WhatThreadDoes;
const char* PushWhatThreadDoes(const char* what) {
- const char* r = NTSAN::RelaxedLoad(&WhatThreadDoes);
- NTSAN::RelaxedStore(&WhatThreadDoes, what);
+ const char* r = NTSAN::RelaxedLoad(&WhatThreadDoes);
+ NTSAN::RelaxedStore(&WhatThreadDoes, what);
return r;
}
void PopWhatThreadDoes(const char* prev) {
- NTSAN::RelaxedStore(&WhatThreadDoes, prev);
+ NTSAN::RelaxedStore(&WhatThreadDoes, prev);
}
const char** WhatThreadDoesLocation() {
diff --git a/library/cpp/messagebus/config/defs.h b/library/cpp/messagebus/config/defs.h
index 92b1df9969..5db4ef4dae 100644
--- a/library/cpp/messagebus/config/defs.h
+++ b/library/cpp/messagebus/config/defs.h
@@ -70,7 +70,7 @@ namespace NBus {
inline bool IsBusKeyValid(TBusKey key) {
return key != YBUS_KEYINVALID && key != YBUS_KEYMAX && key > YBUS_KEYLOCAL;
}
-
+
#define YBUS_VERSION 0
#define YBUS_INFINITE (1u << 30u)
diff --git a/library/cpp/messagebus/latch.h b/library/cpp/messagebus/latch.h
index 373f4c0e13..3677ee7c29 100644
--- a/library/cpp/messagebus/latch.h
+++ b/library/cpp/messagebus/latch.h
@@ -23,7 +23,7 @@ public:
}
TGuard<TMutex> guard(Mutex);
- while (AtomicGet(Locked) == 1) {
+ while (AtomicGet(Locked) == 1) {
CondVar.WaitI(Mutex);
}
}
@@ -39,7 +39,7 @@ public:
}
TGuard<TMutex> guard(Mutex);
- AtomicSet(Locked, 0);
+ AtomicSet(Locked, 0);
CondVar.BroadCast();
}
diff --git a/library/cpp/messagebus/local_tasks.h b/library/cpp/messagebus/local_tasks.h
index d8e801a457..c92d197ca5 100644
--- a/library/cpp/messagebus/local_tasks.h
+++ b/library/cpp/messagebus/local_tasks.h
@@ -1,23 +1,23 @@
#pragma once
-#include <util/system/atomic.h>
-
+#include <util/system/atomic.h>
+
class TLocalTasks {
private:
- TAtomic GotTasks;
+ TAtomic GotTasks;
public:
TLocalTasks()
- : GotTasks(0)
+ : GotTasks(0)
{
}
void AddTask() {
- AtomicSet(GotTasks, 1);
+ AtomicSet(GotTasks, 1);
}
bool FetchTask() {
- bool gotTasks = AtomicCas(&GotTasks, 0, 1);
+ bool gotTasks = AtomicCas(&GotTasks, 0, 1);
return gotTasks;
}
};
diff --git a/library/cpp/messagebus/message.h b/library/cpp/messagebus/message.h
index 005ca10c65..bf57f13dde 100644
--- a/library/cpp/messagebus/message.h
+++ b/library/cpp/messagebus/message.h
@@ -155,7 +155,7 @@ namespace NBus {
inline bool IsVersionNegotiation(const NBus::TBusHeader& header) {
return header.Id == 0 && header.Size == sizeof(TBusHeader);
}
-
+
//////////////////////////////////////////////////////////
/// \brief Base class for all messages passed in the system
diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp
index 24bd778799..a322a2366f 100644
--- a/library/cpp/messagebus/oldmodule/module.cpp
+++ b/library/cpp/messagebus/oldmodule/module.cpp
@@ -777,7 +777,7 @@ void TBusModuleImpl::DestroyJob(TJobRunner* job) {
Y_VERIFY(jobCount >= 0, "decremented too much");
Jobs.erase(job->JobStorageIterator);
- if (AtomicGet(State) == STOPPED) {
+ if (AtomicGet(State) == STOPPED) {
if (jobCount == 0) {
ShutdownCondVar.BroadCast();
}
@@ -804,11 +804,11 @@ void TBusModuleImpl::OnMessageReceived(TAutoPtr<TBusMessage> msg0, TOnMessageCon
}
void TBusModuleImpl::Shutdown() {
- if (AtomicGet(State) != TBusModuleImpl::RUNNING) {
- AtomicSet(State, TBusModuleImpl::STOPPED);
+ if (AtomicGet(State) != TBusModuleImpl::RUNNING) {
+ AtomicSet(State, TBusModuleImpl::STOPPED);
return;
}
- AtomicSet(State, TBusModuleImpl::STOPPED);
+ AtomicSet(State, TBusModuleImpl::STOPPED);
for (auto& clientSession : ClientSessions) {
clientSession->Shutdown();
diff --git a/library/cpp/messagebus/protobuf/ybusbuf.cpp b/library/cpp/messagebus/protobuf/ybusbuf.cpp
index 63415b3737..90ff132942 100644
--- a/library/cpp/messagebus/protobuf/ybusbuf.cpp
+++ b/library/cpp/messagebus/protobuf/ybusbuf.cpp
@@ -75,12 +75,12 @@ TAutoPtr<TBusMessage> TBusBufferProtocol::Deserialize(ui16 messageType, TArrayRe
// clone the base
TAutoPtr<TBusBufferBase> bmess = messageTemplate->New();
- // Need to override protobuf message size limit
- // NOTE: the payload size has already been checked against session MaxMessageSize
- google::protobuf::io::CodedInputStream input(reinterpret_cast<const ui8*>(payload.data()), payload.size());
+ // Need to override protobuf message size limit
+ // NOTE: the payload size has already been checked against session MaxMessageSize
+ google::protobuf::io::CodedInputStream input(reinterpret_cast<const ui8*>(payload.data()), payload.size());
input.SetTotalBytesLimit(payload.size());
-
- bool ok = bmess->GetRecord()->ParseFromCodedStream(&input) && input.ConsumedEntireMessage();
+
+ bool ok = bmess->GetRecord()->ParseFromCodedStream(&input) && input.ConsumedEntireMessage();
if (!ok) {
return nullptr;
}
diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp
index 22932569db..2113b5622f 100644
--- a/library/cpp/messagebus/remote_connection.cpp
+++ b/library/cpp/messagebus/remote_connection.cpp
@@ -186,7 +186,7 @@ namespace NBus {
}
ReaderData.DropChannel();
-
+
ReaderData.Status.Fd = readSocket.Socket;
ReaderData.SocketVersion = readSocket.SocketVersion;
@@ -393,8 +393,8 @@ namespace NBus {
}
return true;
- }
-
+ }
+
bool TRemoteConnection::ReaderFillBuffer() {
if (!ReaderData.BufferMore())
return true;
diff --git a/library/cpp/messagebus/remote_connection_status.cpp b/library/cpp/messagebus/remote_connection_status.cpp
index 2c48b2a287..c34c875536 100644
--- a/library/cpp/messagebus/remote_connection_status.cpp
+++ b/library/cpp/messagebus/remote_connection_status.cpp
@@ -202,7 +202,7 @@ TString TRemoteConnectionStatus::PrintToString() const {
p.AddRow("write buffer cap", LeftPad(WriterStatus.BufferSize, 12));
p.AddRow("read buffer cap", LeftPad(ReaderStatus.BufferSize, 12));
-
+
p.AddRow("write buffer drops", LeftPad(WriterStatus.Incremental.BufferDrops, 10));
p.AddRow("read buffer drops", LeftPad(ReaderStatus.Incremental.BufferDrops, 10));
diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
index 040f9b7702..e771a933ca 100644
--- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp
+++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
@@ -313,9 +313,9 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
TMutex Lock_;
TDeque<TAutoPtr<TOnMessageContext>> DelayedMessages;
- TDelayReplyServer()
- : MessageReceivedEvent(TEventResetType::rAuto)
- {
+ TDelayReplyServer()
+ : MessageReceivedEvent(TEventResetType::rAuto)
+ {
Bus = CreateMessageQueue("TDelayReplyServer");
TBusServerSessionConfig sessionConfig;
sessionConfig.SendTimeout = 1000;
@@ -617,30 +617,30 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
}
Y_UNIT_TEST(ServerMessageReservedIds) {
- TObjectCountCheck objectCountCheck;
-
- TExampleServer server;
- TNetAddr serverAddr = server.GetActualListenAddr();
-
- TExampleClient client;
-
- client.SendMessagesWaitReplies(2, serverAddr);
-
- // This test doens't check 0, 1, YBUS_KEYINVALID because there are asserts() on sending side
-
- TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount));
- req->GetHeader()->Id = 2;
- client.Session->SendMessageAutoPtr(req, &serverAddr);
- client.MessageCount = 1;
- client.WaitForError(MESSAGE_DELIVERY_FAILED);
-
- req.Reset(new TExampleRequest(&client.Proto.RequestCount));
- req->GetHeader()->Id = YBUS_KEYLOCAL;
- client.Session->SendMessageAutoPtr(req, &serverAddr);
- client.MessageCount = 1;
- client.WaitForError(MESSAGE_DELIVERY_FAILED);
- }
-
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+ TNetAddr serverAddr = server.GetActualListenAddr();
+
+ TExampleClient client;
+
+ client.SendMessagesWaitReplies(2, serverAddr);
+
+ // This test doens't check 0, 1, YBUS_KEYINVALID because there are asserts() on sending side
+
+ TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount));
+ req->GetHeader()->Id = 2;
+ client.Session->SendMessageAutoPtr(req, &serverAddr);
+ client.MessageCount = 1;
+ client.WaitForError(MESSAGE_DELIVERY_FAILED);
+
+ req.Reset(new TExampleRequest(&client.Proto.RequestCount));
+ req->GetHeader()->Id = YBUS_KEYLOCAL;
+ client.Session->SendMessageAutoPtr(req, &serverAddr);
+ client.MessageCount = 1;
+ client.WaitForError(MESSAGE_DELIVERY_FAILED);
+ }
+
Y_UNIT_TEST(TestGetInFlightForDestination) {
TObjectCountCheck objectCountCheck;
@@ -661,7 +661,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
break;
}
}
- UNIT_ASSERT_VALUES_EQUAL(server.GetDelayedMessageCount(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(server.GetDelayedMessageCount(), 2);
size_t inFlight = client.Session->GetInFlight(addr);
// 4 is for messagebus1 that adds inFlight counter twice for some reason
@@ -731,10 +731,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
// check reset is possible here
message->Reset();
- // intentionally don't destroy the message
- // we will try to resend it
+ // intentionally don't destroy the message
+ // we will try to resend it
Y_UNUSED(message.Release());
-
+
TestSync.CheckAndIncrement(1);
}
};
@@ -760,8 +760,8 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
// check reset is possible here
message->Reset();
client.TestSync.CheckAndIncrement(3);
-
- delete message;
+
+ delete message;
}
Y_UNIT_TEST(ResetAfterSendOneWayErrorInReturn) {
@@ -865,8 +865,8 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
request.SetVersionInternal(0xF); // max
output.Write(&request, sizeof(request));
- UNIT_ASSERT_VALUES_EQUAL(IsVersionNegotiation(request), true);
-
+ UNIT_ASSERT_VALUES_EQUAL(IsVersionNegotiation(request), true);
+
TStreamSocketInput input(&socket);
TBusHeader response;
diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp
index 9c21227e2b..bc78c5238a 100644
--- a/library/cpp/messagebus/test/ut/one_way_ut.cpp
+++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp
@@ -93,7 +93,7 @@ public:
TExampleProtocol Proto;
public:
- TAtomic NumMessages;
+ TAtomic NumMessages;
NullServer() {
NumMessages = 0;
@@ -119,7 +119,7 @@ public:
/// tell session to forget this message and never expect any reply
mess.ForgetRequest();
- AtomicIncrement(NumMessages);
+ AtomicIncrement(NumMessages);
}
/// this handler should not be called because this server does not send replies
@@ -139,10 +139,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
client.Work();
// wait until all client message are delivered
- UNIT_WAIT_FOR(AtomicGet(server.NumMessages) == 10);
+ UNIT_WAIT_FOR(AtomicGet(server.NumMessages) == 10);
// assert correct number of messages
- UNIT_ASSERT_VALUES_EQUAL(AtomicGet(server.NumMessages), 10);
+ UNIT_ASSERT_VALUES_EQUAL(AtomicGet(server.NumMessages), 10);
UNIT_ASSERT_VALUES_EQUAL(server.Session->GetInFlight(), 0);
UNIT_ASSERT_VALUES_EQUAL(client.Session->GetInFlight(), 0);
}
@@ -196,7 +196,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
TBusClientSessionConfig sessionConfig;
sessionConfig.SendTimeout = 1;
sessionConfig.ConnectTimeout = 1;
- sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10);
+ sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10);
return sessionConfig;
}
@@ -245,11 +245,11 @@ Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
first = false;
}
- // BUGBUG: The test is buggy: the client might not get any error when sending one-way messages.
- // All the messages that the client has sent before he gets first MESSAGE_BUSY error might get
- // serailized and written to the socket buffer, so the write queue gets drained and there are
- // no messages to timeout when periodic timeout check happens.
-
+ // BUGBUG: The test is buggy: the client might not get any error when sending one-way messages.
+ // All the messages that the client has sent before he gets first MESSAGE_BUSY error might get
+ // serailized and written to the socket buffer, so the write queue gets drained and there are
+ // no messages to timeout when periodic timeout check happens.
+
client.GotError.WaitI();
}
}