aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/test/ut/module_server_ut.cpp
blob: b0e6ac38da42e7549b7835423dabbc0b94fbead0 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#include <library/cpp/testing/unittest/registar.h>

#include "count_down_latch.h"
#include "moduletest.h"

#include <library/cpp/messagebus/test/helper/example.h>
#include <library/cpp/messagebus/test/helper/example_module.h>
#include <library/cpp/messagebus/test/helper/object_count_check.h>
#include <library/cpp/messagebus/test/helper/wait_for.h>

#include <library/cpp/messagebus/oldmodule/module.h>

#include <util/generic/cast.h>

using namespace NBus;
using namespace NBus::NTest;

Y_UNIT_TEST_SUITE(ModuleServerTests) {
    Y_UNIT_TEST(TestModule) {
        TObjectCountCheck objectCountCheck;

        /// create or get instance of message queue, need one per application
        TBusMessageQueuePtr bus(CreateMessageQueue());
        THostInfoHandler hostHandler(bus.Get());
        TDupDetectModule module(hostHandler.GetActualListenAddr());
        bool success;
        success = module.Init(bus.Get());
        UNIT_ASSERT_C(success, "failed to initialize dupdetect module");

        success = module.StartInput();
        UNIT_ASSERT_C(success, "failed to start dupdetect module");

        TDupDetectHandler dupHandler(module.ListenAddr, bus.Get());
        dupHandler.Work();

        UNIT_WAIT_FOR(dupHandler.NumMessages == dupHandler.NumReplies);

        module.Shutdown();
        dupHandler.DupDetect->Shutdown();
    }

    struct TParallelOnMessageModule: public TExampleServerModule {
        TCountDownLatch WaitTwoRequestsLatch;

        TParallelOnMessageModule()
            : WaitTwoRequestsLatch(2)
        {
        }

        TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
            WaitTwoRequestsLatch.CountDown();
            Y_ABORT_UNLESS(WaitTwoRequestsLatch.Await(TDuration::Seconds(5)), "oops");

            VerifyDynamicCast<TExampleRequest*>(mess);

            job->SendReply(new TExampleResponse(&Proto.ResponseCount));
            return nullptr;
        }
    };

    Y_UNIT_TEST(TestOnMessageHandlerCalledInParallel) {
        TObjectCountCheck objectCountCheck;

        TBusQueueConfig config;
        config.NumWorkers = 5;

        TParallelOnMessageModule module;
        module.StartModule();

        TExampleClient client;

        client.SendMessagesWaitReplies(2, module.ServerAddr);

        module.Shutdown();
    }

    struct TDelayReplyServer: public TExampleServerModule {
        TSystemEvent MessageReceivedEvent;
        TSystemEvent ClientDiedEvent;

        TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
            Y_UNUSED(mess);

            MessageReceivedEvent.Signal();

            Y_ABORT_UNLESS(ClientDiedEvent.WaitT(TDuration::Seconds(5)), "oops");

            job->SendReply(new TExampleResponse(&Proto.ResponseCount));
            return nullptr;
        }
    };

    Y_UNIT_TEST(TestReplyCalledAfterClientDisconnected) {
        TObjectCountCheck objectCountCheck;

        TBusQueueConfig config;
        config.NumWorkers = 5;

        TDelayReplyServer server;
        server.StartModule();

        THolder<TExampleClient> client(new TExampleClient);

        client->SendMessages(1, server.ServerAddr);

        UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));

        UNIT_ASSERT_VALUES_EQUAL(1, server.GetModuleSessionInFlight());

        client.Destroy();

        server.ClientDiedEvent.Signal();

        // wait until all server message are delivered
        UNIT_WAIT_FOR(0 == server.GetModuleSessionInFlight());

        server.Shutdown();
    }
}