aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus
diff options
context:
space:
mode:
authoralexcrush <alexcrush@yandex-team.ru>2022-02-10 16:50:33 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:33 +0300
commit34f0683c2d7731f2f9f59966c12e602f405fa0d6 (patch)
tree9af7dd74efcfa3d8aa7b40bd757ad8f2d89f6902 /library/cpp/messagebus
parenta1d67d6a31f789aa011250c3edce5751c0cadad2 (diff)
downloadydb-34f0683c2d7731f2f9f59966c12e602f405fa0d6.tar.gz
Restoring authorship annotation for <alexcrush@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus')
-rw-r--r--library/cpp/messagebus/handler.cpp6
-rw-r--r--library/cpp/messagebus/handler.h2
-rw-r--r--library/cpp/messagebus/oldmodule/module.cpp14
-rw-r--r--library/cpp/messagebus/remote_connection.h2
-rw-r--r--library/cpp/messagebus/test/ut/messagebus_ut.cpp108
5 files changed, 66 insertions, 66 deletions
diff --git a/library/cpp/messagebus/handler.cpp b/library/cpp/messagebus/handler.cpp
index 333bd52934..4f40db8776 100644
--- a/library/cpp/messagebus/handler.cpp
+++ b/library/cpp/messagebus/handler.cpp
@@ -30,7 +30,7 @@ void TOnMessageContext::ForgetRequest() {
TNetAddr TOnMessageContext::GetPeerAddrNetAddr() const {
return Ident.GetNetAddr();
}
-
-bool TOnMessageContext::IsConnectionAlive() const {
+
+bool TOnMessageContext::IsConnectionAlive() const {
return !!Ident.Connection && Ident.Connection->IsAlive();
-}
+}
diff --git a/library/cpp/messagebus/handler.h b/library/cpp/messagebus/handler.h
index 60002c68a6..21b39a20bc 100644
--- a/library/cpp/messagebus/handler.h
+++ b/library/cpp/messagebus/handler.h
@@ -113,7 +113,7 @@ namespace NBus {
}
TNetAddr GetPeerAddrNetAddr() const;
-
+
bool IsConnectionAlive() const;
};
diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp
index 24bd778799..6917b893c3 100644
--- a/library/cpp/messagebus/oldmodule/module.cpp
+++ b/library/cpp/messagebus/oldmodule/module.cpp
@@ -209,7 +209,7 @@ namespace NBus {
{
}
};
-
+
struct TJobRunner: public TAtomicRefCount<TJobRunner>,
public NActor::TActor<TJobRunner>,
public NActor::TQueueInActor<TJobRunner, TJobResponseMessage>,
@@ -759,8 +759,8 @@ TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) {
return strReturn;
}
-}
-
+}
+
void TBusModuleImpl::AddJob(TJobRunner* jobRunner) {
TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for AddJob");
Jobs.push_back(jobRunner);
@@ -875,7 +875,7 @@ void TModuleClientHandler::OnError(TAutoPtr<TBusMessage> msg, EMessageStatus sta
job->UnRef();
}
}
-
-void TModuleClientHandler::OnClientConnectionEvent(const TClientConnectionEvent& event) {
- Module->OnClientConnectionEvent(event);
-}
+
+void TModuleClientHandler::OnClientConnectionEvent(const TClientConnectionEvent& event) {
+ Module->OnClientConnectionEvent(event);
+}
diff --git a/library/cpp/messagebus/remote_connection.h b/library/cpp/messagebus/remote_connection.h
index 4538947368..719dbe5f3d 100644
--- a/library/cpp/messagebus/remote_connection.h
+++ b/library/cpp/messagebus/remote_connection.h
@@ -204,7 +204,7 @@ namespace NBus {
struct TWriterData {
TAtomic Down;
-
+
NEventLoop::TChannelPtr Channel;
ui32 SocketVersion;
diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
index 040f9b7702..4fef8d5ba6 100644
--- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp
+++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
@@ -310,7 +310,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
TExampleProtocol Proto;
TSystemEvent MessageReceivedEvent; // 1 wait for 1 message
TBusServerSessionPtr Session;
- TMutex Lock_;
+ TMutex Lock_;
TDeque<TAutoPtr<TOnMessageContext>> DelayedMessages;
TDelayReplyServer()
@@ -328,52 +328,52 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
void OnMessage(TOnMessageContext& mess) override {
Y_VERIFY(mess.IsConnectionAlive(), "connection should be alive here");
- TAutoPtr<TOnMessageContext> delayedMsg(new TOnMessageContext);
- delayedMsg->Swap(mess);
- auto g(Guard(Lock_));
- DelayedMessages.push_back(delayedMsg);
+ TAutoPtr<TOnMessageContext> delayedMsg(new TOnMessageContext);
+ delayedMsg->Swap(mess);
+ auto g(Guard(Lock_));
+ DelayedMessages.push_back(delayedMsg);
MessageReceivedEvent.Signal();
- }
+ }
- bool CheckClientIsAlive() {
- auto g(Guard(Lock_));
+ bool CheckClientIsAlive() {
+ auto g(Guard(Lock_));
for (auto& delayedMessage : DelayedMessages) {
if (!delayedMessage->IsConnectionAlive()) {
- return false;
- }
- }
- return true;
- }
-
- bool CheckClientIsDead() const {
- auto g(Guard(Lock_));
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool CheckClientIsDead() const {
+ auto g(Guard(Lock_));
for (const auto& delayedMessage : DelayedMessages) {
if (delayedMessage->IsConnectionAlive()) {
- return false;
- }
- }
- return true;
- }
-
- void ReplyToDelayedMessages() {
- while (true) {
- TOnMessageContext msg;
- {
- auto g(Guard(Lock_));
- if (DelayedMessages.empty()) {
- break;
- }
- DelayedMessages.front()->Swap(msg);
- DelayedMessages.pop_front();
- }
- TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount));
- msg.SendReplyMove(reply);
- }
- }
-
- size_t GetDelayedMessageCount() const {
- auto g(Guard(Lock_));
- return DelayedMessages.size();
+ return false;
+ }
+ }
+ return true;
+ }
+
+ void ReplyToDelayedMessages() {
+ while (true) {
+ TOnMessageContext msg;
+ {
+ auto g(Guard(Lock_));
+ if (DelayedMessages.empty()) {
+ break;
+ }
+ DelayedMessages.front()->Swap(msg);
+ DelayedMessages.pop_front();
+ }
+ TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount));
+ msg.SendReplyMove(reply);
+ }
+ }
+
+ size_t GetDelayedMessageCount() const {
+ auto g(Guard(Lock_));
+ return DelayedMessages.size();
}
void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
@@ -397,10 +397,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.Destroy();
- UNIT_WAIT_FOR(server.CheckClientIsDead());
-
- server.ReplyToDelayedMessages();
+ UNIT_WAIT_FOR(server.CheckClientIsDead());
+ server.ReplyToDelayedMessages();
+
// wait until all server message are delivered
UNIT_WAIT_FOR(0 == server.Session->GetInFlight());
}
@@ -654,23 +654,23 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.SendMessages(2, &addr);
- for (size_t i = 0; i < 5; ++i) {
- // One MessageReceivedEvent indicates one message, we need to wait for two
- UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));
- if (server.GetDelayedMessageCount() == 2) {
- break;
- }
- }
+ for (size_t i = 0; i < 5; ++i) {
+ // One MessageReceivedEvent indicates one message, we need to wait for two
+ UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));
+ if (server.GetDelayedMessageCount() == 2) {
+ break;
+ }
+ }
UNIT_ASSERT_VALUES_EQUAL(server.GetDelayedMessageCount(), 2);
size_t inFlight = client.Session->GetInFlight(addr);
// 4 is for messagebus1 that adds inFlight counter twice for some reason
UNIT_ASSERT(inFlight == 2 || inFlight == 4);
- UNIT_ASSERT(server.CheckClientIsAlive());
-
- server.ReplyToDelayedMessages();
+ UNIT_ASSERT(server.CheckClientIsAlive());
+ server.ReplyToDelayedMessages();
+
client.WaitReplies();
}