aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/test/ut/module_server_ut.cpp
blob: 38f3fcc4edea15fad89db794618b625006721eb4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#include <library/cpp/testing/unittest/registar.h>
 
#include "count_down_latch.h"
#include "moduletest.h" 
 
#include <library/cpp/messagebus/test/helper/example.h>
#include <library/cpp/messagebus/test/helper/example_module.h>
#include <library/cpp/messagebus/test/helper/object_count_check.h>
#include <library/cpp/messagebus/test/helper/wait_for.h>

#include <library/cpp/messagebus/oldmodule/module.h>

#include <util/generic/cast.h>

using namespace NBus; 
using namespace NBus::NTest; 
 
Y_UNIT_TEST_SUITE(ModuleServerTests) {
    Y_UNIT_TEST(TestModule) {
        TObjectCountCheck objectCountCheck; 
 
        /// create or get instance of message queue, need one per application 
        TBusMessageQueuePtr bus(CreateMessageQueue()); 
        THostInfoHandler hostHandler(bus.Get());
        TDupDetectModule module(hostHandler.GetActualListenAddr()); 
        bool success; 
        success = module.Init(bus.Get()); 
        UNIT_ASSERT_C(success, "failed to initialize dupdetect module"); 
 
        success = module.StartInput(); 
        UNIT_ASSERT_C(success, "failed to start dupdetect module"); 
 
        TDupDetectHandler dupHandler(module.ListenAddr, bus.Get()); 
        dupHandler.Work(); 
 
        UNIT_WAIT_FOR(dupHandler.NumMessages == dupHandler.NumReplies); 
 
        module.Shutdown(); 
        dupHandler.DupDetect->Shutdown(); 
    } 
 
    struct TParallelOnMessageModule: public TExampleServerModule {
        TCountDownLatch WaitTwoRequestsLatch; 
 
        TParallelOnMessageModule() 
            : WaitTwoRequestsLatch(2) 
        {
        }
 
        TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
            WaitTwoRequestsLatch.CountDown(); 
            Y_VERIFY(WaitTwoRequestsLatch.Await(TDuration::Seconds(5)), "oops");
 
            VerifyDynamicCast<TExampleRequest*>(mess); 
 
            job->SendReply(new TExampleResponse(&Proto.ResponseCount)); 
            return nullptr;
        } 
    }; 
 
    Y_UNIT_TEST(TestOnMessageHandlerCalledInParallel) {
        TObjectCountCheck objectCountCheck; 
 
        TBusQueueConfig config; 
        config.NumWorkers = 5; 
 
        TParallelOnMessageModule module; 
        module.StartModule(); 
 
        TExampleClient client; 
 
        client.SendMessagesWaitReplies(2, module.ServerAddr); 
 
        module.Shutdown(); 
    } 
 
    struct TDelayReplyServer: public TExampleServerModule { 
        TSystemEvent MessageReceivedEvent;
        TSystemEvent ClientDiedEvent;
 
        TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
            Y_UNUSED(mess);
 
            MessageReceivedEvent.Signal(); 
 
            Y_VERIFY(ClientDiedEvent.WaitT(TDuration::Seconds(5)), "oops");
 
            job->SendReply(new TExampleResponse(&Proto.ResponseCount)); 
            return nullptr;
        } 
    }; 
 
    Y_UNIT_TEST(TestReplyCalledAfterClientDisconnected) {
        TObjectCountCheck objectCountCheck; 
 
        TBusQueueConfig config; 
        config.NumWorkers = 5; 
 
        TDelayReplyServer server; 
        server.StartModule(); 
 
        THolder<TExampleClient> client(new TExampleClient); 
 
        client->SendMessages(1, server.ServerAddr); 
 
        UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); 
 
        UNIT_ASSERT_VALUES_EQUAL(1, server.GetModuleSessionInFlight()); 
 
        client.Destroy(); 
 
        server.ClientDiedEvent.Signal(); 
 
        // wait until all server message are delivered 
        UNIT_WAIT_FOR(0 == server.GetModuleSessionInFlight()); 
 
        server.Shutdown(); 
    } 
}