aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/test/ut/starter_ut.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/test/ut/starter_ut.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/test/ut/starter_ut.cpp')
-rw-r--r--library/cpp/messagebus/test/ut/starter_ut.cpp140
1 files changed, 140 insertions, 0 deletions
diff --git a/library/cpp/messagebus/test/ut/starter_ut.cpp b/library/cpp/messagebus/test/ut/starter_ut.cpp
new file mode 100644
index 0000000000..dd4d3aaa5e
--- /dev/null
+++ b/library/cpp/messagebus/test/ut/starter_ut.cpp
@@ -0,0 +1,140 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <library/cpp/messagebus/test/helper/example_module.h>
+#include <library/cpp/messagebus/test/helper/object_count_check.h>
+#include <library/cpp/messagebus/test/helper/wait_for.h>
+
+using namespace NBus;
+using namespace NBus::NTest;
+
+Y_UNIT_TEST_SUITE(TBusStarterTest) {
+ struct TStartJobTestModule: public TExampleModule {
+ using TBusModule::CreateDefaultStarter;
+
+ TAtomic StartCount;
+
+ TStartJobTestModule()
+ : StartCount(0)
+ {
+ }
+
+ TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
+ Y_UNUSED(mess);
+ AtomicIncrement(StartCount);
+ job->Sleep(10);
+ return &TStartJobTestModule::End;
+ }
+
+ TJobHandler End(TBusJob* job, TBusMessage* mess) {
+ Y_UNUSED(mess);
+ AtomicIncrement(StartCount);
+ job->Cancel(MESSAGE_UNKNOWN);
+ return nullptr;
+ }
+ };
+
+ Y_UNIT_TEST(Test) {
+ TObjectCountCheck objectCountCheck;
+
+ TBusMessageQueuePtr bus(CreateMessageQueue());
+
+ TStartJobTestModule module;
+
+ //module.StartModule();
+ module.CreatePrivateSessions(bus.Get());
+ module.StartInput();
+
+ TBusSessionConfig config;
+ config.SendTimeout = 10;
+
+ module.CreateDefaultStarter(*bus, config);
+
+ UNIT_WAIT_FOR(AtomicGet(module.StartCount) >= 3);
+
+ module.Shutdown();
+ bus->Stop();
+ }
+
+ Y_UNIT_TEST(TestModuleStartJob) {
+ TObjectCountCheck objectCountCheck;
+
+ TExampleProtocol proto;
+
+ TStartJobTestModule module;
+
+ TBusModuleConfig moduleConfig;
+ moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10);
+ module.SetConfig(moduleConfig);
+
+ module.StartModule();
+
+ module.StartJob(new TExampleRequest(&proto.RequestCount));
+
+ UNIT_WAIT_FOR(AtomicGet(module.StartCount) != 2);
+
+ module.Shutdown();
+ }
+
+ struct TSleepModule: public TExampleServerModule {
+ TSystemEvent MessageReceivedEvent;
+
+ TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
+ Y_UNUSED(mess);
+
+ MessageReceivedEvent.Signal();
+
+ job->Sleep(1000000000);
+
+ return TJobHandler(&TSleepModule::Never);
+ }
+
+ TJobHandler Never(TBusJob*, TBusMessage*) {
+ Y_FAIL("happens");
+ throw 1;
+ }
+ };
+
+ Y_UNIT_TEST(StartJobDestroyDuringSleep) {
+ TObjectCountCheck objectCountCheck;
+
+ TExampleProtocol proto;
+
+ TSleepModule module;
+
+ module.StartModule();
+
+ module.StartJob(new TExampleRequest(&proto.StartCount));
+
+ module.MessageReceivedEvent.WaitI();
+
+ module.Shutdown();
+ }
+
+ struct TSendReplyModule: public TExampleServerModule {
+ TSystemEvent MessageReceivedEvent;
+
+ TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
+ Y_UNUSED(mess);
+
+ job->SendReply(new TExampleResponse(&Proto.ResponseCount));
+
+ MessageReceivedEvent.Signal();
+
+ return nullptr;
+ }
+ };
+
+ Y_UNIT_TEST(AllowSendReplyInStarted) {
+ TObjectCountCheck objectCountCheck;
+
+ TExampleProtocol proto;
+
+ TSendReplyModule module;
+ module.StartModule();
+ module.StartJob(new TExampleRequest(&proto.StartCount));
+
+ module.MessageReceivedEvent.WaitI();
+
+ module.Shutdown();
+ }
+}