blob: 548e83ae5aba64edcba71d463bf6b3d0b0d2d01c (
plain) (
tree)
|
|
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/messagebus/test/helper/example_module.h>
#include <library/cpp/messagebus/test/helper/object_count_check.h>
#include <library/cpp/messagebus/test/helper/wait_for.h>
using namespace NBus;
using namespace NBus::NTest;
Y_UNIT_TEST_SUITE(TBusStarterTest) {
struct TStartJobTestModule: public TExampleModule {
using TBusModule::CreateDefaultStarter;
TAtomic StartCount;
TStartJobTestModule()
: StartCount(0)
{
}
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
Y_UNUSED(mess);
AtomicIncrement(StartCount);
job->Sleep(10);
return &TStartJobTestModule::End;
}
TJobHandler End(TBusJob* job, TBusMessage* mess) {
Y_UNUSED(mess);
AtomicIncrement(StartCount);
job->Cancel(MESSAGE_UNKNOWN);
return nullptr;
}
};
Y_UNIT_TEST(Test) {
TObjectCountCheck objectCountCheck;
TBusMessageQueuePtr bus(CreateMessageQueue());
TStartJobTestModule module;
//module.StartModule();
module.CreatePrivateSessions(bus.Get());
module.StartInput();
TBusSessionConfig config;
config.SendTimeout = 10;
module.CreateDefaultStarter(*bus, config);
UNIT_WAIT_FOR(AtomicGet(module.StartCount) >= 3);
module.Shutdown();
bus->Stop();
}
Y_UNIT_TEST(TestModuleStartJob) {
TObjectCountCheck objectCountCheck;
TExampleProtocol proto;
TStartJobTestModule module;
TBusModuleConfig moduleConfig;
moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10);
module.SetConfig(moduleConfig);
module.StartModule();
module.StartJob(new TExampleRequest(&proto.RequestCount));
UNIT_WAIT_FOR(AtomicGet(module.StartCount) != 2);
module.Shutdown();
}
struct TSleepModule: public TExampleServerModule {
TSystemEvent MessageReceivedEvent;
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
Y_UNUSED(mess);
MessageReceivedEvent.Signal();
job->Sleep(1000000000);
return TJobHandler(&TSleepModule::Never);
}
TJobHandler Never(TBusJob*, TBusMessage*) {
Y_ABORT("happens");
throw 1;
}
};
Y_UNIT_TEST(StartJobDestroyDuringSleep) {
TObjectCountCheck objectCountCheck;
TExampleProtocol proto;
TSleepModule module;
module.StartModule();
module.StartJob(new TExampleRequest(&proto.StartCount));
module.MessageReceivedEvent.WaitI();
module.Shutdown();
}
struct TSendReplyModule: public TExampleServerModule {
TSystemEvent MessageReceivedEvent;
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
Y_UNUSED(mess);
job->SendReply(new TExampleResponse(&Proto.ResponseCount));
MessageReceivedEvent.Signal();
return nullptr;
}
};
Y_UNIT_TEST(AllowSendReplyInStarted) {
TObjectCountCheck objectCountCheck;
TExampleProtocol proto;
TSendReplyModule module;
module.StartModule();
module.StartJob(new TExampleRequest(&proto.StartCount));
module.MessageReceivedEvent.WaitI();
module.Shutdown();
}
}
|