diff options
author | alexcrush <alexcrush@yandex-team.ru> | 2022-02-10 16:50:33 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:33 +0300 |
commit | 34f0683c2d7731f2f9f59966c12e602f405fa0d6 (patch) | |
tree | 9af7dd74efcfa3d8aa7b40bd757ad8f2d89f6902 /library/cpp/messagebus | |
parent | a1d67d6a31f789aa011250c3edce5751c0cadad2 (diff) | |
download | ydb-34f0683c2d7731f2f9f59966c12e602f405fa0d6.tar.gz |
Restoring authorship annotation for <alexcrush@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus')
-rw-r--r-- | library/cpp/messagebus/handler.cpp | 6 | ||||
-rw-r--r-- | library/cpp/messagebus/handler.h | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/oldmodule/module.cpp | 14 | ||||
-rw-r--r-- | library/cpp/messagebus/remote_connection.h | 2 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/messagebus_ut.cpp | 108 |
5 files changed, 66 insertions, 66 deletions
diff --git a/library/cpp/messagebus/handler.cpp b/library/cpp/messagebus/handler.cpp index 333bd52934..4f40db8776 100644 --- a/library/cpp/messagebus/handler.cpp +++ b/library/cpp/messagebus/handler.cpp @@ -30,7 +30,7 @@ void TOnMessageContext::ForgetRequest() { TNetAddr TOnMessageContext::GetPeerAddrNetAddr() const { return Ident.GetNetAddr(); } - -bool TOnMessageContext::IsConnectionAlive() const { + +bool TOnMessageContext::IsConnectionAlive() const { return !!Ident.Connection && Ident.Connection->IsAlive(); -} +} diff --git a/library/cpp/messagebus/handler.h b/library/cpp/messagebus/handler.h index 60002c68a6..21b39a20bc 100644 --- a/library/cpp/messagebus/handler.h +++ b/library/cpp/messagebus/handler.h @@ -113,7 +113,7 @@ namespace NBus { } TNetAddr GetPeerAddrNetAddr() const; - + bool IsConnectionAlive() const; }; diff --git a/library/cpp/messagebus/oldmodule/module.cpp b/library/cpp/messagebus/oldmodule/module.cpp index 24bd778799..6917b893c3 100644 --- a/library/cpp/messagebus/oldmodule/module.cpp +++ b/library/cpp/messagebus/oldmodule/module.cpp @@ -209,7 +209,7 @@ namespace NBus { { } }; - + struct TJobRunner: public TAtomicRefCount<TJobRunner>, public NActor::TActor<TJobRunner>, public NActor::TQueueInActor<TJobRunner, TJobResponseMessage>, @@ -759,8 +759,8 @@ TBusSession* TMyModule::CreateExtSession(TBusMessageQueue& queue) { return strReturn; } -} - +} + void TBusModuleImpl::AddJob(TJobRunner* jobRunner) { TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for AddJob"); Jobs.push_back(jobRunner); @@ -875,7 +875,7 @@ void TModuleClientHandler::OnError(TAutoPtr<TBusMessage> msg, EMessageStatus sta job->UnRef(); } } - -void TModuleClientHandler::OnClientConnectionEvent(const TClientConnectionEvent& event) { - Module->OnClientConnectionEvent(event); -} + +void TModuleClientHandler::OnClientConnectionEvent(const TClientConnectionEvent& event) { + Module->OnClientConnectionEvent(event); +} diff --git a/library/cpp/messagebus/remote_connection.h b/library/cpp/messagebus/remote_connection.h index 4538947368..719dbe5f3d 100644 --- a/library/cpp/messagebus/remote_connection.h +++ b/library/cpp/messagebus/remote_connection.h @@ -204,7 +204,7 @@ namespace NBus { struct TWriterData { TAtomic Down; - + NEventLoop::TChannelPtr Channel; ui32 SocketVersion; diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp index 040f9b7702..4fef8d5ba6 100644 --- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp +++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp @@ -310,7 +310,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { TExampleProtocol Proto; TSystemEvent MessageReceivedEvent; // 1 wait for 1 message TBusServerSessionPtr Session; - TMutex Lock_; + TMutex Lock_; TDeque<TAutoPtr<TOnMessageContext>> DelayedMessages; TDelayReplyServer() @@ -328,52 +328,52 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { void OnMessage(TOnMessageContext& mess) override { Y_VERIFY(mess.IsConnectionAlive(), "connection should be alive here"); - TAutoPtr<TOnMessageContext> delayedMsg(new TOnMessageContext); - delayedMsg->Swap(mess); - auto g(Guard(Lock_)); - DelayedMessages.push_back(delayedMsg); + TAutoPtr<TOnMessageContext> delayedMsg(new TOnMessageContext); + delayedMsg->Swap(mess); + auto g(Guard(Lock_)); + DelayedMessages.push_back(delayedMsg); MessageReceivedEvent.Signal(); - } + } - bool CheckClientIsAlive() { - auto g(Guard(Lock_)); + bool CheckClientIsAlive() { + auto g(Guard(Lock_)); for (auto& delayedMessage : DelayedMessages) { if (!delayedMessage->IsConnectionAlive()) { - return false; - } - } - return true; - } - - bool CheckClientIsDead() const { - auto g(Guard(Lock_)); + return false; + } + } + return true; + } + + bool CheckClientIsDead() const { + auto g(Guard(Lock_)); for (const auto& delayedMessage : DelayedMessages) { if (delayedMessage->IsConnectionAlive()) { - return false; - } - } - return true; - } - - void ReplyToDelayedMessages() { - while (true) { - TOnMessageContext msg; - { - auto g(Guard(Lock_)); - if (DelayedMessages.empty()) { - break; - } - DelayedMessages.front()->Swap(msg); - DelayedMessages.pop_front(); - } - TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount)); - msg.SendReplyMove(reply); - } - } - - size_t GetDelayedMessageCount() const { - auto g(Guard(Lock_)); - return DelayedMessages.size(); + return false; + } + } + return true; + } + + void ReplyToDelayedMessages() { + while (true) { + TOnMessageContext msg; + { + auto g(Guard(Lock_)); + if (DelayedMessages.empty()) { + break; + } + DelayedMessages.front()->Swap(msg); + DelayedMessages.pop_front(); + } + TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount)); + msg.SendReplyMove(reply); + } + } + + size_t GetDelayedMessageCount() const { + auto g(Guard(Lock_)); + return DelayedMessages.size(); } void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { @@ -397,10 +397,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.Destroy(); - UNIT_WAIT_FOR(server.CheckClientIsDead()); - - server.ReplyToDelayedMessages(); + UNIT_WAIT_FOR(server.CheckClientIsDead()); + server.ReplyToDelayedMessages(); + // wait until all server message are delivered UNIT_WAIT_FOR(0 == server.Session->GetInFlight()); } @@ -654,23 +654,23 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.SendMessages(2, &addr); - for (size_t i = 0; i < 5; ++i) { - // One MessageReceivedEvent indicates one message, we need to wait for two - UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); - if (server.GetDelayedMessageCount() == 2) { - break; - } - } + for (size_t i = 0; i < 5; ++i) { + // One MessageReceivedEvent indicates one message, we need to wait for two + UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); + if (server.GetDelayedMessageCount() == 2) { + break; + } + } UNIT_ASSERT_VALUES_EQUAL(server.GetDelayedMessageCount(), 2); size_t inFlight = client.Session->GetInFlight(addr); // 4 is for messagebus1 that adds inFlight counter twice for some reason UNIT_ASSERT(inFlight == 2 || inFlight == 4); - UNIT_ASSERT(server.CheckClientIsAlive()); - - server.ReplyToDelayedMessages(); + UNIT_ASSERT(server.CheckClientIsAlive()); + server.ReplyToDelayedMessages(); + client.WaitReplies(); } |