diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/messagebus/test/ut/moduletest.h | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/test/ut/moduletest.h')
-rw-r--r-- | library/cpp/messagebus/test/ut/moduletest.h | 406 |
1 files changed, 203 insertions, 203 deletions
diff --git a/library/cpp/messagebus/test/ut/moduletest.h b/library/cpp/messagebus/test/ut/moduletest.h index d5da72c0cb..e67da02701 100644 --- a/library/cpp/messagebus/test/ut/moduletest.h +++ b/library/cpp/messagebus/test/ut/moduletest.h @@ -11,211 +11,211 @@ #include <library/cpp/messagebus/ybus.h> #include <library/cpp/messagebus/oldmodule/module.h> -namespace NBus { - namespace NTest { - using namespace std; +namespace NBus { + namespace NTest { + using namespace std; -#define TYPE_HOSTINFOREQUEST 100 +#define TYPE_HOSTINFOREQUEST 100 #define TYPE_HOSTINFORESPONSE 101 - //////////////////////////////////////////////////////////////////// - /// \brief DupDetect protocol that common between client and server - //////////////////////////////////////////////////////////////////// - /// \brief HostInfo request class - class THostInfoMessage: public TBusMessage { - public: - THostInfoMessage() - : TBusMessage(TYPE_HOSTINFOREQUEST) - { - } - THostInfoMessage(ECreateUninitialized) - : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) - { - } - - ~THostInfoMessage() override { - } - }; - - //////////////////////////////////////////////////////////////////// - /// \brief HostInfo reply class - class THostInfoReply: public TBusMessage { - public: - THostInfoReply() - : TBusMessage(TYPE_HOSTINFORESPONSE) - { - } - THostInfoReply(ECreateUninitialized) - : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) - { - } - - ~THostInfoReply() override { - } - }; - - //////////////////////////////////////////////////////////////////// - /// \brief HostInfo protocol that common between client and server - class THostInfoProtocol: public TBusProtocol { - public: - THostInfoProtocol() - : TBusProtocol("HOSTINFO", 0) - { - } - /// serialized protocol specific data into TBusData - void Serialize(const TBusMessage* mess, TBuffer& data) override { - Y_UNUSED(data); - Y_UNUSED(mess); - } - - /// deserialized TBusData into new instance of the message - TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override { - Y_UNUSED(payload); - - if (messageType == TYPE_HOSTINFOREQUEST) { - return new THostInfoMessage(MESSAGE_CREATE_UNINITIALIZED); - } else if (messageType == TYPE_HOSTINFORESPONSE) { - return new THostInfoReply(MESSAGE_CREATE_UNINITIALIZED); - } else { - Y_FAIL("unknown"); - } - } - }; - - ////////////////////////////////////////////////////////////// - /// \brief HostInfo handler (should convert it to module too) - struct THostInfoHandler: public TBusServerHandlerError { - TBusServerSessionPtr Session; - TBusServerSessionConfig HostInfoConfig; - THostInfoProtocol HostInfoProto; - - THostInfoHandler(TBusMessageQueue* queue) { - Session = TBusServerSession::Create(&HostInfoProto, this, HostInfoConfig, queue); - } - - void OnMessage(TOnMessageContext& mess) override { - usleep(10 * 1000); /// pretend we are doing something - - TAutoPtr<THostInfoReply> reply(new THostInfoReply()); - - mess.SendReplyMove(reply); - } - - TNetAddr GetActualListenAddr() { - return TNetAddr("localhost", Session->GetActualListenPort()); - } - }; - - ////////////////////////////////////////////////////////////// - /// \brief DupDetect handler (should convert it to module too) - struct TDupDetectHandler: public TBusClientHandlerError { - TNetAddr ServerAddr; - - TBusClientSessionPtr DupDetect; - TBusClientSessionConfig DupDetectConfig; - TExampleProtocol DupDetectProto; - - int NumMessages; - int NumReplies; - - TDupDetectHandler(const TNetAddr& serverAddr, TBusMessageQueuePtr queue) - : ServerAddr(serverAddr) - { - DupDetect = TBusClientSession::Create(&DupDetectProto, this, DupDetectConfig, queue); - DupDetect->RegisterService("localhost"); - } - - void Work() { - NumMessages = 10; - NumReplies = 0; - - for (int i = 0; i < NumMessages; i++) { - TExampleRequest* mess = new TExampleRequest(&DupDetectProto.RequestCount); - DupDetect->SendMessage(mess, &ServerAddr); - } - } - - void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override { - Y_UNUSED(mess); - Y_UNUSED(reply); - NumReplies++; - } - }; - - ///////////////////////////////////////////////////////////////// - /// \brief DupDetect module - - struct TDupDetectModule: public TBusModule { - TNetAddr HostInfoAddr; - - TBusClientSessionPtr HostInfoClientSession; - TBusClientSessionConfig HostInfoConfig; - THostInfoProtocol HostInfoProto; - - TExampleProtocol DupDetectProto; - TBusServerSessionConfig DupDetectConfig; - - TNetAddr ListenAddr; - - TDupDetectModule(const TNetAddr& hostInfoAddr) - : TBusModule("DUPDETECTMODULE") - , HostInfoAddr(hostInfoAddr) - { - } - - bool Init(TBusMessageQueue* queue) { - HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig); - HostInfoClientSession->RegisterService("localhost"); - - return TBusModule::CreatePrivateSessions(queue); - } - - TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { - TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig); - - ListenAddr = TNetAddr("localhost", session->GetActualListenPort()); - - return session; - } - - /// entry point into module, first function to call - TJobHandler Start(TBusJob* job, TBusMessage* mess) override { - TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); - Y_UNUSED(dmess); - - THostInfoMessage* hmess = new THostInfoMessage(); - - /// send message to imaginary hostinfo server - job->Send(hmess, HostInfoClientSession, TReplyHandler(), 0, HostInfoAddr); - - return TJobHandler(&TDupDetectModule::ProcessHostInfo); - } - - /// next handler is executed when all outstanding requests from previous handler is completed - TJobHandler ProcessHostInfo(TBusJob* job, TBusMessage* mess) { - TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); - Y_UNUSED(dmess); - - THostInfoMessage* hmess = job->Get<THostInfoMessage>(); - THostInfoReply* hreply = job->Get<THostInfoReply>(); - EMessageStatus hstatus = job->GetStatus<THostInfoMessage>(); - Y_ASSERT(hmess != nullptr); - Y_ASSERT(hreply != nullptr); - Y_ASSERT(hstatus == MESSAGE_OK); - - return TJobHandler(&TDupDetectModule::Finish); - } - - /// last handler sends reply and returns NULL - TJobHandler Finish(TBusJob* job, TBusMessage* mess) { - Y_UNUSED(mess); - - TExampleResponse* reply = new TExampleResponse(&DupDetectProto.ResponseCount); - job->SendReply(reply); - - return nullptr; - } - }; + //////////////////////////////////////////////////////////////////// + /// \brief DupDetect protocol that common between client and server + //////////////////////////////////////////////////////////////////// + /// \brief HostInfo request class + class THostInfoMessage: public TBusMessage { + public: + THostInfoMessage() + : TBusMessage(TYPE_HOSTINFOREQUEST) + { + } + THostInfoMessage(ECreateUninitialized) + : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) + { + } + + ~THostInfoMessage() override { + } + }; + + //////////////////////////////////////////////////////////////////// + /// \brief HostInfo reply class + class THostInfoReply: public TBusMessage { + public: + THostInfoReply() + : TBusMessage(TYPE_HOSTINFORESPONSE) + { + } + THostInfoReply(ECreateUninitialized) + : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) + { + } + + ~THostInfoReply() override { + } + }; + + //////////////////////////////////////////////////////////////////// + /// \brief HostInfo protocol that common between client and server + class THostInfoProtocol: public TBusProtocol { + public: + THostInfoProtocol() + : TBusProtocol("HOSTINFO", 0) + { + } + /// serialized protocol specific data into TBusData + void Serialize(const TBusMessage* mess, TBuffer& data) override { + Y_UNUSED(data); + Y_UNUSED(mess); + } + + /// deserialized TBusData into new instance of the message + TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override { + Y_UNUSED(payload); + + if (messageType == TYPE_HOSTINFOREQUEST) { + return new THostInfoMessage(MESSAGE_CREATE_UNINITIALIZED); + } else if (messageType == TYPE_HOSTINFORESPONSE) { + return new THostInfoReply(MESSAGE_CREATE_UNINITIALIZED); + } else { + Y_FAIL("unknown"); + } + } + }; + + ////////////////////////////////////////////////////////////// + /// \brief HostInfo handler (should convert it to module too) + struct THostInfoHandler: public TBusServerHandlerError { + TBusServerSessionPtr Session; + TBusServerSessionConfig HostInfoConfig; + THostInfoProtocol HostInfoProto; + + THostInfoHandler(TBusMessageQueue* queue) { + Session = TBusServerSession::Create(&HostInfoProto, this, HostInfoConfig, queue); + } + + void OnMessage(TOnMessageContext& mess) override { + usleep(10 * 1000); /// pretend we are doing something + + TAutoPtr<THostInfoReply> reply(new THostInfoReply()); + + mess.SendReplyMove(reply); + } + + TNetAddr GetActualListenAddr() { + return TNetAddr("localhost", Session->GetActualListenPort()); + } + }; + + ////////////////////////////////////////////////////////////// + /// \brief DupDetect handler (should convert it to module too) + struct TDupDetectHandler: public TBusClientHandlerError { + TNetAddr ServerAddr; + + TBusClientSessionPtr DupDetect; + TBusClientSessionConfig DupDetectConfig; + TExampleProtocol DupDetectProto; + + int NumMessages; + int NumReplies; + + TDupDetectHandler(const TNetAddr& serverAddr, TBusMessageQueuePtr queue) + : ServerAddr(serverAddr) + { + DupDetect = TBusClientSession::Create(&DupDetectProto, this, DupDetectConfig, queue); + DupDetect->RegisterService("localhost"); + } + + void Work() { + NumMessages = 10; + NumReplies = 0; + + for (int i = 0; i < NumMessages; i++) { + TExampleRequest* mess = new TExampleRequest(&DupDetectProto.RequestCount); + DupDetect->SendMessage(mess, &ServerAddr); + } + } + + void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override { + Y_UNUSED(mess); + Y_UNUSED(reply); + NumReplies++; + } + }; + + ///////////////////////////////////////////////////////////////// + /// \brief DupDetect module + + struct TDupDetectModule: public TBusModule { + TNetAddr HostInfoAddr; + + TBusClientSessionPtr HostInfoClientSession; + TBusClientSessionConfig HostInfoConfig; + THostInfoProtocol HostInfoProto; + + TExampleProtocol DupDetectProto; + TBusServerSessionConfig DupDetectConfig; + + TNetAddr ListenAddr; + + TDupDetectModule(const TNetAddr& hostInfoAddr) + : TBusModule("DUPDETECTMODULE") + , HostInfoAddr(hostInfoAddr) + { + } + + bool Init(TBusMessageQueue* queue) { + HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig); + HostInfoClientSession->RegisterService("localhost"); + + return TBusModule::CreatePrivateSessions(queue); + } + + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { + TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig); + + ListenAddr = TNetAddr("localhost", session->GetActualListenPort()); + + return session; + } + + /// entry point into module, first function to call + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); + Y_UNUSED(dmess); + + THostInfoMessage* hmess = new THostInfoMessage(); + + /// send message to imaginary hostinfo server + job->Send(hmess, HostInfoClientSession, TReplyHandler(), 0, HostInfoAddr); + + return TJobHandler(&TDupDetectModule::ProcessHostInfo); + } + + /// next handler is executed when all outstanding requests from previous handler is completed + TJobHandler ProcessHostInfo(TBusJob* job, TBusMessage* mess) { + TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); + Y_UNUSED(dmess); + + THostInfoMessage* hmess = job->Get<THostInfoMessage>(); + THostInfoReply* hreply = job->Get<THostInfoReply>(); + EMessageStatus hstatus = job->GetStatus<THostInfoMessage>(); + Y_ASSERT(hmess != nullptr); + Y_ASSERT(hreply != nullptr); + Y_ASSERT(hstatus == MESSAGE_OK); + + return TJobHandler(&TDupDetectModule::Finish); + } + + /// last handler sends reply and returns NULL + TJobHandler Finish(TBusJob* job, TBusMessage* mess) { + Y_UNUSED(mess); + + TExampleResponse* reply = new TExampleResponse(&DupDetectProto.ResponseCount); + job->SendReply(reply); + + return nullptr; + } + }; } -} +} |