diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/test/ut/one_way_ut.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/test/ut/one_way_ut.cpp')
-rw-r--r-- | library/cpp/messagebus/test/ut/one_way_ut.cpp | 255 |
1 files changed, 255 insertions, 0 deletions
diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp new file mode 100644 index 0000000000..9c21227e2b --- /dev/null +++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp @@ -0,0 +1,255 @@ +/////////////////////////////////////////////////////////////////// +/// \file +/// \brief Example of reply-less communication + +/// This example demostrates how asynchronous message passing library +/// can be used to send message and do not wait for reply back. +/// The usage of reply-less communication should be restricted to +/// low-throughput clients and high-throughput server to provide reasonable +/// utility. Removing replies from the communication removes any restriction +/// on how many message can be send to server and rougue clients may overwelm +/// server without thoughtput control. + +/// 1) To implement reply-less client \n + +/// Call NBus::TBusSession::AckMessage() +/// from within NBus::IMessageHandler::OnSent() handler when message has +/// gone into wire on client end. See example in NBus::NullClient::OnMessageSent(). +/// Discard identity for reply message. + +/// 2) To implement reply-less server \n + +/// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage() +/// handler when message has been received on server end. +/// See example in NBus::NullServer::OnMessage(). +/// Discard identity for reply message. + +#include <library/cpp/messagebus/test/helper/alloc_counter.h> +#include <library/cpp/messagebus/test/helper/example.h> +#include <library/cpp/messagebus/test/helper/hanging_server.h> +#include <library/cpp/messagebus/test/helper/message_handler_error.h> +#include <library/cpp/messagebus/test/helper/object_count_check.h> +#include <library/cpp/messagebus/test/helper/wait_for.h> + +#include <library/cpp/messagebus/ybus.h> + +using namespace std; +using namespace NBus; +using namespace NBus::NPrivate; +using namespace NBus::NTest; + +//////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////// +/// \brief Reply-less client and handler +struct NullClient : TBusClientHandlerError { + TNetAddr ServerAddr; + + TBusMessageQueuePtr Queue; + TBusClientSessionPtr Session; + TExampleProtocol Proto; + + /// constructor creates instances of protocol and session + NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig()) + : ServerAddr(serverAddr) + { + UNIT_ASSERT(serverAddr.GetPort() > 0); + + /// create or get instance of message queue, need one per application + Queue = CreateMessageQueue(); + + /// register source/client session + Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue); + + /// register service, announce to clients via LocatorService + Session->RegisterService("localhost"); + } + + ~NullClient() override { + Session->Shutdown(); + } + + /// dispatch of requests is done here + void Work() { + int batch = 10; + + for (int i = 0; i < batch; i++) { + TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount); + mess->Data = "TADA"; + Session->SendMessageOneWay(mess, &ServerAddr); + } + } + + void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override { + } +}; + +///////////////////////////////////////////////////////////////////// +/// \brief Reply-less server and handler +class NullServer: public TBusServerHandlerError { +public: + /// session object to maintian + TBusMessageQueuePtr Queue; + TBusServerSessionPtr Session; + TExampleProtocol Proto; + +public: + TAtomic NumMessages; + + NullServer() { + NumMessages = 0; + + /// create or get instance of single message queue, need one for application + Queue = CreateMessageQueue(); + + /// register destination session + TBusServerSessionConfig sessionConfig; + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue); + } + + ~NullServer() override { + Session->Shutdown(); + } + + /// when message comes do not send reply, just acknowledge + void OnMessage(TOnMessageContext& mess) override { + TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage()); + + Y_ASSERT(fmess->Data == "TADA"); + + /// tell session to forget this message and never expect any reply + mess.ForgetRequest(); + + AtomicIncrement(NumMessages); + } + + /// this handler should not be called because this server does not send replies + void OnSent(TAutoPtr<TBusMessage> mess) override { + Y_UNUSED(mess); + Y_FAIL("This server does not sent replies"); + } +}; + +Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { + Y_UNIT_TEST(Simple) { + TObjectCountCheck objectCountCheck; + + NullServer server; + NullClient client(TNetAddr("localhost", server.Session->GetActualListenPort())); + + client.Work(); + + // wait until all client message are delivered + UNIT_WAIT_FOR(AtomicGet(server.NumMessages) == 10); + + // assert correct number of messages + UNIT_ASSERT_VALUES_EQUAL(AtomicGet(server.NumMessages), 10); + UNIT_ASSERT_VALUES_EQUAL(server.Session->GetInFlight(), 0); + UNIT_ASSERT_VALUES_EQUAL(client.Session->GetInFlight(), 0); + } + + struct TMessageTooLargeClient: public NullClient { + TSystemEvent GotTooLarge; + + TBusClientSessionConfig Config() { + TBusClientSessionConfig r; + r.MaxMessageSize = 1; + return r; + } + + TMessageTooLargeClient(unsigned port) + : NullClient(TNetAddr("localhost", port), Config()) + { + } + + ~TMessageTooLargeClient() override { + Session->Shutdown(); + } + + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { + Y_UNUSED(mess); + + Y_VERIFY(status == MESSAGE_MESSAGE_TOO_LARGE, "wrong status: %s", ToCString(status)); + + GotTooLarge.Signal(); + } + }; + + Y_UNIT_TEST(MessageTooLargeOnClient) { + TObjectCountCheck objectCountCheck; + + NullServer server; + + TMessageTooLargeClient client(server.Session->GetActualListenPort()); + + EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); + + client.GotTooLarge.WaitI(); + } + + struct TCheckTimeoutClient: public NullClient { + ~TCheckTimeoutClient() override { + Session->Shutdown(); + } + + static TBusClientSessionConfig SessionConfig() { + TBusClientSessionConfig sessionConfig; + sessionConfig.SendTimeout = 1; + sessionConfig.ConnectTimeout = 1; + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10); + return sessionConfig; + } + + TCheckTimeoutClient(const TNetAddr& serverAddr) + : NullClient(serverAddr, SessionConfig()) + { + } + + TSystemEvent GotError; + + /// message that could not be delivered + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { + Y_UNUSED(mess); + Y_UNUSED(status); // TODO: check status + + GotError.Signal(); + } + }; + + Y_UNIT_TEST(SendTimeout_Callback_NoServer) { + TObjectCountCheck objectCountCheck; + + TCheckTimeoutClient client(TNetAddr("localhost", 17)); + + EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); + UNIT_ASSERT_EQUAL(ok, MESSAGE_OK); + + client.GotError.WaitI(); + } + + Y_UNIT_TEST(SendTimeout_Callback_HangingServer) { + THangingServer server; + + TObjectCountCheck objectCountCheck; + + TCheckTimeoutClient client(TNetAddr("localhost", server.GetPort())); + + bool first = true; + for (;;) { + EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); + if (ok == MESSAGE_BUSY) { + UNIT_ASSERT(!first); + break; + } + UNIT_ASSERT_VALUES_EQUAL(ok, MESSAGE_OK); + first = false; + } + + // BUGBUG: The test is buggy: the client might not get any error when sending one-way messages. + // All the messages that the client has sent before he gets first MESSAGE_BUSY error might get + // serailized and written to the socket buffer, so the write queue gets drained and there are + // no messages to timeout when periodic timeout check happens. + + client.GotError.WaitI(); + } +} |