aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/test/ut/one_way_ut.cpp
blob: a8e0cb960bd22850412bcd923137b837ae2b1f76 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
///////////////////////////////////////////////////////////////////
/// \file
/// \brief Example of reply-less communication 
 
/// This example demostrates how asynchronous message passing library
/// can be used to send message and do not wait for reply back.
/// The usage of reply-less communication should be restricted to
/// low-throughput clients and high-throughput server to provide reasonable
/// utility. Removing replies from the communication removes any restriction
/// on how many message can be send to server and rougue clients may overwelm
/// server without thoughtput control.
 
/// 1) To implement reply-less client \n
 
/// Call NBus::TBusSession::AckMessage() 
/// from within NBus::IMessageHandler::OnSent() handler when message has 
/// gone into wire on client end. See example in NBus::NullClient::OnMessageSent(). 
/// Discard identity for reply message.
 
/// 2) To implement reply-less server \n
 
/// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage()
/// handler when message has been received on server end. 
/// See example in NBus::NullServer::OnMessage(). 
/// Discard identity for reply message.

#include <library/cpp/messagebus/test/helper/alloc_counter.h>
#include <library/cpp/messagebus/test/helper/example.h>
#include <library/cpp/messagebus/test/helper/hanging_server.h>
#include <library/cpp/messagebus/test/helper/message_handler_error.h>
#include <library/cpp/messagebus/test/helper/object_count_check.h>
#include <library/cpp/messagebus/test/helper/wait_for.h>

#include <library/cpp/messagebus/ybus.h>

using namespace std;
using namespace NBus;
using namespace NBus::NPrivate;
using namespace NBus::NTest;

////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////
/// \brief Reply-less client and handler
struct NullClient : TBusClientHandlerError { 
    TNetAddr ServerAddr;

    TBusMessageQueuePtr Queue;
    TBusClientSessionPtr Session;
    TExampleProtocol Proto;

    /// constructor creates instances of protocol and session
    NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig())
        : ServerAddr(serverAddr)
    {
        UNIT_ASSERT(serverAddr.GetPort() > 0);

        /// create or get instance of message queue, need one per application
        Queue = CreateMessageQueue();

        /// register source/client session
        Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue);

        /// register service, announce to clients via LocatorService
        Session->RegisterService("localhost");
    }

    ~NullClient() override {
        Session->Shutdown();
    }

    /// dispatch of requests is done here
    void Work() { 
        int batch = 10;

        for (int i = 0; i < batch; i++) { 
            TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount); 
            mess->Data = "TADA";
            Session->SendMessageOneWay(mess, &ServerAddr);
        }
    }

    void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override {
    }
};

/////////////////////////////////////////////////////////////////////
/// \brief Reply-less server and handler
class NullServer: public TBusServerHandlerError {
public:
    /// session object to maintian
    TBusMessageQueuePtr Queue;
    TBusServerSessionPtr Session;
    TExampleProtocol Proto;

public:
    TAtomic NumMessages;

    NullServer() {
        NumMessages = 0;

        /// create or get instance of single message queue, need one for application
        Queue = CreateMessageQueue();

        /// register destination session
        TBusServerSessionConfig sessionConfig;
        Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue);
    }

    ~NullServer() override {
        Session->Shutdown();
    }

    /// when message comes do not send reply, just acknowledge
    void OnMessage(TOnMessageContext& mess) override {
        TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage()); 

        Y_ASSERT(fmess->Data == "TADA");

        /// tell session to forget this message and never expect any reply
        mess.ForgetRequest();

        AtomicIncrement(NumMessages);
    }

    /// this handler should not be called because this server does not send replies
    void OnSent(TAutoPtr<TBusMessage> mess) override {
        Y_UNUSED(mess);
        Y_FAIL("This server does not sent replies");
    } 
};

Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
    Y_UNIT_TEST(Simple) {
        TObjectCountCheck objectCountCheck;

        NullServer server;
        NullClient client(TNetAddr("localhost", server.Session->GetActualListenPort()));

        client.Work();

        // wait until all client message are delivered
        UNIT_WAIT_FOR(AtomicGet(server.NumMessages) == 10);

        // assert correct number of messages
        UNIT_ASSERT_VALUES_EQUAL(AtomicGet(server.NumMessages), 10);
        UNIT_ASSERT_VALUES_EQUAL(server.Session->GetInFlight(), 0);
        UNIT_ASSERT_VALUES_EQUAL(client.Session->GetInFlight(), 0);
    }

    struct TMessageTooLargeClient: public NullClient {
        TSystemEvent GotTooLarge;

        TBusClientSessionConfig Config() {
            TBusClientSessionConfig r;
            r.MaxMessageSize = 1;
            return r;
        }

        TMessageTooLargeClient(unsigned port)
            : NullClient(TNetAddr("localhost", port), Config())
        { 
        } 

        ~TMessageTooLargeClient() override {
            Session->Shutdown();
        }

        void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
            Y_UNUSED(mess);

            Y_VERIFY(status == MESSAGE_MESSAGE_TOO_LARGE, "wrong status: %s", ToCString(status));

            GotTooLarge.Signal();
        }
    };

    Y_UNIT_TEST(MessageTooLargeOnClient) {
        TObjectCountCheck objectCountCheck;

        NullServer server;

        TMessageTooLargeClient client(server.Session->GetActualListenPort());

        EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
        UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);

        client.GotTooLarge.WaitI();
    }

    struct TCheckTimeoutClient: public NullClient { 
        ~TCheckTimeoutClient() override {
            Session->Shutdown();
        }

        static TBusClientSessionConfig SessionConfig() {
            TBusClientSessionConfig sessionConfig;
            sessionConfig.SendTimeout = 1;
            sessionConfig.ConnectTimeout = 1;
            sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10);
            return sessionConfig;
        }

        TCheckTimeoutClient(const TNetAddr& serverAddr) 
            : NullClient(serverAddr, SessionConfig()) 
        { 
        } 

        TSystemEvent GotError;

        /// message that could not be delivered
        void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
            Y_UNUSED(mess);
            Y_UNUSED(status); // TODO: check status

            GotError.Signal();
        }
    };

    Y_UNIT_TEST(SendTimeout_Callback_NoServer) {
        TObjectCountCheck objectCountCheck;

        TCheckTimeoutClient client(TNetAddr("localhost", 17));

        EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
        UNIT_ASSERT_EQUAL(ok, MESSAGE_OK);

        client.GotError.WaitI();
    }

    Y_UNIT_TEST(SendTimeout_Callback_HangingServer) {
        THangingServer server;

        TObjectCountCheck objectCountCheck;

        TCheckTimeoutClient client(TNetAddr("localhost", server.GetPort()));

        bool first = true;
        for (;;) {
            EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
            if (ok == MESSAGE_BUSY) {
                UNIT_ASSERT(!first);
                break;
            }
            UNIT_ASSERT_VALUES_EQUAL(ok, MESSAGE_OK);
            first = false;
        }

        // BUGBUG: The test is buggy: the client might not get any error when sending one-way messages.
        // All the messages that the client has sent before he gets first MESSAGE_BUSY error might get
        // serailized and written to the socket buffer, so the write queue gets drained and there are
        // no messages to timeout when periodic timeout check happens.

        client.GotError.WaitI();
    }
}