aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/test/ut
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/test/ut
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/test/ut')
-rw-r--r--library/cpp/messagebus/test/ut/count_down_latch.h30
-rw-r--r--library/cpp/messagebus/test/ut/locator_uniq_ut.cpp40
-rw-r--r--library/cpp/messagebus/test/ut/messagebus_ut.cpp1151
-rw-r--r--library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp143
-rw-r--r--library/cpp/messagebus/test/ut/module_client_ut.cpp368
-rw-r--r--library/cpp/messagebus/test/ut/module_server_ut.cpp119
-rw-r--r--library/cpp/messagebus/test/ut/moduletest.h221
-rw-r--r--library/cpp/messagebus/test/ut/one_way_ut.cpp255
-rw-r--r--library/cpp/messagebus/test/ut/starter_ut.cpp140
-rw-r--r--library/cpp/messagebus/test/ut/sync_client_ut.cpp69
-rw-r--r--library/cpp/messagebus/test/ut/ya.make56
11 files changed, 2592 insertions, 0 deletions
diff --git a/library/cpp/messagebus/test/ut/count_down_latch.h b/library/cpp/messagebus/test/ut/count_down_latch.h
new file mode 100644
index 0000000000..5117db5731
--- /dev/null
+++ b/library/cpp/messagebus/test/ut/count_down_latch.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include <util/system/atomic.h>
+#include <util/system/event.h>
+
+class TCountDownLatch {
+private:
+ TAtomic Current;
+ TSystemEvent EventObject;
+
+public:
+ TCountDownLatch(unsigned initial)
+ : Current(initial)
+ {
+ }
+
+ void CountDown() {
+ if (AtomicDecrement(Current) == 0) {
+ EventObject.Signal();
+ }
+ }
+
+ void Await() {
+ EventObject.Wait();
+ }
+
+ bool Await(TDuration timeout) {
+ return EventObject.WaitT(timeout);
+ }
+};
diff --git a/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp b/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp
new file mode 100644
index 0000000000..3fdd175d73
--- /dev/null
+++ b/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp
@@ -0,0 +1,40 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <library/cpp/messagebus/test_utils.h>
+#include <library/cpp/messagebus/ybus.h>
+
+class TLocatorRegisterUniqTest: public TTestBase {
+ UNIT_TEST_SUITE(TLocatorRegisterUniqTest);
+ UNIT_TEST(TestRegister);
+ UNIT_TEST_SUITE_END();
+
+protected:
+ void TestRegister();
+};
+
+UNIT_TEST_SUITE_REGISTRATION(TLocatorRegisterUniqTest);
+
+void TLocatorRegisterUniqTest::TestRegister() {
+ ASSUME_IP_V4_ENABLED;
+
+ NBus::TBusLocator locator;
+ const char* serviceName = "TestService";
+ const char* hostName = "192.168.0.42";
+ int port = 31337;
+
+ NBus::TBusKeyVec keys;
+ locator.LocateKeys(serviceName, keys);
+ UNIT_ASSERT(keys.size() == 0);
+
+ locator.Register(serviceName, hostName, port);
+ locator.LocateKeys(serviceName, keys);
+ /// YBUS_KEYMIN YBUS_KEYMAX range
+ UNIT_ASSERT(keys.size() == 1);
+
+ TVector<NBus::TNetAddr> hosts;
+ UNIT_ASSERT(locator.LocateAll(serviceName, NBus::YBUS_KEYMIN, hosts) == 1);
+
+ locator.Register(serviceName, hostName, port);
+ hosts.clear();
+ UNIT_ASSERT(locator.LocateAll(serviceName, NBus::YBUS_KEYMIN, hosts) == 1);
+}
diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
new file mode 100644
index 0000000000..040f9b7702
--- /dev/null
+++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
@@ -0,0 +1,1151 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <library/cpp/messagebus/test/helper/example.h>
+#include <library/cpp/messagebus/test/helper/fixed_port.h>
+#include <library/cpp/messagebus/test/helper/hanging_server.h>
+#include <library/cpp/messagebus/test/helper/object_count_check.h>
+#include <library/cpp/messagebus/test/helper/wait_for.h>
+
+#include <library/cpp/messagebus/misc/test_sync.h>
+
+#include <util/network/sock.h>
+
+#include <utility>
+
+using namespace NBus;
+using namespace NBus::NTest;
+
+namespace {
+ struct TExampleClientSlowOnMessageSent: public TExampleClient {
+ TAtomic SentCompleted;
+
+ TSystemEvent ReplyReceived;
+
+ TExampleClientSlowOnMessageSent()
+ : SentCompleted(0)
+ {
+ }
+
+ ~TExampleClientSlowOnMessageSent() override {
+ Session->Shutdown();
+ }
+
+ void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override {
+ Y_VERIFY(AtomicGet(SentCompleted), "must be completed");
+
+ TExampleClient::OnReply(mess, reply);
+
+ ReplyReceived.Signal();
+ }
+
+ void OnMessageSent(TBusMessage*) override {
+ Sleep(TDuration::MilliSeconds(100));
+ AtomicSet(SentCompleted, 1);
+ }
+ };
+
+}
+
+Y_UNIT_TEST_SUITE(TMessageBusTests) {
+ void TestDestinationTemplate(bool useCompression, bool ackMessageBeforeReply,
+ const TBusServerSessionConfig& sessionConfig) {
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+
+ TExampleClient client(sessionConfig);
+ client.CrashOnError = true;
+
+ server.UseCompression = useCompression;
+ client.UseCompression = useCompression;
+
+ server.AckMessageBeforeSendReply = ackMessageBeforeReply;
+
+ client.SendMessagesWaitReplies(100, server.GetActualListenAddr());
+ UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0);
+ UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0);
+ }
+
+ Y_UNIT_TEST(TestDestination) {
+ TestDestinationTemplate(false, false, TBusServerSessionConfig());
+ }
+
+ Y_UNIT_TEST(TestDestinationUsingAck) {
+ TestDestinationTemplate(false, true, TBusServerSessionConfig());
+ }
+
+ Y_UNIT_TEST(TestDestinationWithCompression) {
+ TestDestinationTemplate(true, false, TBusServerSessionConfig());
+ }
+
+ Y_UNIT_TEST(TestCork) {
+ TBusServerSessionConfig config;
+ config.SendThreshold = 1000000000000;
+ config.Cork = TDuration::MilliSeconds(10);
+ TestDestinationTemplate(false, false, config);
+ // TODO: test for cork hanging
+ }
+
+ Y_UNIT_TEST(TestReconnect) {
+ if (!IsFixedPortTestAllowed()) {
+ return;
+ }
+
+ TObjectCountCheck objectCountCheck;
+
+ unsigned port = FixedPort;
+ TNetAddr serverAddr("localhost", port);
+ THolder<TExampleServer> server;
+
+ TBusClientSessionConfig clientConfig;
+ clientConfig.RetryInterval = 0;
+ TExampleClient client(clientConfig);
+
+ server.Reset(new TExampleServer(port, "TExampleServer 1"));
+
+ client.SendMessagesWaitReplies(17, serverAddr);
+
+ server.Destroy();
+
+ // Making the client to detect disconnection.
+ client.SendMessages(1, serverAddr);
+ EMessageStatus error = client.WaitForError();
+ if (error == MESSAGE_DELIVERY_FAILED) {
+ client.SendMessages(1, serverAddr);
+ error = client.WaitForError();
+ }
+ UNIT_ASSERT_VALUES_EQUAL(MESSAGE_CONNECT_FAILED, error);
+
+ server.Reset(new TExampleServer(port, "TExampleServer 2"));
+
+ client.SendMessagesWaitReplies(19, serverAddr);
+ }
+
+ struct TestNoServerImplClient: public TExampleClient {
+ TTestSync TestSync;
+ int failures = 0;
+
+ template <typename... Args>
+ TestNoServerImplClient(Args&&... args)
+ : TExampleClient(std::forward<Args>(args)...)
+ {
+ }
+
+ ~TestNoServerImplClient() override {
+ Session->Shutdown();
+ }
+
+ void OnError(TAutoPtr<TBusMessage> message, EMessageStatus status) override {
+ Y_UNUSED(message);
+
+ Y_VERIFY(status == MESSAGE_CONNECT_FAILED, "must be MESSAGE_CONNECT_FAILED, got %s", ToString(status).data());
+
+ TestSync.CheckAndIncrement((failures++) * 2);
+ }
+ };
+
+ void TestNoServerImpl(unsigned port, bool oneWay) {
+ TNetAddr noServerAddr("localhost", port);
+
+ TestNoServerImplClient client;
+
+ int count = 0;
+ for (; count < 200; ++count) {
+ EMessageStatus status;
+ if (oneWay) {
+ status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr);
+ } else {
+ TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
+ status = client.Session->SendMessageAutoPtr(message, &noServerAddr);
+ }
+
+ Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
+
+ if (count == 0) {
+ // lame way to wait until it is connected
+ Sleep(TDuration::MilliSeconds(10));
+ }
+ client.TestSync.WaitForAndIncrement(count * 2 + 1);
+ }
+
+ client.TestSync.WaitForAndIncrement(count * 2);
+ }
+
+ void HangingServerImpl(unsigned port) {
+ TNetAddr noServerAddr("localhost", port);
+
+ TExampleClient client;
+
+ int count = 0;
+ for (;; ++count) {
+ TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
+ EMessageStatus status = client.Session->SendMessageAutoPtr(message, &noServerAddr);
+ if (status == MESSAGE_BUSY) {
+ break;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(int(MESSAGE_OK), int(status));
+
+ if (count == 0) {
+ // lame way to wait until it is connected
+ Sleep(TDuration::MilliSeconds(10));
+ }
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(client.Session->GetConfig()->MaxInFlight, count);
+ }
+
+ Y_UNIT_TEST(TestHangindServer) {
+ TObjectCountCheck objectCountCheck;
+
+ THangingServer server(0);
+
+ HangingServerImpl(server.GetPort());
+ }
+
+ Y_UNIT_TEST(TestNoServer) {
+ TObjectCountCheck objectCountCheck;
+
+ TestNoServerImpl(17, false);
+ }
+
+ Y_UNIT_TEST(PauseInput) {
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+ server.Session->PauseInput(true);
+
+ TBusClientSessionConfig clientConfig;
+ clientConfig.MaxInFlight = 1000;
+ TExampleClient client(clientConfig);
+
+ client.SendMessages(100, server.GetActualListenAddr());
+
+ server.TestSync.Check(0);
+
+ server.Session->PauseInput(false);
+
+ server.TestSync.WaitFor(100);
+
+ client.WaitReplies();
+
+ server.Session->PauseInput(true);
+
+ client.SendMessages(200, server.GetActualListenAddr());
+
+ server.TestSync.Check(100);
+
+ server.Session->PauseInput(false);
+
+ server.TestSync.WaitFor(300);
+
+ client.WaitReplies();
+ }
+
+ struct TSendTimeoutCheckerExampleClient: public TExampleClient {
+ static TBusClientSessionConfig SessionConfig(bool periodLessThanConnectTimeout) {
+ TBusClientSessionConfig sessionConfig;
+ if (periodLessThanConnectTimeout) {
+ sessionConfig.SendTimeout = 1;
+ sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(50);
+ } else {
+ sessionConfig.SendTimeout = 50;
+ sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1);
+ }
+ return sessionConfig;
+ }
+
+ TSendTimeoutCheckerExampleClient(bool periodLessThanConnectTimeout)
+ : TExampleClient(SessionConfig(periodLessThanConnectTimeout))
+ {
+ }
+
+ ~TSendTimeoutCheckerExampleClient() override {
+ Session->Shutdown();
+ }
+
+ TSystemEvent ErrorHappened;
+
+ void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override {
+ Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "got status: %s", ToString(status).data());
+ ErrorHappened.Signal();
+ }
+ };
+
+ void NoServer_SendTimeout_Callback_Impl(bool periodLessThanConnectTimeout) {
+ TObjectCountCheck objectCountCheck;
+
+ TNetAddr serverAddr("localhost", 17);
+
+ TSendTimeoutCheckerExampleClient client(periodLessThanConnectTimeout);
+
+ client.SendMessages(1, serverAddr);
+
+ client.ErrorHappened.WaitI();
+ }
+
+ Y_UNIT_TEST(NoServer_SendTimeout_Callback_PeriodLess) {
+ NoServer_SendTimeout_Callback_Impl(true);
+ }
+
+ Y_UNIT_TEST(NoServer_SendTimeout_Callback_TimeoutLess) {
+ NoServer_SendTimeout_Callback_Impl(false);
+ }
+
+ Y_UNIT_TEST(TestOnReplyCalledAfterOnMessageSent) {
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+ TNetAddr serverAddr = server.GetActualListenAddr();
+ TExampleClientSlowOnMessageSent client;
+
+ TAutoPtr<TExampleRequest> message(new TExampleRequest(&client.Proto.RequestCount));
+ EMessageStatus s = client.Session->SendMessageAutoPtr(message, &serverAddr);
+ UNIT_ASSERT_EQUAL(s, MESSAGE_OK);
+
+ UNIT_ASSERT(client.ReplyReceived.WaitT(TDuration::Seconds(5)));
+ }
+
+ struct TDelayReplyServer: public TBusServerHandlerError {
+ TBusMessageQueuePtr Bus;
+ TExampleProtocol Proto;
+ TSystemEvent MessageReceivedEvent; // 1 wait for 1 message
+ TBusServerSessionPtr Session;
+ TMutex Lock_;
+ TDeque<TAutoPtr<TOnMessageContext>> DelayedMessages;
+
+ TDelayReplyServer()
+ : MessageReceivedEvent(TEventResetType::rAuto)
+ {
+ Bus = CreateMessageQueue("TDelayReplyServer");
+ TBusServerSessionConfig sessionConfig;
+ sessionConfig.SendTimeout = 1000;
+ sessionConfig.TotalTimeout = 2001;
+ Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
+ if (!Session) {
+ ythrow yexception() << "Failed to create destination session";
+ }
+ }
+
+ void OnMessage(TOnMessageContext& mess) override {
+ Y_VERIFY(mess.IsConnectionAlive(), "connection should be alive here");
+ TAutoPtr<TOnMessageContext> delayedMsg(new TOnMessageContext);
+ delayedMsg->Swap(mess);
+ auto g(Guard(Lock_));
+ DelayedMessages.push_back(delayedMsg);
+ MessageReceivedEvent.Signal();
+ }
+
+ bool CheckClientIsAlive() {
+ auto g(Guard(Lock_));
+ for (auto& delayedMessage : DelayedMessages) {
+ if (!delayedMessage->IsConnectionAlive()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool CheckClientIsDead() const {
+ auto g(Guard(Lock_));
+ for (const auto& delayedMessage : DelayedMessages) {
+ if (delayedMessage->IsConnectionAlive()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ void ReplyToDelayedMessages() {
+ while (true) {
+ TOnMessageContext msg;
+ {
+ auto g(Guard(Lock_));
+ if (DelayedMessages.empty()) {
+ break;
+ }
+ DelayedMessages.front()->Swap(msg);
+ DelayedMessages.pop_front();
+ }
+ TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount));
+ msg.SendReplyMove(reply);
+ }
+ }
+
+ size_t GetDelayedMessageCount() const {
+ auto g(Guard(Lock_));
+ return DelayedMessages.size();
+ }
+
+ void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
+ Y_UNUSED(mess);
+ Y_VERIFY(status == MESSAGE_SHUTDOWN, "only shutdown allowed, got %s", ToString(status).data());
+ }
+ };
+
+ Y_UNIT_TEST(TestReplyCalledAfterClientDisconnected) {
+ TObjectCountCheck objectCountCheck;
+
+ TDelayReplyServer server;
+
+ THolder<TExampleClient> client(new TExampleClient);
+
+ client->SendMessages(1, TNetAddr("localhost", server.Session->GetActualListenPort()));
+
+ UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));
+
+ UNIT_ASSERT_VALUES_EQUAL(1, server.Session->GetInFlight());
+
+ client.Destroy();
+
+ UNIT_WAIT_FOR(server.CheckClientIsDead());
+
+ server.ReplyToDelayedMessages();
+
+ // wait until all server message are delivered
+ UNIT_WAIT_FOR(0 == server.Session->GetInFlight());
+ }
+
+ struct TPackUnpackServer: public TBusServerHandlerError {
+ TBusMessageQueuePtr Bus;
+ TExampleProtocol Proto;
+ TSystemEvent MessageReceivedEvent;
+ TSystemEvent ClientDiedEvent;
+ TBusServerSessionPtr Session;
+
+ TPackUnpackServer() {
+ Bus = CreateMessageQueue("TPackUnpackServer");
+ TBusServerSessionConfig sessionConfig;
+ Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
+ }
+
+ void OnMessage(TOnMessageContext& mess) override {
+ TBusIdentity ident;
+ mess.AckMessage(ident);
+
+ char packed[BUS_IDENTITY_PACKED_SIZE];
+ ident.Pack(packed);
+ TBusIdentity resurrected;
+ resurrected.Unpack(packed);
+
+ mess.GetSession()->SendReply(resurrected, new TExampleResponse(&Proto.ResponseCount));
+ }
+
+ void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
+ Y_UNUSED(mess);
+ Y_VERIFY(status == MESSAGE_SHUTDOWN, "only shutdown allowed");
+ }
+ };
+
+ Y_UNIT_TEST(PackUnpack) {
+ TObjectCountCheck objectCountCheck;
+
+ TPackUnpackServer server;
+
+ THolder<TExampleClient> client(new TExampleClient);
+
+ client->SendMessagesWaitReplies(1, TNetAddr("localhost", server.Session->GetActualListenPort()));
+ }
+
+ Y_UNIT_TEST(ClientRequestTooLarge) {
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+
+ TBusClientSessionConfig clientConfig;
+ clientConfig.MaxMessageSize = 100;
+ TExampleClient client(clientConfig);
+
+ client.DataSize = 10;
+ client.SendMessagesWaitReplies(1, server.GetActualListenAddr());
+
+ client.DataSize = 1000;
+ client.SendMessages(1, server.GetActualListenAddr());
+ client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE);
+
+ client.DataSize = 20;
+ client.SendMessagesWaitReplies(10, server.GetActualListenAddr());
+
+ client.DataSize = 10000;
+ client.SendMessages(1, server.GetActualListenAddr());
+ client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE);
+ }
+
+ struct TServerForResponseTooLarge: public TExampleServer {
+ TTestSync TestSync;
+
+ static TBusServerSessionConfig Config() {
+ TBusServerSessionConfig config;
+ config.MaxMessageSize = 100;
+ return config;
+ }
+
+ TServerForResponseTooLarge()
+ : TExampleServer("TServerForResponseTooLarge", Config())
+ {
+ }
+
+ ~TServerForResponseTooLarge() override {
+ Session->Shutdown();
+ }
+
+ void OnMessage(TOnMessageContext& mess) override {
+ TAutoPtr<TBusMessage> response;
+
+ if (TestSync.Get() == 0) {
+ TestSync.CheckAndIncrement(0);
+ response.Reset(new TExampleResponse(&Proto.ResponseCount, 1000));
+ } else {
+ TestSync.WaitForAndIncrement(3);
+ response.Reset(new TExampleResponse(&Proto.ResponseCount, 10));
+ }
+
+ mess.SendReplyMove(response);
+ }
+
+ void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override {
+ TestSync.WaitForAndIncrement(1);
+
+ Y_VERIFY(status == MESSAGE_MESSAGE_TOO_LARGE, "status");
+ }
+ };
+
+ Y_UNIT_TEST(ServerResponseTooLarge) {
+ TObjectCountCheck objectCountCheck;
+
+ TServerForResponseTooLarge server;
+
+ TExampleClient client;
+ client.DataSize = 10;
+
+ client.SendMessages(1, server.GetActualListenAddr());
+ server.TestSync.WaitForAndIncrement(2);
+ client.ResetCounters();
+
+ client.SendMessages(1, server.GetActualListenAddr());
+
+ client.WorkDone.WaitI();
+
+ server.TestSync.CheckAndIncrement(4);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, client.Session->GetInFlight());
+ }
+
+ struct TServerForRequestTooLarge: public TExampleServer {
+ TTestSync TestSync;
+
+ static TBusServerSessionConfig Config() {
+ TBusServerSessionConfig config;
+ config.MaxMessageSize = 100;
+ return config;
+ }
+
+ TServerForRequestTooLarge()
+ : TExampleServer("TServerForRequestTooLarge", Config())
+ {
+ }
+
+ ~TServerForRequestTooLarge() override {
+ Session->Shutdown();
+ }
+
+ void OnMessage(TOnMessageContext& req) override {
+ unsigned n = TestSync.Get();
+ if (n < 2) {
+ TestSync.CheckAndIncrement(n);
+ TAutoPtr<TExampleResponse> resp(new TExampleResponse(&Proto.ResponseCount, 10));
+ req.SendReplyMove(resp);
+ } else {
+ Y_FAIL("wrong");
+ }
+ }
+ };
+
+ Y_UNIT_TEST(ServerRequestTooLarge) {
+ TObjectCountCheck objectCountCheck;
+
+ TServerForRequestTooLarge server;
+
+ TExampleClient client;
+ client.DataSize = 10;
+
+ client.SendMessagesWaitReplies(2, server.GetActualListenAddr());
+
+ server.TestSync.CheckAndIncrement(2);
+
+ client.DataSize = 200;
+ client.SendMessages(1, server.GetActualListenAddr());
+ // server closes connection, so MESSAGE_DELIVERY_FAILED is returned to client
+ client.WaitForError(MESSAGE_DELIVERY_FAILED);
+ }
+
+ Y_UNIT_TEST(ClientResponseTooLarge) {
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+
+ server.DataSize = 10;
+
+ TBusClientSessionConfig clientSessionConfig;
+ clientSessionConfig.MaxMessageSize = 100;
+ TExampleClient client(clientSessionConfig);
+ client.DataSize = 10;
+
+ client.SendMessagesWaitReplies(3, server.GetActualListenAddr());
+
+ server.DataSize = 1000;
+
+ client.SendMessages(1, server.GetActualListenAddr());
+ client.WaitForError(MESSAGE_DELIVERY_FAILED);
+ }
+
+ Y_UNIT_TEST(ServerUnknownMessage) {
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+ TNetAddr serverAddr = server.GetActualListenAddr();
+
+ TExampleClient client;
+
+ client.SendMessagesWaitReplies(2, serverAddr);
+
+ TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount));
+ req->GetHeader()->Type = 11;
+ client.Session->SendMessageAutoPtr(req, &serverAddr);
+ client.MessageCount = 1;
+
+ client.WaitForError(MESSAGE_DELIVERY_FAILED);
+ }
+
+ Y_UNIT_TEST(ServerMessageReservedIds) {
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+ TNetAddr serverAddr = server.GetActualListenAddr();
+
+ TExampleClient client;
+
+ client.SendMessagesWaitReplies(2, serverAddr);
+
+ // This test doens't check 0, 1, YBUS_KEYINVALID because there are asserts() on sending side
+
+ TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount));
+ req->GetHeader()->Id = 2;
+ client.Session->SendMessageAutoPtr(req, &serverAddr);
+ client.MessageCount = 1;
+ client.WaitForError(MESSAGE_DELIVERY_FAILED);
+
+ req.Reset(new TExampleRequest(&client.Proto.RequestCount));
+ req->GetHeader()->Id = YBUS_KEYLOCAL;
+ client.Session->SendMessageAutoPtr(req, &serverAddr);
+ client.MessageCount = 1;
+ client.WaitForError(MESSAGE_DELIVERY_FAILED);
+ }
+
+ Y_UNIT_TEST(TestGetInFlightForDestination) {
+ TObjectCountCheck objectCountCheck;
+
+ TDelayReplyServer server;
+
+ TExampleClient client;
+
+ TNetAddr addr("localhost", server.Session->GetActualListenPort());
+
+ UNIT_ASSERT_VALUES_EQUAL(size_t(0), client.Session->GetInFlight(addr));
+
+ client.SendMessages(2, &addr);
+
+ for (size_t i = 0; i < 5; ++i) {
+ // One MessageReceivedEvent indicates one message, we need to wait for two
+ UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));
+ if (server.GetDelayedMessageCount() == 2) {
+ break;
+ }
+ }
+ UNIT_ASSERT_VALUES_EQUAL(server.GetDelayedMessageCount(), 2);
+
+ size_t inFlight = client.Session->GetInFlight(addr);
+ // 4 is for messagebus1 that adds inFlight counter twice for some reason
+ UNIT_ASSERT(inFlight == 2 || inFlight == 4);
+
+ UNIT_ASSERT(server.CheckClientIsAlive());
+
+ server.ReplyToDelayedMessages();
+
+ client.WaitReplies();
+ }
+
+ struct TResetAfterSendOneWayErrorInCallbackClient: public TExampleClient {
+ TTestSync TestSync;
+
+ static TBusClientSessionConfig SessionConfig() {
+ TBusClientSessionConfig config;
+ // 1 ms is not enough when test is running under valgrind
+ config.ConnectTimeout = 10;
+ config.SendTimeout = 10;
+ config.Secret.TimeoutPeriod = TDuration::MilliSeconds(1);
+ return config;
+ }
+
+ TResetAfterSendOneWayErrorInCallbackClient()
+ : TExampleClient(SessionConfig())
+ {
+ }
+
+ ~TResetAfterSendOneWayErrorInCallbackClient() override {
+ Session->Shutdown();
+ }
+
+ void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
+ TestSync.WaitForAndIncrement(0);
+ Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "must be connection failed, got %s", ToString(status).data());
+ mess.Destroy();
+ TestSync.CheckAndIncrement(1);
+ }
+ };
+
+ Y_UNIT_TEST(ResetAfterSendOneWayErrorInCallback) {
+ TObjectCountCheck objectCountCheck;
+
+ TNetAddr noServerAddr("localhost", 17);
+
+ TResetAfterSendOneWayErrorInCallbackClient client;
+
+ EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr);
+ UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
+
+ client.TestSync.WaitForAndIncrement(2);
+ }
+
+ struct TResetAfterSendMessageOneWayDuringShutdown: public TExampleClient {
+ TTestSync TestSync;
+
+ ~TResetAfterSendMessageOneWayDuringShutdown() override {
+ Session->Shutdown();
+ }
+
+ void OnError(TAutoPtr<TBusMessage> message, EMessageStatus status) override {
+ TestSync.CheckAndIncrement(0);
+
+ Y_VERIFY(status == MESSAGE_CONNECT_FAILED, "must be MESSAGE_CONNECT_FAILED, got %s", ToString(status).data());
+
+ // check reset is possible here
+ message->Reset();
+
+ // intentionally don't destroy the message
+ // we will try to resend it
+ Y_UNUSED(message.Release());
+
+ TestSync.CheckAndIncrement(1);
+ }
+ };
+
+ Y_UNIT_TEST(ResetAfterSendMessageOneWayDuringShutdown) {
+ TObjectCountCheck objectCountCheck;
+
+ TNetAddr noServerAddr("localhost", 17);
+
+ TResetAfterSendMessageOneWayDuringShutdown client;
+
+ TExampleRequest* message = new TExampleRequest(&client.Proto.RequestCount);
+ EMessageStatus ok = client.Session->SendMessageOneWay(message, &noServerAddr);
+ UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
+
+ client.TestSync.WaitForAndIncrement(2);
+
+ client.Session->Shutdown();
+
+ ok = client.Session->SendMessageOneWay(message);
+ Y_VERIFY(ok == MESSAGE_SHUTDOWN, "must be shutdown when sending during shutdown, got %s", ToString(ok).data());
+
+ // check reset is possible here
+ message->Reset();
+ client.TestSync.CheckAndIncrement(3);
+
+ delete message;
+ }
+
+ Y_UNIT_TEST(ResetAfterSendOneWayErrorInReturn) {
+ TObjectCountCheck objectCountCheck;
+
+ TestNoServerImpl(17, true);
+ }
+
+ struct TResetAfterSendOneWaySuccessClient: public TExampleClient {
+ TTestSync TestSync;
+
+ ~TResetAfterSendOneWaySuccessClient() override {
+ Session->Shutdown();
+ }
+
+ void OnMessageSentOneWay(TAutoPtr<TBusMessage> sent) override {
+ TestSync.WaitForAndIncrement(0);
+ sent->Reset();
+ TestSync.CheckAndIncrement(1);
+ }
+ };
+
+ Y_UNIT_TEST(ResetAfterSendOneWaySuccess) {
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+ TNetAddr serverAddr = server.GetActualListenAddr();
+
+ TResetAfterSendOneWaySuccessClient client;
+
+ EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &serverAddr);
+ UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
+ // otherwize message might go to OnError(MESSAGE_SHUTDOWN)
+ server.WaitForOnMessageCount(1);
+
+ client.TestSync.WaitForAndIncrement(2);
+ }
+
+ Y_UNIT_TEST(GetStatus) {
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+
+ TExampleClient client;
+ // make sure connected
+ client.SendMessagesWaitReplies(3, server.GetActualListenAddr());
+
+ server.Bus->GetStatus();
+ server.Bus->GetStatus();
+ server.Bus->GetStatus();
+
+ client.Bus->GetStatus();
+ client.Bus->GetStatus();
+ client.Bus->GetStatus();
+ }
+
+ Y_UNIT_TEST(BindOnRandomPort) {
+ TObjectCountCheck objectCountCheck;
+
+ TBusServerSessionConfig serverConfig;
+ TExampleServer server;
+
+ TExampleClient client;
+ TNetAddr addr(TNetAddr("127.0.0.1", server.Session->GetActualListenPort()));
+ client.SendMessagesWaitReplies(3, &addr);
+ }
+
+ Y_UNIT_TEST(UnbindOnShutdown) {
+ TBusMessageQueuePtr queue(CreateMessageQueue());
+
+ TExampleProtocol proto;
+ TBusServerHandlerError handler;
+ TBusServerSessionPtr session = TBusServerSession::Create(
+ &proto, &handler, TBusServerSessionConfig(), queue);
+
+ unsigned port = session->GetActualListenPort();
+ UNIT_ASSERT(port > 0);
+
+ session->Shutdown();
+
+ // fails is Shutdown() didn't unbind
+ THangingServer hangingServer(port);
+ }
+
+ Y_UNIT_TEST(VersionNegotiation) {
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+
+ TSockAddrInet addr(IpFromString("127.0.0.1"), server.Session->GetActualListenPort());
+
+ TInetStreamSocket socket;
+ int r1 = socket.Connect(&addr);
+ UNIT_ASSERT(r1 >= 0);
+
+ TStreamSocketOutput output(&socket);
+
+ TBusHeader request;
+ Zero(request);
+ request.Size = sizeof(request);
+ request.SetVersionInternal(0xF); // max
+ output.Write(&request, sizeof(request));
+
+ UNIT_ASSERT_VALUES_EQUAL(IsVersionNegotiation(request), true);
+
+ TStreamSocketInput input(&socket);
+
+ TBusHeader response;
+ size_t pos = 0;
+
+ while (pos < sizeof(response)) {
+ size_t count = input.Read(((char*)&response) + pos, sizeof(response) - pos);
+ pos += count;
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(sizeof(response), pos);
+
+ UNIT_ASSERT_VALUES_EQUAL(YBUS_VERSION, response.GetVersionInternal());
+ }
+
+ struct TOnConnectionEventClient: public TExampleClient {
+ TTestSync Sync;
+
+ ~TOnConnectionEventClient() override {
+ Session->Shutdown();
+ }
+
+ void OnClientConnectionEvent(const TClientConnectionEvent& event) override {
+ if (Sync.Get() > 2) {
+ // Test OnClientConnectionEvent_Disconnect is broken.
+ // Sometimes reconnect happens during server shutdown
+ // when acceptor connections is still alive, and
+ // server connection is already closed
+ return;
+ }
+
+ if (event.GetType() == TClientConnectionEvent::CONNECTED) {
+ Sync.WaitForAndIncrement(0);
+ } else if (event.GetType() == TClientConnectionEvent::DISCONNECTED) {
+ Sync.WaitForAndIncrement(2);
+ }
+ }
+
+ void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override {
+ // We do not check for message errors in this test.
+ }
+
+ void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override {
+ }
+ };
+
+ struct TOnConnectionEventServer: public TExampleServer {
+ TOnConnectionEventServer()
+ : TExampleServer("TOnConnectionEventServer")
+ {
+ }
+
+ ~TOnConnectionEventServer() override {
+ Session->Shutdown();
+ }
+
+ void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override {
+ // We do not check for server message errors in this test.
+ }
+ };
+
+ Y_UNIT_TEST(OnClientConnectionEvent_Shutdown) {
+ TObjectCountCheck objectCountCheck;
+
+ TOnConnectionEventServer server;
+
+ TOnConnectionEventClient client;
+
+ TNetAddr addr("127.0.0.1", server.Session->GetActualListenPort());
+
+ client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr);
+
+ client.Sync.WaitForAndIncrement(1);
+
+ client.Session->Shutdown();
+
+ client.Sync.WaitForAndIncrement(3);
+ }
+
+ Y_UNIT_TEST(OnClientConnectionEvent_Disconnect) {
+ TObjectCountCheck objectCountCheck;
+
+ THolder<TOnConnectionEventServer> server(new TOnConnectionEventServer);
+
+ TOnConnectionEventClient client;
+ TNetAddr addr("127.0.0.1", server->Session->GetActualListenPort());
+
+ client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr);
+
+ client.Sync.WaitForAndIncrement(1);
+
+ server.Destroy();
+
+ client.Sync.WaitForAndIncrement(3);
+ }
+
+ struct TServerForQuotaWake: public TExampleServer {
+ TSystemEvent GoOn;
+ TMutex OneLock;
+
+ TOnMessageContext OneMessage;
+
+ static TBusServerSessionConfig Config() {
+ TBusServerSessionConfig config;
+
+ config.PerConnectionMaxInFlight = 1;
+ config.PerConnectionMaxInFlightBySize = 1500;
+ config.MaxMessageSize = 1024;
+
+ return config;
+ }
+
+ TServerForQuotaWake()
+ : TExampleServer("TServerForQuotaWake", Config())
+ {
+ }
+
+ ~TServerForQuotaWake() override {
+ Session->Shutdown();
+ }
+
+ void OnMessage(TOnMessageContext& req) override {
+ if (!GoOn.Wait(0)) {
+ TGuard<TMutex> guard(OneLock);
+
+ UNIT_ASSERT(!OneMessage);
+
+ OneMessage.Swap(req);
+ } else
+ TExampleServer::OnMessage(req);
+ }
+
+ void WakeOne() {
+ TGuard<TMutex> guard(OneLock);
+
+ UNIT_ASSERT(!!OneMessage);
+
+ TExampleServer::OnMessage(OneMessage);
+
+ TOnMessageContext().Swap(OneMessage);
+ }
+ };
+
+ Y_UNIT_TEST(WakeReaderOnQuota) {
+ const size_t test_msg_count = 64;
+
+ TBusClientSessionConfig clientConfig;
+
+ clientConfig.MaxInFlight = test_msg_count;
+
+ TExampleClient client(clientConfig);
+ TServerForQuotaWake server;
+ TInstant start;
+
+ client.MessageCount = test_msg_count;
+
+ const NBus::TNetAddr addr = server.GetActualListenAddr();
+
+ for (unsigned count = 0;;) {
+ UNIT_ASSERT(count <= test_msg_count);
+
+ TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
+ EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr);
+
+ if (status == MESSAGE_OK) {
+ count++;
+
+ } else if (status == MESSAGE_BUSY) {
+ if (count == test_msg_count) {
+ TInstant now = TInstant::Now();
+
+ if (start.GetValue() == 0) {
+ start = now;
+
+ // TODO: properly check that server is blocked
+ } else if (start + TDuration::MilliSeconds(100) < now) {
+ break;
+ }
+ }
+
+ Sleep(TDuration::MilliSeconds(10));
+
+ } else
+ UNIT_ASSERT(false);
+ }
+
+ server.GoOn.Signal();
+ server.WakeOne();
+
+ client.WaitReplies();
+
+ server.WaitForOnMessageCount(test_msg_count);
+ };
+
+ Y_UNIT_TEST(TestConnectionAttempts) {
+ TObjectCountCheck objectCountCheck;
+
+ TNetAddr noServerAddr("localhost", 17);
+ TBusClientSessionConfig clientConfig;
+ clientConfig.RetryInterval = 100;
+ TestNoServerImplClient client(clientConfig);
+
+ int count = 0;
+ for (; count < 10; ++count) {
+ EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
+ &noServerAddr);
+
+ Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
+ client.TestSync.WaitForAndIncrement(count * 2 + 1);
+
+ // First connection attempt is for connect call; second one is to get connect result.
+ UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
+ }
+ Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval));
+ for (; count < 10; ++count) {
+ EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
+ &noServerAddr);
+
+ Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
+ client.TestSync.WaitForAndIncrement(count * 2 + 1);
+
+ // First connection attempt is for connect call; second one is to get connect result.
+ UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 4);
+ }
+ };
+
+ Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndNotReconnectWhenIdle) {
+ TObjectCountCheck objectCountCheck;
+
+ TNetAddr noServerAddr("localhost", 17);
+ TBusClientSessionConfig clientConfig;
+ clientConfig.RetryInterval = 100;
+ clientConfig.ReconnectWhenIdle = false;
+ TestNoServerImplClient client(clientConfig);
+
+ int count = 0;
+ for (; count < 10; ++count) {
+ EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
+ &noServerAddr);
+
+ Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
+ client.TestSync.WaitForAndIncrement(count * 2 + 1);
+
+ // First connection attempt is for connect call; second one is to get connect result.
+ UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
+ }
+
+ Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval / 2));
+ UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
+ Sleep(TDuration::MilliSeconds(10 * clientConfig.RetryInterval));
+ UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
+ };
+
+ Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndReconnectWhenIdle) {
+ TObjectCountCheck objectCountCheck;
+
+ TNetAddr noServerAddr("localhost", 17);
+ TBusClientSessionConfig clientConfig;
+ clientConfig.ReconnectWhenIdle = true;
+ clientConfig.RetryInterval = 100;
+ TestNoServerImplClient client(clientConfig);
+
+ int count = 0;
+ for (; count < 10; ++count) {
+ EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount),
+ &noServerAddr);
+
+ Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data());
+ client.TestSync.WaitForAndIncrement(count * 2 + 1);
+
+ // First connection attempt is for connect call; second one is to get connect result.
+ UNIT_ASSERT_VALUES_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
+ }
+
+ Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval / 2));
+ UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2);
+ Sleep(TDuration::MilliSeconds(10 * clientConfig.RetryInterval));
+ // it is undeterministic how many reconnects will be during that amount of time
+ // but it should occur at least once
+ UNIT_ASSERT(client.Session->GetConnectSyscallsNumForTest(noServerAddr) > 2);
+ };
+};
diff --git a/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp
new file mode 100644
index 0000000000..4083cf3b7b
--- /dev/null
+++ b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp
@@ -0,0 +1,143 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <library/cpp/messagebus/test/helper/example.h>
+#include <library/cpp/messagebus/test/helper/message_handler_error.h>
+
+#include <library/cpp/messagebus/misc/test_sync.h>
+#include <library/cpp/messagebus/oldmodule/module.h>
+
+using namespace NBus;
+using namespace NBus::NTest;
+
+Y_UNIT_TEST_SUITE(ModuleClientOneWay) {
+ struct TTestServer: public TBusServerHandlerError {
+ TExampleProtocol Proto;
+
+ TTestSync* const TestSync;
+
+ TBusMessageQueuePtr Queue;
+ TBusServerSessionPtr ServerSession;
+
+ TTestServer(TTestSync* testSync)
+ : TestSync(testSync)
+ {
+ Queue = CreateMessageQueue();
+ ServerSession = TBusServerSession::Create(&Proto, this, TBusServerSessionConfig(), Queue);
+ }
+
+ void OnMessage(TOnMessageContext& context) override {
+ TestSync->WaitForAndIncrement(1);
+ context.ForgetRequest();
+ }
+ };
+
+ struct TClientModule: public TBusModule {
+ TExampleProtocol Proto;
+
+ TTestSync* const TestSync;
+ unsigned const Port;
+
+ TBusClientSessionPtr ClientSession;
+
+ TClientModule(TTestSync* testSync, unsigned port)
+ : TBusModule("m")
+ , TestSync(testSync)
+ , Port(port)
+ {
+ }
+
+ TJobHandler Start(TBusJob* job, TBusMessage*) override {
+ TestSync->WaitForAndIncrement(0);
+
+ job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", Port));
+
+ return &TClientModule::Sent;
+ }
+
+ TJobHandler Sent(TBusJob* job, TBusMessage*) {
+ TestSync->WaitForAndIncrement(2);
+ job->Cancel(MESSAGE_DONT_ASK);
+ return nullptr;
+ }
+
+ TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
+ ClientSession = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig());
+ return nullptr;
+ }
+ };
+
+ Y_UNIT_TEST(Simple) {
+ TTestSync testSync;
+
+ TTestServer server(&testSync);
+
+ TBusMessageQueuePtr queue = CreateMessageQueue();
+ TClientModule clientModule(&testSync, server.ServerSession->GetActualListenPort());
+
+ clientModule.CreatePrivateSessions(queue.Get());
+ clientModule.StartInput();
+
+ clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount));
+
+ testSync.WaitForAndIncrement(3);
+
+ clientModule.Shutdown();
+ }
+
+ struct TSendErrorModule: public TBusModule {
+ TExampleProtocol Proto;
+
+ TTestSync* const TestSync;
+
+ TBusClientSessionPtr ClientSession;
+
+ TSendErrorModule(TTestSync* testSync)
+ : TBusModule("m")
+ , TestSync(testSync)
+ {
+ }
+
+ TJobHandler Start(TBusJob* job, TBusMessage*) override {
+ TestSync->WaitForAndIncrement(0);
+
+ job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", 1));
+
+ return &TSendErrorModule::Sent;
+ }
+
+ TJobHandler Sent(TBusJob* job, TBusMessage*) {
+ TestSync->WaitForAndIncrement(1);
+ job->Cancel(MESSAGE_DONT_ASK);
+ return nullptr;
+ }
+
+ TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
+ TBusServerSessionConfig sessionConfig;
+ sessionConfig.ConnectTimeout = 1;
+ sessionConfig.SendTimeout = 1;
+ sessionConfig.TotalTimeout = 1;
+ sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1);
+ ClientSession = CreateDefaultSource(queue, &Proto, sessionConfig);
+ return nullptr;
+ }
+ };
+
+ Y_UNIT_TEST(SendError) {
+ TTestSync testSync;
+
+ TBusQueueConfig queueConfig;
+ queueConfig.NumWorkers = 5;
+
+ TBusMessageQueuePtr queue = CreateMessageQueue(queueConfig);
+ TSendErrorModule clientModule(&testSync);
+
+ clientModule.CreatePrivateSessions(queue.Get());
+ clientModule.StartInput();
+
+ clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount));
+
+ testSync.WaitForAndIncrement(2);
+
+ clientModule.Shutdown();
+ }
+}
diff --git a/library/cpp/messagebus/test/ut/module_client_ut.cpp b/library/cpp/messagebus/test/ut/module_client_ut.cpp
new file mode 100644
index 0000000000..ebfe185cc6
--- /dev/null
+++ b/library/cpp/messagebus/test/ut/module_client_ut.cpp
@@ -0,0 +1,368 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include "count_down_latch.h"
+#include "moduletest.h"
+
+#include <library/cpp/messagebus/test/helper/example.h>
+#include <library/cpp/messagebus/test/helper/example_module.h>
+#include <library/cpp/messagebus/test/helper/object_count_check.h>
+#include <library/cpp/messagebus/test/helper/wait_for.h>
+
+#include <library/cpp/messagebus/misc/test_sync.h>
+#include <library/cpp/messagebus/oldmodule/module.h>
+
+#include <util/generic/cast.h>
+#include <util/system/event.h>
+
+using namespace NBus;
+using namespace NBus::NTest;
+
+// helper class that cleans TBusJob instance, so job's destructor can
+// be completed without assertion fail.
+struct TJobGuard {
+public:
+ TJobGuard(NBus::TBusJob* job)
+ : Job(job)
+ {
+ }
+
+ ~TJobGuard() {
+ Job->ClearAllMessageStates();
+ }
+
+private:
+ NBus::TBusJob* Job;
+};
+
+class TMessageOk: public NBus::TBusMessage {
+public:
+ TMessageOk()
+ : NBus::TBusMessage(1)
+ {
+ }
+};
+
+class TMessageError: public NBus::TBusMessage {
+public:
+ TMessageError()
+ : NBus::TBusMessage(2)
+ {
+ }
+};
+
+Y_UNIT_TEST_SUITE(BusJobTest) {
+#if 0
+ Y_UNIT_TEST(TestPending) {
+ TObjectCountCheck objectCountCheck;
+
+ TDupDetectModule module;
+ TBusJob job(&module, new TBusMessage(0));
+ // Guard will clear the job if unit-assertion fails.
+ TJobGuard g(&job);
+
+ NBus::TBusMessage* msg = new NBus::TBusMessage(1);
+ job.Send(msg, NULL);
+ NBus::TJobStateVec pending;
+ job.GetPending(&pending);
+
+ UNIT_ASSERT_VALUES_EQUAL(pending.size(), 1u);
+ UNIT_ASSERT_EQUAL(msg, pending[0].Message);
+ }
+
+ Y_UNIT_TEST(TestCallReplyHandler) {
+ TObjectCountCheck objectCountCheck;
+
+ TDupDetectModule module;
+ NBus::TBusJob job(&module, new NBus::TBusMessage(0));
+ // Guard will clear the job if unit-assertion fails.
+ TJobGuard g(&job);
+
+ NBus::TBusMessage* msgOk = new TMessageOk;
+ NBus::TBusMessage* msgError = new TMessageError;
+ job.Send(msgOk, NULL);
+ job.Send(msgError, NULL);
+
+ UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>(), NULL);
+ UNIT_ASSERT_EQUAL(job.GetState<TMessageError>(), NULL);
+
+ NBus::TBusMessage* reply = new NBus::TBusMessage(0);
+ job.CallReplyHandler(NBus::MESSAGE_OK, msgOk, reply);
+ job.CallReplyHandler(NBus::MESSAGE_TIMEOUT, msgError, NULL);
+
+ UNIT_ASSERT_UNEQUAL(job.GetState<TMessageOk>(), NULL);
+ UNIT_ASSERT_UNEQUAL(job.GetState<TMessageError>(), NULL);
+
+ UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageError>(), NBus::MESSAGE_TIMEOUT);
+ UNIT_ASSERT_EQUAL(job.GetState<TMessageError>()->Status, NBus::MESSAGE_TIMEOUT);
+
+ UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageOk>(), NBus::MESSAGE_OK);
+ UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>()->Reply, reply);
+ }
+#endif
+
+ struct TParallelOnReplyModule : TExampleClientModule {
+ TNetAddr ServerAddr;
+
+ TCountDownLatch RepliesLatch;
+
+ TParallelOnReplyModule(const TNetAddr& serverAddr)
+ : ServerAddr(serverAddr)
+ , RepliesLatch(2)
+ {
+ }
+
+ TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
+ Y_UNUSED(mess);
+ job->Send(new TExampleRequest(&Proto.RequestCount), Source, TReplyHandler(&TParallelOnReplyModule::ReplyHandler), 0, ServerAddr);
+ return &TParallelOnReplyModule::HandleReplies;
+ }
+
+ void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {
+ Y_UNUSED(mess);
+ Y_UNUSED(reply);
+ Y_VERIFY(status == MESSAGE_OK, "failed to get reply: %s", ToCString(status));
+ }
+
+ TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) {
+ Y_UNUSED(mess);
+ RepliesLatch.CountDown();
+ Y_VERIFY(RepliesLatch.Await(TDuration::Seconds(10)), "failed to get answers");
+ job->Cancel(MESSAGE_UNKNOWN);
+ return nullptr;
+ }
+ };
+
+ Y_UNIT_TEST(TestReplyHandlerCalledInParallel) {
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+
+ TExampleProtocol proto;
+
+ TBusQueueConfig config;
+ config.NumWorkers = 5;
+
+ TParallelOnReplyModule module(server.GetActualListenAddr());
+ module.StartModule();
+
+ module.StartJob(new TExampleRequest(&proto.StartCount));
+ module.StartJob(new TExampleRequest(&proto.StartCount));
+
+ UNIT_ASSERT(module.RepliesLatch.Await(TDuration::Seconds(10)));
+
+ module.Shutdown();
+ }
+
+ struct TErrorHandlerCheckerModule : TExampleModule {
+ TNetAddr ServerAddr;
+
+ TBusClientSessionPtr Source;
+
+ TCountDownLatch GotReplyLatch;
+
+ TBusMessage* SentMessage;
+
+ TErrorHandlerCheckerModule()
+ : ServerAddr("localhost", 17)
+ , GotReplyLatch(2)
+ , SentMessage()
+ {
+ }
+
+ TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
+ Y_UNUSED(mess);
+ TExampleRequest* message = new TExampleRequest(&Proto.RequestCount);
+ job->Send(message, Source, TReplyHandler(&TErrorHandlerCheckerModule::ReplyHandler), 0, ServerAddr);
+ SentMessage = message;
+ return &TErrorHandlerCheckerModule::HandleReplies;
+ }
+
+ void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* req, TBusMessage* resp) {
+ Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "got wrong status: %s", ToString(status).data());
+ Y_VERIFY(req == SentMessage, "checking request");
+ Y_VERIFY(resp == nullptr, "checking response");
+ GotReplyLatch.CountDown();
+ }
+
+ TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) {
+ Y_UNUSED(mess);
+ job->Cancel(MESSAGE_UNKNOWN);
+ GotReplyLatch.CountDown();
+ return nullptr;
+ }
+
+ TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
+ TBusClientSessionConfig sessionConfig;
+ sessionConfig.SendTimeout = 1; // TODO: allow 0
+ sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10);
+ Source = CreateDefaultSource(queue, &Proto, sessionConfig);
+ Source->RegisterService("localhost");
+ return nullptr;
+ }
+ };
+
+ Y_UNIT_TEST(ErrorHandler) {
+ TExampleProtocol proto;
+
+ TBusQueueConfig config;
+ config.NumWorkers = 5;
+
+ TErrorHandlerCheckerModule module;
+
+ TBusModuleConfig moduleConfig;
+ moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10);
+ module.SetConfig(moduleConfig);
+
+ module.StartModule();
+
+ module.StartJob(new TExampleRequest(&proto.StartCount));
+
+ module.GotReplyLatch.Await();
+
+ module.Shutdown();
+ }
+
+ struct TSlowReplyServer: public TBusServerHandlerError {
+ TTestSync* const TestSync;
+ TBusMessageQueuePtr Bus;
+ TBusServerSessionPtr ServerSession;
+ TExampleProtocol Proto;
+
+ TAtomic OnMessageCount;
+
+ TSlowReplyServer(TTestSync* testSync)
+ : TestSync(testSync)
+ , OnMessageCount(0)
+ {
+ Bus = CreateMessageQueue("TSlowReplyServer");
+ TBusServerSessionConfig sessionConfig;
+ ServerSession = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
+ }
+
+ void OnMessage(TOnMessageContext& req) override {
+ if (AtomicIncrement(OnMessageCount) == 1) {
+ TestSync->WaitForAndIncrement(0);
+ }
+ TAutoPtr<TBusMessage> response(new TExampleResponse(&Proto.ResponseCount));
+ req.SendReplyMove(response);
+ }
+ };
+
+ struct TModuleThatSendsReplyEarly: public TExampleClientModule {
+ TTestSync* const TestSync;
+ const unsigned ServerPort;
+
+ TBusServerSessionPtr ServerSession;
+ TAtomic ReplyCount;
+
+ TModuleThatSendsReplyEarly(TTestSync* testSync, unsigned serverPort)
+ : TestSync(testSync)
+ , ServerPort(serverPort)
+ , ServerSession(nullptr)
+ , ReplyCount(0)
+ {
+ }
+
+ TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
+ Y_UNUSED(mess);
+ for (unsigned i = 0; i < 2; ++i) {
+ job->Send(
+ new TExampleRequest(&Proto.RequestCount),
+ Source,
+ TReplyHandler(&TModuleThatSendsReplyEarly::ReplyHandler),
+ 0,
+ TNetAddr("127.0.0.1", ServerPort));
+ }
+ return &TModuleThatSendsReplyEarly::HandleReplies;
+ }
+
+ void ReplyHandler(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {
+ Y_UNUSED(mess);
+ Y_UNUSED(reply);
+ Y_VERIFY(status == MESSAGE_OK, "failed to get reply");
+ if (AtomicIncrement(ReplyCount) == 1) {
+ TestSync->WaitForAndIncrement(1);
+ job->SendReply(new TExampleResponse(&Proto.ResponseCount));
+ } else {
+ TestSync->WaitForAndIncrement(3);
+ }
+ }
+
+ TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) {
+ Y_UNUSED(mess);
+ job->Cancel(MESSAGE_UNKNOWN);
+ return nullptr;
+ }
+
+ TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
+ TExampleClientModule::CreateExtSession(queue);
+ TBusServerSessionConfig sessionConfig;
+ return ServerSession = CreateDefaultDestination(queue, &Proto, sessionConfig);
+ }
+ };
+
+ Y_UNIT_TEST(SendReplyCalledBeforeAllRepliesReceived) {
+ TTestSync testSync;
+
+ TSlowReplyServer slowReplyServer(&testSync);
+
+ TModuleThatSendsReplyEarly module(&testSync, slowReplyServer.ServerSession->GetActualListenPort());
+ module.StartModule();
+
+ TExampleClient client;
+ TNetAddr addr("127.0.0.1", module.ServerSession->GetActualListenPort());
+ client.SendMessagesWaitReplies(1, &addr);
+
+ testSync.WaitForAndIncrement(2);
+
+ module.Shutdown();
+ }
+
+ struct TShutdownCalledBeforeReplyReceivedModule: public TExampleClientModule {
+ unsigned ServerPort;
+
+ TTestSync TestSync;
+
+ TShutdownCalledBeforeReplyReceivedModule(unsigned serverPort)
+ : ServerPort(serverPort)
+ {
+ }
+
+ TJobHandler Start(TBusJob* job, TBusMessage*) override {
+ TestSync.CheckAndIncrement(0);
+
+ job->Send(new TExampleRequest(&Proto.RequestCount), Source,
+ TReplyHandler(&TShutdownCalledBeforeReplyReceivedModule::HandleReply),
+ 0, TNetAddr("localhost", ServerPort));
+ return &TShutdownCalledBeforeReplyReceivedModule::End;
+ }
+
+ void HandleReply(TBusJob*, EMessageStatus status, TBusMessage*, TBusMessage*) {
+ Y_VERIFY(status == MESSAGE_SHUTDOWN, "got %s", ToCString(status));
+ TestSync.CheckAndIncrement(1);
+ }
+
+ TJobHandler End(TBusJob* job, TBusMessage*) {
+ TestSync.CheckAndIncrement(2);
+ job->Cancel(MESSAGE_SHUTDOWN);
+ return nullptr;
+ }
+ };
+
+ Y_UNIT_TEST(ShutdownCalledBeforeReplyReceived) {
+ TExampleServer server;
+ server.ForgetRequest = true;
+
+ TShutdownCalledBeforeReplyReceivedModule module(server.GetActualListenPort());
+
+ module.StartModule();
+
+ module.StartJob(new TExampleRequest(&module.Proto.RequestCount));
+
+ server.TestSync.WaitFor(1);
+
+ module.Shutdown();
+
+ module.TestSync.CheckAndIncrement(3);
+ }
+}
diff --git a/library/cpp/messagebus/test/ut/module_server_ut.cpp b/library/cpp/messagebus/test/ut/module_server_ut.cpp
new file mode 100644
index 0000000000..88fe1dd9b6
--- /dev/null
+++ b/library/cpp/messagebus/test/ut/module_server_ut.cpp
@@ -0,0 +1,119 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include "count_down_latch.h"
+#include "moduletest.h"
+
+#include <library/cpp/messagebus/test/helper/example.h>
+#include <library/cpp/messagebus/test/helper/example_module.h>
+#include <library/cpp/messagebus/test/helper/object_count_check.h>
+#include <library/cpp/messagebus/test/helper/wait_for.h>
+
+#include <library/cpp/messagebus/oldmodule/module.h>
+
+#include <util/generic/cast.h>
+
+using namespace NBus;
+using namespace NBus::NTest;
+
+Y_UNIT_TEST_SUITE(ModuleServerTests) {
+ Y_UNIT_TEST(TestModule) {
+ TObjectCountCheck objectCountCheck;
+
+ /// create or get instance of message queue, need one per application
+ TBusMessageQueuePtr bus(CreateMessageQueue());
+ THostInfoHandler hostHandler(bus.Get());
+ TDupDetectModule module(hostHandler.GetActualListenAddr());
+ bool success;
+ success = module.Init(bus.Get());
+ UNIT_ASSERT_C(success, "failed to initialize dupdetect module");
+
+ success = module.StartInput();
+ UNIT_ASSERT_C(success, "failed to start dupdetect module");
+
+ TDupDetectHandler dupHandler(module.ListenAddr, bus.Get());
+ dupHandler.Work();
+
+ UNIT_WAIT_FOR(dupHandler.NumMessages == dupHandler.NumReplies);
+
+ module.Shutdown();
+ dupHandler.DupDetect->Shutdown();
+ }
+
+ struct TParallelOnMessageModule: public TExampleServerModule {
+ TCountDownLatch WaitTwoRequestsLatch;
+
+ TParallelOnMessageModule()
+ : WaitTwoRequestsLatch(2)
+ {
+ }
+
+ TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
+ WaitTwoRequestsLatch.CountDown();
+ Y_VERIFY(WaitTwoRequestsLatch.Await(TDuration::Seconds(5)), "oops");
+
+ VerifyDynamicCast<TExampleRequest*>(mess);
+
+ job->SendReply(new TExampleResponse(&Proto.ResponseCount));
+ return nullptr;
+ }
+ };
+
+ Y_UNIT_TEST(TestOnMessageHandlerCalledInParallel) {
+ TObjectCountCheck objectCountCheck;
+
+ TBusQueueConfig config;
+ config.NumWorkers = 5;
+
+ TParallelOnMessageModule module;
+ module.StartModule();
+
+ TExampleClient client;
+
+ client.SendMessagesWaitReplies(2, module.ServerAddr);
+
+ module.Shutdown();
+ }
+
+ struct TDelayReplyServer: public TExampleServerModule {
+ TSystemEvent MessageReceivedEvent;
+ TSystemEvent ClientDiedEvent;
+
+ TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
+ Y_UNUSED(mess);
+
+ MessageReceivedEvent.Signal();
+
+ Y_VERIFY(ClientDiedEvent.WaitT(TDuration::Seconds(5)), "oops");
+
+ job->SendReply(new TExampleResponse(&Proto.ResponseCount));
+ return nullptr;
+ }
+ };
+
+ Y_UNIT_TEST(TestReplyCalledAfterClientDisconnected) {
+ TObjectCountCheck objectCountCheck;
+
+ TBusQueueConfig config;
+ config.NumWorkers = 5;
+
+ TDelayReplyServer server;
+ server.StartModule();
+
+ THolder<TExampleClient> client(new TExampleClient);
+
+ client->SendMessages(1, server.ServerAddr);
+
+ UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5)));
+
+ UNIT_ASSERT_VALUES_EQUAL(1, server.GetModuleSessionInFlight());
+
+ client.Destroy();
+
+ server.ClientDiedEvent.Signal();
+
+ // wait until all server message are delivered
+ UNIT_WAIT_FOR(0 == server.GetModuleSessionInFlight());
+
+ server.Shutdown();
+ }
+}
diff --git a/library/cpp/messagebus/test/ut/moduletest.h b/library/cpp/messagebus/test/ut/moduletest.h
new file mode 100644
index 0000000000..d5da72c0cb
--- /dev/null
+++ b/library/cpp/messagebus/test/ut/moduletest.h
@@ -0,0 +1,221 @@
+#pragma once
+
+///////////////////////////////////////////////////////////////////
+/// \file
+/// \brief Example of using local session for communication.
+
+#include <library/cpp/messagebus/test/helper/alloc_counter.h>
+#include <library/cpp/messagebus/test/helper/example.h>
+#include <library/cpp/messagebus/test/helper/message_handler_error.h>
+
+#include <library/cpp/messagebus/ybus.h>
+#include <library/cpp/messagebus/oldmodule/module.h>
+
+namespace NBus {
+ namespace NTest {
+ using namespace std;
+
+#define TYPE_HOSTINFOREQUEST 100
+#define TYPE_HOSTINFORESPONSE 101
+
+ ////////////////////////////////////////////////////////////////////
+ /// \brief DupDetect protocol that common between client and server
+ ////////////////////////////////////////////////////////////////////
+ /// \brief HostInfo request class
+ class THostInfoMessage: public TBusMessage {
+ public:
+ THostInfoMessage()
+ : TBusMessage(TYPE_HOSTINFOREQUEST)
+ {
+ }
+ THostInfoMessage(ECreateUninitialized)
+ : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
+ {
+ }
+
+ ~THostInfoMessage() override {
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////
+ /// \brief HostInfo reply class
+ class THostInfoReply: public TBusMessage {
+ public:
+ THostInfoReply()
+ : TBusMessage(TYPE_HOSTINFORESPONSE)
+ {
+ }
+ THostInfoReply(ECreateUninitialized)
+ : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
+ {
+ }
+
+ ~THostInfoReply() override {
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////
+ /// \brief HostInfo protocol that common between client and server
+ class THostInfoProtocol: public TBusProtocol {
+ public:
+ THostInfoProtocol()
+ : TBusProtocol("HOSTINFO", 0)
+ {
+ }
+ /// serialized protocol specific data into TBusData
+ void Serialize(const TBusMessage* mess, TBuffer& data) override {
+ Y_UNUSED(data);
+ Y_UNUSED(mess);
+ }
+
+ /// deserialized TBusData into new instance of the message
+ TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override {
+ Y_UNUSED(payload);
+
+ if (messageType == TYPE_HOSTINFOREQUEST) {
+ return new THostInfoMessage(MESSAGE_CREATE_UNINITIALIZED);
+ } else if (messageType == TYPE_HOSTINFORESPONSE) {
+ return new THostInfoReply(MESSAGE_CREATE_UNINITIALIZED);
+ } else {
+ Y_FAIL("unknown");
+ }
+ }
+ };
+
+ //////////////////////////////////////////////////////////////
+ /// \brief HostInfo handler (should convert it to module too)
+ struct THostInfoHandler: public TBusServerHandlerError {
+ TBusServerSessionPtr Session;
+ TBusServerSessionConfig HostInfoConfig;
+ THostInfoProtocol HostInfoProto;
+
+ THostInfoHandler(TBusMessageQueue* queue) {
+ Session = TBusServerSession::Create(&HostInfoProto, this, HostInfoConfig, queue);
+ }
+
+ void OnMessage(TOnMessageContext& mess) override {
+ usleep(10 * 1000); /// pretend we are doing something
+
+ TAutoPtr<THostInfoReply> reply(new THostInfoReply());
+
+ mess.SendReplyMove(reply);
+ }
+
+ TNetAddr GetActualListenAddr() {
+ return TNetAddr("localhost", Session->GetActualListenPort());
+ }
+ };
+
+ //////////////////////////////////////////////////////////////
+ /// \brief DupDetect handler (should convert it to module too)
+ struct TDupDetectHandler: public TBusClientHandlerError {
+ TNetAddr ServerAddr;
+
+ TBusClientSessionPtr DupDetect;
+ TBusClientSessionConfig DupDetectConfig;
+ TExampleProtocol DupDetectProto;
+
+ int NumMessages;
+ int NumReplies;
+
+ TDupDetectHandler(const TNetAddr& serverAddr, TBusMessageQueuePtr queue)
+ : ServerAddr(serverAddr)
+ {
+ DupDetect = TBusClientSession::Create(&DupDetectProto, this, DupDetectConfig, queue);
+ DupDetect->RegisterService("localhost");
+ }
+
+ void Work() {
+ NumMessages = 10;
+ NumReplies = 0;
+
+ for (int i = 0; i < NumMessages; i++) {
+ TExampleRequest* mess = new TExampleRequest(&DupDetectProto.RequestCount);
+ DupDetect->SendMessage(mess, &ServerAddr);
+ }
+ }
+
+ void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override {
+ Y_UNUSED(mess);
+ Y_UNUSED(reply);
+ NumReplies++;
+ }
+ };
+
+ /////////////////////////////////////////////////////////////////
+ /// \brief DupDetect module
+
+ struct TDupDetectModule: public TBusModule {
+ TNetAddr HostInfoAddr;
+
+ TBusClientSessionPtr HostInfoClientSession;
+ TBusClientSessionConfig HostInfoConfig;
+ THostInfoProtocol HostInfoProto;
+
+ TExampleProtocol DupDetectProto;
+ TBusServerSessionConfig DupDetectConfig;
+
+ TNetAddr ListenAddr;
+
+ TDupDetectModule(const TNetAddr& hostInfoAddr)
+ : TBusModule("DUPDETECTMODULE")
+ , HostInfoAddr(hostInfoAddr)
+ {
+ }
+
+ bool Init(TBusMessageQueue* queue) {
+ HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig);
+ HostInfoClientSession->RegisterService("localhost");
+
+ return TBusModule::CreatePrivateSessions(queue);
+ }
+
+ TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
+ TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig);
+
+ ListenAddr = TNetAddr("localhost", session->GetActualListenPort());
+
+ return session;
+ }
+
+ /// entry point into module, first function to call
+ TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
+ TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess);
+ Y_UNUSED(dmess);
+
+ THostInfoMessage* hmess = new THostInfoMessage();
+
+ /// send message to imaginary hostinfo server
+ job->Send(hmess, HostInfoClientSession, TReplyHandler(), 0, HostInfoAddr);
+
+ return TJobHandler(&TDupDetectModule::ProcessHostInfo);
+ }
+
+ /// next handler is executed when all outstanding requests from previous handler is completed
+ TJobHandler ProcessHostInfo(TBusJob* job, TBusMessage* mess) {
+ TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess);
+ Y_UNUSED(dmess);
+
+ THostInfoMessage* hmess = job->Get<THostInfoMessage>();
+ THostInfoReply* hreply = job->Get<THostInfoReply>();
+ EMessageStatus hstatus = job->GetStatus<THostInfoMessage>();
+ Y_ASSERT(hmess != nullptr);
+ Y_ASSERT(hreply != nullptr);
+ Y_ASSERT(hstatus == MESSAGE_OK);
+
+ return TJobHandler(&TDupDetectModule::Finish);
+ }
+
+ /// last handler sends reply and returns NULL
+ TJobHandler Finish(TBusJob* job, TBusMessage* mess) {
+ Y_UNUSED(mess);
+
+ TExampleResponse* reply = new TExampleResponse(&DupDetectProto.ResponseCount);
+ job->SendReply(reply);
+
+ return nullptr;
+ }
+ };
+
+ }
+}
diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp
new file mode 100644
index 0000000000..9c21227e2b
--- /dev/null
+++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp
@@ -0,0 +1,255 @@
+///////////////////////////////////////////////////////////////////
+/// \file
+/// \brief Example of reply-less communication
+
+/// This example demostrates how asynchronous message passing library
+/// can be used to send message and do not wait for reply back.
+/// The usage of reply-less communication should be restricted to
+/// low-throughput clients and high-throughput server to provide reasonable
+/// utility. Removing replies from the communication removes any restriction
+/// on how many message can be send to server and rougue clients may overwelm
+/// server without thoughtput control.
+
+/// 1) To implement reply-less client \n
+
+/// Call NBus::TBusSession::AckMessage()
+/// from within NBus::IMessageHandler::OnSent() handler when message has
+/// gone into wire on client end. See example in NBus::NullClient::OnMessageSent().
+/// Discard identity for reply message.
+
+/// 2) To implement reply-less server \n
+
+/// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage()
+/// handler when message has been received on server end.
+/// See example in NBus::NullServer::OnMessage().
+/// Discard identity for reply message.
+
+#include <library/cpp/messagebus/test/helper/alloc_counter.h>
+#include <library/cpp/messagebus/test/helper/example.h>
+#include <library/cpp/messagebus/test/helper/hanging_server.h>
+#include <library/cpp/messagebus/test/helper/message_handler_error.h>
+#include <library/cpp/messagebus/test/helper/object_count_check.h>
+#include <library/cpp/messagebus/test/helper/wait_for.h>
+
+#include <library/cpp/messagebus/ybus.h>
+
+using namespace std;
+using namespace NBus;
+using namespace NBus::NPrivate;
+using namespace NBus::NTest;
+
+////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////
+/// \brief Reply-less client and handler
+struct NullClient : TBusClientHandlerError {
+ TNetAddr ServerAddr;
+
+ TBusMessageQueuePtr Queue;
+ TBusClientSessionPtr Session;
+ TExampleProtocol Proto;
+
+ /// constructor creates instances of protocol and session
+ NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig())
+ : ServerAddr(serverAddr)
+ {
+ UNIT_ASSERT(serverAddr.GetPort() > 0);
+
+ /// create or get instance of message queue, need one per application
+ Queue = CreateMessageQueue();
+
+ /// register source/client session
+ Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue);
+
+ /// register service, announce to clients via LocatorService
+ Session->RegisterService("localhost");
+ }
+
+ ~NullClient() override {
+ Session->Shutdown();
+ }
+
+ /// dispatch of requests is done here
+ void Work() {
+ int batch = 10;
+
+ for (int i = 0; i < batch; i++) {
+ TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount);
+ mess->Data = "TADA";
+ Session->SendMessageOneWay(mess, &ServerAddr);
+ }
+ }
+
+ void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override {
+ }
+};
+
+/////////////////////////////////////////////////////////////////////
+/// \brief Reply-less server and handler
+class NullServer: public TBusServerHandlerError {
+public:
+ /// session object to maintian
+ TBusMessageQueuePtr Queue;
+ TBusServerSessionPtr Session;
+ TExampleProtocol Proto;
+
+public:
+ TAtomic NumMessages;
+
+ NullServer() {
+ NumMessages = 0;
+
+ /// create or get instance of single message queue, need one for application
+ Queue = CreateMessageQueue();
+
+ /// register destination session
+ TBusServerSessionConfig sessionConfig;
+ Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue);
+ }
+
+ ~NullServer() override {
+ Session->Shutdown();
+ }
+
+ /// when message comes do not send reply, just acknowledge
+ void OnMessage(TOnMessageContext& mess) override {
+ TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage());
+
+ Y_ASSERT(fmess->Data == "TADA");
+
+ /// tell session to forget this message and never expect any reply
+ mess.ForgetRequest();
+
+ AtomicIncrement(NumMessages);
+ }
+
+ /// this handler should not be called because this server does not send replies
+ void OnSent(TAutoPtr<TBusMessage> mess) override {
+ Y_UNUSED(mess);
+ Y_FAIL("This server does not sent replies");
+ }
+};
+
+Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
+ Y_UNIT_TEST(Simple) {
+ TObjectCountCheck objectCountCheck;
+
+ NullServer server;
+ NullClient client(TNetAddr("localhost", server.Session->GetActualListenPort()));
+
+ client.Work();
+
+ // wait until all client message are delivered
+ UNIT_WAIT_FOR(AtomicGet(server.NumMessages) == 10);
+
+ // assert correct number of messages
+ UNIT_ASSERT_VALUES_EQUAL(AtomicGet(server.NumMessages), 10);
+ UNIT_ASSERT_VALUES_EQUAL(server.Session->GetInFlight(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(client.Session->GetInFlight(), 0);
+ }
+
+ struct TMessageTooLargeClient: public NullClient {
+ TSystemEvent GotTooLarge;
+
+ TBusClientSessionConfig Config() {
+ TBusClientSessionConfig r;
+ r.MaxMessageSize = 1;
+ return r;
+ }
+
+ TMessageTooLargeClient(unsigned port)
+ : NullClient(TNetAddr("localhost", port), Config())
+ {
+ }
+
+ ~TMessageTooLargeClient() override {
+ Session->Shutdown();
+ }
+
+ void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
+ Y_UNUSED(mess);
+
+ Y_VERIFY(status == MESSAGE_MESSAGE_TOO_LARGE, "wrong status: %s", ToCString(status));
+
+ GotTooLarge.Signal();
+ }
+ };
+
+ Y_UNIT_TEST(MessageTooLargeOnClient) {
+ TObjectCountCheck objectCountCheck;
+
+ NullServer server;
+
+ TMessageTooLargeClient client(server.Session->GetActualListenPort());
+
+ EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
+ UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
+
+ client.GotTooLarge.WaitI();
+ }
+
+ struct TCheckTimeoutClient: public NullClient {
+ ~TCheckTimeoutClient() override {
+ Session->Shutdown();
+ }
+
+ static TBusClientSessionConfig SessionConfig() {
+ TBusClientSessionConfig sessionConfig;
+ sessionConfig.SendTimeout = 1;
+ sessionConfig.ConnectTimeout = 1;
+ sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10);
+ return sessionConfig;
+ }
+
+ TCheckTimeoutClient(const TNetAddr& serverAddr)
+ : NullClient(serverAddr, SessionConfig())
+ {
+ }
+
+ TSystemEvent GotError;
+
+ /// message that could not be delivered
+ void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
+ Y_UNUSED(mess);
+ Y_UNUSED(status); // TODO: check status
+
+ GotError.Signal();
+ }
+ };
+
+ Y_UNIT_TEST(SendTimeout_Callback_NoServer) {
+ TObjectCountCheck objectCountCheck;
+
+ TCheckTimeoutClient client(TNetAddr("localhost", 17));
+
+ EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
+ UNIT_ASSERT_EQUAL(ok, MESSAGE_OK);
+
+ client.GotError.WaitI();
+ }
+
+ Y_UNIT_TEST(SendTimeout_Callback_HangingServer) {
+ THangingServer server;
+
+ TObjectCountCheck objectCountCheck;
+
+ TCheckTimeoutClient client(TNetAddr("localhost", server.GetPort()));
+
+ bool first = true;
+ for (;;) {
+ EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
+ if (ok == MESSAGE_BUSY) {
+ UNIT_ASSERT(!first);
+ break;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(ok, MESSAGE_OK);
+ first = false;
+ }
+
+ // BUGBUG: The test is buggy: the client might not get any error when sending one-way messages.
+ // All the messages that the client has sent before he gets first MESSAGE_BUSY error might get
+ // serailized and written to the socket buffer, so the write queue gets drained and there are
+ // no messages to timeout when periodic timeout check happens.
+
+ client.GotError.WaitI();
+ }
+}
diff --git a/library/cpp/messagebus/test/ut/starter_ut.cpp b/library/cpp/messagebus/test/ut/starter_ut.cpp
new file mode 100644
index 0000000000..dd4d3aaa5e
--- /dev/null
+++ b/library/cpp/messagebus/test/ut/starter_ut.cpp
@@ -0,0 +1,140 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <library/cpp/messagebus/test/helper/example_module.h>
+#include <library/cpp/messagebus/test/helper/object_count_check.h>
+#include <library/cpp/messagebus/test/helper/wait_for.h>
+
+using namespace NBus;
+using namespace NBus::NTest;
+
+Y_UNIT_TEST_SUITE(TBusStarterTest) {
+ struct TStartJobTestModule: public TExampleModule {
+ using TBusModule::CreateDefaultStarter;
+
+ TAtomic StartCount;
+
+ TStartJobTestModule()
+ : StartCount(0)
+ {
+ }
+
+ TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
+ Y_UNUSED(mess);
+ AtomicIncrement(StartCount);
+ job->Sleep(10);
+ return &TStartJobTestModule::End;
+ }
+
+ TJobHandler End(TBusJob* job, TBusMessage* mess) {
+ Y_UNUSED(mess);
+ AtomicIncrement(StartCount);
+ job->Cancel(MESSAGE_UNKNOWN);
+ return nullptr;
+ }
+ };
+
+ Y_UNIT_TEST(Test) {
+ TObjectCountCheck objectCountCheck;
+
+ TBusMessageQueuePtr bus(CreateMessageQueue());
+
+ TStartJobTestModule module;
+
+ //module.StartModule();
+ module.CreatePrivateSessions(bus.Get());
+ module.StartInput();
+
+ TBusSessionConfig config;
+ config.SendTimeout = 10;
+
+ module.CreateDefaultStarter(*bus, config);
+
+ UNIT_WAIT_FOR(AtomicGet(module.StartCount) >= 3);
+
+ module.Shutdown();
+ bus->Stop();
+ }
+
+ Y_UNIT_TEST(TestModuleStartJob) {
+ TObjectCountCheck objectCountCheck;
+
+ TExampleProtocol proto;
+
+ TStartJobTestModule module;
+
+ TBusModuleConfig moduleConfig;
+ moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10);
+ module.SetConfig(moduleConfig);
+
+ module.StartModule();
+
+ module.StartJob(new TExampleRequest(&proto.RequestCount));
+
+ UNIT_WAIT_FOR(AtomicGet(module.StartCount) != 2);
+
+ module.Shutdown();
+ }
+
+ struct TSleepModule: public TExampleServerModule {
+ TSystemEvent MessageReceivedEvent;
+
+ TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
+ Y_UNUSED(mess);
+
+ MessageReceivedEvent.Signal();
+
+ job->Sleep(1000000000);
+
+ return TJobHandler(&TSleepModule::Never);
+ }
+
+ TJobHandler Never(TBusJob*, TBusMessage*) {
+ Y_FAIL("happens");
+ throw 1;
+ }
+ };
+
+ Y_UNIT_TEST(StartJobDestroyDuringSleep) {
+ TObjectCountCheck objectCountCheck;
+
+ TExampleProtocol proto;
+
+ TSleepModule module;
+
+ module.StartModule();
+
+ module.StartJob(new TExampleRequest(&proto.StartCount));
+
+ module.MessageReceivedEvent.WaitI();
+
+ module.Shutdown();
+ }
+
+ struct TSendReplyModule: public TExampleServerModule {
+ TSystemEvent MessageReceivedEvent;
+
+ TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
+ Y_UNUSED(mess);
+
+ job->SendReply(new TExampleResponse(&Proto.ResponseCount));
+
+ MessageReceivedEvent.Signal();
+
+ return nullptr;
+ }
+ };
+
+ Y_UNIT_TEST(AllowSendReplyInStarted) {
+ TObjectCountCheck objectCountCheck;
+
+ TExampleProtocol proto;
+
+ TSendReplyModule module;
+ module.StartModule();
+ module.StartJob(new TExampleRequest(&proto.StartCount));
+
+ module.MessageReceivedEvent.WaitI();
+
+ module.Shutdown();
+ }
+}
diff --git a/library/cpp/messagebus/test/ut/sync_client_ut.cpp b/library/cpp/messagebus/test/ut/sync_client_ut.cpp
new file mode 100644
index 0000000000..400128193f
--- /dev/null
+++ b/library/cpp/messagebus/test/ut/sync_client_ut.cpp
@@ -0,0 +1,69 @@
+#include <library/cpp/messagebus/test/helper/example.h>
+#include <library/cpp/messagebus/test/helper/object_count_check.h>
+
+namespace NBus {
+ namespace NTest {
+ using namespace std;
+
+ ////////////////////////////////////////////////////////////////////
+ /// \brief Client for sending synchronous message to local server
+ struct TSyncClient {
+ TNetAddr ServerAddr;
+
+ TExampleProtocol Proto;
+ TBusMessageQueuePtr Bus;
+ TBusSyncClientSessionPtr Session;
+
+ int NumReplies;
+ int NumMessages;
+
+ /// constructor creates instances of queue, protocol and session
+ TSyncClient(const TNetAddr& serverAddr)
+ : ServerAddr(serverAddr)
+ {
+ /// create or get instance of message queue, need one per application
+ Bus = CreateMessageQueue();
+
+ NumReplies = 0;
+ NumMessages = 10;
+
+ /// register source/client session
+ TBusClientSessionConfig sessionConfig;
+ Session = Bus->CreateSyncSource(&Proto, sessionConfig);
+ Session->RegisterService("localhost");
+ }
+
+ ~TSyncClient() {
+ Session->Shutdown();
+ }
+
+ /// dispatch of requests is done here
+ void Work() {
+ for (int i = 0; i < NumMessages; i++) {
+ THolder<TExampleRequest> mess(new TExampleRequest(&Proto.RequestCount));
+ EMessageStatus status;
+ THolder<TBusMessage> reply(Session->SendSyncMessage(mess.Get(), status, &ServerAddr));
+ if (!!reply) {
+ NumReplies++;
+ }
+ }
+ }
+ };
+
+ Y_UNIT_TEST_SUITE(SyncClientTest) {
+ Y_UNIT_TEST(TestSync) {
+ TObjectCountCheck objectCountCheck;
+
+ TExampleServer server;
+ TSyncClient client(server.GetActualListenAddr());
+ client.Work();
+ // assert correct number of replies
+ UNIT_ASSERT_EQUAL(client.NumReplies, client.NumMessages);
+ // assert that there is no message left in flight
+ UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0);
+ UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0);
+ }
+ }
+
+ }
+}
diff --git a/library/cpp/messagebus/test/ut/ya.make b/library/cpp/messagebus/test/ut/ya.make
new file mode 100644
index 0000000000..fe1b4961d6
--- /dev/null
+++ b/library/cpp/messagebus/test/ut/ya.make
@@ -0,0 +1,56 @@
+OWNER(g:messagebus)
+
+UNITTEST_FOR(library/cpp/messagebus)
+
+TIMEOUT(1200)
+
+SIZE(LARGE)
+
+TAG(
+ ya:not_autocheck
+ ya:fat
+)
+
+FORK_SUBTESTS()
+
+PEERDIR(
+ library/cpp/testing/unittest_main
+ library/cpp/messagebus
+ library/cpp/messagebus/test/helper
+ library/cpp/messagebus/www
+)
+
+SRCS(
+ messagebus_ut.cpp
+ module_client_ut.cpp
+ module_client_one_way_ut.cpp
+ module_server_ut.cpp
+ one_way_ut.cpp
+ starter_ut.cpp
+ sync_client_ut.cpp
+ locator_uniq_ut.cpp
+ ../../actor/actor_ut.cpp
+ ../../actor/ring_buffer_ut.cpp
+ ../../actor/tasks_ut.cpp
+ ../../actor/what_thread_does_guard_ut.cpp
+ ../../async_result_ut.cpp
+ ../../cc_semaphore_ut.cpp
+ ../../coreconn_ut.cpp
+ ../../duration_histogram_ut.cpp
+ ../../message_status_counter_ut.cpp
+ ../../misc/weak_ptr_ut.cpp
+ ../../latch_ut.cpp
+ ../../lfqueue_batch_ut.cpp
+ ../../local_flags_ut.cpp
+ ../../memory_ut.cpp
+ ../../moved_ut.cpp
+ ../../netaddr_ut.cpp
+ ../../network_ut.cpp
+ ../../nondestroying_holder_ut.cpp
+ ../../scheduler_actor_ut.cpp
+ ../../scheduler/scheduler_ut.cpp
+ ../../socket_addr_ut.cpp
+ ../../vector_swaps_ut.cpp
+)
+
+END()