aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/test/ut/one_way_ut.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/test/ut/one_way_ut.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/test/ut/one_way_ut.cpp')
-rw-r--r--library/cpp/messagebus/test/ut/one_way_ut.cpp255
1 files changed, 255 insertions, 0 deletions
diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp
new file mode 100644
index 0000000000..9c21227e2b
--- /dev/null
+++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp
@@ -0,0 +1,255 @@
+///////////////////////////////////////////////////////////////////
+/// \file
+/// \brief Example of reply-less communication
+
+/// This example demostrates how asynchronous message passing library
+/// can be used to send message and do not wait for reply back.
+/// The usage of reply-less communication should be restricted to
+/// low-throughput clients and high-throughput server to provide reasonable
+/// utility. Removing replies from the communication removes any restriction
+/// on how many message can be send to server and rougue clients may overwelm
+/// server without thoughtput control.
+
+/// 1) To implement reply-less client \n
+
+/// Call NBus::TBusSession::AckMessage()
+/// from within NBus::IMessageHandler::OnSent() handler when message has
+/// gone into wire on client end. See example in NBus::NullClient::OnMessageSent().
+/// Discard identity for reply message.
+
+/// 2) To implement reply-less server \n
+
+/// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage()
+/// handler when message has been received on server end.
+/// See example in NBus::NullServer::OnMessage().
+/// Discard identity for reply message.
+
+#include <library/cpp/messagebus/test/helper/alloc_counter.h>
+#include <library/cpp/messagebus/test/helper/example.h>
+#include <library/cpp/messagebus/test/helper/hanging_server.h>
+#include <library/cpp/messagebus/test/helper/message_handler_error.h>
+#include <library/cpp/messagebus/test/helper/object_count_check.h>
+#include <library/cpp/messagebus/test/helper/wait_for.h>
+
+#include <library/cpp/messagebus/ybus.h>
+
+using namespace std;
+using namespace NBus;
+using namespace NBus::NPrivate;
+using namespace NBus::NTest;
+
+////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////
+/// \brief Reply-less client and handler
+struct NullClient : TBusClientHandlerError {
+ TNetAddr ServerAddr;
+
+ TBusMessageQueuePtr Queue;
+ TBusClientSessionPtr Session;
+ TExampleProtocol Proto;
+
+ /// constructor creates instances of protocol and session
+ NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig())
+ : ServerAddr(serverAddr)
+ {
+ UNIT_ASSERT(serverAddr.GetPort() > 0);
+
+ /// create or get instance of message queue, need one per application
+ Queue = CreateMessageQueue();
+
+ /// register source/client session
+ Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue);
+
+ /// register service, announce to clients via LocatorService
+ Session->RegisterService("localhost");
+ }
+
+ ~NullClient() override {
+ Session->Shutdown();
+ }
+
+ /// dispatch of requests is done here
+ void Work() {
+ int batch = 10;
+
+ for (int i = 0; i < batch; i++) {
+ TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount);
+ mess->Data = "TADA";
+ Session->SendMessageOneWay(mess, &ServerAddr);
+ }
+ }
+
+ void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override {
+ }
+};
+
+/////////////////////////////////////////////////////////////////////
+/// \brief Reply-less server and handler
+class NullServer: public TBusServerHandlerError {
+public:
+ /// session object to maintian
+ TBusMessageQueuePtr Queue;
+ TBusServerSessionPtr Session;
+ TExampleProtocol Proto;
+
+public:
+ TAtomic NumMessages;
+
+ NullServer() {
+ NumMessages = 0;
+
+ /// create or get instance of single message queue, need one for application
+ Queue = CreateMessageQueue();
+
+ /// register destination session
+ TBusServerSessionConfig sessionConfig;
+ Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue);
+ }
+
+ ~NullServer() override {
+ Session->Shutdown();
+ }
+
+ /// when message comes do not send reply, just acknowledge
+ void OnMessage(TOnMessageContext& mess) override {
+ TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage());
+
+ Y_ASSERT(fmess->Data == "TADA");
+
+ /// tell session to forget this message and never expect any reply
+ mess.ForgetRequest();
+
+ AtomicIncrement(NumMessages);
+ }
+
+ /// this handler should not be called because this server does not send replies
+ void OnSent(TAutoPtr<TBusMessage> mess) override {
+ Y_UNUSED(mess);
+ Y_FAIL("This server does not sent replies");
+ }
+};
+
+Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
+ Y_UNIT_TEST(Simple) {
+ TObjectCountCheck objectCountCheck;
+
+ NullServer server;
+ NullClient client(TNetAddr("localhost", server.Session->GetActualListenPort()));
+
+ client.Work();
+
+ // wait until all client message are delivered
+ UNIT_WAIT_FOR(AtomicGet(server.NumMessages) == 10);
+
+ // assert correct number of messages
+ UNIT_ASSERT_VALUES_EQUAL(AtomicGet(server.NumMessages), 10);
+ UNIT_ASSERT_VALUES_EQUAL(server.Session->GetInFlight(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(client.Session->GetInFlight(), 0);
+ }
+
+ struct TMessageTooLargeClient: public NullClient {
+ TSystemEvent GotTooLarge;
+
+ TBusClientSessionConfig Config() {
+ TBusClientSessionConfig r;
+ r.MaxMessageSize = 1;
+ return r;
+ }
+
+ TMessageTooLargeClient(unsigned port)
+ : NullClient(TNetAddr("localhost", port), Config())
+ {
+ }
+
+ ~TMessageTooLargeClient() override {
+ Session->Shutdown();
+ }
+
+ void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
+ Y_UNUSED(mess);
+
+ Y_VERIFY(status == MESSAGE_MESSAGE_TOO_LARGE, "wrong status: %s", ToCString(status));
+
+ GotTooLarge.Signal();
+ }
+ };
+
+ Y_UNIT_TEST(MessageTooLargeOnClient) {
+ TObjectCountCheck objectCountCheck;
+
+ NullServer server;
+
+ TMessageTooLargeClient client(server.Session->GetActualListenPort());
+
+ EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
+ UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
+
+ client.GotTooLarge.WaitI();
+ }
+
+ struct TCheckTimeoutClient: public NullClient {
+ ~TCheckTimeoutClient() override {
+ Session->Shutdown();
+ }
+
+ static TBusClientSessionConfig SessionConfig() {
+ TBusClientSessionConfig sessionConfig;
+ sessionConfig.SendTimeout = 1;
+ sessionConfig.ConnectTimeout = 1;
+ sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10);
+ return sessionConfig;
+ }
+
+ TCheckTimeoutClient(const TNetAddr& serverAddr)
+ : NullClient(serverAddr, SessionConfig())
+ {
+ }
+
+ TSystemEvent GotError;
+
+ /// message that could not be delivered
+ void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
+ Y_UNUSED(mess);
+ Y_UNUSED(status); // TODO: check status
+
+ GotError.Signal();
+ }
+ };
+
+ Y_UNIT_TEST(SendTimeout_Callback_NoServer) {
+ TObjectCountCheck objectCountCheck;
+
+ TCheckTimeoutClient client(TNetAddr("localhost", 17));
+
+ EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
+ UNIT_ASSERT_EQUAL(ok, MESSAGE_OK);
+
+ client.GotError.WaitI();
+ }
+
+ Y_UNIT_TEST(SendTimeout_Callback_HangingServer) {
+ THangingServer server;
+
+ TObjectCountCheck objectCountCheck;
+
+ TCheckTimeoutClient client(TNetAddr("localhost", server.GetPort()));
+
+ bool first = true;
+ for (;;) {
+ EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
+ if (ok == MESSAGE_BUSY) {
+ UNIT_ASSERT(!first);
+ break;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(ok, MESSAGE_OK);
+ first = false;
+ }
+
+ // BUGBUG: The test is buggy: the client might not get any error when sending one-way messages.
+ // All the messages that the client has sent before he gets first MESSAGE_BUSY error might get
+ // serailized and written to the socket buffer, so the write queue gets drained and there are
+ // no messages to timeout when periodic timeout check happens.
+
+ client.GotError.WaitI();
+ }
+}