///////////////////////////////////////////////////////////////////
/// \file
/// \brief Example of reply-less communication
/// This example demostrates how asynchronous message passing library
/// can be used to send message and do not wait for reply back.
/// The usage of reply-less communication should be restricted to
/// low-throughput clients and high-throughput server to provide reasonable
/// utility. Removing replies from the communication removes any restriction
/// on how many message can be send to server and rougue clients may overwelm
/// server without thoughtput control.
/// 1) To implement reply-less client \n
/// Call NBus::TBusSession::AckMessage()
/// from within NBus::IMessageHandler::OnSent() handler when message has
/// gone into wire on client end. See example in NBus::NullClient::OnMessageSent().
/// Discard identity for reply message.
/// 2) To implement reply-less server \n
/// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage()
/// handler when message has been received on server end.
/// See example in NBus::NullServer::OnMessage().
/// Discard identity for reply message.
#include <library/cpp/messagebus/test/helper/alloc_counter.h>
#include <library/cpp/messagebus/test/helper/example.h>
#include <library/cpp/messagebus/test/helper/hanging_server.h>
#include <library/cpp/messagebus/test/helper/message_handler_error.h>
#include <library/cpp/messagebus/test/helper/object_count_check.h>
#include <library/cpp/messagebus/test/helper/wait_for.h>
#include <library/cpp/messagebus/ybus.h>
using namespace std;
using namespace NBus;
using namespace NBus::NPrivate;
using namespace NBus::NTest;
////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////
/// \brief Reply-less client and handler
struct NullClient : TBusClientHandlerError {
TNetAddr ServerAddr;
TBusMessageQueuePtr Queue;
TBusClientSessionPtr Session;
TExampleProtocol Proto;
/// constructor creates instances of protocol and session
NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig())
: ServerAddr(serverAddr)
{
UNIT_ASSERT(serverAddr.GetPort() > 0);
/// create or get instance of message queue, need one per application
Queue = CreateMessageQueue();
/// register source/client session
Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue);
/// register service, announce to clients via LocatorService
Session->RegisterService("localhost");
}
~NullClient() override {
Session->Shutdown();
}
/// dispatch of requests is done here
void Work() {
int batch = 10;
for (int i = 0; i < batch; i++) {
TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount);
mess->Data = "TADA";
Session->SendMessageOneWay(mess, &ServerAddr);
}
}
void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override {
}
};
/////////////////////////////////////////////////////////////////////
/// \brief Reply-less server and handler
class NullServer: public TBusServerHandlerError {
public:
/// session object to maintian
TBusMessageQueuePtr Queue;
TBusServerSessionPtr Session;
TExampleProtocol Proto;
public:
TAtomic NumMessages;
NullServer() {
NumMessages = 0;
/// create or get instance of single message queue, need one for application
Queue = CreateMessageQueue();
/// register destination session
TBusServerSessionConfig sessionConfig;
Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue);
}
~NullServer() override {
Session->Shutdown();
}
/// when message comes do not send reply, just acknowledge
void OnMessage(TOnMessageContext& mess) override {
TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage());
Y_ASSERT(fmess->Data == "TADA");
/// tell session to forget this message and never expect any reply
mess.ForgetRequest();
AtomicIncrement(NumMessages);
}
/// this handler should not be called because this server does not send replies
void OnSent(TAutoPtr<TBusMessage> mess) override {
Y_UNUSED(mess);
Y_ABORT("This server does not sent replies");
}
};
Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
Y_UNIT_TEST(Simple) {
TObjectCountCheck objectCountCheck;
NullServer server;
NullClient client(TNetAddr("localhost", server.Session->GetActualListenPort()));
client.Work();
// wait until all client message are delivered
UNIT_WAIT_FOR(AtomicGet(server.NumMessages) == 10);
// assert correct number of messages
UNIT_ASSERT_VALUES_EQUAL(AtomicGet(server.NumMessages), 10);
UNIT_ASSERT_VALUES_EQUAL(server.Session->GetInFlight(), 0);
UNIT_ASSERT_VALUES_EQUAL(client.Session->GetInFlight(), 0);
}
struct TMessageTooLargeClient: public NullClient {
TSystemEvent GotTooLarge;
TBusClientSessionConfig Config() {
TBusClientSessionConfig r;
r.MaxMessageSize = 1;
return r;
}
TMessageTooLargeClient(unsigned port)
: NullClient(TNetAddr("localhost", port), Config())
{
}
~TMessageTooLargeClient() override {
Session->Shutdown();
}
void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
Y_UNUSED(mess);
Y_ABORT_UNLESS(status == MESSAGE_MESSAGE_TOO_LARGE, "wrong status: %s", ToCString(status));
GotTooLarge.Signal();
}
};
Y_UNIT_TEST(MessageTooLargeOnClient) {
TObjectCountCheck objectCountCheck;
NullServer server;
TMessageTooLargeClient client(server.Session->GetActualListenPort());
EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok);
client.GotTooLarge.WaitI();
}
struct TCheckTimeoutClient: public NullClient {
~TCheckTimeoutClient() override {
Session->Shutdown();
}
static TBusClientSessionConfig SessionConfig() {
TBusClientSessionConfig sessionConfig;
sessionConfig.SendTimeout = 1;
sessionConfig.ConnectTimeout = 1;
sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10);
return sessionConfig;
}
TCheckTimeoutClient(const TNetAddr& serverAddr)
: NullClient(serverAddr, SessionConfig())
{
}
TSystemEvent GotError;
/// message that could not be delivered
void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override {
Y_UNUSED(mess);
Y_UNUSED(status); // TODO: check status
GotError.Signal();
}
};
Y_UNIT_TEST(SendTimeout_Callback_NoServer) {
TObjectCountCheck objectCountCheck;
TCheckTimeoutClient client(TNetAddr("localhost", 17));
EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
UNIT_ASSERT_EQUAL(ok, MESSAGE_OK);
client.GotError.WaitI();
}
Y_UNIT_TEST(SendTimeout_Callback_HangingServer) {
THangingServer server;
TObjectCountCheck objectCountCheck;
TCheckTimeoutClient client(TNetAddr("localhost", server.GetPort()));
bool first = true;
for (;;) {
EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr);
if (ok == MESSAGE_BUSY) {
UNIT_ASSERT(!first);
break;
}
UNIT_ASSERT_VALUES_EQUAL(ok, MESSAGE_OK);
first = false;
}
// BUGBUG: The test is buggy: the client might not get any error when sending one-way messages.
// All the messages that the client has sent before he gets first MESSAGE_BUSY error might get
// serailized and written to the socket buffer, so the write queue gets drained and there are
// no messages to timeout when periodic timeout check happens.
client.GotError.WaitI();
}
}