aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/test/ut/moduletest.h
blob: e67da0270157633bb4ebfa7cffa7a1667af639f5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
#pragma once

///////////////////////////////////////////////////////////////////
/// \file
/// \brief Example of using local session for communication.

#include <library/cpp/messagebus/test/helper/alloc_counter.h>
#include <library/cpp/messagebus/test/helper/example.h>
#include <library/cpp/messagebus/test/helper/message_handler_error.h>

#include <library/cpp/messagebus/ybus.h>
#include <library/cpp/messagebus/oldmodule/module.h>

namespace NBus { 
    namespace NTest { 
        using namespace std; 

#define TYPE_HOSTINFOREQUEST 100 
#define TYPE_HOSTINFORESPONSE 101

        //////////////////////////////////////////////////////////////////// 
        /// \brief DupDetect protocol that common between client and server 
        //////////////////////////////////////////////////////////////////// 
        /// \brief HostInfo request class 
        class THostInfoMessage: public TBusMessage { 
        public: 
            THostInfoMessage() 
                : TBusMessage(TYPE_HOSTINFOREQUEST) 
            { 
            } 
            THostInfoMessage(ECreateUninitialized) 
                : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) 
            { 
            } 

            ~THostInfoMessage() override { 
            } 
        }; 

        //////////////////////////////////////////////////////////////////// 
        /// \brief HostInfo reply class 
        class THostInfoReply: public TBusMessage { 
        public: 
            THostInfoReply() 
                : TBusMessage(TYPE_HOSTINFORESPONSE) 
            { 
            } 
            THostInfoReply(ECreateUninitialized) 
                : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) 
            { 
            } 

            ~THostInfoReply() override { 
            } 
        }; 

        //////////////////////////////////////////////////////////////////// 
        /// \brief HostInfo protocol that common between client and server 
        class THostInfoProtocol: public TBusProtocol { 
        public: 
            THostInfoProtocol() 
                : TBusProtocol("HOSTINFO", 0) 
            { 
            } 
            /// serialized protocol specific data into TBusData 
            void Serialize(const TBusMessage* mess, TBuffer& data) override { 
                Y_UNUSED(data); 
                Y_UNUSED(mess); 
            } 

            /// deserialized TBusData into new instance of the message 
            TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override { 
                Y_UNUSED(payload); 

                if (messageType == TYPE_HOSTINFOREQUEST) { 
                    return new THostInfoMessage(MESSAGE_CREATE_UNINITIALIZED); 
                } else if (messageType == TYPE_HOSTINFORESPONSE) { 
                    return new THostInfoReply(MESSAGE_CREATE_UNINITIALIZED); 
                } else { 
                    Y_FAIL("unknown"); 
                } 
            } 
        }; 

        ////////////////////////////////////////////////////////////// 
        /// \brief HostInfo handler (should convert it to module too) 
        struct THostInfoHandler: public TBusServerHandlerError { 
            TBusServerSessionPtr Session; 
            TBusServerSessionConfig HostInfoConfig; 
            THostInfoProtocol HostInfoProto; 

            THostInfoHandler(TBusMessageQueue* queue) { 
                Session = TBusServerSession::Create(&HostInfoProto, this, HostInfoConfig, queue); 
            } 

            void OnMessage(TOnMessageContext& mess) override { 
                usleep(10 * 1000); /// pretend we are doing something 

                TAutoPtr<THostInfoReply> reply(new THostInfoReply()); 

                mess.SendReplyMove(reply); 
            } 

            TNetAddr GetActualListenAddr() { 
                return TNetAddr("localhost", Session->GetActualListenPort()); 
            } 
        }; 

        ////////////////////////////////////////////////////////////// 
        /// \brief DupDetect handler (should convert it to module too) 
        struct TDupDetectHandler: public TBusClientHandlerError { 
            TNetAddr ServerAddr; 

            TBusClientSessionPtr DupDetect; 
            TBusClientSessionConfig DupDetectConfig; 
            TExampleProtocol DupDetectProto; 

            int NumMessages; 
            int NumReplies; 

            TDupDetectHandler(const TNetAddr& serverAddr, TBusMessageQueuePtr queue) 
                : ServerAddr(serverAddr) 
            { 
                DupDetect = TBusClientSession::Create(&DupDetectProto, this, DupDetectConfig, queue); 
                DupDetect->RegisterService("localhost"); 
            } 

            void Work() { 
                NumMessages = 10; 
                NumReplies = 0; 

                for (int i = 0; i < NumMessages; i++) { 
                    TExampleRequest* mess = new TExampleRequest(&DupDetectProto.RequestCount); 
                    DupDetect->SendMessage(mess, &ServerAddr); 
                } 
            } 

            void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override { 
                Y_UNUSED(mess); 
                Y_UNUSED(reply); 
                NumReplies++; 
            } 
        }; 

        ///////////////////////////////////////////////////////////////// 
        /// \brief DupDetect module 
 
        struct TDupDetectModule: public TBusModule { 
            TNetAddr HostInfoAddr; 

            TBusClientSessionPtr HostInfoClientSession; 
            TBusClientSessionConfig HostInfoConfig; 
            THostInfoProtocol HostInfoProto; 

            TExampleProtocol DupDetectProto; 
            TBusServerSessionConfig DupDetectConfig; 

            TNetAddr ListenAddr; 

            TDupDetectModule(const TNetAddr& hostInfoAddr) 
                : TBusModule("DUPDETECTMODULE") 
                , HostInfoAddr(hostInfoAddr) 
            { 
            } 

            bool Init(TBusMessageQueue* queue) { 
                HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig); 
                HostInfoClientSession->RegisterService("localhost"); 

                return TBusModule::CreatePrivateSessions(queue); 
            } 

            TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { 
                TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig); 
 
                ListenAddr = TNetAddr("localhost", session->GetActualListenPort()); 

                return session; 
            } 

            /// entry point into module, first function to call 
            TJobHandler Start(TBusJob* job, TBusMessage* mess) override { 
                TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); 
                Y_UNUSED(dmess); 

                THostInfoMessage* hmess = new THostInfoMessage(); 

                /// send message to imaginary hostinfo server 
                job->Send(hmess, HostInfoClientSession, TReplyHandler(), 0, HostInfoAddr); 
 
                return TJobHandler(&TDupDetectModule::ProcessHostInfo); 
            } 
 
            /// next handler is executed when all outstanding requests from previous handler is completed 
            TJobHandler ProcessHostInfo(TBusJob* job, TBusMessage* mess) { 
                TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); 
                Y_UNUSED(dmess); 

                THostInfoMessage* hmess = job->Get<THostInfoMessage>(); 
                THostInfoReply* hreply = job->Get<THostInfoReply>(); 
                EMessageStatus hstatus = job->GetStatus<THostInfoMessage>(); 
                Y_ASSERT(hmess != nullptr); 
                Y_ASSERT(hreply != nullptr); 
                Y_ASSERT(hstatus == MESSAGE_OK); 

                return TJobHandler(&TDupDetectModule::Finish); 
            } 
 
            /// last handler sends reply and returns NULL 
            TJobHandler Finish(TBusJob* job, TBusMessage* mess) { 
                Y_UNUSED(mess); 

                TExampleResponse* reply = new TExampleResponse(&DupDetectProto.ResponseCount); 
                job->SendReply(reply); 

                return nullptr; 
            } 
        }; 

    }
}