aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/test
diff options
context:
space:
mode:
authoralexcrush <alexcrush@yandex-team.ru>2022-02-10 16:50:33 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:33 +0300
commit34f0683c2d7731f2f9f59966c12e602f405fa0d6 (patch)
tree9af7dd74efcfa3d8aa7b40bd757ad8f2d89f6902 /library/cpp/messagebus/test
parenta1d67d6a31f789aa011250c3edce5751c0cadad2 (diff)
downloadydb-34f0683c2d7731f2f9f59966c12e602f405fa0d6.tar.gz
Restoring authorship annotation for <alexcrush@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/test')
-rw-r--r--library/cpp/messagebus/test/ut/messagebus_ut.cpp108
1 files changed, 54 insertions, 54 deletions
diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
index 040f9b7702..4fef8d5ba6 100644
--- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp
+++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
@@ -310,7 +310,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
TExampleProtocol Proto;
TSystemEvent MessageReceivedEvent; // 1 wait for 1 message
TBusServerSessionPtr Session;
- TMutex Lock_;
+ TMutex Lock_;
TDeque<TAutoPtr<TOnMessageContext>> DelayedMessages;
TDelayReplyServer()
@@ -328,52 +328,52 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
void OnMessage(TOnMessageContext& mess) override {
Y_VERIFY(mess.IsConnectionAlive(), "connection should be alive here");
- TAutoPtr<TOnMessageContext> delayedMsg(new TOnMessageContext);
- delayedMsg->Swap(mess);
- auto g(Guard(Lock_));
- DelayedMessages.push_back(delayedMsg);
+ TAutoPtr<TOnMessageContext> delayedMsg(new TOnMessageContext);
+ delayedMsg->Swap(mess);
+ auto g(Guard(Lock_));
+ DelayedMessages.push_back(delayedMsg);
MessageReceivedEvent.Signal();
- }
+ }
- bool CheckClientIsAlive() {
- auto g(Guard(Lock_));
+ bool CheckClientIsAlive() {
+ auto g(Guard(Lock_));
for (auto& delayedMessage : DelayedMessages) {
if (!delayedMessage->IsConnectionAlive()) {
- return false;
- }
- }
- return true;
- }
-
- bool CheckClientIsDead() const {
- auto g(Guard(Lock_));
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool CheckClientIsDead() const {
+ auto g(Guard(Lock_));
for (const auto& delayedMessage : DelayedMessages) {
if (delayedMessage->IsConnectionAlive()) {
- return false;
- }
- }
- return true;
- }
-
- void ReplyToDelayedMessages() {
- while (true) {
- TOnMessageContext msg;
- {
- auto g(Guard(Lock_));
- if (DelayedMessages.empty()) {
- break;
- }
- DelayedMessages.front()->Swap(msg);
- DelayedMessages.pop_front();
- }
- TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount));
- msg.SendReplyMove(reply);
- }
- }
-
- size_t GetDelayedMessageCount() const {
- auto g(Guard(Lock_));
- return DelayedMessages.size();
+ return false;
+ }
+ }
+ return true;
+ }
+
+ void ReplyToDelayedMessages() {
+ while (true) {
+ TOnMessageContext msg;
+ {
+ auto g(Guard(Lock_));
+ if (DelayedMessages.empty()) {
+ break;
+ }
+ DelayedMessages.front()->Swap(msg);
+ DelayedMessages.pop_front();
+ }
+ TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount));
+ msg.SendReplyMove(reply);
+ }
+ }
+
+ size_t GetDelayedMessageCount() const {
+ auto g(Guard(Lock_));
+ return DelayedMessages.size();
}
void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
@@ -397,10 +397,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.Destroy();
- UNIT_WAIT_FOR(server.CheckClientIsDead());
-
- server.ReplyToDelayedMessages();
+ UNIT_WAIT_FOR(server.CheckClientIsDead());
+ server.ReplyToDelayedMessages();
+
// wait until all server message are delivered
UNIT_WAIT_FOR(0 == server.Session->GetInFlight());
}
@@ -654,23 +654,23 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.SendMessages(2, &addr);
- for (size_t i = 0; i < 5; ++i) {
- // One MessageReceivedEvent indicates one message, we need to wait for two
- UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));
- if (server.GetDelayedMessageCount() == 2) {
- break;
- }
- }
+ for (size_t i = 0; i < 5; ++i) {
+ // One MessageReceivedEvent indicates one message, we need to wait for two
+ UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));
+ if (server.GetDelayedMessageCount() == 2) {
+ break;
+ }
+ }
UNIT_ASSERT_VALUES_EQUAL(server.GetDelayedMessageCount(), 2);
size_t inFlight = client.Session->GetInFlight(addr);
// 4 is for messagebus1 that adds inFlight counter twice for some reason
UNIT_ASSERT(inFlight == 2 || inFlight == 4);
- UNIT_ASSERT(server.CheckClientIsAlive());
-
- server.ReplyToDelayedMessages();
+ UNIT_ASSERT(server.CheckClientIsAlive());
+ server.ReplyToDelayedMessages();
+
client.WaitReplies();
}