diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | 1f553f46fb4f3c5eec631352cdd900a0709016af (patch) | |
tree | a231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/test/ut | |
parent | c4de7efdedc25b49cbea74bd589eecb61b55b60a (diff) | |
download | ydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/test/ut')
-rw-r--r-- | library/cpp/messagebus/test/ut/count_down_latch.h | 52 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/messagebus_ut.cpp | 1336 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp | 242 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/module_client_ut.cpp | 568 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/module_server_ut.cpp | 182 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/moduletest.h | 16 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/one_way_ut.cpp | 234 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/starter_ut.cpp | 232 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/sync_client_ut.cpp | 10 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/ya.make | 4 |
10 files changed, 1438 insertions, 1438 deletions
diff --git a/library/cpp/messagebus/test/ut/count_down_latch.h b/library/cpp/messagebus/test/ut/count_down_latch.h index 5117db5731..fb6374e773 100644 --- a/library/cpp/messagebus/test/ut/count_down_latch.h +++ b/library/cpp/messagebus/test/ut/count_down_latch.h @@ -1,30 +1,30 @@ -#pragma once - -#include <util/system/atomic.h> -#include <util/system/event.h> - -class TCountDownLatch { -private: - TAtomic Current; +#pragma once + +#include <util/system/atomic.h> +#include <util/system/event.h> + +class TCountDownLatch { +private: + TAtomic Current; TSystemEvent EventObject; -public: - TCountDownLatch(unsigned initial) - : Current(initial) +public: + TCountDownLatch(unsigned initial) + : Current(initial) { } - - void CountDown() { - if (AtomicDecrement(Current) == 0) { - EventObject.Signal(); - } - } - - void Await() { - EventObject.Wait(); - } - - bool Await(TDuration timeout) { - return EventObject.WaitT(timeout); - } -}; + + void CountDown() { + if (AtomicDecrement(Current) == 0) { + EventObject.Signal(); + } + } + + void Await() { + EventObject.Wait(); + } + + bool Await(TDuration timeout) { + return EventObject.WaitT(timeout); + } +}; diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp index 040f9b7702..42d4a1e9b2 100644 --- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp +++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp @@ -8,104 +8,104 @@ #include <library/cpp/messagebus/misc/test_sync.h> -#include <util/network/sock.h> - +#include <util/network/sock.h> + #include <utility> using namespace NBus; using namespace NBus::NTest; -namespace { - struct TExampleClientSlowOnMessageSent: public TExampleClient { - TAtomic SentCompleted; - +namespace { + struct TExampleClientSlowOnMessageSent: public TExampleClient { + TAtomic SentCompleted; + TSystemEvent ReplyReceived; - - TExampleClientSlowOnMessageSent() - : SentCompleted(0) + + TExampleClientSlowOnMessageSent() + : SentCompleted(0) { } - + ~TExampleClientSlowOnMessageSent() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override { Y_VERIFY(AtomicGet(SentCompleted), "must be completed"); - - TExampleClient::OnReply(mess, reply); - - ReplyReceived.Signal(); - } - + + TExampleClient::OnReply(mess, reply); + + ReplyReceived.Signal(); + } + void OnMessageSent(TBusMessage*) override { - Sleep(TDuration::MilliSeconds(100)); - AtomicSet(SentCompleted, 1); - } - }; - -} - + Sleep(TDuration::MilliSeconds(100)); + AtomicSet(SentCompleted, 1); + } + }; + +} + Y_UNIT_TEST_SUITE(TMessageBusTests) { - void TestDestinationTemplate(bool useCompression, bool ackMessageBeforeReply, + void TestDestinationTemplate(bool useCompression, bool ackMessageBeforeReply, const TBusServerSessionConfig& sessionConfig) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - - TExampleClient client(sessionConfig); - client.CrashOnError = true; - - server.UseCompression = useCompression; - client.UseCompression = useCompression; - - server.AckMessageBeforeSendReply = ackMessageBeforeReply; - - client.SendMessagesWaitReplies(100, server.GetActualListenAddr()); - UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0); - UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0); - } - + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TExampleClient client(sessionConfig); + client.CrashOnError = true; + + server.UseCompression = useCompression; + client.UseCompression = useCompression; + + server.AckMessageBeforeSendReply = ackMessageBeforeReply; + + client.SendMessagesWaitReplies(100, server.GetActualListenAddr()); + UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0); + UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0); + } + Y_UNIT_TEST(TestDestination) { - TestDestinationTemplate(false, false, TBusServerSessionConfig()); - } - + TestDestinationTemplate(false, false, TBusServerSessionConfig()); + } + Y_UNIT_TEST(TestDestinationUsingAck) { - TestDestinationTemplate(false, true, TBusServerSessionConfig()); - } - + TestDestinationTemplate(false, true, TBusServerSessionConfig()); + } + Y_UNIT_TEST(TestDestinationWithCompression) { - TestDestinationTemplate(true, false, TBusServerSessionConfig()); - } - + TestDestinationTemplate(true, false, TBusServerSessionConfig()); + } + Y_UNIT_TEST(TestCork) { - TBusServerSessionConfig config; - config.SendThreshold = 1000000000000; - config.Cork = TDuration::MilliSeconds(10); - TestDestinationTemplate(false, false, config); - // TODO: test for cork hanging - } - + TBusServerSessionConfig config; + config.SendThreshold = 1000000000000; + config.Cork = TDuration::MilliSeconds(10); + TestDestinationTemplate(false, false, config); + // TODO: test for cork hanging + } + Y_UNIT_TEST(TestReconnect) { - if (!IsFixedPortTestAllowed()) { - return; - } - - TObjectCountCheck objectCountCheck; - - unsigned port = FixedPort; - TNetAddr serverAddr("localhost", port); - THolder<TExampleServer> server; - - TBusClientSessionConfig clientConfig; + if (!IsFixedPortTestAllowed()) { + return; + } + + TObjectCountCheck objectCountCheck; + + unsigned port = FixedPort; + TNetAddr serverAddr("localhost", port); + THolder<TExampleServer> server; + + TBusClientSessionConfig clientConfig; clientConfig.RetryInterval = 0; - TExampleClient client(clientConfig); - - server.Reset(new TExampleServer(port, "TExampleServer 1")); - - client.SendMessagesWaitReplies(17, serverAddr); - - server.Destroy(); + TExampleClient client(clientConfig); + + server.Reset(new TExampleServer(port, "TExampleServer 1")); + + client.SendMessagesWaitReplies(17, serverAddr); + + server.Destroy(); // Making the client to detect disconnection. client.SendMessages(1, serverAddr); @@ -116,11 +116,11 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { } UNIT_ASSERT_VALUES_EQUAL(MESSAGE_CONNECT_FAILED, error); - server.Reset(new TExampleServer(port, "TExampleServer 2")); - - client.SendMessagesWaitReplies(19, serverAddr); - } - + server.Reset(new TExampleServer(port, "TExampleServer 2")); + + client.SendMessagesWaitReplies(19, serverAddr); + } + struct TestNoServerImplClient: public TExampleClient { TTestSync TestSync; int failures = 0; @@ -145,8 +145,8 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { }; void TestNoServerImpl(unsigned port, bool oneWay) { - TNetAddr noServerAddr("localhost", port); - + TNetAddr noServerAddr("localhost", port); + TestNoServerImplClient client; int count = 0; @@ -174,167 +174,167 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { void HangingServerImpl(unsigned port) { TNetAddr noServerAddr("localhost", port); - TExampleClient client; - - int count = 0; - for (;; ++count) { - TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); - EMessageStatus status = client.Session->SendMessageAutoPtr(message, &noServerAddr); - if (status == MESSAGE_BUSY) { - break; - } - UNIT_ASSERT_VALUES_EQUAL(int(MESSAGE_OK), int(status)); - - if (count == 0) { - // lame way to wait until it is connected - Sleep(TDuration::MilliSeconds(10)); - } - } - - UNIT_ASSERT_VALUES_EQUAL(client.Session->GetConfig()->MaxInFlight, count); - } - + TExampleClient client; + + int count = 0; + for (;; ++count) { + TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); + EMessageStatus status = client.Session->SendMessageAutoPtr(message, &noServerAddr); + if (status == MESSAGE_BUSY) { + break; + } + UNIT_ASSERT_VALUES_EQUAL(int(MESSAGE_OK), int(status)); + + if (count == 0) { + // lame way to wait until it is connected + Sleep(TDuration::MilliSeconds(10)); + } + } + + UNIT_ASSERT_VALUES_EQUAL(client.Session->GetConfig()->MaxInFlight, count); + } + Y_UNIT_TEST(TestHangindServer) { - TObjectCountCheck objectCountCheck; - - THangingServer server(0); - + TObjectCountCheck objectCountCheck; + + THangingServer server(0); + HangingServerImpl(server.GetPort()); - } - + } + Y_UNIT_TEST(TestNoServer) { - TObjectCountCheck objectCountCheck; - + TObjectCountCheck objectCountCheck; + TestNoServerImpl(17, false); - } - + } + Y_UNIT_TEST(PauseInput) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - server.Session->PauseInput(true); - - TBusClientSessionConfig clientConfig; - clientConfig.MaxInFlight = 1000; - TExampleClient client(clientConfig); - - client.SendMessages(100, server.GetActualListenAddr()); - - server.TestSync.Check(0); - - server.Session->PauseInput(false); - - server.TestSync.WaitFor(100); - - client.WaitReplies(); - - server.Session->PauseInput(true); - - client.SendMessages(200, server.GetActualListenAddr()); - - server.TestSync.Check(100); - - server.Session->PauseInput(false); - - server.TestSync.WaitFor(300); - - client.WaitReplies(); - } - + TObjectCountCheck objectCountCheck; + + TExampleServer server; + server.Session->PauseInput(true); + + TBusClientSessionConfig clientConfig; + clientConfig.MaxInFlight = 1000; + TExampleClient client(clientConfig); + + client.SendMessages(100, server.GetActualListenAddr()); + + server.TestSync.Check(0); + + server.Session->PauseInput(false); + + server.TestSync.WaitFor(100); + + client.WaitReplies(); + + server.Session->PauseInput(true); + + client.SendMessages(200, server.GetActualListenAddr()); + + server.TestSync.Check(100); + + server.Session->PauseInput(false); + + server.TestSync.WaitFor(300); + + client.WaitReplies(); + } + struct TSendTimeoutCheckerExampleClient: public TExampleClient { - static TBusClientSessionConfig SessionConfig(bool periodLessThanConnectTimeout) { - TBusClientSessionConfig sessionConfig; - if (periodLessThanConnectTimeout) { - sessionConfig.SendTimeout = 1; - sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(50); - } else { - sessionConfig.SendTimeout = 50; - sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); - } - return sessionConfig; - } - - TSendTimeoutCheckerExampleClient(bool periodLessThanConnectTimeout) - : TExampleClient(SessionConfig(periodLessThanConnectTimeout)) + static TBusClientSessionConfig SessionConfig(bool periodLessThanConnectTimeout) { + TBusClientSessionConfig sessionConfig; + if (periodLessThanConnectTimeout) { + sessionConfig.SendTimeout = 1; + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(50); + } else { + sessionConfig.SendTimeout = 50; + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); + } + return sessionConfig; + } + + TSendTimeoutCheckerExampleClient(bool periodLessThanConnectTimeout) + : TExampleClient(SessionConfig(periodLessThanConnectTimeout)) { } - + ~TSendTimeoutCheckerExampleClient() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + TSystemEvent ErrorHappened; - + void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override { Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "got status: %s", ToString(status).data()); - ErrorHappened.Signal(); - } - }; - - void NoServer_SendTimeout_Callback_Impl(bool periodLessThanConnectTimeout) { - TObjectCountCheck objectCountCheck; - - TNetAddr serverAddr("localhost", 17); - - TSendTimeoutCheckerExampleClient client(periodLessThanConnectTimeout); - - client.SendMessages(1, serverAddr); - - client.ErrorHappened.WaitI(); - } - + ErrorHappened.Signal(); + } + }; + + void NoServer_SendTimeout_Callback_Impl(bool periodLessThanConnectTimeout) { + TObjectCountCheck objectCountCheck; + + TNetAddr serverAddr("localhost", 17); + + TSendTimeoutCheckerExampleClient client(periodLessThanConnectTimeout); + + client.SendMessages(1, serverAddr); + + client.ErrorHappened.WaitI(); + } + Y_UNIT_TEST(NoServer_SendTimeout_Callback_PeriodLess) { - NoServer_SendTimeout_Callback_Impl(true); - } - + NoServer_SendTimeout_Callback_Impl(true); + } + Y_UNIT_TEST(NoServer_SendTimeout_Callback_TimeoutLess) { - NoServer_SendTimeout_Callback_Impl(false); - } - + NoServer_SendTimeout_Callback_Impl(false); + } + Y_UNIT_TEST(TestOnReplyCalledAfterOnMessageSent) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - TNetAddr serverAddr = server.GetActualListenAddr(); - TExampleClientSlowOnMessageSent client; - - TAutoPtr<TExampleRequest> message(new TExampleRequest(&client.Proto.RequestCount)); - EMessageStatus s = client.Session->SendMessageAutoPtr(message, &serverAddr); - UNIT_ASSERT_EQUAL(s, MESSAGE_OK); - - UNIT_ASSERT(client.ReplyReceived.WaitT(TDuration::Seconds(5))); - } - - struct TDelayReplyServer: public TBusServerHandlerError { - TBusMessageQueuePtr Bus; - TExampleProtocol Proto; + TObjectCountCheck objectCountCheck; + + TExampleServer server; + TNetAddr serverAddr = server.GetActualListenAddr(); + TExampleClientSlowOnMessageSent client; + + TAutoPtr<TExampleRequest> message(new TExampleRequest(&client.Proto.RequestCount)); + EMessageStatus s = client.Session->SendMessageAutoPtr(message, &serverAddr); + UNIT_ASSERT_EQUAL(s, MESSAGE_OK); + + UNIT_ASSERT(client.ReplyReceived.WaitT(TDuration::Seconds(5))); + } + + struct TDelayReplyServer: public TBusServerHandlerError { + TBusMessageQueuePtr Bus; + TExampleProtocol Proto; TSystemEvent MessageReceivedEvent; // 1 wait for 1 message - TBusServerSessionPtr Session; + TBusServerSessionPtr Session; TMutex Lock_; TDeque<TAutoPtr<TOnMessageContext>> DelayedMessages; - + TDelayReplyServer() : MessageReceivedEvent(TEventResetType::rAuto) { - Bus = CreateMessageQueue("TDelayReplyServer"); - TBusServerSessionConfig sessionConfig; - sessionConfig.SendTimeout = 1000; - sessionConfig.TotalTimeout = 2001; - Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); - if (!Session) { - ythrow yexception() << "Failed to create destination session"; - } - } - + Bus = CreateMessageQueue("TDelayReplyServer"); + TBusServerSessionConfig sessionConfig; + sessionConfig.SendTimeout = 1000; + sessionConfig.TotalTimeout = 2001; + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); + if (!Session) { + ythrow yexception() << "Failed to create destination session"; + } + } + void OnMessage(TOnMessageContext& mess) override { Y_VERIFY(mess.IsConnectionAlive(), "connection should be alive here"); TAutoPtr<TOnMessageContext> delayedMsg(new TOnMessageContext); delayedMsg->Swap(mess); auto g(Guard(Lock_)); DelayedMessages.push_back(delayedMsg); - MessageReceivedEvent.Signal(); + MessageReceivedEvent.Signal(); } - + bool CheckClientIsAlive() { auto g(Guard(Lock_)); for (auto& delayedMessage : DelayedMessages) { @@ -370,252 +370,252 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { msg.SendReplyMove(reply); } } - + size_t GetDelayedMessageCount() const { auto g(Guard(Lock_)); return DelayedMessages.size(); - } - + } + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { Y_UNUSED(mess); Y_VERIFY(status == MESSAGE_SHUTDOWN, "only shutdown allowed, got %s", ToString(status).data()); - } - }; - + } + }; + Y_UNIT_TEST(TestReplyCalledAfterClientDisconnected) { - TObjectCountCheck objectCountCheck; - - TDelayReplyServer server; - - THolder<TExampleClient> client(new TExampleClient); - - client->SendMessages(1, TNetAddr("localhost", server.Session->GetActualListenPort())); - - UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); - - UNIT_ASSERT_VALUES_EQUAL(1, server.Session->GetInFlight()); - - client.Destroy(); - + TObjectCountCheck objectCountCheck; + + TDelayReplyServer server; + + THolder<TExampleClient> client(new TExampleClient); + + client->SendMessages(1, TNetAddr("localhost", server.Session->GetActualListenPort())); + + UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); + + UNIT_ASSERT_VALUES_EQUAL(1, server.Session->GetInFlight()); + + client.Destroy(); + UNIT_WAIT_FOR(server.CheckClientIsDead()); - + server.ReplyToDelayedMessages(); - // wait until all server message are delivered - UNIT_WAIT_FOR(0 == server.Session->GetInFlight()); - } - - struct TPackUnpackServer: public TBusServerHandlerError { - TBusMessageQueuePtr Bus; - TExampleProtocol Proto; + // wait until all server message are delivered + UNIT_WAIT_FOR(0 == server.Session->GetInFlight()); + } + + struct TPackUnpackServer: public TBusServerHandlerError { + TBusMessageQueuePtr Bus; + TExampleProtocol Proto; TSystemEvent MessageReceivedEvent; TSystemEvent ClientDiedEvent; - TBusServerSessionPtr Session; - - TPackUnpackServer() { - Bus = CreateMessageQueue("TPackUnpackServer"); - TBusServerSessionConfig sessionConfig; - Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); - } - + TBusServerSessionPtr Session; + + TPackUnpackServer() { + Bus = CreateMessageQueue("TPackUnpackServer"); + TBusServerSessionConfig sessionConfig; + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); + } + void OnMessage(TOnMessageContext& mess) override { - TBusIdentity ident; - mess.AckMessage(ident); - - char packed[BUS_IDENTITY_PACKED_SIZE]; - ident.Pack(packed); - TBusIdentity resurrected; - resurrected.Unpack(packed); - - mess.GetSession()->SendReply(resurrected, new TExampleResponse(&Proto.ResponseCount)); - } - + TBusIdentity ident; + mess.AckMessage(ident); + + char packed[BUS_IDENTITY_PACKED_SIZE]; + ident.Pack(packed); + TBusIdentity resurrected; + resurrected.Unpack(packed); + + mess.GetSession()->SendReply(resurrected, new TExampleResponse(&Proto.ResponseCount)); + } + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { Y_UNUSED(mess); Y_VERIFY(status == MESSAGE_SHUTDOWN, "only shutdown allowed"); - } - }; - + } + }; + Y_UNIT_TEST(PackUnpack) { - TObjectCountCheck objectCountCheck; - - TPackUnpackServer server; - - THolder<TExampleClient> client(new TExampleClient); - - client->SendMessagesWaitReplies(1, TNetAddr("localhost", server.Session->GetActualListenPort())); - } - + TObjectCountCheck objectCountCheck; + + TPackUnpackServer server; + + THolder<TExampleClient> client(new TExampleClient); + + client->SendMessagesWaitReplies(1, TNetAddr("localhost", server.Session->GetActualListenPort())); + } + Y_UNIT_TEST(ClientRequestTooLarge) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - - TBusClientSessionConfig clientConfig; - clientConfig.MaxMessageSize = 100; - TExampleClient client(clientConfig); - - client.DataSize = 10; - client.SendMessagesWaitReplies(1, server.GetActualListenAddr()); - - client.DataSize = 1000; - client.SendMessages(1, server.GetActualListenAddr()); - client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE); - - client.DataSize = 20; - client.SendMessagesWaitReplies(10, server.GetActualListenAddr()); - - client.DataSize = 10000; - client.SendMessages(1, server.GetActualListenAddr()); - client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE); - } - + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TBusClientSessionConfig clientConfig; + clientConfig.MaxMessageSize = 100; + TExampleClient client(clientConfig); + + client.DataSize = 10; + client.SendMessagesWaitReplies(1, server.GetActualListenAddr()); + + client.DataSize = 1000; + client.SendMessages(1, server.GetActualListenAddr()); + client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE); + + client.DataSize = 20; + client.SendMessagesWaitReplies(10, server.GetActualListenAddr()); + + client.DataSize = 10000; + client.SendMessages(1, server.GetActualListenAddr()); + client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE); + } + struct TServerForResponseTooLarge: public TExampleServer { - TTestSync TestSync; - - static TBusServerSessionConfig Config() { - TBusServerSessionConfig config; - config.MaxMessageSize = 100; - return config; - } - - TServerForResponseTooLarge() - : TExampleServer("TServerForResponseTooLarge", Config()) + TTestSync TestSync; + + static TBusServerSessionConfig Config() { + TBusServerSessionConfig config; + config.MaxMessageSize = 100; + return config; + } + + TServerForResponseTooLarge() + : TExampleServer("TServerForResponseTooLarge", Config()) { } - + ~TServerForResponseTooLarge() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnMessage(TOnMessageContext& mess) override { - TAutoPtr<TBusMessage> response; - - if (TestSync.Get() == 0) { - TestSync.CheckAndIncrement(0); - response.Reset(new TExampleResponse(&Proto.ResponseCount, 1000)); - } else { - TestSync.WaitForAndIncrement(3); - response.Reset(new TExampleResponse(&Proto.ResponseCount, 10)); - } - - mess.SendReplyMove(response); - } - + TAutoPtr<TBusMessage> response; + + if (TestSync.Get() == 0) { + TestSync.CheckAndIncrement(0); + response.Reset(new TExampleResponse(&Proto.ResponseCount, 1000)); + } else { + TestSync.WaitForAndIncrement(3); + response.Reset(new TExampleResponse(&Proto.ResponseCount, 10)); + } + + mess.SendReplyMove(response); + } + void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override { - TestSync.WaitForAndIncrement(1); - + TestSync.WaitForAndIncrement(1); + Y_VERIFY(status == MESSAGE_MESSAGE_TOO_LARGE, "status"); - } - }; - + } + }; + Y_UNIT_TEST(ServerResponseTooLarge) { - TObjectCountCheck objectCountCheck; - - TServerForResponseTooLarge server; - - TExampleClient client; - client.DataSize = 10; - - client.SendMessages(1, server.GetActualListenAddr()); - server.TestSync.WaitForAndIncrement(2); - client.ResetCounters(); - - client.SendMessages(1, server.GetActualListenAddr()); - - client.WorkDone.WaitI(); - - server.TestSync.CheckAndIncrement(4); - - UNIT_ASSERT_VALUES_EQUAL(1, client.Session->GetInFlight()); - } - + TObjectCountCheck objectCountCheck; + + TServerForResponseTooLarge server; + + TExampleClient client; + client.DataSize = 10; + + client.SendMessages(1, server.GetActualListenAddr()); + server.TestSync.WaitForAndIncrement(2); + client.ResetCounters(); + + client.SendMessages(1, server.GetActualListenAddr()); + + client.WorkDone.WaitI(); + + server.TestSync.CheckAndIncrement(4); + + UNIT_ASSERT_VALUES_EQUAL(1, client.Session->GetInFlight()); + } + struct TServerForRequestTooLarge: public TExampleServer { - TTestSync TestSync; - - static TBusServerSessionConfig Config() { - TBusServerSessionConfig config; - config.MaxMessageSize = 100; - return config; - } - - TServerForRequestTooLarge() - : TExampleServer("TServerForRequestTooLarge", Config()) + TTestSync TestSync; + + static TBusServerSessionConfig Config() { + TBusServerSessionConfig config; + config.MaxMessageSize = 100; + return config; + } + + TServerForRequestTooLarge() + : TExampleServer("TServerForRequestTooLarge", Config()) { } - + ~TServerForRequestTooLarge() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnMessage(TOnMessageContext& req) override { - unsigned n = TestSync.Get(); - if (n < 2) { - TestSync.CheckAndIncrement(n); - TAutoPtr<TExampleResponse> resp(new TExampleResponse(&Proto.ResponseCount, 10)); - req.SendReplyMove(resp); - } else { + unsigned n = TestSync.Get(); + if (n < 2) { + TestSync.CheckAndIncrement(n); + TAutoPtr<TExampleResponse> resp(new TExampleResponse(&Proto.ResponseCount, 10)); + req.SendReplyMove(resp); + } else { Y_FAIL("wrong"); - } - } - }; - + } + } + }; + Y_UNIT_TEST(ServerRequestTooLarge) { - TObjectCountCheck objectCountCheck; - - TServerForRequestTooLarge server; - - TExampleClient client; - client.DataSize = 10; - - client.SendMessagesWaitReplies(2, server.GetActualListenAddr()); - - server.TestSync.CheckAndIncrement(2); - - client.DataSize = 200; - client.SendMessages(1, server.GetActualListenAddr()); - // server closes connection, so MESSAGE_DELIVERY_FAILED is returned to client - client.WaitForError(MESSAGE_DELIVERY_FAILED); - } - + TObjectCountCheck objectCountCheck; + + TServerForRequestTooLarge server; + + TExampleClient client; + client.DataSize = 10; + + client.SendMessagesWaitReplies(2, server.GetActualListenAddr()); + + server.TestSync.CheckAndIncrement(2); + + client.DataSize = 200; + client.SendMessages(1, server.GetActualListenAddr()); + // server closes connection, so MESSAGE_DELIVERY_FAILED is returned to client + client.WaitForError(MESSAGE_DELIVERY_FAILED); + } + Y_UNIT_TEST(ClientResponseTooLarge) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - - server.DataSize = 10; - - TBusClientSessionConfig clientSessionConfig; - clientSessionConfig.MaxMessageSize = 100; - TExampleClient client(clientSessionConfig); - client.DataSize = 10; - - client.SendMessagesWaitReplies(3, server.GetActualListenAddr()); - - server.DataSize = 1000; - - client.SendMessages(1, server.GetActualListenAddr()); - client.WaitForError(MESSAGE_DELIVERY_FAILED); - } - + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + server.DataSize = 10; + + TBusClientSessionConfig clientSessionConfig; + clientSessionConfig.MaxMessageSize = 100; + TExampleClient client(clientSessionConfig); + client.DataSize = 10; + + client.SendMessagesWaitReplies(3, server.GetActualListenAddr()); + + server.DataSize = 1000; + + client.SendMessages(1, server.GetActualListenAddr()); + client.WaitForError(MESSAGE_DELIVERY_FAILED); + } + Y_UNIT_TEST(ServerUnknownMessage) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - TNetAddr serverAddr = server.GetActualListenAddr(); - - TExampleClient client; - - client.SendMessagesWaitReplies(2, serverAddr); - - TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount)); - req->GetHeader()->Type = 11; - client.Session->SendMessageAutoPtr(req, &serverAddr); - client.MessageCount = 1; - - client.WaitForError(MESSAGE_DELIVERY_FAILED); - } - + TObjectCountCheck objectCountCheck; + + TExampleServer server; + TNetAddr serverAddr = server.GetActualListenAddr(); + + TExampleClient client; + + client.SendMessagesWaitReplies(2, serverAddr); + + TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount)); + req->GetHeader()->Type = 11; + client.Session->SendMessageAutoPtr(req, &serverAddr); + client.MessageCount = 1; + + client.WaitForError(MESSAGE_DELIVERY_FAILED); + } + Y_UNIT_TEST(ServerMessageReservedIds) { TObjectCountCheck objectCountCheck; @@ -642,18 +642,18 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { } Y_UNIT_TEST(TestGetInFlightForDestination) { - TObjectCountCheck objectCountCheck; - - TDelayReplyServer server; - - TExampleClient client; - - TNetAddr addr("localhost", server.Session->GetActualListenPort()); - - UNIT_ASSERT_VALUES_EQUAL(size_t(0), client.Session->GetInFlight(addr)); - - client.SendMessages(2, &addr); - + TObjectCountCheck objectCountCheck; + + TDelayReplyServer server; + + TExampleClient client; + + TNetAddr addr("localhost", server.Session->GetActualListenPort()); + + UNIT_ASSERT_VALUES_EQUAL(size_t(0), client.Session->GetInFlight(addr)); + + client.SendMessages(2, &addr); + for (size_t i = 0; i < 5; ++i) { // One MessageReceivedEvent indicates one message, we need to wait for two UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); @@ -662,98 +662,98 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { } } UNIT_ASSERT_VALUES_EQUAL(server.GetDelayedMessageCount(), 2); - - size_t inFlight = client.Session->GetInFlight(addr); - // 4 is for messagebus1 that adds inFlight counter twice for some reason - UNIT_ASSERT(inFlight == 2 || inFlight == 4); - + + size_t inFlight = client.Session->GetInFlight(addr); + // 4 is for messagebus1 that adds inFlight counter twice for some reason + UNIT_ASSERT(inFlight == 2 || inFlight == 4); + UNIT_ASSERT(server.CheckClientIsAlive()); - + server.ReplyToDelayedMessages(); - client.WaitReplies(); - } - + client.WaitReplies(); + } + struct TResetAfterSendOneWayErrorInCallbackClient: public TExampleClient { - TTestSync TestSync; - - static TBusClientSessionConfig SessionConfig() { - TBusClientSessionConfig config; - // 1 ms is not enough when test is running under valgrind - config.ConnectTimeout = 10; - config.SendTimeout = 10; - config.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); - return config; - } - - TResetAfterSendOneWayErrorInCallbackClient() - : TExampleClient(SessionConfig()) - { - } - + TTestSync TestSync; + + static TBusClientSessionConfig SessionConfig() { + TBusClientSessionConfig config; + // 1 ms is not enough when test is running under valgrind + config.ConnectTimeout = 10; + config.SendTimeout = 10; + config.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); + return config; + } + + TResetAfterSendOneWayErrorInCallbackClient() + : TExampleClient(SessionConfig()) + { + } + ~TResetAfterSendOneWayErrorInCallbackClient() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { - TestSync.WaitForAndIncrement(0); + TestSync.WaitForAndIncrement(0); Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "must be connection failed, got %s", ToString(status).data()); - mess.Destroy(); - TestSync.CheckAndIncrement(1); - } - }; - + mess.Destroy(); + TestSync.CheckAndIncrement(1); + } + }; + Y_UNIT_TEST(ResetAfterSendOneWayErrorInCallback) { - TObjectCountCheck objectCountCheck; - - TNetAddr noServerAddr("localhost", 17); - - TResetAfterSendOneWayErrorInCallbackClient client; - - EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr); - UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); - - client.TestSync.WaitForAndIncrement(2); - } - + TObjectCountCheck objectCountCheck; + + TNetAddr noServerAddr("localhost", 17); + + TResetAfterSendOneWayErrorInCallbackClient client; + + EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr); + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); + + client.TestSync.WaitForAndIncrement(2); + } + struct TResetAfterSendMessageOneWayDuringShutdown: public TExampleClient { - TTestSync TestSync; - + TTestSync TestSync; + ~TResetAfterSendMessageOneWayDuringShutdown() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnError(TAutoPtr<TBusMessage> message, EMessageStatus status) override { - TestSync.CheckAndIncrement(0); - + TestSync.CheckAndIncrement(0); + Y_VERIFY(status == MESSAGE_CONNECT_FAILED, "must be MESSAGE_CONNECT_FAILED, got %s", ToString(status).data()); - - // check reset is possible here - message->Reset(); - + + // check reset is possible here + message->Reset(); + // intentionally don't destroy the message // we will try to resend it Y_UNUSED(message.Release()); - TestSync.CheckAndIncrement(1); - } - }; - + TestSync.CheckAndIncrement(1); + } + }; + Y_UNIT_TEST(ResetAfterSendMessageOneWayDuringShutdown) { - TObjectCountCheck objectCountCheck; - - TNetAddr noServerAddr("localhost", 17); - - TResetAfterSendMessageOneWayDuringShutdown client; - + TObjectCountCheck objectCountCheck; + + TNetAddr noServerAddr("localhost", 17); + + TResetAfterSendMessageOneWayDuringShutdown client; + TExampleRequest* message = new TExampleRequest(&client.Proto.RequestCount); EMessageStatus ok = client.Session->SendMessageOneWay(message, &noServerAddr); - UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); - + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); + client.TestSync.WaitForAndIncrement(2); - client.Session->Shutdown(); - + client.Session->Shutdown(); + ok = client.Session->SendMessageOneWay(message); Y_VERIFY(ok == MESSAGE_SHUTDOWN, "must be shutdown when sending during shutdown, got %s", ToString(ok).data()); @@ -762,148 +762,148 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.TestSync.CheckAndIncrement(3); delete message; - } - + } + Y_UNIT_TEST(ResetAfterSendOneWayErrorInReturn) { - TObjectCountCheck objectCountCheck; - + TObjectCountCheck objectCountCheck; + TestNoServerImpl(17, true); - } - + } + struct TResetAfterSendOneWaySuccessClient: public TExampleClient { - TTestSync TestSync; - + TTestSync TestSync; + ~TResetAfterSendOneWaySuccessClient() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnMessageSentOneWay(TAutoPtr<TBusMessage> sent) override { - TestSync.WaitForAndIncrement(0); - sent->Reset(); - TestSync.CheckAndIncrement(1); - } - }; - + TestSync.WaitForAndIncrement(0); + sent->Reset(); + TestSync.CheckAndIncrement(1); + } + }; + Y_UNIT_TEST(ResetAfterSendOneWaySuccess) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - TNetAddr serverAddr = server.GetActualListenAddr(); - - TResetAfterSendOneWaySuccessClient client; - - EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &serverAddr); - UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); - // otherwize message might go to OnError(MESSAGE_SHUTDOWN) - server.WaitForOnMessageCount(1); - - client.TestSync.WaitForAndIncrement(2); - } - + TObjectCountCheck objectCountCheck; + + TExampleServer server; + TNetAddr serverAddr = server.GetActualListenAddr(); + + TResetAfterSendOneWaySuccessClient client; + + EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &serverAddr); + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); + // otherwize message might go to OnError(MESSAGE_SHUTDOWN) + server.WaitForOnMessageCount(1); + + client.TestSync.WaitForAndIncrement(2); + } + Y_UNIT_TEST(GetStatus) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - - TExampleClient client; - // make sure connected - client.SendMessagesWaitReplies(3, server.GetActualListenAddr()); - - server.Bus->GetStatus(); - server.Bus->GetStatus(); - server.Bus->GetStatus(); - - client.Bus->GetStatus(); - client.Bus->GetStatus(); - client.Bus->GetStatus(); - } - + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TExampleClient client; + // make sure connected + client.SendMessagesWaitReplies(3, server.GetActualListenAddr()); + + server.Bus->GetStatus(); + server.Bus->GetStatus(); + server.Bus->GetStatus(); + + client.Bus->GetStatus(); + client.Bus->GetStatus(); + client.Bus->GetStatus(); + } + Y_UNIT_TEST(BindOnRandomPort) { - TObjectCountCheck objectCountCheck; - - TBusServerSessionConfig serverConfig; - TExampleServer server; - - TExampleClient client; - TNetAddr addr(TNetAddr("127.0.0.1", server.Session->GetActualListenPort())); - client.SendMessagesWaitReplies(3, &addr); - } - + TObjectCountCheck objectCountCheck; + + TBusServerSessionConfig serverConfig; + TExampleServer server; + + TExampleClient client; + TNetAddr addr(TNetAddr("127.0.0.1", server.Session->GetActualListenPort())); + client.SendMessagesWaitReplies(3, &addr); + } + Y_UNIT_TEST(UnbindOnShutdown) { - TBusMessageQueuePtr queue(CreateMessageQueue()); - - TExampleProtocol proto; - TBusServerHandlerError handler; - TBusServerSessionPtr session = TBusServerSession::Create( + TBusMessageQueuePtr queue(CreateMessageQueue()); + + TExampleProtocol proto; + TBusServerHandlerError handler; + TBusServerSessionPtr session = TBusServerSession::Create( &proto, &handler, TBusServerSessionConfig(), queue); - - unsigned port = session->GetActualListenPort(); - UNIT_ASSERT(port > 0); - - session->Shutdown(); - - // fails is Shutdown() didn't unbind - THangingServer hangingServer(port); - } - + + unsigned port = session->GetActualListenPort(); + UNIT_ASSERT(port > 0); + + session->Shutdown(); + + // fails is Shutdown() didn't unbind + THangingServer hangingServer(port); + } + Y_UNIT_TEST(VersionNegotiation) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - - TSockAddrInet addr(IpFromString("127.0.0.1"), server.Session->GetActualListenPort()); - - TInetStreamSocket socket; - int r1 = socket.Connect(&addr); - UNIT_ASSERT(r1 >= 0); - - TStreamSocketOutput output(&socket); - - TBusHeader request; - Zero(request); - request.Size = sizeof(request); - request.SetVersionInternal(0xF); // max - output.Write(&request, sizeof(request)); - + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TSockAddrInet addr(IpFromString("127.0.0.1"), server.Session->GetActualListenPort()); + + TInetStreamSocket socket; + int r1 = socket.Connect(&addr); + UNIT_ASSERT(r1 >= 0); + + TStreamSocketOutput output(&socket); + + TBusHeader request; + Zero(request); + request.Size = sizeof(request); + request.SetVersionInternal(0xF); // max + output.Write(&request, sizeof(request)); + UNIT_ASSERT_VALUES_EQUAL(IsVersionNegotiation(request), true); - TStreamSocketInput input(&socket); - - TBusHeader response; - size_t pos = 0; - - while (pos < sizeof(response)) { + TStreamSocketInput input(&socket); + + TBusHeader response; + size_t pos = 0; + + while (pos < sizeof(response)) { size_t count = input.Read(((char*)&response) + pos, sizeof(response) - pos); - pos += count; - } - - UNIT_ASSERT_VALUES_EQUAL(sizeof(response), pos); - - UNIT_ASSERT_VALUES_EQUAL(YBUS_VERSION, response.GetVersionInternal()); - } - + pos += count; + } + + UNIT_ASSERT_VALUES_EQUAL(sizeof(response), pos); + + UNIT_ASSERT_VALUES_EQUAL(YBUS_VERSION, response.GetVersionInternal()); + } + struct TOnConnectionEventClient: public TExampleClient { - TTestSync Sync; - + TTestSync Sync; + ~TOnConnectionEventClient() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnClientConnectionEvent(const TClientConnectionEvent& event) override { - if (Sync.Get() > 2) { - // Test OnClientConnectionEvent_Disconnect is broken. - // Sometimes reconnect happens during server shutdown - // when acceptor connections is still alive, and - // server connection is already closed - return; - } - - if (event.GetType() == TClientConnectionEvent::CONNECTED) { - Sync.WaitForAndIncrement(0); - } else if (event.GetType() == TClientConnectionEvent::DISCONNECTED) { - Sync.WaitForAndIncrement(2); - } - } + if (Sync.Get() > 2) { + // Test OnClientConnectionEvent_Disconnect is broken. + // Sometimes reconnect happens during server shutdown + // when acceptor connections is still alive, and + // server connection is already closed + return; + } + + if (event.GetType() == TClientConnectionEvent::CONNECTED) { + Sync.WaitForAndIncrement(0); + } else if (event.GetType() == TClientConnectionEvent::DISCONNECTED) { + Sync.WaitForAndIncrement(2); + } + } void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override { // We do not check for message errors in this test. @@ -911,8 +911,8 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override { } - }; - + }; + struct TOnConnectionEventServer: public TExampleServer { TOnConnectionEventServer() : TExampleServer("TOnConnectionEventServer") @@ -929,39 +929,39 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { }; Y_UNIT_TEST(OnClientConnectionEvent_Shutdown) { - TObjectCountCheck objectCountCheck; - + TObjectCountCheck objectCountCheck; + TOnConnectionEventServer server; - - TOnConnectionEventClient client; - - TNetAddr addr("127.0.0.1", server.Session->GetActualListenPort()); - + + TOnConnectionEventClient client; + + TNetAddr addr("127.0.0.1", server.Session->GetActualListenPort()); + client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr); - - client.Sync.WaitForAndIncrement(1); - - client.Session->Shutdown(); - - client.Sync.WaitForAndIncrement(3); - } - + + client.Sync.WaitForAndIncrement(1); + + client.Session->Shutdown(); + + client.Sync.WaitForAndIncrement(3); + } + Y_UNIT_TEST(OnClientConnectionEvent_Disconnect) { - TObjectCountCheck objectCountCheck; - + TObjectCountCheck objectCountCheck; + THolder<TOnConnectionEventServer> server(new TOnConnectionEventServer); - - TOnConnectionEventClient client; - TNetAddr addr("127.0.0.1", server->Session->GetActualListenPort()); - + + TOnConnectionEventClient client; + TNetAddr addr("127.0.0.1", server->Session->GetActualListenPort()); + client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr); - - client.Sync.WaitForAndIncrement(1); - - server.Destroy(); - - client.Sync.WaitForAndIncrement(3); - } + + client.Sync.WaitForAndIncrement(1); + + server.Destroy(); + + client.Sync.WaitForAndIncrement(3); + } struct TServerForQuotaWake: public TExampleServer { TSystemEvent GoOn; @@ -1042,7 +1042,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { start = now; // TODO: properly check that server is blocked - } else if (start + TDuration::MilliSeconds(100) < now) { + } else if (start + TDuration::MilliSeconds(100) < now) { break; } } diff --git a/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp index 4083cf3b7b..9c1224ada9 100644 --- a/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp +++ b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp @@ -1,143 +1,143 @@ #include <library/cpp/testing/unittest/registar.h> - + #include <library/cpp/messagebus/test/helper/example.h> #include <library/cpp/messagebus/test/helper/message_handler_error.h> - + #include <library/cpp/messagebus/misc/test_sync.h> #include <library/cpp/messagebus/oldmodule/module.h> -using namespace NBus; -using namespace NBus::NTest; - +using namespace NBus; +using namespace NBus::NTest; + Y_UNIT_TEST_SUITE(ModuleClientOneWay) { - struct TTestServer: public TBusServerHandlerError { - TExampleProtocol Proto; - - TTestSync* const TestSync; - - TBusMessageQueuePtr Queue; - TBusServerSessionPtr ServerSession; - - TTestServer(TTestSync* testSync) - : TestSync(testSync) - { - Queue = CreateMessageQueue(); - ServerSession = TBusServerSession::Create(&Proto, this, TBusServerSessionConfig(), Queue); - } - + struct TTestServer: public TBusServerHandlerError { + TExampleProtocol Proto; + + TTestSync* const TestSync; + + TBusMessageQueuePtr Queue; + TBusServerSessionPtr ServerSession; + + TTestServer(TTestSync* testSync) + : TestSync(testSync) + { + Queue = CreateMessageQueue(); + ServerSession = TBusServerSession::Create(&Proto, this, TBusServerSessionConfig(), Queue); + } + void OnMessage(TOnMessageContext& context) override { - TestSync->WaitForAndIncrement(1); - context.ForgetRequest(); - } - }; - - struct TClientModule: public TBusModule { - TExampleProtocol Proto; - - TTestSync* const TestSync; - unsigned const Port; - - TBusClientSessionPtr ClientSession; - - TClientModule(TTestSync* testSync, unsigned port) - : TBusModule("m") - , TestSync(testSync) - , Port(port) + TestSync->WaitForAndIncrement(1); + context.ForgetRequest(); + } + }; + + struct TClientModule: public TBusModule { + TExampleProtocol Proto; + + TTestSync* const TestSync; + unsigned const Port; + + TBusClientSessionPtr ClientSession; + + TClientModule(TTestSync* testSync, unsigned port) + : TBusModule("m") + , TestSync(testSync) + , Port(port) { } - + TJobHandler Start(TBusJob* job, TBusMessage*) override { - TestSync->WaitForAndIncrement(0); - - job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", Port)); - - return &TClientModule::Sent; - } - - TJobHandler Sent(TBusJob* job, TBusMessage*) { - TestSync->WaitForAndIncrement(2); - job->Cancel(MESSAGE_DONT_ASK); + TestSync->WaitForAndIncrement(0); + + job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", Port)); + + return &TClientModule::Sent; + } + + TJobHandler Sent(TBusJob* job, TBusMessage*) { + TestSync->WaitForAndIncrement(2); + job->Cancel(MESSAGE_DONT_ASK); return nullptr; - } - + } + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { - ClientSession = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig()); + ClientSession = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig()); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(Simple) { - TTestSync testSync; - - TTestServer server(&testSync); - - TBusMessageQueuePtr queue = CreateMessageQueue(); - TClientModule clientModule(&testSync, server.ServerSession->GetActualListenPort()); - - clientModule.CreatePrivateSessions(queue.Get()); - clientModule.StartInput(); - - clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount)); - - testSync.WaitForAndIncrement(3); - - clientModule.Shutdown(); - } - - struct TSendErrorModule: public TBusModule { - TExampleProtocol Proto; - - TTestSync* const TestSync; - - TBusClientSessionPtr ClientSession; - - TSendErrorModule(TTestSync* testSync) - : TBusModule("m") - , TestSync(testSync) + TTestSync testSync; + + TTestServer server(&testSync); + + TBusMessageQueuePtr queue = CreateMessageQueue(); + TClientModule clientModule(&testSync, server.ServerSession->GetActualListenPort()); + + clientModule.CreatePrivateSessions(queue.Get()); + clientModule.StartInput(); + + clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount)); + + testSync.WaitForAndIncrement(3); + + clientModule.Shutdown(); + } + + struct TSendErrorModule: public TBusModule { + TExampleProtocol Proto; + + TTestSync* const TestSync; + + TBusClientSessionPtr ClientSession; + + TSendErrorModule(TTestSync* testSync) + : TBusModule("m") + , TestSync(testSync) { } - + TJobHandler Start(TBusJob* job, TBusMessage*) override { - TestSync->WaitForAndIncrement(0); - - job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", 1)); - - return &TSendErrorModule::Sent; - } - - TJobHandler Sent(TBusJob* job, TBusMessage*) { - TestSync->WaitForAndIncrement(1); - job->Cancel(MESSAGE_DONT_ASK); + TestSync->WaitForAndIncrement(0); + + job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", 1)); + + return &TSendErrorModule::Sent; + } + + TJobHandler Sent(TBusJob* job, TBusMessage*) { + TestSync->WaitForAndIncrement(1); + job->Cancel(MESSAGE_DONT_ASK); return nullptr; - } - + } + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { - TBusServerSessionConfig sessionConfig; - sessionConfig.ConnectTimeout = 1; - sessionConfig.SendTimeout = 1; - sessionConfig.TotalTimeout = 1; - sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); - ClientSession = CreateDefaultSource(queue, &Proto, sessionConfig); + TBusServerSessionConfig sessionConfig; + sessionConfig.ConnectTimeout = 1; + sessionConfig.SendTimeout = 1; + sessionConfig.TotalTimeout = 1; + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); + ClientSession = CreateDefaultSource(queue, &Proto, sessionConfig); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(SendError) { - TTestSync testSync; - - TBusQueueConfig queueConfig; - queueConfig.NumWorkers = 5; - - TBusMessageQueuePtr queue = CreateMessageQueue(queueConfig); - TSendErrorModule clientModule(&testSync); - - clientModule.CreatePrivateSessions(queue.Get()); - clientModule.StartInput(); - - clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount)); - - testSync.WaitForAndIncrement(2); - - clientModule.Shutdown(); - } -} + TTestSync testSync; + + TBusQueueConfig queueConfig; + queueConfig.NumWorkers = 5; + + TBusMessageQueuePtr queue = CreateMessageQueue(queueConfig); + TSendErrorModule clientModule(&testSync); + + clientModule.CreatePrivateSessions(queue.Get()); + clientModule.StartInput(); + + clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount)); + + testSync.WaitForAndIncrement(2); + + clientModule.Shutdown(); + } +} diff --git a/library/cpp/messagebus/test/ut/module_client_ut.cpp b/library/cpp/messagebus/test/ut/module_client_ut.cpp index ebfe185cc6..faffdbb625 100644 --- a/library/cpp/messagebus/test/ut/module_client_ut.cpp +++ b/library/cpp/messagebus/test/ut/module_client_ut.cpp @@ -1,368 +1,368 @@ #include <library/cpp/testing/unittest/registar.h> - + #include "count_down_latch.h" -#include "moduletest.h" - +#include "moduletest.h" + #include <library/cpp/messagebus/test/helper/example.h> #include <library/cpp/messagebus/test/helper/example_module.h> #include <library/cpp/messagebus/test/helper/object_count_check.h> #include <library/cpp/messagebus/test/helper/wait_for.h> - + #include <library/cpp/messagebus/misc/test_sync.h> #include <library/cpp/messagebus/oldmodule/module.h> #include <util/generic/cast.h> #include <util/system/event.h> -using namespace NBus; -using namespace NBus::NTest; - -// helper class that cleans TBusJob instance, so job's destructor can -// be completed without assertion fail. -struct TJobGuard { +using namespace NBus; +using namespace NBus::NTest; + +// helper class that cleans TBusJob instance, so job's destructor can +// be completed without assertion fail. +struct TJobGuard { public: TJobGuard(NBus::TBusJob* job) : Job(job) { } - + ~TJobGuard() { Job->ClearAllMessageStates(); } - + private: NBus::TBusJob* Job; -}; - +}; + class TMessageOk: public NBus::TBusMessage { public: TMessageOk() : NBus::TBusMessage(1) { } -}; - +}; + class TMessageError: public NBus::TBusMessage { public: TMessageError() : NBus::TBusMessage(2) { } -}; - +}; + Y_UNIT_TEST_SUITE(BusJobTest) { -#if 0 +#if 0 Y_UNIT_TEST(TestPending) { - TObjectCountCheck objectCountCheck; - - TDupDetectModule module; - TBusJob job(&module, new TBusMessage(0)); - // Guard will clear the job if unit-assertion fails. - TJobGuard g(&job); - - NBus::TBusMessage* msg = new NBus::TBusMessage(1); - job.Send(msg, NULL); - NBus::TJobStateVec pending; - job.GetPending(&pending); - - UNIT_ASSERT_VALUES_EQUAL(pending.size(), 1u); - UNIT_ASSERT_EQUAL(msg, pending[0].Message); - } - + TObjectCountCheck objectCountCheck; + + TDupDetectModule module; + TBusJob job(&module, new TBusMessage(0)); + // Guard will clear the job if unit-assertion fails. + TJobGuard g(&job); + + NBus::TBusMessage* msg = new NBus::TBusMessage(1); + job.Send(msg, NULL); + NBus::TJobStateVec pending; + job.GetPending(&pending); + + UNIT_ASSERT_VALUES_EQUAL(pending.size(), 1u); + UNIT_ASSERT_EQUAL(msg, pending[0].Message); + } + Y_UNIT_TEST(TestCallReplyHandler) { - TObjectCountCheck objectCountCheck; - - TDupDetectModule module; - NBus::TBusJob job(&module, new NBus::TBusMessage(0)); - // Guard will clear the job if unit-assertion fails. - TJobGuard g(&job); - - NBus::TBusMessage* msgOk = new TMessageOk; - NBus::TBusMessage* msgError = new TMessageError; - job.Send(msgOk, NULL); - job.Send(msgError, NULL); - - UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>(), NULL); - UNIT_ASSERT_EQUAL(job.GetState<TMessageError>(), NULL); - - NBus::TBusMessage* reply = new NBus::TBusMessage(0); - job.CallReplyHandler(NBus::MESSAGE_OK, msgOk, reply); - job.CallReplyHandler(NBus::MESSAGE_TIMEOUT, msgError, NULL); - - UNIT_ASSERT_UNEQUAL(job.GetState<TMessageOk>(), NULL); - UNIT_ASSERT_UNEQUAL(job.GetState<TMessageError>(), NULL); - - UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageError>(), NBus::MESSAGE_TIMEOUT); - UNIT_ASSERT_EQUAL(job.GetState<TMessageError>()->Status, NBus::MESSAGE_TIMEOUT); - - UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageOk>(), NBus::MESSAGE_OK); - UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>()->Reply, reply); - } -#endif - - struct TParallelOnReplyModule : TExampleClientModule { - TNetAddr ServerAddr; - - TCountDownLatch RepliesLatch; - - TParallelOnReplyModule(const TNetAddr& serverAddr) - : ServerAddr(serverAddr) - , RepliesLatch(2) + TObjectCountCheck objectCountCheck; + + TDupDetectModule module; + NBus::TBusJob job(&module, new NBus::TBusMessage(0)); + // Guard will clear the job if unit-assertion fails. + TJobGuard g(&job); + + NBus::TBusMessage* msgOk = new TMessageOk; + NBus::TBusMessage* msgError = new TMessageError; + job.Send(msgOk, NULL); + job.Send(msgError, NULL); + + UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>(), NULL); + UNIT_ASSERT_EQUAL(job.GetState<TMessageError>(), NULL); + + NBus::TBusMessage* reply = new NBus::TBusMessage(0); + job.CallReplyHandler(NBus::MESSAGE_OK, msgOk, reply); + job.CallReplyHandler(NBus::MESSAGE_TIMEOUT, msgError, NULL); + + UNIT_ASSERT_UNEQUAL(job.GetState<TMessageOk>(), NULL); + UNIT_ASSERT_UNEQUAL(job.GetState<TMessageError>(), NULL); + + UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageError>(), NBus::MESSAGE_TIMEOUT); + UNIT_ASSERT_EQUAL(job.GetState<TMessageError>()->Status, NBus::MESSAGE_TIMEOUT); + + UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageOk>(), NBus::MESSAGE_OK); + UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>()->Reply, reply); + } +#endif + + struct TParallelOnReplyModule : TExampleClientModule { + TNetAddr ServerAddr; + + TCountDownLatch RepliesLatch; + + TParallelOnReplyModule(const TNetAddr& serverAddr) + : ServerAddr(serverAddr) + , RepliesLatch(2) { } - + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); - job->Send(new TExampleRequest(&Proto.RequestCount), Source, TReplyHandler(&TParallelOnReplyModule::ReplyHandler), 0, ServerAddr); - return &TParallelOnReplyModule::HandleReplies; - } - - void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { + job->Send(new TExampleRequest(&Proto.RequestCount), Source, TReplyHandler(&TParallelOnReplyModule::ReplyHandler), 0, ServerAddr); + return &TParallelOnReplyModule::HandleReplies; + } + + void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { Y_UNUSED(mess); Y_UNUSED(reply); Y_VERIFY(status == MESSAGE_OK, "failed to get reply: %s", ToCString(status)); - } - - TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) { + } + + TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) { Y_UNUSED(mess); - RepliesLatch.CountDown(); + RepliesLatch.CountDown(); Y_VERIFY(RepliesLatch.Await(TDuration::Seconds(10)), "failed to get answers"); - job->Cancel(MESSAGE_UNKNOWN); + job->Cancel(MESSAGE_UNKNOWN); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(TestReplyHandlerCalledInParallel) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - - TExampleProtocol proto; - - TBusQueueConfig config; - config.NumWorkers = 5; - - TParallelOnReplyModule module(server.GetActualListenAddr()); - module.StartModule(); - - module.StartJob(new TExampleRequest(&proto.StartCount)); - module.StartJob(new TExampleRequest(&proto.StartCount)); - - UNIT_ASSERT(module.RepliesLatch.Await(TDuration::Seconds(10))); - - module.Shutdown(); - } - - struct TErrorHandlerCheckerModule : TExampleModule { - TNetAddr ServerAddr; - - TBusClientSessionPtr Source; - - TCountDownLatch GotReplyLatch; - - TBusMessage* SentMessage; - - TErrorHandlerCheckerModule() - : ServerAddr("localhost", 17) - , GotReplyLatch(2) - , SentMessage() + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TExampleProtocol proto; + + TBusQueueConfig config; + config.NumWorkers = 5; + + TParallelOnReplyModule module(server.GetActualListenAddr()); + module.StartModule(); + + module.StartJob(new TExampleRequest(&proto.StartCount)); + module.StartJob(new TExampleRequest(&proto.StartCount)); + + UNIT_ASSERT(module.RepliesLatch.Await(TDuration::Seconds(10))); + + module.Shutdown(); + } + + struct TErrorHandlerCheckerModule : TExampleModule { + TNetAddr ServerAddr; + + TBusClientSessionPtr Source; + + TCountDownLatch GotReplyLatch; + + TBusMessage* SentMessage; + + TErrorHandlerCheckerModule() + : ServerAddr("localhost", 17) + , GotReplyLatch(2) + , SentMessage() { } - + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); - TExampleRequest* message = new TExampleRequest(&Proto.RequestCount); - job->Send(message, Source, TReplyHandler(&TErrorHandlerCheckerModule::ReplyHandler), 0, ServerAddr); - SentMessage = message; - return &TErrorHandlerCheckerModule::HandleReplies; - } - - void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* req, TBusMessage* resp) { + TExampleRequest* message = new TExampleRequest(&Proto.RequestCount); + job->Send(message, Source, TReplyHandler(&TErrorHandlerCheckerModule::ReplyHandler), 0, ServerAddr); + SentMessage = message; + return &TErrorHandlerCheckerModule::HandleReplies; + } + + void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* req, TBusMessage* resp) { Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "got wrong status: %s", ToString(status).data()); Y_VERIFY(req == SentMessage, "checking request"); Y_VERIFY(resp == nullptr, "checking response"); - GotReplyLatch.CountDown(); - } - - TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) { + GotReplyLatch.CountDown(); + } + + TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) { Y_UNUSED(mess); - job->Cancel(MESSAGE_UNKNOWN); - GotReplyLatch.CountDown(); + job->Cancel(MESSAGE_UNKNOWN); + GotReplyLatch.CountDown(); return nullptr; - } - + } + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { - TBusClientSessionConfig sessionConfig; - sessionConfig.SendTimeout = 1; // TODO: allow 0 - sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10); - Source = CreateDefaultSource(queue, &Proto, sessionConfig); - Source->RegisterService("localhost"); + TBusClientSessionConfig sessionConfig; + sessionConfig.SendTimeout = 1; // TODO: allow 0 + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10); + Source = CreateDefaultSource(queue, &Proto, sessionConfig); + Source->RegisterService("localhost"); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(ErrorHandler) { - TExampleProtocol proto; - - TBusQueueConfig config; - config.NumWorkers = 5; - - TErrorHandlerCheckerModule module; - - TBusModuleConfig moduleConfig; - moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10); - module.SetConfig(moduleConfig); - - module.StartModule(); - - module.StartJob(new TExampleRequest(&proto.StartCount)); - - module.GotReplyLatch.Await(); - - module.Shutdown(); - } - + TExampleProtocol proto; + + TBusQueueConfig config; + config.NumWorkers = 5; + + TErrorHandlerCheckerModule module; + + TBusModuleConfig moduleConfig; + moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10); + module.SetConfig(moduleConfig); + + module.StartModule(); + + module.StartJob(new TExampleRequest(&proto.StartCount)); + + module.GotReplyLatch.Await(); + + module.Shutdown(); + } + struct TSlowReplyServer: public TBusServerHandlerError { - TTestSync* const TestSync; - TBusMessageQueuePtr Bus; - TBusServerSessionPtr ServerSession; - TExampleProtocol Proto; - - TAtomic OnMessageCount; - - TSlowReplyServer(TTestSync* testSync) - : TestSync(testSync) - , OnMessageCount(0) - { - Bus = CreateMessageQueue("TSlowReplyServer"); - TBusServerSessionConfig sessionConfig; - ServerSession = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); - } - + TTestSync* const TestSync; + TBusMessageQueuePtr Bus; + TBusServerSessionPtr ServerSession; + TExampleProtocol Proto; + + TAtomic OnMessageCount; + + TSlowReplyServer(TTestSync* testSync) + : TestSync(testSync) + , OnMessageCount(0) + { + Bus = CreateMessageQueue("TSlowReplyServer"); + TBusServerSessionConfig sessionConfig; + ServerSession = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); + } + void OnMessage(TOnMessageContext& req) override { - if (AtomicIncrement(OnMessageCount) == 1) { - TestSync->WaitForAndIncrement(0); - } - TAutoPtr<TBusMessage> response(new TExampleResponse(&Proto.ResponseCount)); - req.SendReplyMove(response); - } - }; - + if (AtomicIncrement(OnMessageCount) == 1) { + TestSync->WaitForAndIncrement(0); + } + TAutoPtr<TBusMessage> response(new TExampleResponse(&Proto.ResponseCount)); + req.SendReplyMove(response); + } + }; + struct TModuleThatSendsReplyEarly: public TExampleClientModule { - TTestSync* const TestSync; - const unsigned ServerPort; - - TBusServerSessionPtr ServerSession; - TAtomic ReplyCount; - - TModuleThatSendsReplyEarly(TTestSync* testSync, unsigned serverPort) - : TestSync(testSync) - , ServerPort(serverPort) + TTestSync* const TestSync; + const unsigned ServerPort; + + TBusServerSessionPtr ServerSession; + TAtomic ReplyCount; + + TModuleThatSendsReplyEarly(TTestSync* testSync, unsigned serverPort) + : TestSync(testSync) + , ServerPort(serverPort) , ServerSession(nullptr) - , ReplyCount(0) + , ReplyCount(0) { } - + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); - for (unsigned i = 0; i < 2; ++i) { - job->Send( - new TExampleRequest(&Proto.RequestCount), - Source, - TReplyHandler(&TModuleThatSendsReplyEarly::ReplyHandler), - 0, - TNetAddr("127.0.0.1", ServerPort)); - } - return &TModuleThatSendsReplyEarly::HandleReplies; - } - - void ReplyHandler(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { + for (unsigned i = 0; i < 2; ++i) { + job->Send( + new TExampleRequest(&Proto.RequestCount), + Source, + TReplyHandler(&TModuleThatSendsReplyEarly::ReplyHandler), + 0, + TNetAddr("127.0.0.1", ServerPort)); + } + return &TModuleThatSendsReplyEarly::HandleReplies; + } + + void ReplyHandler(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { Y_UNUSED(mess); Y_UNUSED(reply); Y_VERIFY(status == MESSAGE_OK, "failed to get reply"); - if (AtomicIncrement(ReplyCount) == 1) { - TestSync->WaitForAndIncrement(1); - job->SendReply(new TExampleResponse(&Proto.ResponseCount)); - } else { - TestSync->WaitForAndIncrement(3); - } - } - - TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) { + if (AtomicIncrement(ReplyCount) == 1) { + TestSync->WaitForAndIncrement(1); + job->SendReply(new TExampleResponse(&Proto.ResponseCount)); + } else { + TestSync->WaitForAndIncrement(3); + } + } + + TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) { Y_UNUSED(mess); - job->Cancel(MESSAGE_UNKNOWN); + job->Cancel(MESSAGE_UNKNOWN); return nullptr; - } - + } + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { - TExampleClientModule::CreateExtSession(queue); - TBusServerSessionConfig sessionConfig; - return ServerSession = CreateDefaultDestination(queue, &Proto, sessionConfig); - } - }; - + TExampleClientModule::CreateExtSession(queue); + TBusServerSessionConfig sessionConfig; + return ServerSession = CreateDefaultDestination(queue, &Proto, sessionConfig); + } + }; + Y_UNIT_TEST(SendReplyCalledBeforeAllRepliesReceived) { - TTestSync testSync; - - TSlowReplyServer slowReplyServer(&testSync); - - TModuleThatSendsReplyEarly module(&testSync, slowReplyServer.ServerSession->GetActualListenPort()); - module.StartModule(); - - TExampleClient client; - TNetAddr addr("127.0.0.1", module.ServerSession->GetActualListenPort()); - client.SendMessagesWaitReplies(1, &addr); - - testSync.WaitForAndIncrement(2); - - module.Shutdown(); - } - + TTestSync testSync; + + TSlowReplyServer slowReplyServer(&testSync); + + TModuleThatSendsReplyEarly module(&testSync, slowReplyServer.ServerSession->GetActualListenPort()); + module.StartModule(); + + TExampleClient client; + TNetAddr addr("127.0.0.1", module.ServerSession->GetActualListenPort()); + client.SendMessagesWaitReplies(1, &addr); + + testSync.WaitForAndIncrement(2); + + module.Shutdown(); + } + struct TShutdownCalledBeforeReplyReceivedModule: public TExampleClientModule { - unsigned ServerPort; - - TTestSync TestSync; - - TShutdownCalledBeforeReplyReceivedModule(unsigned serverPort) - : ServerPort(serverPort) + unsigned ServerPort; + + TTestSync TestSync; + + TShutdownCalledBeforeReplyReceivedModule(unsigned serverPort) + : ServerPort(serverPort) { } - + TJobHandler Start(TBusJob* job, TBusMessage*) override { - TestSync.CheckAndIncrement(0); - - job->Send(new TExampleRequest(&Proto.RequestCount), Source, + TestSync.CheckAndIncrement(0); + + job->Send(new TExampleRequest(&Proto.RequestCount), Source, TReplyHandler(&TShutdownCalledBeforeReplyReceivedModule::HandleReply), 0, TNetAddr("localhost", ServerPort)); - return &TShutdownCalledBeforeReplyReceivedModule::End; - } - - void HandleReply(TBusJob*, EMessageStatus status, TBusMessage*, TBusMessage*) { + return &TShutdownCalledBeforeReplyReceivedModule::End; + } + + void HandleReply(TBusJob*, EMessageStatus status, TBusMessage*, TBusMessage*) { Y_VERIFY(status == MESSAGE_SHUTDOWN, "got %s", ToCString(status)); - TestSync.CheckAndIncrement(1); - } - - TJobHandler End(TBusJob* job, TBusMessage*) { - TestSync.CheckAndIncrement(2); - job->Cancel(MESSAGE_SHUTDOWN); + TestSync.CheckAndIncrement(1); + } + + TJobHandler End(TBusJob* job, TBusMessage*) { + TestSync.CheckAndIncrement(2); + job->Cancel(MESSAGE_SHUTDOWN); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(ShutdownCalledBeforeReplyReceived) { - TExampleServer server; - server.ForgetRequest = true; - - TShutdownCalledBeforeReplyReceivedModule module(server.GetActualListenPort()); - - module.StartModule(); - - module.StartJob(new TExampleRequest(&module.Proto.RequestCount)); - - server.TestSync.WaitFor(1); - - module.Shutdown(); - - module.TestSync.CheckAndIncrement(3); - } -} + TExampleServer server; + server.ForgetRequest = true; + + TShutdownCalledBeforeReplyReceivedModule module(server.GetActualListenPort()); + + module.StartModule(); + + module.StartJob(new TExampleRequest(&module.Proto.RequestCount)); + + server.TestSync.WaitFor(1); + + module.Shutdown(); + + module.TestSync.CheckAndIncrement(3); + } +} diff --git a/library/cpp/messagebus/test/ut/module_server_ut.cpp b/library/cpp/messagebus/test/ut/module_server_ut.cpp index 88fe1dd9b6..38f3fcc4ed 100644 --- a/library/cpp/messagebus/test/ut/module_server_ut.cpp +++ b/library/cpp/messagebus/test/ut/module_server_ut.cpp @@ -1,8 +1,8 @@ #include <library/cpp/testing/unittest/registar.h> - + #include "count_down_latch.h" -#include "moduletest.h" - +#include "moduletest.h" + #include <library/cpp/messagebus/test/helper/example.h> #include <library/cpp/messagebus/test/helper/example_module.h> #include <library/cpp/messagebus/test/helper/object_count_check.h> @@ -12,108 +12,108 @@ #include <util/generic/cast.h> -using namespace NBus; -using namespace NBus::NTest; - +using namespace NBus; +using namespace NBus::NTest; + Y_UNIT_TEST_SUITE(ModuleServerTests) { Y_UNIT_TEST(TestModule) { - TObjectCountCheck objectCountCheck; - - /// create or get instance of message queue, need one per application - TBusMessageQueuePtr bus(CreateMessageQueue()); + TObjectCountCheck objectCountCheck; + + /// create or get instance of message queue, need one per application + TBusMessageQueuePtr bus(CreateMessageQueue()); THostInfoHandler hostHandler(bus.Get()); - TDupDetectModule module(hostHandler.GetActualListenAddr()); - bool success; - success = module.Init(bus.Get()); - UNIT_ASSERT_C(success, "failed to initialize dupdetect module"); - - success = module.StartInput(); - UNIT_ASSERT_C(success, "failed to start dupdetect module"); - - TDupDetectHandler dupHandler(module.ListenAddr, bus.Get()); - dupHandler.Work(); - - UNIT_WAIT_FOR(dupHandler.NumMessages == dupHandler.NumReplies); - - module.Shutdown(); - dupHandler.DupDetect->Shutdown(); - } - + TDupDetectModule module(hostHandler.GetActualListenAddr()); + bool success; + success = module.Init(bus.Get()); + UNIT_ASSERT_C(success, "failed to initialize dupdetect module"); + + success = module.StartInput(); + UNIT_ASSERT_C(success, "failed to start dupdetect module"); + + TDupDetectHandler dupHandler(module.ListenAddr, bus.Get()); + dupHandler.Work(); + + UNIT_WAIT_FOR(dupHandler.NumMessages == dupHandler.NumReplies); + + module.Shutdown(); + dupHandler.DupDetect->Shutdown(); + } + struct TParallelOnMessageModule: public TExampleServerModule { - TCountDownLatch WaitTwoRequestsLatch; - - TParallelOnMessageModule() - : WaitTwoRequestsLatch(2) + TCountDownLatch WaitTwoRequestsLatch; + + TParallelOnMessageModule() + : WaitTwoRequestsLatch(2) { } - + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { - WaitTwoRequestsLatch.CountDown(); + WaitTwoRequestsLatch.CountDown(); Y_VERIFY(WaitTwoRequestsLatch.Await(TDuration::Seconds(5)), "oops"); - - VerifyDynamicCast<TExampleRequest*>(mess); - - job->SendReply(new TExampleResponse(&Proto.ResponseCount)); + + VerifyDynamicCast<TExampleRequest*>(mess); + + job->SendReply(new TExampleResponse(&Proto.ResponseCount)); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(TestOnMessageHandlerCalledInParallel) { - TObjectCountCheck objectCountCheck; - - TBusQueueConfig config; - config.NumWorkers = 5; - - TParallelOnMessageModule module; - module.StartModule(); - - TExampleClient client; - - client.SendMessagesWaitReplies(2, module.ServerAddr); - - module.Shutdown(); - } - - struct TDelayReplyServer: public TExampleServerModule { + TObjectCountCheck objectCountCheck; + + TBusQueueConfig config; + config.NumWorkers = 5; + + TParallelOnMessageModule module; + module.StartModule(); + + TExampleClient client; + + client.SendMessagesWaitReplies(2, module.ServerAddr); + + module.Shutdown(); + } + + struct TDelayReplyServer: public TExampleServerModule { TSystemEvent MessageReceivedEvent; TSystemEvent ClientDiedEvent; - + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); - - MessageReceivedEvent.Signal(); - + + MessageReceivedEvent.Signal(); + Y_VERIFY(ClientDiedEvent.WaitT(TDuration::Seconds(5)), "oops"); - - job->SendReply(new TExampleResponse(&Proto.ResponseCount)); + + job->SendReply(new TExampleResponse(&Proto.ResponseCount)); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(TestReplyCalledAfterClientDisconnected) { - TObjectCountCheck objectCountCheck; - - TBusQueueConfig config; - config.NumWorkers = 5; - - TDelayReplyServer server; - server.StartModule(); - - THolder<TExampleClient> client(new TExampleClient); - - client->SendMessages(1, server.ServerAddr); - - UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); - - UNIT_ASSERT_VALUES_EQUAL(1, server.GetModuleSessionInFlight()); - - client.Destroy(); - - server.ClientDiedEvent.Signal(); - - // wait until all server message are delivered - UNIT_WAIT_FOR(0 == server.GetModuleSessionInFlight()); - - server.Shutdown(); - } -} + TObjectCountCheck objectCountCheck; + + TBusQueueConfig config; + config.NumWorkers = 5; + + TDelayReplyServer server; + server.StartModule(); + + THolder<TExampleClient> client(new TExampleClient); + + client->SendMessages(1, server.ServerAddr); + + UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); + + UNIT_ASSERT_VALUES_EQUAL(1, server.GetModuleSessionInFlight()); + + client.Destroy(); + + server.ClientDiedEvent.Signal(); + + // wait until all server message are delivered + UNIT_WAIT_FOR(0 == server.GetModuleSessionInFlight()); + + server.Shutdown(); + } +} diff --git a/library/cpp/messagebus/test/ut/moduletest.h b/library/cpp/messagebus/test/ut/moduletest.h index d5da72c0cb..0f9834d9ff 100644 --- a/library/cpp/messagebus/test/ut/moduletest.h +++ b/library/cpp/messagebus/test/ut/moduletest.h @@ -7,10 +7,10 @@ #include <library/cpp/messagebus/test/helper/alloc_counter.h> #include <library/cpp/messagebus/test/helper/example.h> #include <library/cpp/messagebus/test/helper/message_handler_error.h> - + #include <library/cpp/messagebus/ybus.h> #include <library/cpp/messagebus/oldmodule/module.h> - + namespace NBus { namespace NTest { using namespace std; @@ -71,7 +71,7 @@ namespace NBus { /// deserialized TBusData into new instance of the message TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override { Y_UNUSED(payload); - + if (messageType == TYPE_HOSTINFOREQUEST) { return new THostInfoMessage(MESSAGE_CREATE_UNINITIALIZED); } else if (messageType == TYPE_HOSTINFORESPONSE) { @@ -100,7 +100,7 @@ namespace NBus { mess.SendReplyMove(reply); } - + TNetAddr GetActualListenAddr() { return TNetAddr("localhost", Session->GetActualListenPort()); } @@ -110,7 +110,7 @@ namespace NBus { /// \brief DupDetect handler (should convert it to module too) struct TDupDetectHandler: public TBusClientHandlerError { TNetAddr ServerAddr; - + TBusClientSessionPtr DupDetect; TBusClientSessionConfig DupDetectConfig; TExampleProtocol DupDetectProto; @@ -147,7 +147,7 @@ namespace NBus { struct TDupDetectModule: public TBusModule { TNetAddr HostInfoAddr; - + TBusClientSessionPtr HostInfoClientSession; TBusClientSessionConfig HostInfoConfig; THostInfoProtocol HostInfoProto; @@ -162,7 +162,7 @@ namespace NBus { , HostInfoAddr(hostInfoAddr) { } - + bool Init(TBusMessageQueue* queue) { HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig); HostInfoClientSession->RegisterService("localhost"); @@ -174,7 +174,7 @@ namespace NBus { TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig); ListenAddr = TNetAddr("localhost", session->GetActualListenPort()); - + return session; } diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp index 9c21227e2b..7a907cc620 100644 --- a/library/cpp/messagebus/test/ut/one_way_ut.cpp +++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp @@ -32,33 +32,33 @@ #include <library/cpp/messagebus/test/helper/wait_for.h> #include <library/cpp/messagebus/ybus.h> - + using namespace std; -using namespace NBus; -using namespace NBus::NPrivate; -using namespace NBus::NTest; +using namespace NBus; +using namespace NBus::NPrivate; +using namespace NBus::NTest; //////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////// /// \brief Reply-less client and handler struct NullClient : TBusClientHandlerError { - TNetAddr ServerAddr; - - TBusMessageQueuePtr Queue; - TBusClientSessionPtr Session; - TExampleProtocol Proto; + TNetAddr ServerAddr; + + TBusMessageQueuePtr Queue; + TBusClientSessionPtr Session; + TExampleProtocol Proto; /// constructor creates instances of protocol and session - NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig()) - : ServerAddr(serverAddr) - { - UNIT_ASSERT(serverAddr.GetPort() > 0); + NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig()) + : ServerAddr(serverAddr) + { + UNIT_ASSERT(serverAddr.GetPort() > 0); /// create or get instance of message queue, need one per application Queue = CreateMessageQueue(); /// register source/client session - Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue); + Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue); /// register service, announce to clients via LocatorService Session->RegisterService("localhost"); @@ -74,8 +74,8 @@ struct NullClient : TBusClientHandlerError { for (int i = 0; i < batch; i++) { TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount); - mess->Data = "TADA"; - Session->SendMessageOneWay(mess, &ServerAddr); + mess->Data = "TADA"; + Session->SendMessageOneWay(mess, &ServerAddr); } } @@ -85,12 +85,12 @@ struct NullClient : TBusClientHandlerError { ///////////////////////////////////////////////////////////////////// /// \brief Reply-less server and handler -class NullServer: public TBusServerHandlerError { +class NullServer: public TBusServerHandlerError { public: /// session object to maintian - TBusMessageQueuePtr Queue; - TBusServerSessionPtr Session; - TExampleProtocol Proto; + TBusMessageQueuePtr Queue; + TBusServerSessionPtr Session; + TExampleProtocol Proto; public: TAtomic NumMessages; @@ -102,8 +102,8 @@ public: Queue = CreateMessageQueue(); /// register destination session - TBusServerSessionConfig sessionConfig; - Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue); + TBusServerSessionConfig sessionConfig; + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue); } ~NullServer() override { @@ -117,7 +117,7 @@ public: Y_ASSERT(fmess->Data == "TADA"); /// tell session to forget this message and never expect any reply - mess.ForgetRequest(); + mess.ForgetRequest(); AtomicIncrement(NumMessages); } @@ -131,125 +131,125 @@ public: Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { Y_UNIT_TEST(Simple) { - TObjectCountCheck objectCountCheck; - - NullServer server; - NullClient client(TNetAddr("localhost", server.Session->GetActualListenPort())); - - client.Work(); - - // wait until all client message are delivered + TObjectCountCheck objectCountCheck; + + NullServer server; + NullClient client(TNetAddr("localhost", server.Session->GetActualListenPort())); + + client.Work(); + + // wait until all client message are delivered UNIT_WAIT_FOR(AtomicGet(server.NumMessages) == 10); - - // assert correct number of messages + + // assert correct number of messages UNIT_ASSERT_VALUES_EQUAL(AtomicGet(server.NumMessages), 10); - UNIT_ASSERT_VALUES_EQUAL(server.Session->GetInFlight(), 0); - UNIT_ASSERT_VALUES_EQUAL(client.Session->GetInFlight(), 0); - } - - struct TMessageTooLargeClient: public NullClient { + UNIT_ASSERT_VALUES_EQUAL(server.Session->GetInFlight(), 0); + UNIT_ASSERT_VALUES_EQUAL(client.Session->GetInFlight(), 0); + } + + struct TMessageTooLargeClient: public NullClient { TSystemEvent GotTooLarge; - - TBusClientSessionConfig Config() { - TBusClientSessionConfig r; - r.MaxMessageSize = 1; - return r; - } - - TMessageTooLargeClient(unsigned port) - : NullClient(TNetAddr("localhost", port), Config()) + + TBusClientSessionConfig Config() { + TBusClientSessionConfig r; + r.MaxMessageSize = 1; + return r; + } + + TMessageTooLargeClient(unsigned port) + : NullClient(TNetAddr("localhost", port), Config()) { } - + ~TMessageTooLargeClient() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { Y_UNUSED(mess); - + Y_VERIFY(status == MESSAGE_MESSAGE_TOO_LARGE, "wrong status: %s", ToCString(status)); - - GotTooLarge.Signal(); - } - }; - + + GotTooLarge.Signal(); + } + }; + Y_UNIT_TEST(MessageTooLargeOnClient) { - TObjectCountCheck objectCountCheck; - - NullServer server; - - TMessageTooLargeClient client(server.Session->GetActualListenPort()); - - EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); - UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); - - client.GotTooLarge.WaitI(); - } - + TObjectCountCheck objectCountCheck; + + NullServer server; + + TMessageTooLargeClient client(server.Session->GetActualListenPort()); + + EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); + + client.GotTooLarge.WaitI(); + } + struct TCheckTimeoutClient: public NullClient { ~TCheckTimeoutClient() override { - Session->Shutdown(); - } - - static TBusClientSessionConfig SessionConfig() { - TBusClientSessionConfig sessionConfig; - sessionConfig.SendTimeout = 1; - sessionConfig.ConnectTimeout = 1; + Session->Shutdown(); + } + + static TBusClientSessionConfig SessionConfig() { + TBusClientSessionConfig sessionConfig; + sessionConfig.SendTimeout = 1; + sessionConfig.ConnectTimeout = 1; sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10); - return sessionConfig; - } - + return sessionConfig; + } + TCheckTimeoutClient(const TNetAddr& serverAddr) : NullClient(serverAddr, SessionConfig()) { } - + TSystemEvent GotError; - - /// message that could not be delivered + + /// message that could not be delivered void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { Y_UNUSED(mess); Y_UNUSED(status); // TODO: check status - - GotError.Signal(); - } - }; - + + GotError.Signal(); + } + }; + Y_UNIT_TEST(SendTimeout_Callback_NoServer) { - TObjectCountCheck objectCountCheck; - - TCheckTimeoutClient client(TNetAddr("localhost", 17)); - - EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); - UNIT_ASSERT_EQUAL(ok, MESSAGE_OK); - - client.GotError.WaitI(); - } - + TObjectCountCheck objectCountCheck; + + TCheckTimeoutClient client(TNetAddr("localhost", 17)); + + EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); + UNIT_ASSERT_EQUAL(ok, MESSAGE_OK); + + client.GotError.WaitI(); + } + Y_UNIT_TEST(SendTimeout_Callback_HangingServer) { - THangingServer server; - - TObjectCountCheck objectCountCheck; - - TCheckTimeoutClient client(TNetAddr("localhost", server.GetPort())); - - bool first = true; - for (;;) { - EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); - if (ok == MESSAGE_BUSY) { - UNIT_ASSERT(!first); - break; - } - UNIT_ASSERT_VALUES_EQUAL(ok, MESSAGE_OK); - first = false; - } - + THangingServer server; + + TObjectCountCheck objectCountCheck; + + TCheckTimeoutClient client(TNetAddr("localhost", server.GetPort())); + + bool first = true; + for (;;) { + EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); + if (ok == MESSAGE_BUSY) { + UNIT_ASSERT(!first); + break; + } + UNIT_ASSERT_VALUES_EQUAL(ok, MESSAGE_OK); + first = false; + } + // BUGBUG: The test is buggy: the client might not get any error when sending one-way messages. // All the messages that the client has sent before he gets first MESSAGE_BUSY error might get // serailized and written to the socket buffer, so the write queue gets drained and there are // no messages to timeout when periodic timeout check happens. - client.GotError.WaitI(); - } -} + client.GotError.WaitI(); + } +} diff --git a/library/cpp/messagebus/test/ut/starter_ut.cpp b/library/cpp/messagebus/test/ut/starter_ut.cpp index dd4d3aaa5e..ebb628ab28 100644 --- a/library/cpp/messagebus/test/ut/starter_ut.cpp +++ b/library/cpp/messagebus/test/ut/starter_ut.cpp @@ -1,140 +1,140 @@ #include <library/cpp/testing/unittest/registar.h> - + #include <library/cpp/messagebus/test/helper/example_module.h> #include <library/cpp/messagebus/test/helper/object_count_check.h> #include <library/cpp/messagebus/test/helper/wait_for.h> - -using namespace NBus; -using namespace NBus::NTest; - + +using namespace NBus; +using namespace NBus::NTest; + Y_UNIT_TEST_SUITE(TBusStarterTest) { struct TStartJobTestModule: public TExampleModule { - using TBusModule::CreateDefaultStarter; - - TAtomic StartCount; - - TStartJobTestModule() - : StartCount(0) - { - } - + using TBusModule::CreateDefaultStarter; + + TAtomic StartCount; + + TStartJobTestModule() + : StartCount(0) + { + } + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); - AtomicIncrement(StartCount); - job->Sleep(10); - return &TStartJobTestModule::End; - } - - TJobHandler End(TBusJob* job, TBusMessage* mess) { + AtomicIncrement(StartCount); + job->Sleep(10); + return &TStartJobTestModule::End; + } + + TJobHandler End(TBusJob* job, TBusMessage* mess) { Y_UNUSED(mess); - AtomicIncrement(StartCount); - job->Cancel(MESSAGE_UNKNOWN); + AtomicIncrement(StartCount); + job->Cancel(MESSAGE_UNKNOWN); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(Test) { - TObjectCountCheck objectCountCheck; - - TBusMessageQueuePtr bus(CreateMessageQueue()); - - TStartJobTestModule module; - - //module.StartModule(); - module.CreatePrivateSessions(bus.Get()); - module.StartInput(); - - TBusSessionConfig config; - config.SendTimeout = 10; - - module.CreateDefaultStarter(*bus, config); - - UNIT_WAIT_FOR(AtomicGet(module.StartCount) >= 3); - - module.Shutdown(); - bus->Stop(); - } - + TObjectCountCheck objectCountCheck; + + TBusMessageQueuePtr bus(CreateMessageQueue()); + + TStartJobTestModule module; + + //module.StartModule(); + module.CreatePrivateSessions(bus.Get()); + module.StartInput(); + + TBusSessionConfig config; + config.SendTimeout = 10; + + module.CreateDefaultStarter(*bus, config); + + UNIT_WAIT_FOR(AtomicGet(module.StartCount) >= 3); + + module.Shutdown(); + bus->Stop(); + } + Y_UNIT_TEST(TestModuleStartJob) { - TObjectCountCheck objectCountCheck; - - TExampleProtocol proto; - - TStartJobTestModule module; - - TBusModuleConfig moduleConfig; - moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10); - module.SetConfig(moduleConfig); - - module.StartModule(); - - module.StartJob(new TExampleRequest(&proto.RequestCount)); - - UNIT_WAIT_FOR(AtomicGet(module.StartCount) != 2); - - module.Shutdown(); - } - + TObjectCountCheck objectCountCheck; + + TExampleProtocol proto; + + TStartJobTestModule module; + + TBusModuleConfig moduleConfig; + moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10); + module.SetConfig(moduleConfig); + + module.StartModule(); + + module.StartJob(new TExampleRequest(&proto.RequestCount)); + + UNIT_WAIT_FOR(AtomicGet(module.StartCount) != 2); + + module.Shutdown(); + } + struct TSleepModule: public TExampleServerModule { TSystemEvent MessageReceivedEvent; - + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); - - MessageReceivedEvent.Signal(); - - job->Sleep(1000000000); - - return TJobHandler(&TSleepModule::Never); - } - - TJobHandler Never(TBusJob*, TBusMessage*) { + + MessageReceivedEvent.Signal(); + + job->Sleep(1000000000); + + return TJobHandler(&TSleepModule::Never); + } + + TJobHandler Never(TBusJob*, TBusMessage*) { Y_FAIL("happens"); - throw 1; - } - }; - + throw 1; + } + }; + Y_UNIT_TEST(StartJobDestroyDuringSleep) { - TObjectCountCheck objectCountCheck; - - TExampleProtocol proto; - - TSleepModule module; - - module.StartModule(); - - module.StartJob(new TExampleRequest(&proto.StartCount)); - - module.MessageReceivedEvent.WaitI(); - - module.Shutdown(); - } - + TObjectCountCheck objectCountCheck; + + TExampleProtocol proto; + + TSleepModule module; + + module.StartModule(); + + module.StartJob(new TExampleRequest(&proto.StartCount)); + + module.MessageReceivedEvent.WaitI(); + + module.Shutdown(); + } + struct TSendReplyModule: public TExampleServerModule { TSystemEvent MessageReceivedEvent; - + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); - - job->SendReply(new TExampleResponse(&Proto.ResponseCount)); - - MessageReceivedEvent.Signal(); - + + job->SendReply(new TExampleResponse(&Proto.ResponseCount)); + + MessageReceivedEvent.Signal(); + return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(AllowSendReplyInStarted) { - TObjectCountCheck objectCountCheck; - - TExampleProtocol proto; - - TSendReplyModule module; - module.StartModule(); - module.StartJob(new TExampleRequest(&proto.StartCount)); - - module.MessageReceivedEvent.WaitI(); - - module.Shutdown(); - } -} + TObjectCountCheck objectCountCheck; + + TExampleProtocol proto; + + TSendReplyModule module; + module.StartModule(); + module.StartJob(new TExampleRequest(&proto.StartCount)); + + module.MessageReceivedEvent.WaitI(); + + module.Shutdown(); + } +} diff --git a/library/cpp/messagebus/test/ut/sync_client_ut.cpp b/library/cpp/messagebus/test/ut/sync_client_ut.cpp index 400128193f..848a9d3457 100644 --- a/library/cpp/messagebus/test/ut/sync_client_ut.cpp +++ b/library/cpp/messagebus/test/ut/sync_client_ut.cpp @@ -4,7 +4,7 @@ namespace NBus { namespace NTest { using namespace std; - + //////////////////////////////////////////////////////////////////// /// \brief Client for sending synchronous message to local server struct TSyncClient { @@ -13,7 +13,7 @@ namespace NBus { TExampleProtocol Proto; TBusMessageQueuePtr Bus; TBusSyncClientSessionPtr Session; - + int NumReplies; int NumMessages; @@ -53,7 +53,7 @@ namespace NBus { Y_UNIT_TEST_SUITE(SyncClientTest) { Y_UNIT_TEST(TestSync) { TObjectCountCheck objectCountCheck; - + TExampleServer server; TSyncClient client(server.GetActualListenAddr()); client.Work(); @@ -65,5 +65,5 @@ namespace NBus { } } - } -} + } +} diff --git a/library/cpp/messagebus/test/ut/ya.make b/library/cpp/messagebus/test/ut/ya.make index fe1b4961d6..5af102e0ba 100644 --- a/library/cpp/messagebus/test/ut/ya.make +++ b/library/cpp/messagebus/test/ut/ya.make @@ -1,7 +1,7 @@ OWNER(g:messagebus) - + UNITTEST_FOR(library/cpp/messagebus) - + TIMEOUT(1200) SIZE(LARGE) |