aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/test/ut/module_server_ut.cpp
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commit1f553f46fb4f3c5eec631352cdd900a0709016af (patch)
treea231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/test/ut/module_server_ut.cpp
parentc4de7efdedc25b49cbea74bd589eecb61b55b60a (diff)
downloadydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/test/ut/module_server_ut.cpp')
-rw-r--r--library/cpp/messagebus/test/ut/module_server_ut.cpp182
1 files changed, 91 insertions, 91 deletions
diff --git a/library/cpp/messagebus/test/ut/module_server_ut.cpp b/library/cpp/messagebus/test/ut/module_server_ut.cpp
index 88fe1dd9b6..38f3fcc4ed 100644
--- a/library/cpp/messagebus/test/ut/module_server_ut.cpp
+++ b/library/cpp/messagebus/test/ut/module_server_ut.cpp
@@ -1,8 +1,8 @@
#include <library/cpp/testing/unittest/registar.h>
-
+
#include "count_down_latch.h"
-#include "moduletest.h"
-
+#include "moduletest.h"
+
#include <library/cpp/messagebus/test/helper/example.h>
#include <library/cpp/messagebus/test/helper/example_module.h>
#include <library/cpp/messagebus/test/helper/object_count_check.h>
@@ -12,108 +12,108 @@
#include <util/generic/cast.h>
-using namespace NBus;
-using namespace NBus::NTest;
-
+using namespace NBus;
+using namespace NBus::NTest;
+
Y_UNIT_TEST_SUITE(ModuleServerTests) {
Y_UNIT_TEST(TestModule) {
- TObjectCountCheck objectCountCheck;
-
- /// create or get instance of message queue, need one per application
- TBusMessageQueuePtr bus(CreateMessageQueue());
+ TObjectCountCheck objectCountCheck;
+
+ /// create or get instance of message queue, need one per application
+ TBusMessageQueuePtr bus(CreateMessageQueue());
THostInfoHandler hostHandler(bus.Get());
- TDupDetectModule module(hostHandler.GetActualListenAddr());
- bool success;
- success = module.Init(bus.Get());
- UNIT_ASSERT_C(success, "failed to initialize dupdetect module");
-
- success = module.StartInput();
- UNIT_ASSERT_C(success, "failed to start dupdetect module");
-
- TDupDetectHandler dupHandler(module.ListenAddr, bus.Get());
- dupHandler.Work();
-
- UNIT_WAIT_FOR(dupHandler.NumMessages == dupHandler.NumReplies);
-
- module.Shutdown();
- dupHandler.DupDetect->Shutdown();
- }
-
+ TDupDetectModule module(hostHandler.GetActualListenAddr());
+ bool success;
+ success = module.Init(bus.Get());
+ UNIT_ASSERT_C(success, "failed to initialize dupdetect module");
+
+ success = module.StartInput();
+ UNIT_ASSERT_C(success, "failed to start dupdetect module");
+
+ TDupDetectHandler dupHandler(module.ListenAddr, bus.Get());
+ dupHandler.Work();
+
+ UNIT_WAIT_FOR(dupHandler.NumMessages == dupHandler.NumReplies);
+
+ module.Shutdown();
+ dupHandler.DupDetect->Shutdown();
+ }
+
struct TParallelOnMessageModule: public TExampleServerModule {
- TCountDownLatch WaitTwoRequestsLatch;
-
- TParallelOnMessageModule()
- : WaitTwoRequestsLatch(2)
+ TCountDownLatch WaitTwoRequestsLatch;
+
+ TParallelOnMessageModule()
+ : WaitTwoRequestsLatch(2)
{
}
-
+
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
- WaitTwoRequestsLatch.CountDown();
+ WaitTwoRequestsLatch.CountDown();
Y_VERIFY(WaitTwoRequestsLatch.Await(TDuration::Seconds(5)), "oops");
-
- VerifyDynamicCast<TExampleRequest*>(mess);
-
- job->SendReply(new TExampleResponse(&Proto.ResponseCount));
+
+ VerifyDynamicCast<TExampleRequest*>(mess);
+
+ job->SendReply(new TExampleResponse(&Proto.ResponseCount));
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(TestOnMessageHandlerCalledInParallel) {
- TObjectCountCheck objectCountCheck;
-
- TBusQueueConfig config;
- config.NumWorkers = 5;
-
- TParallelOnMessageModule module;
- module.StartModule();
-
- TExampleClient client;
-
- client.SendMessagesWaitReplies(2, module.ServerAddr);
-
- module.Shutdown();
- }
-
- struct TDelayReplyServer: public TExampleServerModule {
+ TObjectCountCheck objectCountCheck;
+
+ TBusQueueConfig config;
+ config.NumWorkers = 5;
+
+ TParallelOnMessageModule module;
+ module.StartModule();
+
+ TExampleClient client;
+
+ client.SendMessagesWaitReplies(2, module.ServerAddr);
+
+ module.Shutdown();
+ }
+
+ struct TDelayReplyServer: public TExampleServerModule {
TSystemEvent MessageReceivedEvent;
TSystemEvent ClientDiedEvent;
-
+
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
Y_UNUSED(mess);
-
- MessageReceivedEvent.Signal();
-
+
+ MessageReceivedEvent.Signal();
+
Y_VERIFY(ClientDiedEvent.WaitT(TDuration::Seconds(5)), "oops");
-
- job->SendReply(new TExampleResponse(&Proto.ResponseCount));
+
+ job->SendReply(new TExampleResponse(&Proto.ResponseCount));
return nullptr;
- }
- };
-
+ }
+ };
+
Y_UNIT_TEST(TestReplyCalledAfterClientDisconnected) {
- TObjectCountCheck objectCountCheck;
-
- TBusQueueConfig config;
- config.NumWorkers = 5;
-
- TDelayReplyServer server;
- server.StartModule();
-
- THolder<TExampleClient> client(new TExampleClient);
-
- client->SendMessages(1, server.ServerAddr);
-
- UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));
-
- UNIT_ASSERT_VALUES_EQUAL(1, server.GetModuleSessionInFlight());
-
- client.Destroy();
-
- server.ClientDiedEvent.Signal();
-
- // wait until all server message are delivered
- UNIT_WAIT_FOR(0 == server.GetModuleSessionInFlight());
-
- server.Shutdown();
- }
-}
+ TObjectCountCheck objectCountCheck;
+
+ TBusQueueConfig config;
+ config.NumWorkers = 5;
+
+ TDelayReplyServer server;
+ server.StartModule();
+
+ THolder<TExampleClient> client(new TExampleClient);
+
+ client->SendMessages(1, server.ServerAddr);
+
+ UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));
+
+ UNIT_ASSERT_VALUES_EQUAL(1, server.GetModuleSessionInFlight());
+
+ client.Destroy();
+
+ server.ClientDiedEvent.Signal();
+
+ // wait until all server message are delivered
+ UNIT_WAIT_FOR(0 == server.GetModuleSessionInFlight());
+
+ server.Shutdown();
+ }
+}