diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/messagebus/test/ut | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/test/ut')
-rw-r--r-- | library/cpp/messagebus/test/ut/count_down_latch.h | 6 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/locator_uniq_ut.cpp | 10 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/messagebus_ut.cpp | 102 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp | 8 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/module_client_ut.cpp | 74 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/module_server_ut.cpp | 8 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/moduletest.h | 406 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/one_way_ut.cpp | 48 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/starter_ut.cpp | 6 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/sync_client_ut.cpp | 98 |
10 files changed, 383 insertions, 383 deletions
diff --git a/library/cpp/messagebus/test/ut/count_down_latch.h b/library/cpp/messagebus/test/ut/count_down_latch.h index 5117db5731..a4d6b72bfa 100644 --- a/library/cpp/messagebus/test/ut/count_down_latch.h +++ b/library/cpp/messagebus/test/ut/count_down_latch.h @@ -7,12 +7,12 @@ class TCountDownLatch { private: TAtomic Current; TSystemEvent EventObject; - + public: TCountDownLatch(unsigned initial) : Current(initial) - { - } + { + } void CountDown() { if (AtomicDecrement(Current) == 0) { diff --git a/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp b/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp index 3fdd175d73..dd5dfc4cca 100644 --- a/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp +++ b/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp @@ -4,12 +4,12 @@ #include <library/cpp/messagebus/ybus.h> class TLocatorRegisterUniqTest: public TTestBase { - UNIT_TEST_SUITE(TLocatorRegisterUniqTest); - UNIT_TEST(TestRegister); - UNIT_TEST_SUITE_END(); + UNIT_TEST_SUITE(TLocatorRegisterUniqTest); + UNIT_TEST(TestRegister); + UNIT_TEST_SUITE_END(); -protected: - void TestRegister(); +protected: + void TestRegister(); }; UNIT_TEST_SUITE_REGISTRATION(TLocatorRegisterUniqTest); diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp index 040f9b7702..92839e9cf9 100644 --- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp +++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp @@ -10,8 +10,8 @@ #include <util/network/sock.h> -#include <utility> - +#include <utility> + using namespace NBus; using namespace NBus::NTest; @@ -23,10 +23,10 @@ namespace { TExampleClientSlowOnMessageSent() : SentCompleted(0) - { - } + { + } - ~TExampleClientSlowOnMessageSent() override { + ~TExampleClientSlowOnMessageSent() override { Session->Shutdown(); } @@ -48,7 +48,7 @@ namespace { Y_UNIT_TEST_SUITE(TMessageBusTests) { void TestDestinationTemplate(bool useCompression, bool ackMessageBeforeReply, - const TBusServerSessionConfig& sessionConfig) { + const TBusServerSessionConfig& sessionConfig) { TObjectCountCheck objectCountCheck; TExampleServer server; @@ -121,17 +121,17 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.SendMessagesWaitReplies(19, serverAddr); } - struct TestNoServerImplClient: public TExampleClient { + struct TestNoServerImplClient: public TExampleClient { TTestSync TestSync; int failures = 0; - template <typename... Args> - TestNoServerImplClient(Args&&... args) - : TExampleClient(std::forward<Args>(args)...) - { - } + template <typename... Args> + TestNoServerImplClient(Args&&... args) + : TExampleClient(std::forward<Args>(args)...) + { + } - ~TestNoServerImplClient() override { + ~TestNoServerImplClient() override { Session->Shutdown(); } @@ -155,7 +155,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { if (oneWay) { status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr); } else { - TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); + TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); status = client.Session->SendMessageAutoPtr(message, &noServerAddr); } @@ -168,7 +168,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.TestSync.WaitForAndIncrement(count * 2 + 1); } - client.TestSync.WaitForAndIncrement(count * 2); + client.TestSync.WaitForAndIncrement(count * 2); } void HangingServerImpl(unsigned port) { @@ -241,7 +241,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.WaitReplies(); } - struct TSendTimeoutCheckerExampleClient: public TExampleClient { + struct TSendTimeoutCheckerExampleClient: public TExampleClient { static TBusClientSessionConfig SessionConfig(bool periodLessThanConnectTimeout) { TBusClientSessionConfig sessionConfig; if (periodLessThanConnectTimeout) { @@ -256,8 +256,8 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { TSendTimeoutCheckerExampleClient(bool periodLessThanConnectTimeout) : TExampleClient(SessionConfig(periodLessThanConnectTimeout)) - { - } + { + } ~TSendTimeoutCheckerExampleClient() override { Session->Shutdown(); @@ -470,7 +470,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE); } - struct TServerForResponseTooLarge: public TExampleServer { + struct TServerForResponseTooLarge: public TExampleServer { TTestSync TestSync; static TBusServerSessionConfig Config() { @@ -481,10 +481,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { TServerForResponseTooLarge() : TExampleServer("TServerForResponseTooLarge", Config()) - { - } + { + } - ~TServerForResponseTooLarge() override { + ~TServerForResponseTooLarge() override { Session->Shutdown(); } @@ -530,7 +530,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { UNIT_ASSERT_VALUES_EQUAL(1, client.Session->GetInFlight()); } - struct TServerForRequestTooLarge: public TExampleServer { + struct TServerForRequestTooLarge: public TExampleServer { TTestSync TestSync; static TBusServerSessionConfig Config() { @@ -541,10 +541,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { TServerForRequestTooLarge() : TExampleServer("TServerForRequestTooLarge", Config()) - { - } + { + } - ~TServerForRequestTooLarge() override { + ~TServerForRequestTooLarge() override { Session->Shutdown(); } @@ -674,7 +674,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.WaitReplies(); } - struct TResetAfterSendOneWayErrorInCallbackClient: public TExampleClient { + struct TResetAfterSendOneWayErrorInCallbackClient: public TExampleClient { TTestSync TestSync; static TBusClientSessionConfig SessionConfig() { @@ -691,7 +691,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { { } - ~TResetAfterSendOneWayErrorInCallbackClient() override { + ~TResetAfterSendOneWayErrorInCallbackClient() override { Session->Shutdown(); } @@ -716,10 +716,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.TestSync.WaitForAndIncrement(2); } - struct TResetAfterSendMessageOneWayDuringShutdown: public TExampleClient { + struct TResetAfterSendMessageOneWayDuringShutdown: public TExampleClient { TTestSync TestSync; - ~TResetAfterSendMessageOneWayDuringShutdown() override { + ~TResetAfterSendMessageOneWayDuringShutdown() override { Session->Shutdown(); } @@ -770,10 +770,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { TestNoServerImpl(17, true); } - struct TResetAfterSendOneWaySuccessClient: public TExampleClient { + struct TResetAfterSendOneWaySuccessClient: public TExampleClient { TTestSync TestSync; - ~TResetAfterSendOneWaySuccessClient() override { + ~TResetAfterSendOneWaySuccessClient() override { Session->Shutdown(); } @@ -835,7 +835,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { TExampleProtocol proto; TBusServerHandlerError handler; TBusServerSessionPtr session = TBusServerSession::Create( - &proto, &handler, TBusServerSessionConfig(), queue); + &proto, &handler, TBusServerSessionConfig(), queue); unsigned port = session->GetActualListenPort(); UNIT_ASSERT(port > 0); @@ -873,7 +873,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { size_t pos = 0; while (pos < sizeof(response)) { - size_t count = input.Read(((char*)&response) + pos, sizeof(response) - pos); + size_t count = input.Read(((char*)&response) + pos, sizeof(response) - pos); pos += count; } @@ -882,10 +882,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { UNIT_ASSERT_VALUES_EQUAL(YBUS_VERSION, response.GetVersionInternal()); } - struct TOnConnectionEventClient: public TExampleClient { + struct TOnConnectionEventClient: public TExampleClient { TTestSync Sync; - ~TOnConnectionEventClient() override { + ~TOnConnectionEventClient() override { Session->Shutdown(); } @@ -913,13 +913,13 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { } }; - struct TOnConnectionEventServer: public TExampleServer { + struct TOnConnectionEventServer: public TExampleServer { TOnConnectionEventServer() - : TExampleServer("TOnConnectionEventServer") - { - } + : TExampleServer("TOnConnectionEventServer") + { + } - ~TOnConnectionEventServer() override { + ~TOnConnectionEventServer() override { Session->Shutdown(); } @@ -963,9 +963,9 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.Sync.WaitForAndIncrement(3); } - struct TServerForQuotaWake: public TExampleServer { + struct TServerForQuotaWake: public TExampleServer { TSystemEvent GoOn; - TMutex OneLock; + TMutex OneLock; TOnMessageContext OneMessage; @@ -981,16 +981,16 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { TServerForQuotaWake() : TExampleServer("TServerForQuotaWake", Config()) - { - } + { + } - ~TServerForQuotaWake() override { + ~TServerForQuotaWake() override { Session->Shutdown(); } void OnMessage(TOnMessageContext& req) override { if (!GoOn.Wait(0)) { - TGuard<TMutex> guard(OneLock); + TGuard<TMutex> guard(OneLock); UNIT_ASSERT(!OneMessage); @@ -1000,7 +1000,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { } void WakeOne() { - TGuard<TMutex> guard(OneLock); + TGuard<TMutex> guard(OneLock); UNIT_ASSERT(!!OneMessage); @@ -1035,13 +1035,13 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { count++; } else if (status == MESSAGE_BUSY) { - if (count == test_msg_count) { + if (count == test_msg_count) { TInstant now = TInstant::Now(); - if (start.GetValue() == 0) { + if (start.GetValue() == 0) { start = now; - // TODO: properly check that server is blocked + // TODO: properly check that server is blocked } else if (start + TDuration::MilliSeconds(100) < now) { break; } diff --git a/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp index 4083cf3b7b..fd511e2dd9 100644 --- a/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp +++ b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp @@ -43,8 +43,8 @@ Y_UNIT_TEST_SUITE(ModuleClientOneWay) { : TBusModule("m") , TestSync(testSync) , Port(port) - { - } + { + } TJobHandler Start(TBusJob* job, TBusMessage*) override { TestSync->WaitForAndIncrement(0); @@ -94,8 +94,8 @@ Y_UNIT_TEST_SUITE(ModuleClientOneWay) { TSendErrorModule(TTestSync* testSync) : TBusModule("m") , TestSync(testSync) - { - } + { + } TJobHandler Start(TBusJob* job, TBusMessage*) override { TestSync->WaitForAndIncrement(0); diff --git a/library/cpp/messagebus/test/ut/module_client_ut.cpp b/library/cpp/messagebus/test/ut/module_client_ut.cpp index ebfe185cc6..84897ce5c4 100644 --- a/library/cpp/messagebus/test/ut/module_client_ut.cpp +++ b/library/cpp/messagebus/test/ut/module_client_ut.cpp @@ -20,34 +20,34 @@ using namespace NBus::NTest; // helper class that cleans TBusJob instance, so job's destructor can // be completed without assertion fail. struct TJobGuard { -public: - TJobGuard(NBus::TBusJob* job) - : Job(job) - { - } - - ~TJobGuard() { - Job->ClearAllMessageStates(); - } - -private: - NBus::TBusJob* Job; +public: + TJobGuard(NBus::TBusJob* job) + : Job(job) + { + } + + ~TJobGuard() { + Job->ClearAllMessageStates(); + } + +private: + NBus::TBusJob* Job; }; -class TMessageOk: public NBus::TBusMessage { -public: - TMessageOk() - : NBus::TBusMessage(1) - { - } +class TMessageOk: public NBus::TBusMessage { +public: + TMessageOk() + : NBus::TBusMessage(1) + { + } }; -class TMessageError: public NBus::TBusMessage { -public: - TMessageError() - : NBus::TBusMessage(2) - { - } +class TMessageError: public NBus::TBusMessage { +public: + TMessageError() + : NBus::TBusMessage(2) + { + } }; Y_UNIT_TEST_SUITE(BusJobTest) { @@ -108,8 +108,8 @@ Y_UNIT_TEST_SUITE(BusJobTest) { TParallelOnReplyModule(const TNetAddr& serverAddr) : ServerAddr(serverAddr) , RepliesLatch(2) - { - } + { + } TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); @@ -166,8 +166,8 @@ Y_UNIT_TEST_SUITE(BusJobTest) { : ServerAddr("localhost", 17) , GotReplyLatch(2) , SentMessage() - { - } + { + } TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); @@ -222,7 +222,7 @@ Y_UNIT_TEST_SUITE(BusJobTest) { module.Shutdown(); } - struct TSlowReplyServer: public TBusServerHandlerError { + struct TSlowReplyServer: public TBusServerHandlerError { TTestSync* const TestSync; TBusMessageQueuePtr Bus; TBusServerSessionPtr ServerSession; @@ -248,7 +248,7 @@ Y_UNIT_TEST_SUITE(BusJobTest) { } }; - struct TModuleThatSendsReplyEarly: public TExampleClientModule { + struct TModuleThatSendsReplyEarly: public TExampleClientModule { TTestSync* const TestSync; const unsigned ServerPort; @@ -260,8 +260,8 @@ Y_UNIT_TEST_SUITE(BusJobTest) { , ServerPort(serverPort) , ServerSession(nullptr) , ReplyCount(0) - { - } + { + } TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); @@ -318,22 +318,22 @@ Y_UNIT_TEST_SUITE(BusJobTest) { module.Shutdown(); } - struct TShutdownCalledBeforeReplyReceivedModule: public TExampleClientModule { + struct TShutdownCalledBeforeReplyReceivedModule: public TExampleClientModule { unsigned ServerPort; TTestSync TestSync; TShutdownCalledBeforeReplyReceivedModule(unsigned serverPort) : ServerPort(serverPort) - { - } + { + } TJobHandler Start(TBusJob* job, TBusMessage*) override { TestSync.CheckAndIncrement(0); job->Send(new TExampleRequest(&Proto.RequestCount), Source, - TReplyHandler(&TShutdownCalledBeforeReplyReceivedModule::HandleReply), - 0, TNetAddr("localhost", ServerPort)); + TReplyHandler(&TShutdownCalledBeforeReplyReceivedModule::HandleReply), + 0, TNetAddr("localhost", ServerPort)); return &TShutdownCalledBeforeReplyReceivedModule::End; } diff --git a/library/cpp/messagebus/test/ut/module_server_ut.cpp b/library/cpp/messagebus/test/ut/module_server_ut.cpp index 88fe1dd9b6..4258ae4bf7 100644 --- a/library/cpp/messagebus/test/ut/module_server_ut.cpp +++ b/library/cpp/messagebus/test/ut/module_server_ut.cpp @@ -21,7 +21,7 @@ Y_UNIT_TEST_SUITE(ModuleServerTests) { /// create or get instance of message queue, need one per application TBusMessageQueuePtr bus(CreateMessageQueue()); - THostInfoHandler hostHandler(bus.Get()); + THostInfoHandler hostHandler(bus.Get()); TDupDetectModule module(hostHandler.GetActualListenAddr()); bool success; success = module.Init(bus.Get()); @@ -39,13 +39,13 @@ Y_UNIT_TEST_SUITE(ModuleServerTests) { dupHandler.DupDetect->Shutdown(); } - struct TParallelOnMessageModule: public TExampleServerModule { + struct TParallelOnMessageModule: public TExampleServerModule { TCountDownLatch WaitTwoRequestsLatch; TParallelOnMessageModule() : WaitTwoRequestsLatch(2) - { - } + { + } TJobHandler Start(TBusJob* job, TBusMessage* mess) override { WaitTwoRequestsLatch.CountDown(); diff --git a/library/cpp/messagebus/test/ut/moduletest.h b/library/cpp/messagebus/test/ut/moduletest.h index d5da72c0cb..e67da02701 100644 --- a/library/cpp/messagebus/test/ut/moduletest.h +++ b/library/cpp/messagebus/test/ut/moduletest.h @@ -11,211 +11,211 @@ #include <library/cpp/messagebus/ybus.h> #include <library/cpp/messagebus/oldmodule/module.h> -namespace NBus { - namespace NTest { - using namespace std; +namespace NBus { + namespace NTest { + using namespace std; -#define TYPE_HOSTINFOREQUEST 100 +#define TYPE_HOSTINFOREQUEST 100 #define TYPE_HOSTINFORESPONSE 101 - //////////////////////////////////////////////////////////////////// - /// \brief DupDetect protocol that common between client and server - //////////////////////////////////////////////////////////////////// - /// \brief HostInfo request class - class THostInfoMessage: public TBusMessage { - public: - THostInfoMessage() - : TBusMessage(TYPE_HOSTINFOREQUEST) - { - } - THostInfoMessage(ECreateUninitialized) - : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) - { - } - - ~THostInfoMessage() override { - } - }; - - //////////////////////////////////////////////////////////////////// - /// \brief HostInfo reply class - class THostInfoReply: public TBusMessage { - public: - THostInfoReply() - : TBusMessage(TYPE_HOSTINFORESPONSE) - { - } - THostInfoReply(ECreateUninitialized) - : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) - { - } - - ~THostInfoReply() override { - } - }; - - //////////////////////////////////////////////////////////////////// - /// \brief HostInfo protocol that common between client and server - class THostInfoProtocol: public TBusProtocol { - public: - THostInfoProtocol() - : TBusProtocol("HOSTINFO", 0) - { - } - /// serialized protocol specific data into TBusData - void Serialize(const TBusMessage* mess, TBuffer& data) override { - Y_UNUSED(data); - Y_UNUSED(mess); - } - - /// deserialized TBusData into new instance of the message - TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override { - Y_UNUSED(payload); - - if (messageType == TYPE_HOSTINFOREQUEST) { - return new THostInfoMessage(MESSAGE_CREATE_UNINITIALIZED); - } else if (messageType == TYPE_HOSTINFORESPONSE) { - return new THostInfoReply(MESSAGE_CREATE_UNINITIALIZED); - } else { - Y_FAIL("unknown"); - } - } - }; - - ////////////////////////////////////////////////////////////// - /// \brief HostInfo handler (should convert it to module too) - struct THostInfoHandler: public TBusServerHandlerError { - TBusServerSessionPtr Session; - TBusServerSessionConfig HostInfoConfig; - THostInfoProtocol HostInfoProto; - - THostInfoHandler(TBusMessageQueue* queue) { - Session = TBusServerSession::Create(&HostInfoProto, this, HostInfoConfig, queue); - } - - void OnMessage(TOnMessageContext& mess) override { - usleep(10 * 1000); /// pretend we are doing something - - TAutoPtr<THostInfoReply> reply(new THostInfoReply()); - - mess.SendReplyMove(reply); - } - - TNetAddr GetActualListenAddr() { - return TNetAddr("localhost", Session->GetActualListenPort()); - } - }; - - ////////////////////////////////////////////////////////////// - /// \brief DupDetect handler (should convert it to module too) - struct TDupDetectHandler: public TBusClientHandlerError { - TNetAddr ServerAddr; - - TBusClientSessionPtr DupDetect; - TBusClientSessionConfig DupDetectConfig; - TExampleProtocol DupDetectProto; - - int NumMessages; - int NumReplies; - - TDupDetectHandler(const TNetAddr& serverAddr, TBusMessageQueuePtr queue) - : ServerAddr(serverAddr) - { - DupDetect = TBusClientSession::Create(&DupDetectProto, this, DupDetectConfig, queue); - DupDetect->RegisterService("localhost"); - } - - void Work() { - NumMessages = 10; - NumReplies = 0; - - for (int i = 0; i < NumMessages; i++) { - TExampleRequest* mess = new TExampleRequest(&DupDetectProto.RequestCount); - DupDetect->SendMessage(mess, &ServerAddr); - } - } - - void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override { - Y_UNUSED(mess); - Y_UNUSED(reply); - NumReplies++; - } - }; - - ///////////////////////////////////////////////////////////////// - /// \brief DupDetect module - - struct TDupDetectModule: public TBusModule { - TNetAddr HostInfoAddr; - - TBusClientSessionPtr HostInfoClientSession; - TBusClientSessionConfig HostInfoConfig; - THostInfoProtocol HostInfoProto; - - TExampleProtocol DupDetectProto; - TBusServerSessionConfig DupDetectConfig; - - TNetAddr ListenAddr; - - TDupDetectModule(const TNetAddr& hostInfoAddr) - : TBusModule("DUPDETECTMODULE") - , HostInfoAddr(hostInfoAddr) - { - } - - bool Init(TBusMessageQueue* queue) { - HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig); - HostInfoClientSession->RegisterService("localhost"); - - return TBusModule::CreatePrivateSessions(queue); - } - - TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { - TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig); - - ListenAddr = TNetAddr("localhost", session->GetActualListenPort()); - - return session; - } - - /// entry point into module, first function to call - TJobHandler Start(TBusJob* job, TBusMessage* mess) override { - TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); - Y_UNUSED(dmess); - - THostInfoMessage* hmess = new THostInfoMessage(); - - /// send message to imaginary hostinfo server - job->Send(hmess, HostInfoClientSession, TReplyHandler(), 0, HostInfoAddr); - - return TJobHandler(&TDupDetectModule::ProcessHostInfo); - } - - /// next handler is executed when all outstanding requests from previous handler is completed - TJobHandler ProcessHostInfo(TBusJob* job, TBusMessage* mess) { - TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); - Y_UNUSED(dmess); - - THostInfoMessage* hmess = job->Get<THostInfoMessage>(); - THostInfoReply* hreply = job->Get<THostInfoReply>(); - EMessageStatus hstatus = job->GetStatus<THostInfoMessage>(); - Y_ASSERT(hmess != nullptr); - Y_ASSERT(hreply != nullptr); - Y_ASSERT(hstatus == MESSAGE_OK); - - return TJobHandler(&TDupDetectModule::Finish); - } - - /// last handler sends reply and returns NULL - TJobHandler Finish(TBusJob* job, TBusMessage* mess) { - Y_UNUSED(mess); - - TExampleResponse* reply = new TExampleResponse(&DupDetectProto.ResponseCount); - job->SendReply(reply); - - return nullptr; - } - }; + //////////////////////////////////////////////////////////////////// + /// \brief DupDetect protocol that common between client and server + //////////////////////////////////////////////////////////////////// + /// \brief HostInfo request class + class THostInfoMessage: public TBusMessage { + public: + THostInfoMessage() + : TBusMessage(TYPE_HOSTINFOREQUEST) + { + } + THostInfoMessage(ECreateUninitialized) + : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) + { + } + + ~THostInfoMessage() override { + } + }; + + //////////////////////////////////////////////////////////////////// + /// \brief HostInfo reply class + class THostInfoReply: public TBusMessage { + public: + THostInfoReply() + : TBusMessage(TYPE_HOSTINFORESPONSE) + { + } + THostInfoReply(ECreateUninitialized) + : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) + { + } + + ~THostInfoReply() override { + } + }; + + //////////////////////////////////////////////////////////////////// + /// \brief HostInfo protocol that common between client and server + class THostInfoProtocol: public TBusProtocol { + public: + THostInfoProtocol() + : TBusProtocol("HOSTINFO", 0) + { + } + /// serialized protocol specific data into TBusData + void Serialize(const TBusMessage* mess, TBuffer& data) override { + Y_UNUSED(data); + Y_UNUSED(mess); + } + + /// deserialized TBusData into new instance of the message + TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override { + Y_UNUSED(payload); + + if (messageType == TYPE_HOSTINFOREQUEST) { + return new THostInfoMessage(MESSAGE_CREATE_UNINITIALIZED); + } else if (messageType == TYPE_HOSTINFORESPONSE) { + return new THostInfoReply(MESSAGE_CREATE_UNINITIALIZED); + } else { + Y_FAIL("unknown"); + } + } + }; + + ////////////////////////////////////////////////////////////// + /// \brief HostInfo handler (should convert it to module too) + struct THostInfoHandler: public TBusServerHandlerError { + TBusServerSessionPtr Session; + TBusServerSessionConfig HostInfoConfig; + THostInfoProtocol HostInfoProto; + + THostInfoHandler(TBusMessageQueue* queue) { + Session = TBusServerSession::Create(&HostInfoProto, this, HostInfoConfig, queue); + } + + void OnMessage(TOnMessageContext& mess) override { + usleep(10 * 1000); /// pretend we are doing something + + TAutoPtr<THostInfoReply> reply(new THostInfoReply()); + + mess.SendReplyMove(reply); + } + + TNetAddr GetActualListenAddr() { + return TNetAddr("localhost", Session->GetActualListenPort()); + } + }; + + ////////////////////////////////////////////////////////////// + /// \brief DupDetect handler (should convert it to module too) + struct TDupDetectHandler: public TBusClientHandlerError { + TNetAddr ServerAddr; + + TBusClientSessionPtr DupDetect; + TBusClientSessionConfig DupDetectConfig; + TExampleProtocol DupDetectProto; + + int NumMessages; + int NumReplies; + + TDupDetectHandler(const TNetAddr& serverAddr, TBusMessageQueuePtr queue) + : ServerAddr(serverAddr) + { + DupDetect = TBusClientSession::Create(&DupDetectProto, this, DupDetectConfig, queue); + DupDetect->RegisterService("localhost"); + } + + void Work() { + NumMessages = 10; + NumReplies = 0; + + for (int i = 0; i < NumMessages; i++) { + TExampleRequest* mess = new TExampleRequest(&DupDetectProto.RequestCount); + DupDetect->SendMessage(mess, &ServerAddr); + } + } + + void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override { + Y_UNUSED(mess); + Y_UNUSED(reply); + NumReplies++; + } + }; + + ///////////////////////////////////////////////////////////////// + /// \brief DupDetect module + + struct TDupDetectModule: public TBusModule { + TNetAddr HostInfoAddr; + + TBusClientSessionPtr HostInfoClientSession; + TBusClientSessionConfig HostInfoConfig; + THostInfoProtocol HostInfoProto; + + TExampleProtocol DupDetectProto; + TBusServerSessionConfig DupDetectConfig; + + TNetAddr ListenAddr; + + TDupDetectModule(const TNetAddr& hostInfoAddr) + : TBusModule("DUPDETECTMODULE") + , HostInfoAddr(hostInfoAddr) + { + } + + bool Init(TBusMessageQueue* queue) { + HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig); + HostInfoClientSession->RegisterService("localhost"); + + return TBusModule::CreatePrivateSessions(queue); + } + + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { + TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig); + + ListenAddr = TNetAddr("localhost", session->GetActualListenPort()); + + return session; + } + + /// entry point into module, first function to call + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); + Y_UNUSED(dmess); + + THostInfoMessage* hmess = new THostInfoMessage(); + + /// send message to imaginary hostinfo server + job->Send(hmess, HostInfoClientSession, TReplyHandler(), 0, HostInfoAddr); + + return TJobHandler(&TDupDetectModule::ProcessHostInfo); + } + + /// next handler is executed when all outstanding requests from previous handler is completed + TJobHandler ProcessHostInfo(TBusJob* job, TBusMessage* mess) { + TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); + Y_UNUSED(dmess); + + THostInfoMessage* hmess = job->Get<THostInfoMessage>(); + THostInfoReply* hreply = job->Get<THostInfoReply>(); + EMessageStatus hstatus = job->GetStatus<THostInfoMessage>(); + Y_ASSERT(hmess != nullptr); + Y_ASSERT(hreply != nullptr); + Y_ASSERT(hstatus == MESSAGE_OK); + + return TJobHandler(&TDupDetectModule::Finish); + } + + /// last handler sends reply and returns NULL + TJobHandler Finish(TBusJob* job, TBusMessage* mess) { + Y_UNUSED(mess); + + TExampleResponse* reply = new TExampleResponse(&DupDetectProto.ResponseCount); + job->SendReply(reply); + + return nullptr; + } + }; } -} +} diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp index 9c21227e2b..a8e0cb960b 100644 --- a/library/cpp/messagebus/test/ut/one_way_ut.cpp +++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp @@ -1,7 +1,7 @@ /////////////////////////////////////////////////////////////////// /// \file -/// \brief Example of reply-less communication - +/// \brief Example of reply-less communication + /// This example demostrates how asynchronous message passing library /// can be used to send message and do not wait for reply back. /// The usage of reply-less communication should be restricted to @@ -9,19 +9,19 @@ /// utility. Removing replies from the communication removes any restriction /// on how many message can be send to server and rougue clients may overwelm /// server without thoughtput control. - + /// 1) To implement reply-less client \n - -/// Call NBus::TBusSession::AckMessage() -/// from within NBus::IMessageHandler::OnSent() handler when message has -/// gone into wire on client end. See example in NBus::NullClient::OnMessageSent(). + +/// Call NBus::TBusSession::AckMessage() +/// from within NBus::IMessageHandler::OnSent() handler when message has +/// gone into wire on client end. See example in NBus::NullClient::OnMessageSent(). /// Discard identity for reply message. - + /// 2) To implement reply-less server \n - + /// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage() -/// handler when message has been received on server end. -/// See example in NBus::NullServer::OnMessage(). +/// handler when message has been received on server end. +/// See example in NBus::NullServer::OnMessage(). /// Discard identity for reply message. #include <library/cpp/messagebus/test/helper/alloc_counter.h> @@ -41,7 +41,7 @@ using namespace NBus::NTest; //////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////// /// \brief Reply-less client and handler -struct NullClient : TBusClientHandlerError { +struct NullClient : TBusClientHandlerError { TNetAddr ServerAddr; TBusMessageQueuePtr Queue; @@ -69,11 +69,11 @@ struct NullClient : TBusClientHandlerError { } /// dispatch of requests is done here - void Work() { + void Work() { int batch = 10; - for (int i = 0; i < batch; i++) { - TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount); + for (int i = 0; i < batch; i++) { + TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount); mess->Data = "TADA"; Session->SendMessageOneWay(mess, &ServerAddr); } @@ -112,7 +112,7 @@ public: /// when message comes do not send reply, just acknowledge void OnMessage(TOnMessageContext& mess) override { - TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage()); + TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage()); Y_ASSERT(fmess->Data == "TADA"); @@ -126,7 +126,7 @@ public: void OnSent(TAutoPtr<TBusMessage> mess) override { Y_UNUSED(mess); Y_FAIL("This server does not sent replies"); - } + } }; Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { @@ -158,8 +158,8 @@ Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { TMessageTooLargeClient(unsigned port) : NullClient(TNetAddr("localhost", port), Config()) - { - } + { + } ~TMessageTooLargeClient() override { Session->Shutdown(); @@ -187,7 +187,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { client.GotTooLarge.WaitI(); } - struct TCheckTimeoutClient: public NullClient { + struct TCheckTimeoutClient: public NullClient { ~TCheckTimeoutClient() override { Session->Shutdown(); } @@ -200,10 +200,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { return sessionConfig; } - TCheckTimeoutClient(const TNetAddr& serverAddr) - : NullClient(serverAddr, SessionConfig()) - { - } + TCheckTimeoutClient(const TNetAddr& serverAddr) + : NullClient(serverAddr, SessionConfig()) + { + } TSystemEvent GotError; diff --git a/library/cpp/messagebus/test/ut/starter_ut.cpp b/library/cpp/messagebus/test/ut/starter_ut.cpp index dd4d3aaa5e..b9ff9a449d 100644 --- a/library/cpp/messagebus/test/ut/starter_ut.cpp +++ b/library/cpp/messagebus/test/ut/starter_ut.cpp @@ -8,7 +8,7 @@ using namespace NBus; using namespace NBus::NTest; Y_UNIT_TEST_SUITE(TBusStarterTest) { - struct TStartJobTestModule: public TExampleModule { + struct TStartJobTestModule: public TExampleModule { using TBusModule::CreateDefaultStarter; TAtomic StartCount; @@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(TBusStarterTest) { module.Shutdown(); } - struct TSleepModule: public TExampleServerModule { + struct TSleepModule: public TExampleServerModule { TSystemEvent MessageReceivedEvent; TJobHandler Start(TBusJob* job, TBusMessage* mess) override { @@ -110,7 +110,7 @@ Y_UNIT_TEST_SUITE(TBusStarterTest) { module.Shutdown(); } - struct TSendReplyModule: public TExampleServerModule { + struct TSendReplyModule: public TExampleServerModule { TSystemEvent MessageReceivedEvent; TJobHandler Start(TBusJob* job, TBusMessage* mess) override { diff --git a/library/cpp/messagebus/test/ut/sync_client_ut.cpp b/library/cpp/messagebus/test/ut/sync_client_ut.cpp index 400128193f..7a7189dbec 100644 --- a/library/cpp/messagebus/test/ut/sync_client_ut.cpp +++ b/library/cpp/messagebus/test/ut/sync_client_ut.cpp @@ -2,67 +2,67 @@ #include <library/cpp/messagebus/test/helper/object_count_check.h> namespace NBus { - namespace NTest { - using namespace std; + namespace NTest { + using namespace std; - //////////////////////////////////////////////////////////////////// - /// \brief Client for sending synchronous message to local server - struct TSyncClient { - TNetAddr ServerAddr; + //////////////////////////////////////////////////////////////////// + /// \brief Client for sending synchronous message to local server + struct TSyncClient { + TNetAddr ServerAddr; - TExampleProtocol Proto; - TBusMessageQueuePtr Bus; - TBusSyncClientSessionPtr Session; + TExampleProtocol Proto; + TBusMessageQueuePtr Bus; + TBusSyncClientSessionPtr Session; - int NumReplies; - int NumMessages; + int NumReplies; + int NumMessages; - /// constructor creates instances of queue, protocol and session - TSyncClient(const TNetAddr& serverAddr) - : ServerAddr(serverAddr) - { - /// create or get instance of message queue, need one per application - Bus = CreateMessageQueue(); + /// constructor creates instances of queue, protocol and session + TSyncClient(const TNetAddr& serverAddr) + : ServerAddr(serverAddr) + { + /// create or get instance of message queue, need one per application + Bus = CreateMessageQueue(); - NumReplies = 0; - NumMessages = 10; + NumReplies = 0; + NumMessages = 10; - /// register source/client session - TBusClientSessionConfig sessionConfig; - Session = Bus->CreateSyncSource(&Proto, sessionConfig); - Session->RegisterService("localhost"); - } + /// register source/client session + TBusClientSessionConfig sessionConfig; + Session = Bus->CreateSyncSource(&Proto, sessionConfig); + Session->RegisterService("localhost"); + } - ~TSyncClient() { - Session->Shutdown(); - } + ~TSyncClient() { + Session->Shutdown(); + } - /// dispatch of requests is done here - void Work() { - for (int i = 0; i < NumMessages; i++) { - THolder<TExampleRequest> mess(new TExampleRequest(&Proto.RequestCount)); - EMessageStatus status; - THolder<TBusMessage> reply(Session->SendSyncMessage(mess.Get(), status, &ServerAddr)); - if (!!reply) { - NumReplies++; - } - } - } - }; + /// dispatch of requests is done here + void Work() { + for (int i = 0; i < NumMessages; i++) { + THolder<TExampleRequest> mess(new TExampleRequest(&Proto.RequestCount)); + EMessageStatus status; + THolder<TBusMessage> reply(Session->SendSyncMessage(mess.Get(), status, &ServerAddr)); + if (!!reply) { + NumReplies++; + } + } + } + }; Y_UNIT_TEST_SUITE(SyncClientTest) { Y_UNIT_TEST(TestSync) { - TObjectCountCheck objectCountCheck; + TObjectCountCheck objectCountCheck; - TExampleServer server; - TSyncClient client(server.GetActualListenAddr()); - client.Work(); - // assert correct number of replies - UNIT_ASSERT_EQUAL(client.NumReplies, client.NumMessages); - // assert that there is no message left in flight - UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0); - UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0); - } + TExampleServer server; + TSyncClient client(server.GetActualListenAddr()); + client.Work(); + // assert correct number of replies + UNIT_ASSERT_EQUAL(client.NumReplies, client.NumMessages); + // assert that there is no message left in flight + UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0); + UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0); + } } } |