diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/test/ut | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/test/ut')
-rw-r--r-- | library/cpp/messagebus/test/ut/count_down_latch.h | 30 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/locator_uniq_ut.cpp | 40 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/messagebus_ut.cpp | 1151 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp | 143 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/module_client_ut.cpp | 368 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/module_server_ut.cpp | 119 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/moduletest.h | 221 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/one_way_ut.cpp | 255 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/starter_ut.cpp | 140 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/sync_client_ut.cpp | 69 | ||||
-rw-r--r-- | library/cpp/messagebus/test/ut/ya.make | 56 |
11 files changed, 2592 insertions, 0 deletions
diff --git a/library/cpp/messagebus/test/ut/count_down_latch.h b/library/cpp/messagebus/test/ut/count_down_latch.h new file mode 100644 index 0000000000..5117db5731 --- /dev/null +++ b/library/cpp/messagebus/test/ut/count_down_latch.h @@ -0,0 +1,30 @@ +#pragma once + +#include <util/system/atomic.h> +#include <util/system/event.h> + +class TCountDownLatch { +private: + TAtomic Current; + TSystemEvent EventObject; + +public: + TCountDownLatch(unsigned initial) + : Current(initial) + { + } + + void CountDown() { + if (AtomicDecrement(Current) == 0) { + EventObject.Signal(); + } + } + + void Await() { + EventObject.Wait(); + } + + bool Await(TDuration timeout) { + return EventObject.WaitT(timeout); + } +}; diff --git a/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp b/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp new file mode 100644 index 0000000000..3fdd175d73 --- /dev/null +++ b/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp @@ -0,0 +1,40 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <library/cpp/messagebus/test_utils.h> +#include <library/cpp/messagebus/ybus.h> + +class TLocatorRegisterUniqTest: public TTestBase { + UNIT_TEST_SUITE(TLocatorRegisterUniqTest); + UNIT_TEST(TestRegister); + UNIT_TEST_SUITE_END(); + +protected: + void TestRegister(); +}; + +UNIT_TEST_SUITE_REGISTRATION(TLocatorRegisterUniqTest); + +void TLocatorRegisterUniqTest::TestRegister() { + ASSUME_IP_V4_ENABLED; + + NBus::TBusLocator locator; + const char* serviceName = "TestService"; + const char* hostName = "192.168.0.42"; + int port = 31337; + + NBus::TBusKeyVec keys; + locator.LocateKeys(serviceName, keys); + UNIT_ASSERT(keys.size() == 0); + + locator.Register(serviceName, hostName, port); + locator.LocateKeys(serviceName, keys); + /// YBUS_KEYMIN YBUS_KEYMAX range + UNIT_ASSERT(keys.size() == 1); + + TVector<NBus::TNetAddr> hosts; + UNIT_ASSERT(locator.LocateAll(serviceName, NBus::YBUS_KEYMIN, hosts) == 1); + + locator.Register(serviceName, hostName, port); + hosts.clear(); + UNIT_ASSERT(locator.LocateAll(serviceName, NBus::YBUS_KEYMIN, hosts) == 1); +} diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp new file mode 100644 index 0000000000..040f9b7702 --- /dev/null +++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp @@ -0,0 +1,1151 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <library/cpp/messagebus/test/helper/example.h> +#include <library/cpp/messagebus/test/helper/fixed_port.h> +#include <library/cpp/messagebus/test/helper/hanging_server.h> +#include <library/cpp/messagebus/test/helper/object_count_check.h> +#include <library/cpp/messagebus/test/helper/wait_for.h> + +#include <library/cpp/messagebus/misc/test_sync.h> + +#include <util/network/sock.h> + +#include <utility> + +using namespace NBus; +using namespace NBus::NTest; + +namespace { + struct TExampleClientSlowOnMessageSent: public TExampleClient { + TAtomic SentCompleted; + + TSystemEvent ReplyReceived; + + TExampleClientSlowOnMessageSent() + : SentCompleted(0) + { + } + + ~TExampleClientSlowOnMessageSent() override { + Session->Shutdown(); + } + + void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override { + Y_VERIFY(AtomicGet(SentCompleted), "must be completed"); + + TExampleClient::OnReply(mess, reply); + + ReplyReceived.Signal(); + } + + void OnMessageSent(TBusMessage*) override { + Sleep(TDuration::MilliSeconds(100)); + AtomicSet(SentCompleted, 1); + } + }; + +} + +Y_UNIT_TEST_SUITE(TMessageBusTests) { + void TestDestinationTemplate(bool useCompression, bool ackMessageBeforeReply, + const TBusServerSessionConfig& sessionConfig) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TExampleClient client(sessionConfig); + client.CrashOnError = true; + + server.UseCompression = useCompression; + client.UseCompression = useCompression; + + server.AckMessageBeforeSendReply = ackMessageBeforeReply; + + client.SendMessagesWaitReplies(100, server.GetActualListenAddr()); + UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0); + UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0); + } + + Y_UNIT_TEST(TestDestination) { + TestDestinationTemplate(false, false, TBusServerSessionConfig()); + } + + Y_UNIT_TEST(TestDestinationUsingAck) { + TestDestinationTemplate(false, true, TBusServerSessionConfig()); + } + + Y_UNIT_TEST(TestDestinationWithCompression) { + TestDestinationTemplate(true, false, TBusServerSessionConfig()); + } + + Y_UNIT_TEST(TestCork) { + TBusServerSessionConfig config; + config.SendThreshold = 1000000000000; + config.Cork = TDuration::MilliSeconds(10); + TestDestinationTemplate(false, false, config); + // TODO: test for cork hanging + } + + Y_UNIT_TEST(TestReconnect) { + if (!IsFixedPortTestAllowed()) { + return; + } + + TObjectCountCheck objectCountCheck; + + unsigned port = FixedPort; + TNetAddr serverAddr("localhost", port); + THolder<TExampleServer> server; + + TBusClientSessionConfig clientConfig; + clientConfig.RetryInterval = 0; + TExampleClient client(clientConfig); + + server.Reset(new TExampleServer(port, "TExampleServer 1")); + + client.SendMessagesWaitReplies(17, serverAddr); + + server.Destroy(); + + // Making the client to detect disconnection. + client.SendMessages(1, serverAddr); + EMessageStatus error = client.WaitForError(); + if (error == MESSAGE_DELIVERY_FAILED) { + client.SendMessages(1, serverAddr); + error = client.WaitForError(); + } + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_CONNECT_FAILED, error); + + server.Reset(new TExampleServer(port, "TExampleServer 2")); + + client.SendMessagesWaitReplies(19, serverAddr); + } + + struct TestNoServerImplClient: public TExampleClient { + TTestSync TestSync; + int failures = 0; + + template <typename... Args> + TestNoServerImplClient(Args&&... args) + : TExampleClient(std::forward<Args>(args)...) + { + } + + ~TestNoServerImplClient() override { + Session->Shutdown(); + } + + void OnError(TAutoPtr<TBusMessage> message, EMessageStatus status) override { + Y_UNUSED(message); + + Y_VERIFY(status == MESSAGE_CONNECT_FAILED, "must be MESSAGE_CONNECT_FAILED, got %s", ToString(status).data()); + + TestSync.CheckAndIncrement((failures++) * 2); + } + }; + + void TestNoServerImpl(unsigned port, bool oneWay) { + TNetAddr noServerAddr("localhost", port); + + TestNoServerImplClient client; + + int count = 0; + for (; count < 200; ++count) { + EMessageStatus status; + if (oneWay) { + status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr); + } else { + TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); + status = client.Session->SendMessageAutoPtr(message, &noServerAddr); + } + + Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data()); + + if (count == 0) { + // lame way to wait until it is connected + Sleep(TDuration::MilliSeconds(10)); + } + client.TestSync.WaitForAndIncrement(count * 2 + 1); + } + + client.TestSync.WaitForAndIncrement(count * 2); + } + + void HangingServerImpl(unsigned port) { + TNetAddr noServerAddr("localhost", port); + + TExampleClient client; + + int count = 0; + for (;; ++count) { + TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); + EMessageStatus status = client.Session->SendMessageAutoPtr(message, &noServerAddr); + if (status == MESSAGE_BUSY) { + break; + } + UNIT_ASSERT_VALUES_EQUAL(int(MESSAGE_OK), int(status)); + + if (count == 0) { + // lame way to wait until it is connected + Sleep(TDuration::MilliSeconds(10)); + } + } + + UNIT_ASSERT_VALUES_EQUAL(client.Session->GetConfig()->MaxInFlight, count); + } + + Y_UNIT_TEST(TestHangindServer) { + TObjectCountCheck objectCountCheck; + + THangingServer server(0); + + HangingServerImpl(server.GetPort()); + } + + Y_UNIT_TEST(TestNoServer) { + TObjectCountCheck objectCountCheck; + + TestNoServerImpl(17, false); + } + + Y_UNIT_TEST(PauseInput) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + server.Session->PauseInput(true); + + TBusClientSessionConfig clientConfig; + clientConfig.MaxInFlight = 1000; + TExampleClient client(clientConfig); + + client.SendMessages(100, server.GetActualListenAddr()); + + server.TestSync.Check(0); + + server.Session->PauseInput(false); + + server.TestSync.WaitFor(100); + + client.WaitReplies(); + + server.Session->PauseInput(true); + + client.SendMessages(200, server.GetActualListenAddr()); + + server.TestSync.Check(100); + + server.Session->PauseInput(false); + + server.TestSync.WaitFor(300); + + client.WaitReplies(); + } + + struct TSendTimeoutCheckerExampleClient: public TExampleClient { + static TBusClientSessionConfig SessionConfig(bool periodLessThanConnectTimeout) { + TBusClientSessionConfig sessionConfig; + if (periodLessThanConnectTimeout) { + sessionConfig.SendTimeout = 1; + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(50); + } else { + sessionConfig.SendTimeout = 50; + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); + } + return sessionConfig; + } + + TSendTimeoutCheckerExampleClient(bool periodLessThanConnectTimeout) + : TExampleClient(SessionConfig(periodLessThanConnectTimeout)) + { + } + + ~TSendTimeoutCheckerExampleClient() override { + Session->Shutdown(); + } + + TSystemEvent ErrorHappened; + + void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override { + Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "got status: %s", ToString(status).data()); + ErrorHappened.Signal(); + } + }; + + void NoServer_SendTimeout_Callback_Impl(bool periodLessThanConnectTimeout) { + TObjectCountCheck objectCountCheck; + + TNetAddr serverAddr("localhost", 17); + + TSendTimeoutCheckerExampleClient client(periodLessThanConnectTimeout); + + client.SendMessages(1, serverAddr); + + client.ErrorHappened.WaitI(); + } + + Y_UNIT_TEST(NoServer_SendTimeout_Callback_PeriodLess) { + NoServer_SendTimeout_Callback_Impl(true); + } + + Y_UNIT_TEST(NoServer_SendTimeout_Callback_TimeoutLess) { + NoServer_SendTimeout_Callback_Impl(false); + } + + Y_UNIT_TEST(TestOnReplyCalledAfterOnMessageSent) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + TNetAddr serverAddr = server.GetActualListenAddr(); + TExampleClientSlowOnMessageSent client; + + TAutoPtr<TExampleRequest> message(new TExampleRequest(&client.Proto.RequestCount)); + EMessageStatus s = client.Session->SendMessageAutoPtr(message, &serverAddr); + UNIT_ASSERT_EQUAL(s, MESSAGE_OK); + + UNIT_ASSERT(client.ReplyReceived.WaitT(TDuration::Seconds(5))); + } + + struct TDelayReplyServer: public TBusServerHandlerError { + TBusMessageQueuePtr Bus; + TExampleProtocol Proto; + TSystemEvent MessageReceivedEvent; // 1 wait for 1 message + TBusServerSessionPtr Session; + TMutex Lock_; + TDeque<TAutoPtr<TOnMessageContext>> DelayedMessages; + + TDelayReplyServer() + : MessageReceivedEvent(TEventResetType::rAuto) + { + Bus = CreateMessageQueue("TDelayReplyServer"); + TBusServerSessionConfig sessionConfig; + sessionConfig.SendTimeout = 1000; + sessionConfig.TotalTimeout = 2001; + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); + if (!Session) { + ythrow yexception() << "Failed to create destination session"; + } + } + + void OnMessage(TOnMessageContext& mess) override { + Y_VERIFY(mess.IsConnectionAlive(), "connection should be alive here"); + TAutoPtr<TOnMessageContext> delayedMsg(new TOnMessageContext); + delayedMsg->Swap(mess); + auto g(Guard(Lock_)); + DelayedMessages.push_back(delayedMsg); + MessageReceivedEvent.Signal(); + } + + bool CheckClientIsAlive() { + auto g(Guard(Lock_)); + for (auto& delayedMessage : DelayedMessages) { + if (!delayedMessage->IsConnectionAlive()) { + return false; + } + } + return true; + } + + bool CheckClientIsDead() const { + auto g(Guard(Lock_)); + for (const auto& delayedMessage : DelayedMessages) { + if (delayedMessage->IsConnectionAlive()) { + return false; + } + } + return true; + } + + void ReplyToDelayedMessages() { + while (true) { + TOnMessageContext msg; + { + auto g(Guard(Lock_)); + if (DelayedMessages.empty()) { + break; + } + DelayedMessages.front()->Swap(msg); + DelayedMessages.pop_front(); + } + TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount)); + msg.SendReplyMove(reply); + } + } + + size_t GetDelayedMessageCount() const { + auto g(Guard(Lock_)); + return DelayedMessages.size(); + } + + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { + Y_UNUSED(mess); + Y_VERIFY(status == MESSAGE_SHUTDOWN, "only shutdown allowed, got %s", ToString(status).data()); + } + }; + + Y_UNIT_TEST(TestReplyCalledAfterClientDisconnected) { + TObjectCountCheck objectCountCheck; + + TDelayReplyServer server; + + THolder<TExampleClient> client(new TExampleClient); + + client->SendMessages(1, TNetAddr("localhost", server.Session->GetActualListenPort())); + + UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); + + UNIT_ASSERT_VALUES_EQUAL(1, server.Session->GetInFlight()); + + client.Destroy(); + + UNIT_WAIT_FOR(server.CheckClientIsDead()); + + server.ReplyToDelayedMessages(); + + // wait until all server message are delivered + UNIT_WAIT_FOR(0 == server.Session->GetInFlight()); + } + + struct TPackUnpackServer: public TBusServerHandlerError { + TBusMessageQueuePtr Bus; + TExampleProtocol Proto; + TSystemEvent MessageReceivedEvent; + TSystemEvent ClientDiedEvent; + TBusServerSessionPtr Session; + + TPackUnpackServer() { + Bus = CreateMessageQueue("TPackUnpackServer"); + TBusServerSessionConfig sessionConfig; + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); + } + + void OnMessage(TOnMessageContext& mess) override { + TBusIdentity ident; + mess.AckMessage(ident); + + char packed[BUS_IDENTITY_PACKED_SIZE]; + ident.Pack(packed); + TBusIdentity resurrected; + resurrected.Unpack(packed); + + mess.GetSession()->SendReply(resurrected, new TExampleResponse(&Proto.ResponseCount)); + } + + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { + Y_UNUSED(mess); + Y_VERIFY(status == MESSAGE_SHUTDOWN, "only shutdown allowed"); + } + }; + + Y_UNIT_TEST(PackUnpack) { + TObjectCountCheck objectCountCheck; + + TPackUnpackServer server; + + THolder<TExampleClient> client(new TExampleClient); + + client->SendMessagesWaitReplies(1, TNetAddr("localhost", server.Session->GetActualListenPort())); + } + + Y_UNIT_TEST(ClientRequestTooLarge) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TBusClientSessionConfig clientConfig; + clientConfig.MaxMessageSize = 100; + TExampleClient client(clientConfig); + + client.DataSize = 10; + client.SendMessagesWaitReplies(1, server.GetActualListenAddr()); + + client.DataSize = 1000; + client.SendMessages(1, server.GetActualListenAddr()); + client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE); + + client.DataSize = 20; + client.SendMessagesWaitReplies(10, server.GetActualListenAddr()); + + client.DataSize = 10000; + client.SendMessages(1, server.GetActualListenAddr()); + client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE); + } + + struct TServerForResponseTooLarge: public TExampleServer { + TTestSync TestSync; + + static TBusServerSessionConfig Config() { + TBusServerSessionConfig config; + config.MaxMessageSize = 100; + return config; + } + + TServerForResponseTooLarge() + : TExampleServer("TServerForResponseTooLarge", Config()) + { + } + + ~TServerForResponseTooLarge() override { + Session->Shutdown(); + } + + void OnMessage(TOnMessageContext& mess) override { + TAutoPtr<TBusMessage> response; + + if (TestSync.Get() == 0) { + TestSync.CheckAndIncrement(0); + response.Reset(new TExampleResponse(&Proto.ResponseCount, 1000)); + } else { + TestSync.WaitForAndIncrement(3); + response.Reset(new TExampleResponse(&Proto.ResponseCount, 10)); + } + + mess.SendReplyMove(response); + } + + void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override { + TestSync.WaitForAndIncrement(1); + + Y_VERIFY(status == MESSAGE_MESSAGE_TOO_LARGE, "status"); + } + }; + + Y_UNIT_TEST(ServerResponseTooLarge) { + TObjectCountCheck objectCountCheck; + + TServerForResponseTooLarge server; + + TExampleClient client; + client.DataSize = 10; + + client.SendMessages(1, server.GetActualListenAddr()); + server.TestSync.WaitForAndIncrement(2); + client.ResetCounters(); + + client.SendMessages(1, server.GetActualListenAddr()); + + client.WorkDone.WaitI(); + + server.TestSync.CheckAndIncrement(4); + + UNIT_ASSERT_VALUES_EQUAL(1, client.Session->GetInFlight()); + } + + struct TServerForRequestTooLarge: public TExampleServer { + TTestSync TestSync; + + static TBusServerSessionConfig Config() { + TBusServerSessionConfig config; + config.MaxMessageSize = 100; + return config; + } + + TServerForRequestTooLarge() + : TExampleServer("TServerForRequestTooLarge", Config()) + { + } + + ~TServerForRequestTooLarge() override { + Session->Shutdown(); + } + + void OnMessage(TOnMessageContext& req) override { + unsigned n = TestSync.Get(); + if (n < 2) { + TestSync.CheckAndIncrement(n); + TAutoPtr<TExampleResponse> resp(new TExampleResponse(&Proto.ResponseCount, 10)); + req.SendReplyMove(resp); + } else { + Y_FAIL("wrong"); + } + } + }; + + Y_UNIT_TEST(ServerRequestTooLarge) { + TObjectCountCheck objectCountCheck; + + TServerForRequestTooLarge server; + + TExampleClient client; + client.DataSize = 10; + + client.SendMessagesWaitReplies(2, server.GetActualListenAddr()); + + server.TestSync.CheckAndIncrement(2); + + client.DataSize = 200; + client.SendMessages(1, server.GetActualListenAddr()); + // server closes connection, so MESSAGE_DELIVERY_FAILED is returned to client + client.WaitForError(MESSAGE_DELIVERY_FAILED); + } + + Y_UNIT_TEST(ClientResponseTooLarge) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + server.DataSize = 10; + + TBusClientSessionConfig clientSessionConfig; + clientSessionConfig.MaxMessageSize = 100; + TExampleClient client(clientSessionConfig); + client.DataSize = 10; + + client.SendMessagesWaitReplies(3, server.GetActualListenAddr()); + + server.DataSize = 1000; + + client.SendMessages(1, server.GetActualListenAddr()); + client.WaitForError(MESSAGE_DELIVERY_FAILED); + } + + Y_UNIT_TEST(ServerUnknownMessage) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + TNetAddr serverAddr = server.GetActualListenAddr(); + + TExampleClient client; + + client.SendMessagesWaitReplies(2, serverAddr); + + TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount)); + req->GetHeader()->Type = 11; + client.Session->SendMessageAutoPtr(req, &serverAddr); + client.MessageCount = 1; + + client.WaitForError(MESSAGE_DELIVERY_FAILED); + } + + Y_UNIT_TEST(ServerMessageReservedIds) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + TNetAddr serverAddr = server.GetActualListenAddr(); + + TExampleClient client; + + client.SendMessagesWaitReplies(2, serverAddr); + + // This test doens't check 0, 1, YBUS_KEYINVALID because there are asserts() on sending side + + TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount)); + req->GetHeader()->Id = 2; + client.Session->SendMessageAutoPtr(req, &serverAddr); + client.MessageCount = 1; + client.WaitForError(MESSAGE_DELIVERY_FAILED); + + req.Reset(new TExampleRequest(&client.Proto.RequestCount)); + req->GetHeader()->Id = YBUS_KEYLOCAL; + client.Session->SendMessageAutoPtr(req, &serverAddr); + client.MessageCount = 1; + client.WaitForError(MESSAGE_DELIVERY_FAILED); + } + + Y_UNIT_TEST(TestGetInFlightForDestination) { + TObjectCountCheck objectCountCheck; + + TDelayReplyServer server; + + TExampleClient client; + + TNetAddr addr("localhost", server.Session->GetActualListenPort()); + + UNIT_ASSERT_VALUES_EQUAL(size_t(0), client.Session->GetInFlight(addr)); + + client.SendMessages(2, &addr); + + for (size_t i = 0; i < 5; ++i) { + // One MessageReceivedEvent indicates one message, we need to wait for two + UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); + if (server.GetDelayedMessageCount() == 2) { + break; + } + } + UNIT_ASSERT_VALUES_EQUAL(server.GetDelayedMessageCount(), 2); + + size_t inFlight = client.Session->GetInFlight(addr); + // 4 is for messagebus1 that adds inFlight counter twice for some reason + UNIT_ASSERT(inFlight == 2 || inFlight == 4); + + UNIT_ASSERT(server.CheckClientIsAlive()); + + server.ReplyToDelayedMessages(); + + client.WaitReplies(); + } + + struct TResetAfterSendOneWayErrorInCallbackClient: public TExampleClient { + TTestSync TestSync; + + static TBusClientSessionConfig SessionConfig() { + TBusClientSessionConfig config; + // 1 ms is not enough when test is running under valgrind + config.ConnectTimeout = 10; + config.SendTimeout = 10; + config.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); + return config; + } + + TResetAfterSendOneWayErrorInCallbackClient() + : TExampleClient(SessionConfig()) + { + } + + ~TResetAfterSendOneWayErrorInCallbackClient() override { + Session->Shutdown(); + } + + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { + TestSync.WaitForAndIncrement(0); + Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "must be connection failed, got %s", ToString(status).data()); + mess.Destroy(); + TestSync.CheckAndIncrement(1); + } + }; + + Y_UNIT_TEST(ResetAfterSendOneWayErrorInCallback) { + TObjectCountCheck objectCountCheck; + + TNetAddr noServerAddr("localhost", 17); + + TResetAfterSendOneWayErrorInCallbackClient client; + + EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr); + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); + + client.TestSync.WaitForAndIncrement(2); + } + + struct TResetAfterSendMessageOneWayDuringShutdown: public TExampleClient { + TTestSync TestSync; + + ~TResetAfterSendMessageOneWayDuringShutdown() override { + Session->Shutdown(); + } + + void OnError(TAutoPtr<TBusMessage> message, EMessageStatus status) override { + TestSync.CheckAndIncrement(0); + + Y_VERIFY(status == MESSAGE_CONNECT_FAILED, "must be MESSAGE_CONNECT_FAILED, got %s", ToString(status).data()); + + // check reset is possible here + message->Reset(); + + // intentionally don't destroy the message + // we will try to resend it + Y_UNUSED(message.Release()); + + TestSync.CheckAndIncrement(1); + } + }; + + Y_UNIT_TEST(ResetAfterSendMessageOneWayDuringShutdown) { + TObjectCountCheck objectCountCheck; + + TNetAddr noServerAddr("localhost", 17); + + TResetAfterSendMessageOneWayDuringShutdown client; + + TExampleRequest* message = new TExampleRequest(&client.Proto.RequestCount); + EMessageStatus ok = client.Session->SendMessageOneWay(message, &noServerAddr); + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); + + client.TestSync.WaitForAndIncrement(2); + + client.Session->Shutdown(); + + ok = client.Session->SendMessageOneWay(message); + Y_VERIFY(ok == MESSAGE_SHUTDOWN, "must be shutdown when sending during shutdown, got %s", ToString(ok).data()); + + // check reset is possible here + message->Reset(); + client.TestSync.CheckAndIncrement(3); + + delete message; + } + + Y_UNIT_TEST(ResetAfterSendOneWayErrorInReturn) { + TObjectCountCheck objectCountCheck; + + TestNoServerImpl(17, true); + } + + struct TResetAfterSendOneWaySuccessClient: public TExampleClient { + TTestSync TestSync; + + ~TResetAfterSendOneWaySuccessClient() override { + Session->Shutdown(); + } + + void OnMessageSentOneWay(TAutoPtr<TBusMessage> sent) override { + TestSync.WaitForAndIncrement(0); + sent->Reset(); + TestSync.CheckAndIncrement(1); + } + }; + + Y_UNIT_TEST(ResetAfterSendOneWaySuccess) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + TNetAddr serverAddr = server.GetActualListenAddr(); + + TResetAfterSendOneWaySuccessClient client; + + EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &serverAddr); + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); + // otherwize message might go to OnError(MESSAGE_SHUTDOWN) + server.WaitForOnMessageCount(1); + + client.TestSync.WaitForAndIncrement(2); + } + + Y_UNIT_TEST(GetStatus) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TExampleClient client; + // make sure connected + client.SendMessagesWaitReplies(3, server.GetActualListenAddr()); + + server.Bus->GetStatus(); + server.Bus->GetStatus(); + server.Bus->GetStatus(); + + client.Bus->GetStatus(); + client.Bus->GetStatus(); + client.Bus->GetStatus(); + } + + Y_UNIT_TEST(BindOnRandomPort) { + TObjectCountCheck objectCountCheck; + + TBusServerSessionConfig serverConfig; + TExampleServer server; + + TExampleClient client; + TNetAddr addr(TNetAddr("127.0.0.1", server.Session->GetActualListenPort())); + client.SendMessagesWaitReplies(3, &addr); + } + + Y_UNIT_TEST(UnbindOnShutdown) { + TBusMessageQueuePtr queue(CreateMessageQueue()); + + TExampleProtocol proto; + TBusServerHandlerError handler; + TBusServerSessionPtr session = TBusServerSession::Create( + &proto, &handler, TBusServerSessionConfig(), queue); + + unsigned port = session->GetActualListenPort(); + UNIT_ASSERT(port > 0); + + session->Shutdown(); + + // fails is Shutdown() didn't unbind + THangingServer hangingServer(port); + } + + Y_UNIT_TEST(VersionNegotiation) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TSockAddrInet addr(IpFromString("127.0.0.1"), server.Session->GetActualListenPort()); + + TInetStreamSocket socket; + int r1 = socket.Connect(&addr); + UNIT_ASSERT(r1 >= 0); + + TStreamSocketOutput output(&socket); + + TBusHeader request; + Zero(request); + request.Size = sizeof(request); + request.SetVersionInternal(0xF); // max + output.Write(&request, sizeof(request)); + + UNIT_ASSERT_VALUES_EQUAL(IsVersionNegotiation(request), true); + + TStreamSocketInput input(&socket); + + TBusHeader response; + size_t pos = 0; + + while (pos < sizeof(response)) { + size_t count = input.Read(((char*)&response) + pos, sizeof(response) - pos); + pos += count; + } + + UNIT_ASSERT_VALUES_EQUAL(sizeof(response), pos); + + UNIT_ASSERT_VALUES_EQUAL(YBUS_VERSION, response.GetVersionInternal()); + } + + struct TOnConnectionEventClient: public TExampleClient { + TTestSync Sync; + + ~TOnConnectionEventClient() override { + Session->Shutdown(); + } + + void OnClientConnectionEvent(const TClientConnectionEvent& event) override { + if (Sync.Get() > 2) { + // Test OnClientConnectionEvent_Disconnect is broken. + // Sometimes reconnect happens during server shutdown + // when acceptor connections is still alive, and + // server connection is already closed + return; + } + + if (event.GetType() == TClientConnectionEvent::CONNECTED) { + Sync.WaitForAndIncrement(0); + } else if (event.GetType() == TClientConnectionEvent::DISCONNECTED) { + Sync.WaitForAndIncrement(2); + } + } + + void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override { + // We do not check for message errors in this test. + } + + void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override { + } + }; + + struct TOnConnectionEventServer: public TExampleServer { + TOnConnectionEventServer() + : TExampleServer("TOnConnectionEventServer") + { + } + + ~TOnConnectionEventServer() override { + Session->Shutdown(); + } + + void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override { + // We do not check for server message errors in this test. + } + }; + + Y_UNIT_TEST(OnClientConnectionEvent_Shutdown) { + TObjectCountCheck objectCountCheck; + + TOnConnectionEventServer server; + + TOnConnectionEventClient client; + + TNetAddr addr("127.0.0.1", server.Session->GetActualListenPort()); + + client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr); + + client.Sync.WaitForAndIncrement(1); + + client.Session->Shutdown(); + + client.Sync.WaitForAndIncrement(3); + } + + Y_UNIT_TEST(OnClientConnectionEvent_Disconnect) { + TObjectCountCheck objectCountCheck; + + THolder<TOnConnectionEventServer> server(new TOnConnectionEventServer); + + TOnConnectionEventClient client; + TNetAddr addr("127.0.0.1", server->Session->GetActualListenPort()); + + client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr); + + client.Sync.WaitForAndIncrement(1); + + server.Destroy(); + + client.Sync.WaitForAndIncrement(3); + } + + struct TServerForQuotaWake: public TExampleServer { + TSystemEvent GoOn; + TMutex OneLock; + + TOnMessageContext OneMessage; + + static TBusServerSessionConfig Config() { + TBusServerSessionConfig config; + + config.PerConnectionMaxInFlight = 1; + config.PerConnectionMaxInFlightBySize = 1500; + config.MaxMessageSize = 1024; + + return config; + } + + TServerForQuotaWake() + : TExampleServer("TServerForQuotaWake", Config()) + { + } + + ~TServerForQuotaWake() override { + Session->Shutdown(); + } + + void OnMessage(TOnMessageContext& req) override { + if (!GoOn.Wait(0)) { + TGuard<TMutex> guard(OneLock); + + UNIT_ASSERT(!OneMessage); + + OneMessage.Swap(req); + } else + TExampleServer::OnMessage(req); + } + + void WakeOne() { + TGuard<TMutex> guard(OneLock); + + UNIT_ASSERT(!!OneMessage); + + TExampleServer::OnMessage(OneMessage); + + TOnMessageContext().Swap(OneMessage); + } + }; + + Y_UNIT_TEST(WakeReaderOnQuota) { + const size_t test_msg_count = 64; + + TBusClientSessionConfig clientConfig; + + clientConfig.MaxInFlight = test_msg_count; + + TExampleClient client(clientConfig); + TServerForQuotaWake server; + TInstant start; + + client.MessageCount = test_msg_count; + + const NBus::TNetAddr addr = server.GetActualListenAddr(); + + for (unsigned count = 0;;) { + UNIT_ASSERT(count <= test_msg_count); + + TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); + EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr); + + if (status == MESSAGE_OK) { + count++; + + } else if (status == MESSAGE_BUSY) { + if (count == test_msg_count) { + TInstant now = TInstant::Now(); + + if (start.GetValue() == 0) { + start = now; + + // TODO: properly check that server is blocked + } else if (start + TDuration::MilliSeconds(100) < now) { + break; + } + } + + Sleep(TDuration::MilliSeconds(10)); + + } else + UNIT_ASSERT(false); + } + + server.GoOn.Signal(); + server.WakeOne(); + + client.WaitReplies(); + + server.WaitForOnMessageCount(test_msg_count); + }; + + Y_UNIT_TEST(TestConnectionAttempts) { + TObjectCountCheck objectCountCheck; + + TNetAddr noServerAddr("localhost", 17); + TBusClientSessionConfig clientConfig; + clientConfig.RetryInterval = 100; + TestNoServerImplClient client(clientConfig); + + int count = 0; + for (; count < 10; ++count) { + EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), + &noServerAddr); + + Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data()); + client.TestSync.WaitForAndIncrement(count * 2 + 1); + + // First connection attempt is for connect call; second one is to get connect result. + UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); + } + Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval)); + for (; count < 10; ++count) { + EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), + &noServerAddr); + + Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data()); + client.TestSync.WaitForAndIncrement(count * 2 + 1); + + // First connection attempt is for connect call; second one is to get connect result. + UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 4); + } + }; + + Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndNotReconnectWhenIdle) { + TObjectCountCheck objectCountCheck; + + TNetAddr noServerAddr("localhost", 17); + TBusClientSessionConfig clientConfig; + clientConfig.RetryInterval = 100; + clientConfig.ReconnectWhenIdle = false; + TestNoServerImplClient client(clientConfig); + + int count = 0; + for (; count < 10; ++count) { + EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), + &noServerAddr); + + Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data()); + client.TestSync.WaitForAndIncrement(count * 2 + 1); + + // First connection attempt is for connect call; second one is to get connect result. + UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); + } + + Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval / 2)); + UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); + Sleep(TDuration::MilliSeconds(10 * clientConfig.RetryInterval)); + UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); + }; + + Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndReconnectWhenIdle) { + TObjectCountCheck objectCountCheck; + + TNetAddr noServerAddr("localhost", 17); + TBusClientSessionConfig clientConfig; + clientConfig.ReconnectWhenIdle = true; + clientConfig.RetryInterval = 100; + TestNoServerImplClient client(clientConfig); + + int count = 0; + for (; count < 10; ++count) { + EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), + &noServerAddr); + + Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data()); + client.TestSync.WaitForAndIncrement(count * 2 + 1); + + // First connection attempt is for connect call; second one is to get connect result. + UNIT_ASSERT_VALUES_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); + } + + Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval / 2)); + UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); + Sleep(TDuration::MilliSeconds(10 * clientConfig.RetryInterval)); + // it is undeterministic how many reconnects will be during that amount of time + // but it should occur at least once + UNIT_ASSERT(client.Session->GetConnectSyscallsNumForTest(noServerAddr) > 2); + }; +}; diff --git a/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp new file mode 100644 index 0000000000..4083cf3b7b --- /dev/null +++ b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp @@ -0,0 +1,143 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <library/cpp/messagebus/test/helper/example.h> +#include <library/cpp/messagebus/test/helper/message_handler_error.h> + +#include <library/cpp/messagebus/misc/test_sync.h> +#include <library/cpp/messagebus/oldmodule/module.h> + +using namespace NBus; +using namespace NBus::NTest; + +Y_UNIT_TEST_SUITE(ModuleClientOneWay) { + struct TTestServer: public TBusServerHandlerError { + TExampleProtocol Proto; + + TTestSync* const TestSync; + + TBusMessageQueuePtr Queue; + TBusServerSessionPtr ServerSession; + + TTestServer(TTestSync* testSync) + : TestSync(testSync) + { + Queue = CreateMessageQueue(); + ServerSession = TBusServerSession::Create(&Proto, this, TBusServerSessionConfig(), Queue); + } + + void OnMessage(TOnMessageContext& context) override { + TestSync->WaitForAndIncrement(1); + context.ForgetRequest(); + } + }; + + struct TClientModule: public TBusModule { + TExampleProtocol Proto; + + TTestSync* const TestSync; + unsigned const Port; + + TBusClientSessionPtr ClientSession; + + TClientModule(TTestSync* testSync, unsigned port) + : TBusModule("m") + , TestSync(testSync) + , Port(port) + { + } + + TJobHandler Start(TBusJob* job, TBusMessage*) override { + TestSync->WaitForAndIncrement(0); + + job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", Port)); + + return &TClientModule::Sent; + } + + TJobHandler Sent(TBusJob* job, TBusMessage*) { + TestSync->WaitForAndIncrement(2); + job->Cancel(MESSAGE_DONT_ASK); + return nullptr; + } + + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { + ClientSession = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig()); + return nullptr; + } + }; + + Y_UNIT_TEST(Simple) { + TTestSync testSync; + + TTestServer server(&testSync); + + TBusMessageQueuePtr queue = CreateMessageQueue(); + TClientModule clientModule(&testSync, server.ServerSession->GetActualListenPort()); + + clientModule.CreatePrivateSessions(queue.Get()); + clientModule.StartInput(); + + clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount)); + + testSync.WaitForAndIncrement(3); + + clientModule.Shutdown(); + } + + struct TSendErrorModule: public TBusModule { + TExampleProtocol Proto; + + TTestSync* const TestSync; + + TBusClientSessionPtr ClientSession; + + TSendErrorModule(TTestSync* testSync) + : TBusModule("m") + , TestSync(testSync) + { + } + + TJobHandler Start(TBusJob* job, TBusMessage*) override { + TestSync->WaitForAndIncrement(0); + + job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", 1)); + + return &TSendErrorModule::Sent; + } + + TJobHandler Sent(TBusJob* job, TBusMessage*) { + TestSync->WaitForAndIncrement(1); + job->Cancel(MESSAGE_DONT_ASK); + return nullptr; + } + + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { + TBusServerSessionConfig sessionConfig; + sessionConfig.ConnectTimeout = 1; + sessionConfig.SendTimeout = 1; + sessionConfig.TotalTimeout = 1; + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); + ClientSession = CreateDefaultSource(queue, &Proto, sessionConfig); + return nullptr; + } + }; + + Y_UNIT_TEST(SendError) { + TTestSync testSync; + + TBusQueueConfig queueConfig; + queueConfig.NumWorkers = 5; + + TBusMessageQueuePtr queue = CreateMessageQueue(queueConfig); + TSendErrorModule clientModule(&testSync); + + clientModule.CreatePrivateSessions(queue.Get()); + clientModule.StartInput(); + + clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount)); + + testSync.WaitForAndIncrement(2); + + clientModule.Shutdown(); + } +} diff --git a/library/cpp/messagebus/test/ut/module_client_ut.cpp b/library/cpp/messagebus/test/ut/module_client_ut.cpp new file mode 100644 index 0000000000..ebfe185cc6 --- /dev/null +++ b/library/cpp/messagebus/test/ut/module_client_ut.cpp @@ -0,0 +1,368 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include "count_down_latch.h" +#include "moduletest.h" + +#include <library/cpp/messagebus/test/helper/example.h> +#include <library/cpp/messagebus/test/helper/example_module.h> +#include <library/cpp/messagebus/test/helper/object_count_check.h> +#include <library/cpp/messagebus/test/helper/wait_for.h> + +#include <library/cpp/messagebus/misc/test_sync.h> +#include <library/cpp/messagebus/oldmodule/module.h> + +#include <util/generic/cast.h> +#include <util/system/event.h> + +using namespace NBus; +using namespace NBus::NTest; + +// helper class that cleans TBusJob instance, so job's destructor can +// be completed without assertion fail. +struct TJobGuard { +public: + TJobGuard(NBus::TBusJob* job) + : Job(job) + { + } + + ~TJobGuard() { + Job->ClearAllMessageStates(); + } + +private: + NBus::TBusJob* Job; +}; + +class TMessageOk: public NBus::TBusMessage { +public: + TMessageOk() + : NBus::TBusMessage(1) + { + } +}; + +class TMessageError: public NBus::TBusMessage { +public: + TMessageError() + : NBus::TBusMessage(2) + { + } +}; + +Y_UNIT_TEST_SUITE(BusJobTest) { +#if 0 + Y_UNIT_TEST(TestPending) { + TObjectCountCheck objectCountCheck; + + TDupDetectModule module; + TBusJob job(&module, new TBusMessage(0)); + // Guard will clear the job if unit-assertion fails. + TJobGuard g(&job); + + NBus::TBusMessage* msg = new NBus::TBusMessage(1); + job.Send(msg, NULL); + NBus::TJobStateVec pending; + job.GetPending(&pending); + + UNIT_ASSERT_VALUES_EQUAL(pending.size(), 1u); + UNIT_ASSERT_EQUAL(msg, pending[0].Message); + } + + Y_UNIT_TEST(TestCallReplyHandler) { + TObjectCountCheck objectCountCheck; + + TDupDetectModule module; + NBus::TBusJob job(&module, new NBus::TBusMessage(0)); + // Guard will clear the job if unit-assertion fails. + TJobGuard g(&job); + + NBus::TBusMessage* msgOk = new TMessageOk; + NBus::TBusMessage* msgError = new TMessageError; + job.Send(msgOk, NULL); + job.Send(msgError, NULL); + + UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>(), NULL); + UNIT_ASSERT_EQUAL(job.GetState<TMessageError>(), NULL); + + NBus::TBusMessage* reply = new NBus::TBusMessage(0); + job.CallReplyHandler(NBus::MESSAGE_OK, msgOk, reply); + job.CallReplyHandler(NBus::MESSAGE_TIMEOUT, msgError, NULL); + + UNIT_ASSERT_UNEQUAL(job.GetState<TMessageOk>(), NULL); + UNIT_ASSERT_UNEQUAL(job.GetState<TMessageError>(), NULL); + + UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageError>(), NBus::MESSAGE_TIMEOUT); + UNIT_ASSERT_EQUAL(job.GetState<TMessageError>()->Status, NBus::MESSAGE_TIMEOUT); + + UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageOk>(), NBus::MESSAGE_OK); + UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>()->Reply, reply); + } +#endif + + struct TParallelOnReplyModule : TExampleClientModule { + TNetAddr ServerAddr; + + TCountDownLatch RepliesLatch; + + TParallelOnReplyModule(const TNetAddr& serverAddr) + : ServerAddr(serverAddr) + , RepliesLatch(2) + { + } + + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + Y_UNUSED(mess); + job->Send(new TExampleRequest(&Proto.RequestCount), Source, TReplyHandler(&TParallelOnReplyModule::ReplyHandler), 0, ServerAddr); + return &TParallelOnReplyModule::HandleReplies; + } + + void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { + Y_UNUSED(mess); + Y_UNUSED(reply); + Y_VERIFY(status == MESSAGE_OK, "failed to get reply: %s", ToCString(status)); + } + + TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) { + Y_UNUSED(mess); + RepliesLatch.CountDown(); + Y_VERIFY(RepliesLatch.Await(TDuration::Seconds(10)), "failed to get answers"); + job->Cancel(MESSAGE_UNKNOWN); + return nullptr; + } + }; + + Y_UNIT_TEST(TestReplyHandlerCalledInParallel) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TExampleProtocol proto; + + TBusQueueConfig config; + config.NumWorkers = 5; + + TParallelOnReplyModule module(server.GetActualListenAddr()); + module.StartModule(); + + module.StartJob(new TExampleRequest(&proto.StartCount)); + module.StartJob(new TExampleRequest(&proto.StartCount)); + + UNIT_ASSERT(module.RepliesLatch.Await(TDuration::Seconds(10))); + + module.Shutdown(); + } + + struct TErrorHandlerCheckerModule : TExampleModule { + TNetAddr ServerAddr; + + TBusClientSessionPtr Source; + + TCountDownLatch GotReplyLatch; + + TBusMessage* SentMessage; + + TErrorHandlerCheckerModule() + : ServerAddr("localhost", 17) + , GotReplyLatch(2) + , SentMessage() + { + } + + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + Y_UNUSED(mess); + TExampleRequest* message = new TExampleRequest(&Proto.RequestCount); + job->Send(message, Source, TReplyHandler(&TErrorHandlerCheckerModule::ReplyHandler), 0, ServerAddr); + SentMessage = message; + return &TErrorHandlerCheckerModule::HandleReplies; + } + + void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* req, TBusMessage* resp) { + Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "got wrong status: %s", ToString(status).data()); + Y_VERIFY(req == SentMessage, "checking request"); + Y_VERIFY(resp == nullptr, "checking response"); + GotReplyLatch.CountDown(); + } + + TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) { + Y_UNUSED(mess); + job->Cancel(MESSAGE_UNKNOWN); + GotReplyLatch.CountDown(); + return nullptr; + } + + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { + TBusClientSessionConfig sessionConfig; + sessionConfig.SendTimeout = 1; // TODO: allow 0 + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10); + Source = CreateDefaultSource(queue, &Proto, sessionConfig); + Source->RegisterService("localhost"); + return nullptr; + } + }; + + Y_UNIT_TEST(ErrorHandler) { + TExampleProtocol proto; + + TBusQueueConfig config; + config.NumWorkers = 5; + + TErrorHandlerCheckerModule module; + + TBusModuleConfig moduleConfig; + moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10); + module.SetConfig(moduleConfig); + + module.StartModule(); + + module.StartJob(new TExampleRequest(&proto.StartCount)); + + module.GotReplyLatch.Await(); + + module.Shutdown(); + } + + struct TSlowReplyServer: public TBusServerHandlerError { + TTestSync* const TestSync; + TBusMessageQueuePtr Bus; + TBusServerSessionPtr ServerSession; + TExampleProtocol Proto; + + TAtomic OnMessageCount; + + TSlowReplyServer(TTestSync* testSync) + : TestSync(testSync) + , OnMessageCount(0) + { + Bus = CreateMessageQueue("TSlowReplyServer"); + TBusServerSessionConfig sessionConfig; + ServerSession = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); + } + + void OnMessage(TOnMessageContext& req) override { + if (AtomicIncrement(OnMessageCount) == 1) { + TestSync->WaitForAndIncrement(0); + } + TAutoPtr<TBusMessage> response(new TExampleResponse(&Proto.ResponseCount)); + req.SendReplyMove(response); + } + }; + + struct TModuleThatSendsReplyEarly: public TExampleClientModule { + TTestSync* const TestSync; + const unsigned ServerPort; + + TBusServerSessionPtr ServerSession; + TAtomic ReplyCount; + + TModuleThatSendsReplyEarly(TTestSync* testSync, unsigned serverPort) + : TestSync(testSync) + , ServerPort(serverPort) + , ServerSession(nullptr) + , ReplyCount(0) + { + } + + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + Y_UNUSED(mess); + for (unsigned i = 0; i < 2; ++i) { + job->Send( + new TExampleRequest(&Proto.RequestCount), + Source, + TReplyHandler(&TModuleThatSendsReplyEarly::ReplyHandler), + 0, + TNetAddr("127.0.0.1", ServerPort)); + } + return &TModuleThatSendsReplyEarly::HandleReplies; + } + + void ReplyHandler(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { + Y_UNUSED(mess); + Y_UNUSED(reply); + Y_VERIFY(status == MESSAGE_OK, "failed to get reply"); + if (AtomicIncrement(ReplyCount) == 1) { + TestSync->WaitForAndIncrement(1); + job->SendReply(new TExampleResponse(&Proto.ResponseCount)); + } else { + TestSync->WaitForAndIncrement(3); + } + } + + TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) { + Y_UNUSED(mess); + job->Cancel(MESSAGE_UNKNOWN); + return nullptr; + } + + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { + TExampleClientModule::CreateExtSession(queue); + TBusServerSessionConfig sessionConfig; + return ServerSession = CreateDefaultDestination(queue, &Proto, sessionConfig); + } + }; + + Y_UNIT_TEST(SendReplyCalledBeforeAllRepliesReceived) { + TTestSync testSync; + + TSlowReplyServer slowReplyServer(&testSync); + + TModuleThatSendsReplyEarly module(&testSync, slowReplyServer.ServerSession->GetActualListenPort()); + module.StartModule(); + + TExampleClient client; + TNetAddr addr("127.0.0.1", module.ServerSession->GetActualListenPort()); + client.SendMessagesWaitReplies(1, &addr); + + testSync.WaitForAndIncrement(2); + + module.Shutdown(); + } + + struct TShutdownCalledBeforeReplyReceivedModule: public TExampleClientModule { + unsigned ServerPort; + + TTestSync TestSync; + + TShutdownCalledBeforeReplyReceivedModule(unsigned serverPort) + : ServerPort(serverPort) + { + } + + TJobHandler Start(TBusJob* job, TBusMessage*) override { + TestSync.CheckAndIncrement(0); + + job->Send(new TExampleRequest(&Proto.RequestCount), Source, + TReplyHandler(&TShutdownCalledBeforeReplyReceivedModule::HandleReply), + 0, TNetAddr("localhost", ServerPort)); + return &TShutdownCalledBeforeReplyReceivedModule::End; + } + + void HandleReply(TBusJob*, EMessageStatus status, TBusMessage*, TBusMessage*) { + Y_VERIFY(status == MESSAGE_SHUTDOWN, "got %s", ToCString(status)); + TestSync.CheckAndIncrement(1); + } + + TJobHandler End(TBusJob* job, TBusMessage*) { + TestSync.CheckAndIncrement(2); + job->Cancel(MESSAGE_SHUTDOWN); + return nullptr; + } + }; + + Y_UNIT_TEST(ShutdownCalledBeforeReplyReceived) { + TExampleServer server; + server.ForgetRequest = true; + + TShutdownCalledBeforeReplyReceivedModule module(server.GetActualListenPort()); + + module.StartModule(); + + module.StartJob(new TExampleRequest(&module.Proto.RequestCount)); + + server.TestSync.WaitFor(1); + + module.Shutdown(); + + module.TestSync.CheckAndIncrement(3); + } +} diff --git a/library/cpp/messagebus/test/ut/module_server_ut.cpp b/library/cpp/messagebus/test/ut/module_server_ut.cpp new file mode 100644 index 0000000000..88fe1dd9b6 --- /dev/null +++ b/library/cpp/messagebus/test/ut/module_server_ut.cpp @@ -0,0 +1,119 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include "count_down_latch.h" +#include "moduletest.h" + +#include <library/cpp/messagebus/test/helper/example.h> +#include <library/cpp/messagebus/test/helper/example_module.h> +#include <library/cpp/messagebus/test/helper/object_count_check.h> +#include <library/cpp/messagebus/test/helper/wait_for.h> + +#include <library/cpp/messagebus/oldmodule/module.h> + +#include <util/generic/cast.h> + +using namespace NBus; +using namespace NBus::NTest; + +Y_UNIT_TEST_SUITE(ModuleServerTests) { + Y_UNIT_TEST(TestModule) { + TObjectCountCheck objectCountCheck; + + /// create or get instance of message queue, need one per application + TBusMessageQueuePtr bus(CreateMessageQueue()); + THostInfoHandler hostHandler(bus.Get()); + TDupDetectModule module(hostHandler.GetActualListenAddr()); + bool success; + success = module.Init(bus.Get()); + UNIT_ASSERT_C(success, "failed to initialize dupdetect module"); + + success = module.StartInput(); + UNIT_ASSERT_C(success, "failed to start dupdetect module"); + + TDupDetectHandler dupHandler(module.ListenAddr, bus.Get()); + dupHandler.Work(); + + UNIT_WAIT_FOR(dupHandler.NumMessages == dupHandler.NumReplies); + + module.Shutdown(); + dupHandler.DupDetect->Shutdown(); + } + + struct TParallelOnMessageModule: public TExampleServerModule { + TCountDownLatch WaitTwoRequestsLatch; + + TParallelOnMessageModule() + : WaitTwoRequestsLatch(2) + { + } + + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + WaitTwoRequestsLatch.CountDown(); + Y_VERIFY(WaitTwoRequestsLatch.Await(TDuration::Seconds(5)), "oops"); + + VerifyDynamicCast<TExampleRequest*>(mess); + + job->SendReply(new TExampleResponse(&Proto.ResponseCount)); + return nullptr; + } + }; + + Y_UNIT_TEST(TestOnMessageHandlerCalledInParallel) { + TObjectCountCheck objectCountCheck; + + TBusQueueConfig config; + config.NumWorkers = 5; + + TParallelOnMessageModule module; + module.StartModule(); + + TExampleClient client; + + client.SendMessagesWaitReplies(2, module.ServerAddr); + + module.Shutdown(); + } + + struct TDelayReplyServer: public TExampleServerModule { + TSystemEvent MessageReceivedEvent; + TSystemEvent ClientDiedEvent; + + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + Y_UNUSED(mess); + + MessageReceivedEvent.Signal(); + + Y_VERIFY(ClientDiedEvent.WaitT(TDuration::Seconds(5)), "oops"); + + job->SendReply(new TExampleResponse(&Proto.ResponseCount)); + return nullptr; + } + }; + + Y_UNIT_TEST(TestReplyCalledAfterClientDisconnected) { + TObjectCountCheck objectCountCheck; + + TBusQueueConfig config; + config.NumWorkers = 5; + + TDelayReplyServer server; + server.StartModule(); + + THolder<TExampleClient> client(new TExampleClient); + + client->SendMessages(1, server.ServerAddr); + + UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); + + UNIT_ASSERT_VALUES_EQUAL(1, server.GetModuleSessionInFlight()); + + client.Destroy(); + + server.ClientDiedEvent.Signal(); + + // wait until all server message are delivered + UNIT_WAIT_FOR(0 == server.GetModuleSessionInFlight()); + + server.Shutdown(); + } +} diff --git a/library/cpp/messagebus/test/ut/moduletest.h b/library/cpp/messagebus/test/ut/moduletest.h new file mode 100644 index 0000000000..d5da72c0cb --- /dev/null +++ b/library/cpp/messagebus/test/ut/moduletest.h @@ -0,0 +1,221 @@ +#pragma once + +/////////////////////////////////////////////////////////////////// +/// \file +/// \brief Example of using local session for communication. + +#include <library/cpp/messagebus/test/helper/alloc_counter.h> +#include <library/cpp/messagebus/test/helper/example.h> +#include <library/cpp/messagebus/test/helper/message_handler_error.h> + +#include <library/cpp/messagebus/ybus.h> +#include <library/cpp/messagebus/oldmodule/module.h> + +namespace NBus { + namespace NTest { + using namespace std; + +#define TYPE_HOSTINFOREQUEST 100 +#define TYPE_HOSTINFORESPONSE 101 + + //////////////////////////////////////////////////////////////////// + /// \brief DupDetect protocol that common between client and server + //////////////////////////////////////////////////////////////////// + /// \brief HostInfo request class + class THostInfoMessage: public TBusMessage { + public: + THostInfoMessage() + : TBusMessage(TYPE_HOSTINFOREQUEST) + { + } + THostInfoMessage(ECreateUninitialized) + : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) + { + } + + ~THostInfoMessage() override { + } + }; + + //////////////////////////////////////////////////////////////////// + /// \brief HostInfo reply class + class THostInfoReply: public TBusMessage { + public: + THostInfoReply() + : TBusMessage(TYPE_HOSTINFORESPONSE) + { + } + THostInfoReply(ECreateUninitialized) + : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) + { + } + + ~THostInfoReply() override { + } + }; + + //////////////////////////////////////////////////////////////////// + /// \brief HostInfo protocol that common between client and server + class THostInfoProtocol: public TBusProtocol { + public: + THostInfoProtocol() + : TBusProtocol("HOSTINFO", 0) + { + } + /// serialized protocol specific data into TBusData + void Serialize(const TBusMessage* mess, TBuffer& data) override { + Y_UNUSED(data); + Y_UNUSED(mess); + } + + /// deserialized TBusData into new instance of the message + TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override { + Y_UNUSED(payload); + + if (messageType == TYPE_HOSTINFOREQUEST) { + return new THostInfoMessage(MESSAGE_CREATE_UNINITIALIZED); + } else if (messageType == TYPE_HOSTINFORESPONSE) { + return new THostInfoReply(MESSAGE_CREATE_UNINITIALIZED); + } else { + Y_FAIL("unknown"); + } + } + }; + + ////////////////////////////////////////////////////////////// + /// \brief HostInfo handler (should convert it to module too) + struct THostInfoHandler: public TBusServerHandlerError { + TBusServerSessionPtr Session; + TBusServerSessionConfig HostInfoConfig; + THostInfoProtocol HostInfoProto; + + THostInfoHandler(TBusMessageQueue* queue) { + Session = TBusServerSession::Create(&HostInfoProto, this, HostInfoConfig, queue); + } + + void OnMessage(TOnMessageContext& mess) override { + usleep(10 * 1000); /// pretend we are doing something + + TAutoPtr<THostInfoReply> reply(new THostInfoReply()); + + mess.SendReplyMove(reply); + } + + TNetAddr GetActualListenAddr() { + return TNetAddr("localhost", Session->GetActualListenPort()); + } + }; + + ////////////////////////////////////////////////////////////// + /// \brief DupDetect handler (should convert it to module too) + struct TDupDetectHandler: public TBusClientHandlerError { + TNetAddr ServerAddr; + + TBusClientSessionPtr DupDetect; + TBusClientSessionConfig DupDetectConfig; + TExampleProtocol DupDetectProto; + + int NumMessages; + int NumReplies; + + TDupDetectHandler(const TNetAddr& serverAddr, TBusMessageQueuePtr queue) + : ServerAddr(serverAddr) + { + DupDetect = TBusClientSession::Create(&DupDetectProto, this, DupDetectConfig, queue); + DupDetect->RegisterService("localhost"); + } + + void Work() { + NumMessages = 10; + NumReplies = 0; + + for (int i = 0; i < NumMessages; i++) { + TExampleRequest* mess = new TExampleRequest(&DupDetectProto.RequestCount); + DupDetect->SendMessage(mess, &ServerAddr); + } + } + + void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override { + Y_UNUSED(mess); + Y_UNUSED(reply); + NumReplies++; + } + }; + + ///////////////////////////////////////////////////////////////// + /// \brief DupDetect module + + struct TDupDetectModule: public TBusModule { + TNetAddr HostInfoAddr; + + TBusClientSessionPtr HostInfoClientSession; + TBusClientSessionConfig HostInfoConfig; + THostInfoProtocol HostInfoProto; + + TExampleProtocol DupDetectProto; + TBusServerSessionConfig DupDetectConfig; + + TNetAddr ListenAddr; + + TDupDetectModule(const TNetAddr& hostInfoAddr) + : TBusModule("DUPDETECTMODULE") + , HostInfoAddr(hostInfoAddr) + { + } + + bool Init(TBusMessageQueue* queue) { + HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig); + HostInfoClientSession->RegisterService("localhost"); + + return TBusModule::CreatePrivateSessions(queue); + } + + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { + TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig); + + ListenAddr = TNetAddr("localhost", session->GetActualListenPort()); + + return session; + } + + /// entry point into module, first function to call + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); + Y_UNUSED(dmess); + + THostInfoMessage* hmess = new THostInfoMessage(); + + /// send message to imaginary hostinfo server + job->Send(hmess, HostInfoClientSession, TReplyHandler(), 0, HostInfoAddr); + + return TJobHandler(&TDupDetectModule::ProcessHostInfo); + } + + /// next handler is executed when all outstanding requests from previous handler is completed + TJobHandler ProcessHostInfo(TBusJob* job, TBusMessage* mess) { + TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); + Y_UNUSED(dmess); + + THostInfoMessage* hmess = job->Get<THostInfoMessage>(); + THostInfoReply* hreply = job->Get<THostInfoReply>(); + EMessageStatus hstatus = job->GetStatus<THostInfoMessage>(); + Y_ASSERT(hmess != nullptr); + Y_ASSERT(hreply != nullptr); + Y_ASSERT(hstatus == MESSAGE_OK); + + return TJobHandler(&TDupDetectModule::Finish); + } + + /// last handler sends reply and returns NULL + TJobHandler Finish(TBusJob* job, TBusMessage* mess) { + Y_UNUSED(mess); + + TExampleResponse* reply = new TExampleResponse(&DupDetectProto.ResponseCount); + job->SendReply(reply); + + return nullptr; + } + }; + + } +} diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp new file mode 100644 index 0000000000..9c21227e2b --- /dev/null +++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp @@ -0,0 +1,255 @@ +/////////////////////////////////////////////////////////////////// +/// \file +/// \brief Example of reply-less communication + +/// This example demostrates how asynchronous message passing library +/// can be used to send message and do not wait for reply back. +/// The usage of reply-less communication should be restricted to +/// low-throughput clients and high-throughput server to provide reasonable +/// utility. Removing replies from the communication removes any restriction +/// on how many message can be send to server and rougue clients may overwelm +/// server without thoughtput control. + +/// 1) To implement reply-less client \n + +/// Call NBus::TBusSession::AckMessage() +/// from within NBus::IMessageHandler::OnSent() handler when message has +/// gone into wire on client end. See example in NBus::NullClient::OnMessageSent(). +/// Discard identity for reply message. + +/// 2) To implement reply-less server \n + +/// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage() +/// handler when message has been received on server end. +/// See example in NBus::NullServer::OnMessage(). +/// Discard identity for reply message. + +#include <library/cpp/messagebus/test/helper/alloc_counter.h> +#include <library/cpp/messagebus/test/helper/example.h> +#include <library/cpp/messagebus/test/helper/hanging_server.h> +#include <library/cpp/messagebus/test/helper/message_handler_error.h> +#include <library/cpp/messagebus/test/helper/object_count_check.h> +#include <library/cpp/messagebus/test/helper/wait_for.h> + +#include <library/cpp/messagebus/ybus.h> + +using namespace std; +using namespace NBus; +using namespace NBus::NPrivate; +using namespace NBus::NTest; + +//////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////// +/// \brief Reply-less client and handler +struct NullClient : TBusClientHandlerError { + TNetAddr ServerAddr; + + TBusMessageQueuePtr Queue; + TBusClientSessionPtr Session; + TExampleProtocol Proto; + + /// constructor creates instances of protocol and session + NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig()) + : ServerAddr(serverAddr) + { + UNIT_ASSERT(serverAddr.GetPort() > 0); + + /// create or get instance of message queue, need one per application + Queue = CreateMessageQueue(); + + /// register source/client session + Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue); + + /// register service, announce to clients via LocatorService + Session->RegisterService("localhost"); + } + + ~NullClient() override { + Session->Shutdown(); + } + + /// dispatch of requests is done here + void Work() { + int batch = 10; + + for (int i = 0; i < batch; i++) { + TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount); + mess->Data = "TADA"; + Session->SendMessageOneWay(mess, &ServerAddr); + } + } + + void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override { + } +}; + +///////////////////////////////////////////////////////////////////// +/// \brief Reply-less server and handler +class NullServer: public TBusServerHandlerError { +public: + /// session object to maintian + TBusMessageQueuePtr Queue; + TBusServerSessionPtr Session; + TExampleProtocol Proto; + +public: + TAtomic NumMessages; + + NullServer() { + NumMessages = 0; + + /// create or get instance of single message queue, need one for application + Queue = CreateMessageQueue(); + + /// register destination session + TBusServerSessionConfig sessionConfig; + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue); + } + + ~NullServer() override { + Session->Shutdown(); + } + + /// when message comes do not send reply, just acknowledge + void OnMessage(TOnMessageContext& mess) override { + TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage()); + + Y_ASSERT(fmess->Data == "TADA"); + + /// tell session to forget this message and never expect any reply + mess.ForgetRequest(); + + AtomicIncrement(NumMessages); + } + + /// this handler should not be called because this server does not send replies + void OnSent(TAutoPtr<TBusMessage> mess) override { + Y_UNUSED(mess); + Y_FAIL("This server does not sent replies"); + } +}; + +Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { + Y_UNIT_TEST(Simple) { + TObjectCountCheck objectCountCheck; + + NullServer server; + NullClient client(TNetAddr("localhost", server.Session->GetActualListenPort())); + + client.Work(); + + // wait until all client message are delivered + UNIT_WAIT_FOR(AtomicGet(server.NumMessages) == 10); + + // assert correct number of messages + UNIT_ASSERT_VALUES_EQUAL(AtomicGet(server.NumMessages), 10); + UNIT_ASSERT_VALUES_EQUAL(server.Session->GetInFlight(), 0); + UNIT_ASSERT_VALUES_EQUAL(client.Session->GetInFlight(), 0); + } + + struct TMessageTooLargeClient: public NullClient { + TSystemEvent GotTooLarge; + + TBusClientSessionConfig Config() { + TBusClientSessionConfig r; + r.MaxMessageSize = 1; + return r; + } + + TMessageTooLargeClient(unsigned port) + : NullClient(TNetAddr("localhost", port), Config()) + { + } + + ~TMessageTooLargeClient() override { + Session->Shutdown(); + } + + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { + Y_UNUSED(mess); + + Y_VERIFY(status == MESSAGE_MESSAGE_TOO_LARGE, "wrong status: %s", ToCString(status)); + + GotTooLarge.Signal(); + } + }; + + Y_UNIT_TEST(MessageTooLargeOnClient) { + TObjectCountCheck objectCountCheck; + + NullServer server; + + TMessageTooLargeClient client(server.Session->GetActualListenPort()); + + EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); + + client.GotTooLarge.WaitI(); + } + + struct TCheckTimeoutClient: public NullClient { + ~TCheckTimeoutClient() override { + Session->Shutdown(); + } + + static TBusClientSessionConfig SessionConfig() { + TBusClientSessionConfig sessionConfig; + sessionConfig.SendTimeout = 1; + sessionConfig.ConnectTimeout = 1; + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10); + return sessionConfig; + } + + TCheckTimeoutClient(const TNetAddr& serverAddr) + : NullClient(serverAddr, SessionConfig()) + { + } + + TSystemEvent GotError; + + /// message that could not be delivered + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { + Y_UNUSED(mess); + Y_UNUSED(status); // TODO: check status + + GotError.Signal(); + } + }; + + Y_UNIT_TEST(SendTimeout_Callback_NoServer) { + TObjectCountCheck objectCountCheck; + + TCheckTimeoutClient client(TNetAddr("localhost", 17)); + + EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); + UNIT_ASSERT_EQUAL(ok, MESSAGE_OK); + + client.GotError.WaitI(); + } + + Y_UNIT_TEST(SendTimeout_Callback_HangingServer) { + THangingServer server; + + TObjectCountCheck objectCountCheck; + + TCheckTimeoutClient client(TNetAddr("localhost", server.GetPort())); + + bool first = true; + for (;;) { + EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); + if (ok == MESSAGE_BUSY) { + UNIT_ASSERT(!first); + break; + } + UNIT_ASSERT_VALUES_EQUAL(ok, MESSAGE_OK); + first = false; + } + + // BUGBUG: The test is buggy: the client might not get any error when sending one-way messages. + // All the messages that the client has sent before he gets first MESSAGE_BUSY error might get + // serailized and written to the socket buffer, so the write queue gets drained and there are + // no messages to timeout when periodic timeout check happens. + + client.GotError.WaitI(); + } +} diff --git a/library/cpp/messagebus/test/ut/starter_ut.cpp b/library/cpp/messagebus/test/ut/starter_ut.cpp new file mode 100644 index 0000000000..dd4d3aaa5e --- /dev/null +++ b/library/cpp/messagebus/test/ut/starter_ut.cpp @@ -0,0 +1,140 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <library/cpp/messagebus/test/helper/example_module.h> +#include <library/cpp/messagebus/test/helper/object_count_check.h> +#include <library/cpp/messagebus/test/helper/wait_for.h> + +using namespace NBus; +using namespace NBus::NTest; + +Y_UNIT_TEST_SUITE(TBusStarterTest) { + struct TStartJobTestModule: public TExampleModule { + using TBusModule::CreateDefaultStarter; + + TAtomic StartCount; + + TStartJobTestModule() + : StartCount(0) + { + } + + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + Y_UNUSED(mess); + AtomicIncrement(StartCount); + job->Sleep(10); + return &TStartJobTestModule::End; + } + + TJobHandler End(TBusJob* job, TBusMessage* mess) { + Y_UNUSED(mess); + AtomicIncrement(StartCount); + job->Cancel(MESSAGE_UNKNOWN); + return nullptr; + } + }; + + Y_UNIT_TEST(Test) { + TObjectCountCheck objectCountCheck; + + TBusMessageQueuePtr bus(CreateMessageQueue()); + + TStartJobTestModule module; + + //module.StartModule(); + module.CreatePrivateSessions(bus.Get()); + module.StartInput(); + + TBusSessionConfig config; + config.SendTimeout = 10; + + module.CreateDefaultStarter(*bus, config); + + UNIT_WAIT_FOR(AtomicGet(module.StartCount) >= 3); + + module.Shutdown(); + bus->Stop(); + } + + Y_UNIT_TEST(TestModuleStartJob) { + TObjectCountCheck objectCountCheck; + + TExampleProtocol proto; + + TStartJobTestModule module; + + TBusModuleConfig moduleConfig; + moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10); + module.SetConfig(moduleConfig); + + module.StartModule(); + + module.StartJob(new TExampleRequest(&proto.RequestCount)); + + UNIT_WAIT_FOR(AtomicGet(module.StartCount) != 2); + + module.Shutdown(); + } + + struct TSleepModule: public TExampleServerModule { + TSystemEvent MessageReceivedEvent; + + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + Y_UNUSED(mess); + + MessageReceivedEvent.Signal(); + + job->Sleep(1000000000); + + return TJobHandler(&TSleepModule::Never); + } + + TJobHandler Never(TBusJob*, TBusMessage*) { + Y_FAIL("happens"); + throw 1; + } + }; + + Y_UNIT_TEST(StartJobDestroyDuringSleep) { + TObjectCountCheck objectCountCheck; + + TExampleProtocol proto; + + TSleepModule module; + + module.StartModule(); + + module.StartJob(new TExampleRequest(&proto.StartCount)); + + module.MessageReceivedEvent.WaitI(); + + module.Shutdown(); + } + + struct TSendReplyModule: public TExampleServerModule { + TSystemEvent MessageReceivedEvent; + + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + Y_UNUSED(mess); + + job->SendReply(new TExampleResponse(&Proto.ResponseCount)); + + MessageReceivedEvent.Signal(); + + return nullptr; + } + }; + + Y_UNIT_TEST(AllowSendReplyInStarted) { + TObjectCountCheck objectCountCheck; + + TExampleProtocol proto; + + TSendReplyModule module; + module.StartModule(); + module.StartJob(new TExampleRequest(&proto.StartCount)); + + module.MessageReceivedEvent.WaitI(); + + module.Shutdown(); + } +} diff --git a/library/cpp/messagebus/test/ut/sync_client_ut.cpp b/library/cpp/messagebus/test/ut/sync_client_ut.cpp new file mode 100644 index 0000000000..400128193f --- /dev/null +++ b/library/cpp/messagebus/test/ut/sync_client_ut.cpp @@ -0,0 +1,69 @@ +#include <library/cpp/messagebus/test/helper/example.h> +#include <library/cpp/messagebus/test/helper/object_count_check.h> + +namespace NBus { + namespace NTest { + using namespace std; + + //////////////////////////////////////////////////////////////////// + /// \brief Client for sending synchronous message to local server + struct TSyncClient { + TNetAddr ServerAddr; + + TExampleProtocol Proto; + TBusMessageQueuePtr Bus; + TBusSyncClientSessionPtr Session; + + int NumReplies; + int NumMessages; + + /// constructor creates instances of queue, protocol and session + TSyncClient(const TNetAddr& serverAddr) + : ServerAddr(serverAddr) + { + /// create or get instance of message queue, need one per application + Bus = CreateMessageQueue(); + + NumReplies = 0; + NumMessages = 10; + + /// register source/client session + TBusClientSessionConfig sessionConfig; + Session = Bus->CreateSyncSource(&Proto, sessionConfig); + Session->RegisterService("localhost"); + } + + ~TSyncClient() { + Session->Shutdown(); + } + + /// dispatch of requests is done here + void Work() { + for (int i = 0; i < NumMessages; i++) { + THolder<TExampleRequest> mess(new TExampleRequest(&Proto.RequestCount)); + EMessageStatus status; + THolder<TBusMessage> reply(Session->SendSyncMessage(mess.Get(), status, &ServerAddr)); + if (!!reply) { + NumReplies++; + } + } + } + }; + + Y_UNIT_TEST_SUITE(SyncClientTest) { + Y_UNIT_TEST(TestSync) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + TSyncClient client(server.GetActualListenAddr()); + client.Work(); + // assert correct number of replies + UNIT_ASSERT_EQUAL(client.NumReplies, client.NumMessages); + // assert that there is no message left in flight + UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0); + UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0); + } + } + + } +} diff --git a/library/cpp/messagebus/test/ut/ya.make b/library/cpp/messagebus/test/ut/ya.make new file mode 100644 index 0000000000..fe1b4961d6 --- /dev/null +++ b/library/cpp/messagebus/test/ut/ya.make @@ -0,0 +1,56 @@ +OWNER(g:messagebus) + +UNITTEST_FOR(library/cpp/messagebus) + +TIMEOUT(1200) + +SIZE(LARGE) + +TAG( + ya:not_autocheck + ya:fat +) + +FORK_SUBTESTS() + +PEERDIR( + library/cpp/testing/unittest_main + library/cpp/messagebus + library/cpp/messagebus/test/helper + library/cpp/messagebus/www +) + +SRCS( + messagebus_ut.cpp + module_client_ut.cpp + module_client_one_way_ut.cpp + module_server_ut.cpp + one_way_ut.cpp + starter_ut.cpp + sync_client_ut.cpp + locator_uniq_ut.cpp + ../../actor/actor_ut.cpp + ../../actor/ring_buffer_ut.cpp + ../../actor/tasks_ut.cpp + ../../actor/what_thread_does_guard_ut.cpp + ../../async_result_ut.cpp + ../../cc_semaphore_ut.cpp + ../../coreconn_ut.cpp + ../../duration_histogram_ut.cpp + ../../message_status_counter_ut.cpp + ../../misc/weak_ptr_ut.cpp + ../../latch_ut.cpp + ../../lfqueue_batch_ut.cpp + ../../local_flags_ut.cpp + ../../memory_ut.cpp + ../../moved_ut.cpp + ../../netaddr_ut.cpp + ../../network_ut.cpp + ../../nondestroying_holder_ut.cpp + ../../scheduler_actor_ut.cpp + ../../scheduler/scheduler_ut.cpp + ../../socket_addr_ut.cpp + ../../vector_swaps_ut.cpp +) + +END() |