aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/test/ut
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/messagebus/test/ut
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/test/ut')
-rw-r--r--library/cpp/messagebus/test/ut/count_down_latch.h6
-rw-r--r--library/cpp/messagebus/test/ut/locator_uniq_ut.cpp10
-rw-r--r--library/cpp/messagebus/test/ut/messagebus_ut.cpp102
-rw-r--r--library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp8
-rw-r--r--library/cpp/messagebus/test/ut/module_client_ut.cpp74
-rw-r--r--library/cpp/messagebus/test/ut/module_server_ut.cpp8
-rw-r--r--library/cpp/messagebus/test/ut/moduletest.h406
-rw-r--r--library/cpp/messagebus/test/ut/one_way_ut.cpp48
-rw-r--r--library/cpp/messagebus/test/ut/starter_ut.cpp6
-rw-r--r--library/cpp/messagebus/test/ut/sync_client_ut.cpp98
10 files changed, 383 insertions, 383 deletions
diff --git a/library/cpp/messagebus/test/ut/count_down_latch.h b/library/cpp/messagebus/test/ut/count_down_latch.h
index 5117db5731..a4d6b72bfa 100644
--- a/library/cpp/messagebus/test/ut/count_down_latch.h
+++ b/library/cpp/messagebus/test/ut/count_down_latch.h
@@ -7,12 +7,12 @@ class TCountDownLatch {
private:
TAtomic Current;
TSystemEvent EventObject;
-
+
public:
TCountDownLatch(unsigned initial)
: Current(initial)
- {
- }
+ {
+ }
void CountDown() {
if (AtomicDecrement(Current) == 0) {
diff --git a/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp b/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp
index 3fdd175d73..dd5dfc4cca 100644
--- a/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp
+++ b/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp
@@ -4,12 +4,12 @@
#include <library/cpp/messagebus/ybus.h>
class TLocatorRegisterUniqTest: public TTestBase {
- UNIT_TEST_SUITE(TLocatorRegisterUniqTest);
- UNIT_TEST(TestRegister);
- UNIT_TEST_SUITE_END();
+ UNIT_TEST_SUITE(TLocatorRegisterUniqTest);
+ UNIT_TEST(TestRegister);
+ UNIT_TEST_SUITE_END();
-protected:
- void TestRegister();
+protected:
+ void TestRegister();
};
UNIT_TEST_SUITE_REGISTRATION(TLocatorRegisterUniqTest);
diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
index 040f9b7702..92839e9cf9 100644
--- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp
+++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp
@@ -10,8 +10,8 @@
#include <util/network/sock.h>
-#include <utility>
-
+#include <utility>
+
using namespace NBus;
using namespace NBus::NTest;
@@ -23,10 +23,10 @@ namespace {
TExampleClientSlowOnMessageSent()
: SentCompleted(0)
- {
- }
+ {
+ }
- ~TExampleClientSlowOnMessageSent() override {
+ ~TExampleClientSlowOnMessageSent() override {
Session->Shutdown();
}
@@ -48,7 +48,7 @@ namespace {
Y_UNIT_TEST_SUITE(TMessageBusTests) {
void TestDestinationTemplate(bool useCompression, bool ackMessageBeforeReply,
- const TBusServerSessionConfig& sessionConfig) {
+ const TBusServerSessionConfig& sessionConfig) {
TObjectCountCheck objectCountCheck;
TExampleServer server;
@@ -121,17 +121,17 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.SendMessagesWaitReplies(19, serverAddr);
}
- struct TestNoServerImplClient: public TExampleClient {
+ struct TestNoServerImplClient: public TExampleClient {
TTestSync TestSync;
int failures = 0;
- template <typename... Args>
- TestNoServerImplClient(Args&&... args)
- : TExampleClient(std::forward<Args>(args)...)
- {
- }
+ template <typename... Args>
+ TestNoServerImplClient(Args&&... args)
+ : TExampleClient(std::forward<Args>(args)...)
+ {
+ }
- ~TestNoServerImplClient() override {
+ ~TestNoServerImplClient() override {
Session->Shutdown();
}
@@ -155,7 +155,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
if (oneWay) {
status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr);
} else {
- TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
+ TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount));
status = client.Session->SendMessageAutoPtr(message, &noServerAddr);
}
@@ -168,7 +168,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.TestSync.WaitForAndIncrement(count * 2 + 1);
}
- client.TestSync.WaitForAndIncrement(count * 2);
+ client.TestSync.WaitForAndIncrement(count * 2);
}
void HangingServerImpl(unsigned port) {
@@ -241,7 +241,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.WaitReplies();
}
- struct TSendTimeoutCheckerExampleClient: public TExampleClient {
+ struct TSendTimeoutCheckerExampleClient: public TExampleClient {
static TBusClientSessionConfig SessionConfig(bool periodLessThanConnectTimeout) {
TBusClientSessionConfig sessionConfig;
if (periodLessThanConnectTimeout) {
@@ -256,8 +256,8 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
TSendTimeoutCheckerExampleClient(bool periodLessThanConnectTimeout)
: TExampleClient(SessionConfig(periodLessThanConnectTimeout))
- {
- }
+ {
+ }
~TSendTimeoutCheckerExampleClient() override {
Session->Shutdown();
@@ -470,7 +470,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE);
}
- struct TServerForResponseTooLarge: public TExampleServer {
+ struct TServerForResponseTooLarge: public TExampleServer {
TTestSync TestSync;
static TBusServerSessionConfig Config() {
@@ -481,10 +481,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
TServerForResponseTooLarge()
: TExampleServer("TServerForResponseTooLarge", Config())
- {
- }
+ {
+ }
- ~TServerForResponseTooLarge() override {
+ ~TServerForResponseTooLarge() override {
Session->Shutdown();
}
@@ -530,7 +530,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
UNIT_ASSERT_VALUES_EQUAL(1, client.Session->GetInFlight());
}
- struct TServerForRequestTooLarge: public TExampleServer {
+ struct TServerForRequestTooLarge: public TExampleServer {
TTestSync TestSync;
static TBusServerSessionConfig Config() {
@@ -541,10 +541,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
TServerForRequestTooLarge()
: TExampleServer("TServerForRequestTooLarge", Config())
- {
- }
+ {
+ }
- ~TServerForRequestTooLarge() override {
+ ~TServerForRequestTooLarge() override {
Session->Shutdown();
}
@@ -674,7 +674,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.WaitReplies();
}
- struct TResetAfterSendOneWayErrorInCallbackClient: public TExampleClient {
+ struct TResetAfterSendOneWayErrorInCallbackClient: public TExampleClient {
TTestSync TestSync;
static TBusClientSessionConfig SessionConfig() {
@@ -691,7 +691,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
{
}
- ~TResetAfterSendOneWayErrorInCallbackClient() override {
+ ~TResetAfterSendOneWayErrorInCallbackClient() override {
Session->Shutdown();
}
@@ -716,10 +716,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.TestSync.WaitForAndIncrement(2);
}
- struct TResetAfterSendMessageOneWayDuringShutdown: public TExampleClient {
+ struct TResetAfterSendMessageOneWayDuringShutdown: public TExampleClient {
TTestSync TestSync;
- ~TResetAfterSendMessageOneWayDuringShutdown() override {
+ ~TResetAfterSendMessageOneWayDuringShutdown() override {
Session->Shutdown();
}
@@ -770,10 +770,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
TestNoServerImpl(17, true);
}
- struct TResetAfterSendOneWaySuccessClient: public TExampleClient {
+ struct TResetAfterSendOneWaySuccessClient: public TExampleClient {
TTestSync TestSync;
- ~TResetAfterSendOneWaySuccessClient() override {
+ ~TResetAfterSendOneWaySuccessClient() override {
Session->Shutdown();
}
@@ -835,7 +835,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
TExampleProtocol proto;
TBusServerHandlerError handler;
TBusServerSessionPtr session = TBusServerSession::Create(
- &proto, &handler, TBusServerSessionConfig(), queue);
+ &proto, &handler, TBusServerSessionConfig(), queue);
unsigned port = session->GetActualListenPort();
UNIT_ASSERT(port > 0);
@@ -873,7 +873,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
size_t pos = 0;
while (pos < sizeof(response)) {
- size_t count = input.Read(((char*)&response) + pos, sizeof(response) - pos);
+ size_t count = input.Read(((char*)&response) + pos, sizeof(response) - pos);
pos += count;
}
@@ -882,10 +882,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
UNIT_ASSERT_VALUES_EQUAL(YBUS_VERSION, response.GetVersionInternal());
}
- struct TOnConnectionEventClient: public TExampleClient {
+ struct TOnConnectionEventClient: public TExampleClient {
TTestSync Sync;
- ~TOnConnectionEventClient() override {
+ ~TOnConnectionEventClient() override {
Session->Shutdown();
}
@@ -913,13 +913,13 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
}
};
- struct TOnConnectionEventServer: public TExampleServer {
+ struct TOnConnectionEventServer: public TExampleServer {
TOnConnectionEventServer()
- : TExampleServer("TOnConnectionEventServer")
- {
- }
+ : TExampleServer("TOnConnectionEventServer")
+ {
+ }
- ~TOnConnectionEventServer() override {
+ ~TOnConnectionEventServer() override {
Session->Shutdown();
}
@@ -963,9 +963,9 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
client.Sync.WaitForAndIncrement(3);
}
- struct TServerForQuotaWake: public TExampleServer {
+ struct TServerForQuotaWake: public TExampleServer {
TSystemEvent GoOn;
- TMutex OneLock;
+ TMutex OneLock;
TOnMessageContext OneMessage;
@@ -981,16 +981,16 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
TServerForQuotaWake()
: TExampleServer("TServerForQuotaWake", Config())
- {
- }
+ {
+ }
- ~TServerForQuotaWake() override {
+ ~TServerForQuotaWake() override {
Session->Shutdown();
}
void OnMessage(TOnMessageContext& req) override {
if (!GoOn.Wait(0)) {
- TGuard<TMutex> guard(OneLock);
+ TGuard<TMutex> guard(OneLock);
UNIT_ASSERT(!OneMessage);
@@ -1000,7 +1000,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
}
void WakeOne() {
- TGuard<TMutex> guard(OneLock);
+ TGuard<TMutex> guard(OneLock);
UNIT_ASSERT(!!OneMessage);
@@ -1035,13 +1035,13 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) {
count++;
} else if (status == MESSAGE_BUSY) {
- if (count == test_msg_count) {
+ if (count == test_msg_count) {
TInstant now = TInstant::Now();
- if (start.GetValue() == 0) {
+ if (start.GetValue() == 0) {
start = now;
- // TODO: properly check that server is blocked
+ // TODO: properly check that server is blocked
} else if (start + TDuration::MilliSeconds(100) < now) {
break;
}
diff --git a/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp
index 4083cf3b7b..fd511e2dd9 100644
--- a/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp
+++ b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp
@@ -43,8 +43,8 @@ Y_UNIT_TEST_SUITE(ModuleClientOneWay) {
: TBusModule("m")
, TestSync(testSync)
, Port(port)
- {
- }
+ {
+ }
TJobHandler Start(TBusJob* job, TBusMessage*) override {
TestSync->WaitForAndIncrement(0);
@@ -94,8 +94,8 @@ Y_UNIT_TEST_SUITE(ModuleClientOneWay) {
TSendErrorModule(TTestSync* testSync)
: TBusModule("m")
, TestSync(testSync)
- {
- }
+ {
+ }
TJobHandler Start(TBusJob* job, TBusMessage*) override {
TestSync->WaitForAndIncrement(0);
diff --git a/library/cpp/messagebus/test/ut/module_client_ut.cpp b/library/cpp/messagebus/test/ut/module_client_ut.cpp
index ebfe185cc6..84897ce5c4 100644
--- a/library/cpp/messagebus/test/ut/module_client_ut.cpp
+++ b/library/cpp/messagebus/test/ut/module_client_ut.cpp
@@ -20,34 +20,34 @@ using namespace NBus::NTest;
// helper class that cleans TBusJob instance, so job's destructor can
// be completed without assertion fail.
struct TJobGuard {
-public:
- TJobGuard(NBus::TBusJob* job)
- : Job(job)
- {
- }
-
- ~TJobGuard() {
- Job->ClearAllMessageStates();
- }
-
-private:
- NBus::TBusJob* Job;
+public:
+ TJobGuard(NBus::TBusJob* job)
+ : Job(job)
+ {
+ }
+
+ ~TJobGuard() {
+ Job->ClearAllMessageStates();
+ }
+
+private:
+ NBus::TBusJob* Job;
};
-class TMessageOk: public NBus::TBusMessage {
-public:
- TMessageOk()
- : NBus::TBusMessage(1)
- {
- }
+class TMessageOk: public NBus::TBusMessage {
+public:
+ TMessageOk()
+ : NBus::TBusMessage(1)
+ {
+ }
};
-class TMessageError: public NBus::TBusMessage {
-public:
- TMessageError()
- : NBus::TBusMessage(2)
- {
- }
+class TMessageError: public NBus::TBusMessage {
+public:
+ TMessageError()
+ : NBus::TBusMessage(2)
+ {
+ }
};
Y_UNIT_TEST_SUITE(BusJobTest) {
@@ -108,8 +108,8 @@ Y_UNIT_TEST_SUITE(BusJobTest) {
TParallelOnReplyModule(const TNetAddr& serverAddr)
: ServerAddr(serverAddr)
, RepliesLatch(2)
- {
- }
+ {
+ }
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
Y_UNUSED(mess);
@@ -166,8 +166,8 @@ Y_UNIT_TEST_SUITE(BusJobTest) {
: ServerAddr("localhost", 17)
, GotReplyLatch(2)
, SentMessage()
- {
- }
+ {
+ }
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
Y_UNUSED(mess);
@@ -222,7 +222,7 @@ Y_UNIT_TEST_SUITE(BusJobTest) {
module.Shutdown();
}
- struct TSlowReplyServer: public TBusServerHandlerError {
+ struct TSlowReplyServer: public TBusServerHandlerError {
TTestSync* const TestSync;
TBusMessageQueuePtr Bus;
TBusServerSessionPtr ServerSession;
@@ -248,7 +248,7 @@ Y_UNIT_TEST_SUITE(BusJobTest) {
}
};
- struct TModuleThatSendsReplyEarly: public TExampleClientModule {
+ struct TModuleThatSendsReplyEarly: public TExampleClientModule {
TTestSync* const TestSync;
const unsigned ServerPort;
@@ -260,8 +260,8 @@ Y_UNIT_TEST_SUITE(BusJobTest) {
, ServerPort(serverPort)
, ServerSession(nullptr)
, ReplyCount(0)
- {
- }
+ {
+ }
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
Y_UNUSED(mess);
@@ -318,22 +318,22 @@ Y_UNIT_TEST_SUITE(BusJobTest) {
module.Shutdown();
}
- struct TShutdownCalledBeforeReplyReceivedModule: public TExampleClientModule {
+ struct TShutdownCalledBeforeReplyReceivedModule: public TExampleClientModule {
unsigned ServerPort;
TTestSync TestSync;
TShutdownCalledBeforeReplyReceivedModule(unsigned serverPort)
: ServerPort(serverPort)
- {
- }
+ {
+ }
TJobHandler Start(TBusJob* job, TBusMessage*) override {
TestSync.CheckAndIncrement(0);
job->Send(new TExampleRequest(&Proto.RequestCount), Source,
- TReplyHandler(&TShutdownCalledBeforeReplyReceivedModule::HandleReply),
- 0, TNetAddr("localhost", ServerPort));
+ TReplyHandler(&TShutdownCalledBeforeReplyReceivedModule::HandleReply),
+ 0, TNetAddr("localhost", ServerPort));
return &TShutdownCalledBeforeReplyReceivedModule::End;
}
diff --git a/library/cpp/messagebus/test/ut/module_server_ut.cpp b/library/cpp/messagebus/test/ut/module_server_ut.cpp
index 88fe1dd9b6..4258ae4bf7 100644
--- a/library/cpp/messagebus/test/ut/module_server_ut.cpp
+++ b/library/cpp/messagebus/test/ut/module_server_ut.cpp
@@ -21,7 +21,7 @@ Y_UNIT_TEST_SUITE(ModuleServerTests) {
/// create or get instance of message queue, need one per application
TBusMessageQueuePtr bus(CreateMessageQueue());
- THostInfoHandler hostHandler(bus.Get());
+ THostInfoHandler hostHandler(bus.Get());
TDupDetectModule module(hostHandler.GetActualListenAddr());
bool success;
success = module.Init(bus.Get());
@@ -39,13 +39,13 @@ Y_UNIT_TEST_SUITE(ModuleServerTests) {
dupHandler.DupDetect->Shutdown();
}
- struct TParallelOnMessageModule: public TExampleServerModule {
+ struct TParallelOnMessageModule: public TExampleServerModule {
TCountDownLatch WaitTwoRequestsLatch;
TParallelOnMessageModule()
: WaitTwoRequestsLatch(2)
- {
- }
+ {
+ }
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
WaitTwoRequestsLatch.CountDown();
diff --git a/library/cpp/messagebus/test/ut/moduletest.h b/library/cpp/messagebus/test/ut/moduletest.h
index d5da72c0cb..e67da02701 100644
--- a/library/cpp/messagebus/test/ut/moduletest.h
+++ b/library/cpp/messagebus/test/ut/moduletest.h
@@ -11,211 +11,211 @@
#include <library/cpp/messagebus/ybus.h>
#include <library/cpp/messagebus/oldmodule/module.h>
-namespace NBus {
- namespace NTest {
- using namespace std;
+namespace NBus {
+ namespace NTest {
+ using namespace std;
-#define TYPE_HOSTINFOREQUEST 100
+#define TYPE_HOSTINFOREQUEST 100
#define TYPE_HOSTINFORESPONSE 101
- ////////////////////////////////////////////////////////////////////
- /// \brief DupDetect protocol that common between client and server
- ////////////////////////////////////////////////////////////////////
- /// \brief HostInfo request class
- class THostInfoMessage: public TBusMessage {
- public:
- THostInfoMessage()
- : TBusMessage(TYPE_HOSTINFOREQUEST)
- {
- }
- THostInfoMessage(ECreateUninitialized)
- : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
- {
- }
-
- ~THostInfoMessage() override {
- }
- };
-
- ////////////////////////////////////////////////////////////////////
- /// \brief HostInfo reply class
- class THostInfoReply: public TBusMessage {
- public:
- THostInfoReply()
- : TBusMessage(TYPE_HOSTINFORESPONSE)
- {
- }
- THostInfoReply(ECreateUninitialized)
- : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
- {
- }
-
- ~THostInfoReply() override {
- }
- };
-
- ////////////////////////////////////////////////////////////////////
- /// \brief HostInfo protocol that common between client and server
- class THostInfoProtocol: public TBusProtocol {
- public:
- THostInfoProtocol()
- : TBusProtocol("HOSTINFO", 0)
- {
- }
- /// serialized protocol specific data into TBusData
- void Serialize(const TBusMessage* mess, TBuffer& data) override {
- Y_UNUSED(data);
- Y_UNUSED(mess);
- }
-
- /// deserialized TBusData into new instance of the message
- TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override {
- Y_UNUSED(payload);
-
- if (messageType == TYPE_HOSTINFOREQUEST) {
- return new THostInfoMessage(MESSAGE_CREATE_UNINITIALIZED);
- } else if (messageType == TYPE_HOSTINFORESPONSE) {
- return new THostInfoReply(MESSAGE_CREATE_UNINITIALIZED);
- } else {
- Y_FAIL("unknown");
- }
- }
- };
-
- //////////////////////////////////////////////////////////////
- /// \brief HostInfo handler (should convert it to module too)
- struct THostInfoHandler: public TBusServerHandlerError {
- TBusServerSessionPtr Session;
- TBusServerSessionConfig HostInfoConfig;
- THostInfoProtocol HostInfoProto;
-
- THostInfoHandler(TBusMessageQueue* queue) {
- Session = TBusServerSession::Create(&HostInfoProto, this, HostInfoConfig, queue);
- }
-
- void OnMessage(TOnMessageContext& mess) override {
- usleep(10 * 1000); /// pretend we are doing something
-
- TAutoPtr<THostInfoReply> reply(new THostInfoReply());
-
- mess.SendReplyMove(reply);
- }
-
- TNetAddr GetActualListenAddr() {
- return TNetAddr("localhost", Session->GetActualListenPort());
- }
- };
-
- //////////////////////////////////////////////////////////////
- /// \brief DupDetect handler (should convert it to module too)
- struct TDupDetectHandler: public TBusClientHandlerError {
- TNetAddr ServerAddr;
-
- TBusClientSessionPtr DupDetect;
- TBusClientSessionConfig DupDetectConfig;
- TExampleProtocol DupDetectProto;
-
- int NumMessages;
- int NumReplies;
-
- TDupDetectHandler(const TNetAddr& serverAddr, TBusMessageQueuePtr queue)
- : ServerAddr(serverAddr)
- {
- DupDetect = TBusClientSession::Create(&DupDetectProto, this, DupDetectConfig, queue);
- DupDetect->RegisterService("localhost");
- }
-
- void Work() {
- NumMessages = 10;
- NumReplies = 0;
-
- for (int i = 0; i < NumMessages; i++) {
- TExampleRequest* mess = new TExampleRequest(&DupDetectProto.RequestCount);
- DupDetect->SendMessage(mess, &ServerAddr);
- }
- }
-
- void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override {
- Y_UNUSED(mess);
- Y_UNUSED(reply);
- NumReplies++;
- }
- };
-
- /////////////////////////////////////////////////////////////////
- /// \brief DupDetect module
-
- struct TDupDetectModule: public TBusModule {
- TNetAddr HostInfoAddr;
-
- TBusClientSessionPtr HostInfoClientSession;
- TBusClientSessionConfig HostInfoConfig;
- THostInfoProtocol HostInfoProto;
-
- TExampleProtocol DupDetectProto;
- TBusServerSessionConfig DupDetectConfig;
-
- TNetAddr ListenAddr;
-
- TDupDetectModule(const TNetAddr& hostInfoAddr)
- : TBusModule("DUPDETECTMODULE")
- , HostInfoAddr(hostInfoAddr)
- {
- }
-
- bool Init(TBusMessageQueue* queue) {
- HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig);
- HostInfoClientSession->RegisterService("localhost");
-
- return TBusModule::CreatePrivateSessions(queue);
- }
-
- TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
- TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig);
-
- ListenAddr = TNetAddr("localhost", session->GetActualListenPort());
-
- return session;
- }
-
- /// entry point into module, first function to call
- TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
- TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess);
- Y_UNUSED(dmess);
-
- THostInfoMessage* hmess = new THostInfoMessage();
-
- /// send message to imaginary hostinfo server
- job->Send(hmess, HostInfoClientSession, TReplyHandler(), 0, HostInfoAddr);
-
- return TJobHandler(&TDupDetectModule::ProcessHostInfo);
- }
-
- /// next handler is executed when all outstanding requests from previous handler is completed
- TJobHandler ProcessHostInfo(TBusJob* job, TBusMessage* mess) {
- TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess);
- Y_UNUSED(dmess);
-
- THostInfoMessage* hmess = job->Get<THostInfoMessage>();
- THostInfoReply* hreply = job->Get<THostInfoReply>();
- EMessageStatus hstatus = job->GetStatus<THostInfoMessage>();
- Y_ASSERT(hmess != nullptr);
- Y_ASSERT(hreply != nullptr);
- Y_ASSERT(hstatus == MESSAGE_OK);
-
- return TJobHandler(&TDupDetectModule::Finish);
- }
-
- /// last handler sends reply and returns NULL
- TJobHandler Finish(TBusJob* job, TBusMessage* mess) {
- Y_UNUSED(mess);
-
- TExampleResponse* reply = new TExampleResponse(&DupDetectProto.ResponseCount);
- job->SendReply(reply);
-
- return nullptr;
- }
- };
+ ////////////////////////////////////////////////////////////////////
+ /// \brief DupDetect protocol that common between client and server
+ ////////////////////////////////////////////////////////////////////
+ /// \brief HostInfo request class
+ class THostInfoMessage: public TBusMessage {
+ public:
+ THostInfoMessage()
+ : TBusMessage(TYPE_HOSTINFOREQUEST)
+ {
+ }
+ THostInfoMessage(ECreateUninitialized)
+ : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
+ {
+ }
+
+ ~THostInfoMessage() override {
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////
+ /// \brief HostInfo reply class
+ class THostInfoReply: public TBusMessage {
+ public:
+ THostInfoReply()
+ : TBusMessage(TYPE_HOSTINFORESPONSE)
+ {
+ }
+ THostInfoReply(ECreateUninitialized)
+ : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
+ {
+ }
+
+ ~THostInfoReply() override {
+ }
+ };
+
+ ////////////////////////////////////////////////////////////////////
+ /// \brief HostInfo protocol that common between client and server
+ class THostInfoProtocol: public TBusProtocol {
+ public:
+ THostInfoProtocol()
+ : TBusProtocol("HOSTINFO", 0)
+ {
+ }
+ /// serialized protocol specific data into TBusData
+ void Serialize(const TBusMessage* mess, TBuffer& data) override {
+ Y_UNUSED(data);
+ Y_UNUSED(mess);
+ }
+
+ /// deserialized TBusData into new instance of the message
+ TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override {
+ Y_UNUSED(payload);
+
+ if (messageType == TYPE_HOSTINFOREQUEST) {
+ return new THostInfoMessage(MESSAGE_CREATE_UNINITIALIZED);
+ } else if (messageType == TYPE_HOSTINFORESPONSE) {
+ return new THostInfoReply(MESSAGE_CREATE_UNINITIALIZED);
+ } else {
+ Y_FAIL("unknown");
+ }
+ }
+ };
+
+ //////////////////////////////////////////////////////////////
+ /// \brief HostInfo handler (should convert it to module too)
+ struct THostInfoHandler: public TBusServerHandlerError {
+ TBusServerSessionPtr Session;
+ TBusServerSessionConfig HostInfoConfig;
+ THostInfoProtocol HostInfoProto;
+
+ THostInfoHandler(TBusMessageQueue* queue) {
+ Session = TBusServerSession::Create(&HostInfoProto, this, HostInfoConfig, queue);
+ }
+
+ void OnMessage(TOnMessageContext& mess) override {
+ usleep(10 * 1000); /// pretend we are doing something
+
+ TAutoPtr<THostInfoReply> reply(new THostInfoReply());
+
+ mess.SendReplyMove(reply);
+ }
+
+ TNetAddr GetActualListenAddr() {
+ return TNetAddr("localhost", Session->GetActualListenPort());
+ }
+ };
+
+ //////////////////////////////////////////////////////////////
+ /// \brief DupDetect handler (should convert it to module too)
+ struct TDupDetectHandler: public TBusClientHandlerError {
+ TNetAddr ServerAddr;
+
+ TBusClientSessionPtr DupDetect;
+ TBusClientSessionConfig DupDetectConfig;
+ TExampleProtocol DupDetectProto;
+
+ int NumMessages;
+ int NumReplies;
+
+ TDupDetectHandler(const TNetAddr& serverAddr, TBusMessageQueuePtr queue)
+ : ServerAddr(serverAddr)
+ {
+ DupDetect = TBusClientSession::Create(&DupDetectProto, this, DupDetectConfig, queue);
+ DupDetect->RegisterService("localhost");
+ }
+
+ void Work() {
+ NumMessages = 10;
+ NumReplies = 0;
+
+ for (int i = 0; i < NumMessages; i++) {
+ TExampleRequest* mess = new TExampleRequest(&DupDetectProto.RequestCount);
+ DupDetect->SendMessage(mess, &ServerAddr);
+ }
+ }
+
+ void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override {
+ Y_UNUSED(mess);
+ Y_UNUSED(reply);
+ NumReplies++;
+ }
+ };
+
+ /////////////////////////////////////////////////////////////////
+ /// \brief DupDetect module
+
+ struct TDupDetectModule: public TBusModule {
+ TNetAddr HostInfoAddr;
+
+ TBusClientSessionPtr HostInfoClientSession;
+ TBusClientSessionConfig HostInfoConfig;
+ THostInfoProtocol HostInfoProto;
+
+ TExampleProtocol DupDetectProto;
+ TBusServerSessionConfig DupDetectConfig;
+
+ TNetAddr ListenAddr;
+
+ TDupDetectModule(const TNetAddr& hostInfoAddr)
+ : TBusModule("DUPDETECTMODULE")
+ , HostInfoAddr(hostInfoAddr)
+ {
+ }
+
+ bool Init(TBusMessageQueue* queue) {
+ HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig);
+ HostInfoClientSession->RegisterService("localhost");
+
+ return TBusModule::CreatePrivateSessions(queue);
+ }
+
+ TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override {
+ TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig);
+
+ ListenAddr = TNetAddr("localhost", session->GetActualListenPort());
+
+ return session;
+ }
+
+ /// entry point into module, first function to call
+ TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
+ TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess);
+ Y_UNUSED(dmess);
+
+ THostInfoMessage* hmess = new THostInfoMessage();
+
+ /// send message to imaginary hostinfo server
+ job->Send(hmess, HostInfoClientSession, TReplyHandler(), 0, HostInfoAddr);
+
+ return TJobHandler(&TDupDetectModule::ProcessHostInfo);
+ }
+
+ /// next handler is executed when all outstanding requests from previous handler is completed
+ TJobHandler ProcessHostInfo(TBusJob* job, TBusMessage* mess) {
+ TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess);
+ Y_UNUSED(dmess);
+
+ THostInfoMessage* hmess = job->Get<THostInfoMessage>();
+ THostInfoReply* hreply = job->Get<THostInfoReply>();
+ EMessageStatus hstatus = job->GetStatus<THostInfoMessage>();
+ Y_ASSERT(hmess != nullptr);
+ Y_ASSERT(hreply != nullptr);
+ Y_ASSERT(hstatus == MESSAGE_OK);
+
+ return TJobHandler(&TDupDetectModule::Finish);
+ }
+
+ /// last handler sends reply and returns NULL
+ TJobHandler Finish(TBusJob* job, TBusMessage* mess) {
+ Y_UNUSED(mess);
+
+ TExampleResponse* reply = new TExampleResponse(&DupDetectProto.ResponseCount);
+ job->SendReply(reply);
+
+ return nullptr;
+ }
+ };
}
-}
+}
diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp
index 9c21227e2b..a8e0cb960b 100644
--- a/library/cpp/messagebus/test/ut/one_way_ut.cpp
+++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp
@@ -1,7 +1,7 @@
///////////////////////////////////////////////////////////////////
/// \file
-/// \brief Example of reply-less communication
-
+/// \brief Example of reply-less communication
+
/// This example demostrates how asynchronous message passing library
/// can be used to send message and do not wait for reply back.
/// The usage of reply-less communication should be restricted to
@@ -9,19 +9,19 @@
/// utility. Removing replies from the communication removes any restriction
/// on how many message can be send to server and rougue clients may overwelm
/// server without thoughtput control.
-
+
/// 1) To implement reply-less client \n
-
-/// Call NBus::TBusSession::AckMessage()
-/// from within NBus::IMessageHandler::OnSent() handler when message has
-/// gone into wire on client end. See example in NBus::NullClient::OnMessageSent().
+
+/// Call NBus::TBusSession::AckMessage()
+/// from within NBus::IMessageHandler::OnSent() handler when message has
+/// gone into wire on client end. See example in NBus::NullClient::OnMessageSent().
/// Discard identity for reply message.
-
+
/// 2) To implement reply-less server \n
-
+
/// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage()
-/// handler when message has been received on server end.
-/// See example in NBus::NullServer::OnMessage().
+/// handler when message has been received on server end.
+/// See example in NBus::NullServer::OnMessage().
/// Discard identity for reply message.
#include <library/cpp/messagebus/test/helper/alloc_counter.h>
@@ -41,7 +41,7 @@ using namespace NBus::NTest;
////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////
/// \brief Reply-less client and handler
-struct NullClient : TBusClientHandlerError {
+struct NullClient : TBusClientHandlerError {
TNetAddr ServerAddr;
TBusMessageQueuePtr Queue;
@@ -69,11 +69,11 @@ struct NullClient : TBusClientHandlerError {
}
/// dispatch of requests is done here
- void Work() {
+ void Work() {
int batch = 10;
- for (int i = 0; i < batch; i++) {
- TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount);
+ for (int i = 0; i < batch; i++) {
+ TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount);
mess->Data = "TADA";
Session->SendMessageOneWay(mess, &ServerAddr);
}
@@ -112,7 +112,7 @@ public:
/// when message comes do not send reply, just acknowledge
void OnMessage(TOnMessageContext& mess) override {
- TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage());
+ TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage());
Y_ASSERT(fmess->Data == "TADA");
@@ -126,7 +126,7 @@ public:
void OnSent(TAutoPtr<TBusMessage> mess) override {
Y_UNUSED(mess);
Y_FAIL("This server does not sent replies");
- }
+ }
};
Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
@@ -158,8 +158,8 @@ Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
TMessageTooLargeClient(unsigned port)
: NullClient(TNetAddr("localhost", port), Config())
- {
- }
+ {
+ }
~TMessageTooLargeClient() override {
Session->Shutdown();
@@ -187,7 +187,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
client.GotTooLarge.WaitI();
}
- struct TCheckTimeoutClient: public NullClient {
+ struct TCheckTimeoutClient: public NullClient {
~TCheckTimeoutClient() override {
Session->Shutdown();
}
@@ -200,10 +200,10 @@ Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) {
return sessionConfig;
}
- TCheckTimeoutClient(const TNetAddr& serverAddr)
- : NullClient(serverAddr, SessionConfig())
- {
- }
+ TCheckTimeoutClient(const TNetAddr& serverAddr)
+ : NullClient(serverAddr, SessionConfig())
+ {
+ }
TSystemEvent GotError;
diff --git a/library/cpp/messagebus/test/ut/starter_ut.cpp b/library/cpp/messagebus/test/ut/starter_ut.cpp
index dd4d3aaa5e..b9ff9a449d 100644
--- a/library/cpp/messagebus/test/ut/starter_ut.cpp
+++ b/library/cpp/messagebus/test/ut/starter_ut.cpp
@@ -8,7 +8,7 @@ using namespace NBus;
using namespace NBus::NTest;
Y_UNIT_TEST_SUITE(TBusStarterTest) {
- struct TStartJobTestModule: public TExampleModule {
+ struct TStartJobTestModule: public TExampleModule {
using TBusModule::CreateDefaultStarter;
TAtomic StartCount;
@@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(TBusStarterTest) {
module.Shutdown();
}
- struct TSleepModule: public TExampleServerModule {
+ struct TSleepModule: public TExampleServerModule {
TSystemEvent MessageReceivedEvent;
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
@@ -110,7 +110,7 @@ Y_UNIT_TEST_SUITE(TBusStarterTest) {
module.Shutdown();
}
- struct TSendReplyModule: public TExampleServerModule {
+ struct TSendReplyModule: public TExampleServerModule {
TSystemEvent MessageReceivedEvent;
TJobHandler Start(TBusJob* job, TBusMessage* mess) override {
diff --git a/library/cpp/messagebus/test/ut/sync_client_ut.cpp b/library/cpp/messagebus/test/ut/sync_client_ut.cpp
index 400128193f..7a7189dbec 100644
--- a/library/cpp/messagebus/test/ut/sync_client_ut.cpp
+++ b/library/cpp/messagebus/test/ut/sync_client_ut.cpp
@@ -2,67 +2,67 @@
#include <library/cpp/messagebus/test/helper/object_count_check.h>
namespace NBus {
- namespace NTest {
- using namespace std;
+ namespace NTest {
+ using namespace std;
- ////////////////////////////////////////////////////////////////////
- /// \brief Client for sending synchronous message to local server
- struct TSyncClient {
- TNetAddr ServerAddr;
+ ////////////////////////////////////////////////////////////////////
+ /// \brief Client for sending synchronous message to local server
+ struct TSyncClient {
+ TNetAddr ServerAddr;
- TExampleProtocol Proto;
- TBusMessageQueuePtr Bus;
- TBusSyncClientSessionPtr Session;
+ TExampleProtocol Proto;
+ TBusMessageQueuePtr Bus;
+ TBusSyncClientSessionPtr Session;
- int NumReplies;
- int NumMessages;
+ int NumReplies;
+ int NumMessages;
- /// constructor creates instances of queue, protocol and session
- TSyncClient(const TNetAddr& serverAddr)
- : ServerAddr(serverAddr)
- {
- /// create or get instance of message queue, need one per application
- Bus = CreateMessageQueue();
+ /// constructor creates instances of queue, protocol and session
+ TSyncClient(const TNetAddr& serverAddr)
+ : ServerAddr(serverAddr)
+ {
+ /// create or get instance of message queue, need one per application
+ Bus = CreateMessageQueue();
- NumReplies = 0;
- NumMessages = 10;
+ NumReplies = 0;
+ NumMessages = 10;
- /// register source/client session
- TBusClientSessionConfig sessionConfig;
- Session = Bus->CreateSyncSource(&Proto, sessionConfig);
- Session->RegisterService("localhost");
- }
+ /// register source/client session
+ TBusClientSessionConfig sessionConfig;
+ Session = Bus->CreateSyncSource(&Proto, sessionConfig);
+ Session->RegisterService("localhost");
+ }
- ~TSyncClient() {
- Session->Shutdown();
- }
+ ~TSyncClient() {
+ Session->Shutdown();
+ }
- /// dispatch of requests is done here
- void Work() {
- for (int i = 0; i < NumMessages; i++) {
- THolder<TExampleRequest> mess(new TExampleRequest(&Proto.RequestCount));
- EMessageStatus status;
- THolder<TBusMessage> reply(Session->SendSyncMessage(mess.Get(), status, &ServerAddr));
- if (!!reply) {
- NumReplies++;
- }
- }
- }
- };
+ /// dispatch of requests is done here
+ void Work() {
+ for (int i = 0; i < NumMessages; i++) {
+ THolder<TExampleRequest> mess(new TExampleRequest(&Proto.RequestCount));
+ EMessageStatus status;
+ THolder<TBusMessage> reply(Session->SendSyncMessage(mess.Get(), status, &ServerAddr));
+ if (!!reply) {
+ NumReplies++;
+ }
+ }
+ }
+ };
Y_UNIT_TEST_SUITE(SyncClientTest) {
Y_UNIT_TEST(TestSync) {
- TObjectCountCheck objectCountCheck;
+ TObjectCountCheck objectCountCheck;
- TExampleServer server;
- TSyncClient client(server.GetActualListenAddr());
- client.Work();
- // assert correct number of replies
- UNIT_ASSERT_EQUAL(client.NumReplies, client.NumMessages);
- // assert that there is no message left in flight
- UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0);
- UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0);
- }
+ TExampleServer server;
+ TSyncClient client(server.GetActualListenAddr());
+ client.Work();
+ // assert correct number of replies
+ UNIT_ASSERT_EQUAL(client.NumReplies, client.NumMessages);
+ // assert that there is no message left in flight
+ UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0);
+ UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0);
+ }
}
}