aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/test/helper
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commit1f553f46fb4f3c5eec631352cdd900a0709016af (patch)
treea231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/test/helper
parentc4de7efdedc25b49cbea74bd589eecb61b55b60a (diff)
downloadydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/test/helper')
-rw-r--r--library/cpp/messagebus/test/helper/alloc_counter.h30
-rw-r--r--library/cpp/messagebus/test/helper/example.cpp464
-rw-r--r--library/cpp/messagebus/test/helper/example.h38
-rw-r--r--library/cpp/messagebus/test/helper/example_module.cpp74
-rw-r--r--library/cpp/messagebus/test/helper/example_module.h26
-rw-r--r--library/cpp/messagebus/test/helper/fixed_port.cpp8
-rw-r--r--library/cpp/messagebus/test/helper/fixed_port.h8
-rw-r--r--library/cpp/messagebus/test/helper/hanging_server.cpp18
-rw-r--r--library/cpp/messagebus/test/helper/hanging_server.h24
-rw-r--r--library/cpp/messagebus/test/helper/message_handler_error.cpp38
-rw-r--r--library/cpp/messagebus/test/helper/message_handler_error.h10
-rw-r--r--library/cpp/messagebus/test/helper/object_count_check.h98
-rw-r--r--library/cpp/messagebus/test/helper/wait_for.h20
-rw-r--r--library/cpp/messagebus/test/helper/ya.make30
14 files changed, 443 insertions, 443 deletions
diff --git a/library/cpp/messagebus/test/helper/alloc_counter.h b/library/cpp/messagebus/test/helper/alloc_counter.h
index ec9041cb15..88db651e69 100644
--- a/library/cpp/messagebus/test/helper/alloc_counter.h
+++ b/library/cpp/messagebus/test/helper/alloc_counter.h
@@ -1,21 +1,21 @@
-#pragma once
-
-#include <util/generic/noncopyable.h>
-#include <util/system/atomic.h>
-#include <util/system/yassert.h>
-
+#pragma once
+
+#include <util/generic/noncopyable.h>
+#include <util/system/atomic.h>
+#include <util/system/yassert.h>
+
class TAllocCounter : TNonCopyable {
-private:
- TAtomic* CountPtr;
+private:
+ TAtomic* CountPtr;
-public:
+public:
TAllocCounter(TAtomic* countPtr)
: CountPtr(countPtr)
{
- AtomicIncrement(*CountPtr);
- }
-
- ~TAllocCounter() {
+ AtomicIncrement(*CountPtr);
+ }
+
+ ~TAllocCounter() {
Y_VERIFY(AtomicDecrement(*CountPtr) >= 0, "released too many");
- }
-};
+ }
+};
diff --git a/library/cpp/messagebus/test/helper/example.cpp b/library/cpp/messagebus/test/helper/example.cpp
index 7c6d704042..a1913b58c1 100644
--- a/library/cpp/messagebus/test/helper/example.cpp
+++ b/library/cpp/messagebus/test/helper/example.cpp
@@ -1,281 +1,281 @@
#include <library/cpp/testing/unittest/registar.h>
-
+
#include "example.h"
-#include <util/generic/cast.h>
-
-using namespace NBus;
-using namespace NBus::NTest;
-
+#include <util/generic/cast.h>
+
+using namespace NBus;
+using namespace NBus::NTest;
+
static void FillWithJunk(TArrayRef<char> data) {
TStringBuf junk =
"01234567890123456789012345678901234567890123456789012345678901234567890123456789"
"01234567890123456789012345678901234567890123456789012345678901234567890123456789"
"01234567890123456789012345678901234567890123456789012345678901234567890123456789"
"01234567890123456789012345678901234567890123456789012345678901234567890123456789";
-
+
for (size_t i = 0; i < data.size(); i += junk.size()) {
memcpy(data.data() + i, junk.data(), Min(junk.size(), data.size() - i));
- }
-}
-
+ }
+}
+
static TString JunkString(size_t len) {
- TTempBuf temp(len);
+ TTempBuf temp(len);
TArrayRef<char> tempArrayRef(temp.Data(), len);
- FillWithJunk(tempArrayRef);
-
+ FillWithJunk(tempArrayRef);
+
return TString(tempArrayRef.data(), tempArrayRef.size());
-}
-
-TExampleRequest::TExampleRequest(TAtomic* counterPtr, size_t payloadSize)
- : TBusMessage(77)
- , AllocCounter(counterPtr)
- , Data(JunkString(payloadSize))
+}
+
+TExampleRequest::TExampleRequest(TAtomic* counterPtr, size_t payloadSize)
+ : TBusMessage(77)
+ , AllocCounter(counterPtr)
+ , Data(JunkString(payloadSize))
+{
+}
+
+TExampleRequest::TExampleRequest(ECreateUninitialized, TAtomic* counterPtr)
+ : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
+ , AllocCounter(counterPtr)
{
}
-
-TExampleRequest::TExampleRequest(ECreateUninitialized, TAtomic* counterPtr)
- : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
- , AllocCounter(counterPtr)
+
+TExampleResponse::TExampleResponse(TAtomic* counterPtr, size_t payloadSize)
+ : TBusMessage(79)
+ , AllocCounter(counterPtr)
+ , Data(JunkString(payloadSize))
+{
+}
+
+TExampleResponse::TExampleResponse(ECreateUninitialized, TAtomic* counterPtr)
+ : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
+ , AllocCounter(counterPtr)
{
}
-
-TExampleResponse::TExampleResponse(TAtomic* counterPtr, size_t payloadSize)
- : TBusMessage(79)
- , AllocCounter(counterPtr)
- , Data(JunkString(payloadSize))
+
+TExampleProtocol::TExampleProtocol(int port)
+ : TBusProtocol("Example", port)
+ , RequestCount(0)
+ , ResponseCount(0)
+ , RequestCountDeserialized(0)
+ , ResponseCountDeserialized(0)
+ , StartCount(0)
{
}
-
-TExampleResponse::TExampleResponse(ECreateUninitialized, TAtomic* counterPtr)
- : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
- , AllocCounter(counterPtr)
-{
-}
-
-TExampleProtocol::TExampleProtocol(int port)
- : TBusProtocol("Example", port)
- , RequestCount(0)
- , ResponseCount(0)
- , RequestCountDeserialized(0)
- , ResponseCountDeserialized(0)
- , StartCount(0)
-{
-}
-
-TExampleProtocol::~TExampleProtocol() {
- if (UncaughtException()) {
- // so it could be reported in test
- return;
- }
+
+TExampleProtocol::~TExampleProtocol() {
+ if (UncaughtException()) {
+ // so it could be reported in test
+ return;
+ }
Y_VERIFY(0 == AtomicGet(RequestCount), "protocol %s: must be 0 requests allocated, actually %d", GetService(), int(RequestCount));
Y_VERIFY(0 == AtomicGet(ResponseCount), "protocol %s: must be 0 responses allocated, actually %d", GetService(), int(ResponseCount));
Y_VERIFY(0 == AtomicGet(RequestCountDeserialized), "protocol %s: must be 0 requests deserialized allocated, actually %d", GetService(), int(RequestCountDeserialized));
Y_VERIFY(0 == AtomicGet(ResponseCountDeserialized), "protocol %s: must be 0 responses deserialized allocated, actually %d", GetService(), int(ResponseCountDeserialized));
Y_VERIFY(0 == AtomicGet(StartCount), "protocol %s: must be 0 start objects allocated, actually %d", GetService(), int(StartCount));
-}
-
-void TExampleProtocol::Serialize(const TBusMessage* message, TBuffer& buffer) {
- // Messages have no data, we recreate them from scratch
- // instead of sending, so we don't need to serialize them.
- if (const TExampleRequest* exampleMessage = dynamic_cast<const TExampleRequest*>(message)) {
+}
+
+void TExampleProtocol::Serialize(const TBusMessage* message, TBuffer& buffer) {
+ // Messages have no data, we recreate them from scratch
+ // instead of sending, so we don't need to serialize them.
+ if (const TExampleRequest* exampleMessage = dynamic_cast<const TExampleRequest*>(message)) {
buffer.Append(exampleMessage->Data.data(), exampleMessage->Data.size());
- } else if (const TExampleResponse* exampleReply = dynamic_cast<const TExampleResponse*>(message)) {
+ } else if (const TExampleResponse* exampleReply = dynamic_cast<const TExampleResponse*>(message)) {
buffer.Append(exampleReply->Data.data(), exampleReply->Data.size());
- } else {
+ } else {
Y_FAIL("unknown message type");
- }
-}
-
+ }
+}
+
TAutoPtr<TBusMessage> TExampleProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) {
- // TODO: check data
+ // TODO: check data
Y_UNUSED(payload);
-
- if (messageType == 77) {
- TExampleRequest* exampleMessage = new TExampleRequest(MESSAGE_CREATE_UNINITIALIZED, &RequestCountDeserialized);
- exampleMessage->Data.append(payload.data(), payload.size());
- return exampleMessage;
- } else if (messageType == 79) {
- TExampleResponse* exampleReply = new TExampleResponse(MESSAGE_CREATE_UNINITIALIZED, &ResponseCountDeserialized);
- exampleReply->Data.append(payload.data(), payload.size());
- return exampleReply;
- } else {
+
+ if (messageType == 77) {
+ TExampleRequest* exampleMessage = new TExampleRequest(MESSAGE_CREATE_UNINITIALIZED, &RequestCountDeserialized);
+ exampleMessage->Data.append(payload.data(), payload.size());
+ return exampleMessage;
+ } else if (messageType == 79) {
+ TExampleResponse* exampleReply = new TExampleResponse(MESSAGE_CREATE_UNINITIALIZED, &ResponseCountDeserialized);
+ exampleReply->Data.append(payload.data(), payload.size());
+ return exampleReply;
+ } else {
return nullptr;
- }
-}
-
-TExampleClient::TExampleClient(const TBusClientSessionConfig sessionConfig, int port)
- : Proto(port)
- , UseCompression(false)
- , CrashOnError(false)
- , DataSize(320)
- , MessageCount(0)
- , RepliesCount(0)
- , Errors(0)
- , LastError(MESSAGE_OK)
-{
- Bus = CreateMessageQueue("TExampleClient");
-
- Session = TBusClientSession::Create(&Proto, this, sessionConfig, Bus);
-
- Session->RegisterService("localhost");
-}
-
-TExampleClient::~TExampleClient() {
-}
-
+ }
+}
+
+TExampleClient::TExampleClient(const TBusClientSessionConfig sessionConfig, int port)
+ : Proto(port)
+ , UseCompression(false)
+ , CrashOnError(false)
+ , DataSize(320)
+ , MessageCount(0)
+ , RepliesCount(0)
+ , Errors(0)
+ , LastError(MESSAGE_OK)
+{
+ Bus = CreateMessageQueue("TExampleClient");
+
+ Session = TBusClientSession::Create(&Proto, this, sessionConfig, Bus);
+
+ Session->RegisterService("localhost");
+}
+
+TExampleClient::~TExampleClient() {
+}
+
EMessageStatus TExampleClient::SendMessage(const TNetAddr* addr) {
- TAutoPtr<TExampleRequest> message(new TExampleRequest(&Proto.RequestCount, DataSize));
- message->SetCompressed(UseCompression);
- return Session->SendMessageAutoPtr(message, addr);
-}
-
+ TAutoPtr<TExampleRequest> message(new TExampleRequest(&Proto.RequestCount, DataSize));
+ message->SetCompressed(UseCompression);
+ return Session->SendMessageAutoPtr(message, addr);
+}
+
void TExampleClient::SendMessages(size_t count, const TNetAddr* addr) {
- UNIT_ASSERT(MessageCount == 0);
- UNIT_ASSERT(RepliesCount == 0);
- UNIT_ASSERT(Errors == 0);
-
- WorkDone.Reset();
- MessageCount = count;
- for (ssize_t i = 0; i < MessageCount; ++i) {
- EMessageStatus s = SendMessage(addr);
- UNIT_ASSERT_EQUAL_C(s, MESSAGE_OK, "expecting OK, got " << s);
- }
-}
-
-void TExampleClient::SendMessages(size_t count, const TNetAddr& addr) {
- SendMessages(count, &addr);
-}
-
-void TExampleClient::ResetCounters() {
- MessageCount = 0;
- RepliesCount = 0;
- Errors = 0;
- LastError = MESSAGE_OK;
-
- WorkDone.Reset();
-}
-
-void TExampleClient::WaitReplies() {
- WorkDone.WaitT(TDuration::Seconds(60));
-
- UNIT_ASSERT_VALUES_EQUAL(AtomicGet(RepliesCount), MessageCount);
- UNIT_ASSERT_VALUES_EQUAL(AtomicGet(Errors), 0);
- UNIT_ASSERT_VALUES_EQUAL(Session->GetInFlight(), 0);
-
- ResetCounters();
-}
-
+ UNIT_ASSERT(MessageCount == 0);
+ UNIT_ASSERT(RepliesCount == 0);
+ UNIT_ASSERT(Errors == 0);
+
+ WorkDone.Reset();
+ MessageCount = count;
+ for (ssize_t i = 0; i < MessageCount; ++i) {
+ EMessageStatus s = SendMessage(addr);
+ UNIT_ASSERT_EQUAL_C(s, MESSAGE_OK, "expecting OK, got " << s);
+ }
+}
+
+void TExampleClient::SendMessages(size_t count, const TNetAddr& addr) {
+ SendMessages(count, &addr);
+}
+
+void TExampleClient::ResetCounters() {
+ MessageCount = 0;
+ RepliesCount = 0;
+ Errors = 0;
+ LastError = MESSAGE_OK;
+
+ WorkDone.Reset();
+}
+
+void TExampleClient::WaitReplies() {
+ WorkDone.WaitT(TDuration::Seconds(60));
+
+ UNIT_ASSERT_VALUES_EQUAL(AtomicGet(RepliesCount), MessageCount);
+ UNIT_ASSERT_VALUES_EQUAL(AtomicGet(Errors), 0);
+ UNIT_ASSERT_VALUES_EQUAL(Session->GetInFlight(), 0);
+
+ ResetCounters();
+}
+
EMessageStatus TExampleClient::WaitForError() {
- WorkDone.WaitT(TDuration::Seconds(60));
-
- UNIT_ASSERT_VALUES_EQUAL(1, MessageCount);
- UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount));
- UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight());
- UNIT_ASSERT_VALUES_EQUAL(1, Errors);
+ WorkDone.WaitT(TDuration::Seconds(60));
+
+ UNIT_ASSERT_VALUES_EQUAL(1, MessageCount);
+ UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount));
+ UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight());
+ UNIT_ASSERT_VALUES_EQUAL(1, Errors);
EMessageStatus result = LastError;
-
- ResetCounters();
+
+ ResetCounters();
return result;
-}
-
+}
+
void TExampleClient::WaitForError(EMessageStatus status) {
EMessageStatus error = WaitForError();
UNIT_ASSERT_VALUES_EQUAL(status, error);
}
void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr* addr) {
- SendMessages(count, addr);
- WaitReplies();
-}
-
-void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr& addr) {
- SendMessagesWaitReplies(count, &addr);
-}
-
-void TExampleClient::OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) {
+ SendMessages(count, addr);
+ WaitReplies();
+}
+
+void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr& addr) {
+ SendMessagesWaitReplies(count, &addr);
+}
+
+void TExampleClient::OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) {
Y_UNUSED(mess);
Y_UNUSED(reply);
-
- if (AtomicIncrement(RepliesCount) == MessageCount) {
- WorkDone.Signal();
- }
-}
-
-void TExampleClient::OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) {
- if (CrashOnError) {
+
+ if (AtomicIncrement(RepliesCount) == MessageCount) {
+ WorkDone.Signal();
+ }
+}
+
+void TExampleClient::OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) {
+ if (CrashOnError) {
Y_FAIL("client failed: %s", ToCString(status));
- }
-
+ }
+
Y_UNUSED(mess);
-
- AtomicIncrement(Errors);
- LastError = status;
- WorkDone.Signal();
-}
-
-TExampleServer::TExampleServer(
+
+ AtomicIncrement(Errors);
+ LastError = status;
+ WorkDone.Signal();
+}
+
+TExampleServer::TExampleServer(
const char* name,
const TBusServerSessionConfig& sessionConfig)
- : UseCompression(false)
- , AckMessageBeforeSendReply(false)
- , ForgetRequest(false)
-{
- Bus = CreateMessageQueue(name);
- Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
-}
-
-TExampleServer::TExampleServer(unsigned port, const char* name)
- : UseCompression(false)
- , AckMessageBeforeSendReply(false)
- , ForgetRequest(false)
-{
- Bus = CreateMessageQueue(name);
- TBusServerSessionConfig sessionConfig;
- sessionConfig.ListenPort = port;
- Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
-}
-
-TExampleServer::~TExampleServer() {
-}
-
-size_t TExampleServer::GetInFlight() const {
- return Session->GetInFlight();
-}
-
-unsigned TExampleServer::GetActualListenPort() const {
- return Session->GetActualListenPort();
-}
-
-TNetAddr TExampleServer::GetActualListenAddr() const {
- return TNetAddr("127.0.0.1", GetActualListenPort());
-}
-
-void TExampleServer::WaitForOnMessageCount(unsigned n) {
- TestSync.WaitFor(n);
-}
-
-void TExampleServer::OnMessage(TOnMessageContext& mess) {
- TestSync.Inc();
-
- TExampleRequest* request = VerifyDynamicCast<TExampleRequest*>(mess.GetMessage());
-
- if (ForgetRequest) {
- mess.ForgetRequest();
- return;
- }
-
- TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount, DataSize.GetOrElse(request->Data.size())));
- reply->SetCompressed(UseCompression);
-
- EMessageStatus status;
- if (AckMessageBeforeSendReply) {
- TBusIdentity ident;
- mess.AckMessage(ident);
- status = Session->SendReply(ident, reply.Release()); // TODO: leaks on error
- } else {
- status = mess.SendReplyMove(reply);
- }
-
+ : UseCompression(false)
+ , AckMessageBeforeSendReply(false)
+ , ForgetRequest(false)
+{
+ Bus = CreateMessageQueue(name);
+ Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
+}
+
+TExampleServer::TExampleServer(unsigned port, const char* name)
+ : UseCompression(false)
+ , AckMessageBeforeSendReply(false)
+ , ForgetRequest(false)
+{
+ Bus = CreateMessageQueue(name);
+ TBusServerSessionConfig sessionConfig;
+ sessionConfig.ListenPort = port;
+ Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
+}
+
+TExampleServer::~TExampleServer() {
+}
+
+size_t TExampleServer::GetInFlight() const {
+ return Session->GetInFlight();
+}
+
+unsigned TExampleServer::GetActualListenPort() const {
+ return Session->GetActualListenPort();
+}
+
+TNetAddr TExampleServer::GetActualListenAddr() const {
+ return TNetAddr("127.0.0.1", GetActualListenPort());
+}
+
+void TExampleServer::WaitForOnMessageCount(unsigned n) {
+ TestSync.WaitFor(n);
+}
+
+void TExampleServer::OnMessage(TOnMessageContext& mess) {
+ TestSync.Inc();
+
+ TExampleRequest* request = VerifyDynamicCast<TExampleRequest*>(mess.GetMessage());
+
+ if (ForgetRequest) {
+ mess.ForgetRequest();
+ return;
+ }
+
+ TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount, DataSize.GetOrElse(request->Data.size())));
+ reply->SetCompressed(UseCompression);
+
+ EMessageStatus status;
+ if (AckMessageBeforeSendReply) {
+ TBusIdentity ident;
+ mess.AckMessage(ident);
+ status = Session->SendReply(ident, reply.Release()); // TODO: leaks on error
+ } else {
+ status = mess.SendReplyMove(reply);
+ }
+
Y_VERIFY(status == MESSAGE_OK, "failed to send reply: %s", ToString(status).data());
-}
+}
diff --git a/library/cpp/messagebus/test/helper/example.h b/library/cpp/messagebus/test/helper/example.h
index 26b7475308..1ff7d16c1a 100644
--- a/library/cpp/messagebus/test/helper/example.h
+++ b/library/cpp/messagebus/test/helper/example.h
@@ -1,9 +1,9 @@
#pragma once
#include <library/cpp/testing/unittest/registar.h>
-
-#include "alloc_counter.h"
-#include "message_handler_error.h"
+
+#include "alloc_counter.h"
+#include "message_handler_error.h"
#include <library/cpp/messagebus/ybus.h>
#include <library/cpp/messagebus/misc/test_sync.h>
@@ -25,7 +25,7 @@ namespace NBus {
TExampleRequest(TAtomic* counterPtr, size_t payloadSize = 320);
TExampleRequest(ECreateUninitialized, TAtomic* counterPtr);
};
-
+
class TExampleResponse: public TBusMessage {
friend class TExampleProtocol;
@@ -37,7 +37,7 @@ namespace NBus {
TExampleResponse(TAtomic* counterPtr, size_t payloadSize = 320);
TExampleResponse(ECreateUninitialized, TAtomic* counterPtr);
};
-
+
class TExampleProtocol: public TBusProtocol {
public:
TAtomic RequestCount;
@@ -45,7 +45,7 @@ namespace NBus {
TAtomic RequestCountDeserialized;
TAtomic ResponseCountDeserialized;
TAtomic StartCount;
-
+
TExampleProtocol(int port = 0);
~TExampleProtocol() override;
@@ -54,28 +54,28 @@ namespace NBus {
TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override;
};
-
+
class TExampleClient: private TBusClientHandlerError {
public:
TExampleProtocol Proto;
bool UseCompression;
bool CrashOnError;
size_t DataSize;
-
+
ssize_t MessageCount;
TAtomic RepliesCount;
TAtomic Errors;
EMessageStatus LastError;
-
+
TSystemEvent WorkDone;
-
+
TBusMessageQueuePtr Bus;
TBusClientSessionPtr Session;
-
+
public:
TExampleClient(const TBusClientSessionConfig sessionConfig = TBusClientSessionConfig(), int port = 0);
~TExampleClient() override;
-
+
EMessageStatus SendMessage(const TNetAddr* addr = nullptr);
void SendMessages(size_t count, const TNetAddr* addr = nullptr);
@@ -85,15 +85,15 @@ namespace NBus {
void WaitReplies();
EMessageStatus WaitForError();
void WaitForError(EMessageStatus status);
-
+
void SendMessagesWaitReplies(size_t count, const TNetAddr* addr = nullptr);
void SendMessagesWaitReplies(size_t count, const TNetAddr& addr);
-
+
void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override;
void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus) override;
};
-
+
class TExampleServer: private TBusServerHandlerError {
public:
TExampleProtocol Proto;
@@ -103,7 +103,7 @@ namespace NBus {
bool ForgetRequest;
TTestSync TestSync;
-
+
TBusMessageQueuePtr Bus;
TBusServerSessionPtr Session;
@@ -111,7 +111,7 @@ namespace NBus {
TExampleServer(
const char* name = "TExampleServer",
const TBusServerSessionConfig& sessionConfig = TBusServerSessionConfig());
-
+
TExampleServer(unsigned port, const char* name = "TExampleServer");
~TExampleServer() override;
@@ -121,9 +121,9 @@ namespace NBus {
unsigned GetActualListenPort() const;
// any of
TNetAddr GetActualListenAddr() const;
-
+
void WaitForOnMessageCount(unsigned n);
-
+
protected:
void OnMessage(TOnMessageContext& mess) override;
};
diff --git a/library/cpp/messagebus/test/helper/example_module.cpp b/library/cpp/messagebus/test/helper/example_module.cpp
index 65ecfcf73f..c907825924 100644
--- a/library/cpp/messagebus/test/helper/example_module.cpp
+++ b/library/cpp/messagebus/test/helper/example_module.cpp
@@ -1,43 +1,43 @@
-#include "example_module.h"
-
-using namespace NBus;
-using namespace NBus::NTest;
-
-TExampleModule::TExampleModule()
- : TBusModule("TExampleModule")
-{
- TBusQueueConfig queueConfig;
- queueConfig.NumWorkers = 5;
- Queue = CreateMessageQueue(queueConfig);
-}
-
-void TExampleModule::StartModule() {
- CreatePrivateSessions(Queue.Get());
- StartInput();
-}
-
-bool TExampleModule::Shutdown() {
- TBusModule::Shutdown();
- return true;
-}
-
-TBusServerSessionPtr TExampleModule::CreateExtSession(TBusMessageQueue&) {
+#include "example_module.h"
+
+using namespace NBus;
+using namespace NBus::NTest;
+
+TExampleModule::TExampleModule()
+ : TBusModule("TExampleModule")
+{
+ TBusQueueConfig queueConfig;
+ queueConfig.NumWorkers = 5;
+ Queue = CreateMessageQueue(queueConfig);
+}
+
+void TExampleModule::StartModule() {
+ CreatePrivateSessions(Queue.Get());
+ StartInput();
+}
+
+bool TExampleModule::Shutdown() {
+ TBusModule::Shutdown();
+ return true;
+}
+
+TBusServerSessionPtr TExampleModule::CreateExtSession(TBusMessageQueue&) {
return nullptr;
-}
-
-TBusServerSessionPtr TExampleServerModule::CreateExtSession(TBusMessageQueue& queue) {
- TBusServerSessionPtr r = CreateDefaultDestination(queue, &Proto, TBusServerSessionConfig());
- ServerAddr = TNetAddr("localhost", r->GetActualListenPort());
- return r;
-}
-
+}
+
+TBusServerSessionPtr TExampleServerModule::CreateExtSession(TBusMessageQueue& queue) {
+ TBusServerSessionPtr r = CreateDefaultDestination(queue, &Proto, TBusServerSessionConfig());
+ ServerAddr = TNetAddr("localhost", r->GetActualListenPort());
+ return r;
+}
+
TExampleClientModule::TExampleClientModule()
: Source()
{
}
-
-TBusServerSessionPtr TExampleClientModule::CreateExtSession(TBusMessageQueue& queue) {
- Source = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig());
- Source->RegisterService("localhost");
+
+TBusServerSessionPtr TExampleClientModule::CreateExtSession(TBusMessageQueue& queue) {
+ Source = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig());
+ Source->RegisterService("localhost");
return nullptr;
-}
+}
diff --git a/library/cpp/messagebus/test/helper/example_module.h b/library/cpp/messagebus/test/helper/example_module.h
index a0b295f613..b1b0a6dd14 100644
--- a/library/cpp/messagebus/test/helper/example_module.h
+++ b/library/cpp/messagebus/test/helper/example_module.h
@@ -1,7 +1,7 @@
-#pragma once
-
-#include "example.h"
-
+#pragma once
+
+#include "example.h"
+
#include <library/cpp/messagebus/oldmodule/module.h>
namespace NBus {
@@ -9,29 +9,29 @@ namespace NBus {
struct TExampleModule: public TBusModule {
TExampleProtocol Proto;
TBusMessageQueuePtr Queue;
-
+
TExampleModule();
-
+
void StartModule();
-
+
bool Shutdown() override;
-
+
// nop by default
TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override;
};
-
+
struct TExampleServerModule: public TExampleModule {
TNetAddr ServerAddr;
TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override;
};
-
+
struct TExampleClientModule: public TExampleModule {
TBusClientSessionPtr Source;
-
+
TExampleClientModule();
-
+
TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override;
};
-
+
}
}
diff --git a/library/cpp/messagebus/test/helper/fixed_port.cpp b/library/cpp/messagebus/test/helper/fixed_port.cpp
index 258da0d1a5..f83ce3161a 100644
--- a/library/cpp/messagebus/test/helper/fixed_port.cpp
+++ b/library/cpp/messagebus/test/helper/fixed_port.cpp
@@ -1,10 +1,10 @@
#include "fixed_port.h"
#include <util/system/env.h>
-
+
#include <stdlib.h>
-
+
bool NBus::NTest::IsFixedPortTestAllowed() {
- // TODO: report skipped tests to test
+ // TODO: report skipped tests to test
return !GetEnv("MB_TESTS_SKIP_FIXED_PORT");
-}
+}
diff --git a/library/cpp/messagebus/test/helper/fixed_port.h b/library/cpp/messagebus/test/helper/fixed_port.h
index a9c61ebc63..39c8da4dfa 100644
--- a/library/cpp/messagebus/test/helper/fixed_port.h
+++ b/library/cpp/messagebus/test/helper/fixed_port.h
@@ -1,11 +1,11 @@
-#pragma once
-
+#pragma once
+
namespace NBus {
namespace NTest {
bool IsFixedPortTestAllowed();
-
+
// Must not be in range OS uses for bind on random port.
const unsigned FixedPort = 4927;
-
+
}
}
diff --git a/library/cpp/messagebus/test/helper/hanging_server.cpp b/library/cpp/messagebus/test/helper/hanging_server.cpp
index a35514b00d..3911ff10ba 100644
--- a/library/cpp/messagebus/test/helper/hanging_server.cpp
+++ b/library/cpp/messagebus/test/helper/hanging_server.cpp
@@ -1,13 +1,13 @@
#include "hanging_server.h"
-#include <util/system/yassert.h>
-
-using namespace NBus;
-
+#include <util/system/yassert.h>
+
+using namespace NBus;
+
THangingServer::THangingServer(int port) {
BindResult = BindOnPort(port, false);
-}
-
-int THangingServer::GetPort() const {
- return BindResult.first;
-}
+}
+
+int THangingServer::GetPort() const {
+ return BindResult.first;
+}
diff --git a/library/cpp/messagebus/test/helper/hanging_server.h b/library/cpp/messagebus/test/helper/hanging_server.h
index cc9fb274d8..384f07d7cf 100644
--- a/library/cpp/messagebus/test/helper/hanging_server.h
+++ b/library/cpp/messagebus/test/helper/hanging_server.h
@@ -1,16 +1,16 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/messagebus/network.h>
-#include <util/network/sock.h>
-
-class THangingServer {
-private:
+#include <util/network/sock.h>
+
+class THangingServer {
+private:
std::pair<unsigned, TVector<NBus::TBindResult>> BindResult;
-public:
- // listen on given port, and nothing else
- THangingServer(int port = 0);
- // actual port
- int GetPort() const;
-};
+public:
+ // listen on given port, and nothing else
+ THangingServer(int port = 0);
+ // actual port
+ int GetPort() const;
+};
diff --git a/library/cpp/messagebus/test/helper/message_handler_error.cpp b/library/cpp/messagebus/test/helper/message_handler_error.cpp
index c09811ec67..40997adec8 100644
--- a/library/cpp/messagebus/test/helper/message_handler_error.cpp
+++ b/library/cpp/messagebus/test/helper/message_handler_error.cpp
@@ -1,26 +1,26 @@
#include "message_handler_error.h"
-#include <util/system/yassert.h>
-
-using namespace NBus;
-using namespace NBus::NTest;
-
-void TBusClientHandlerError::OnError(TAutoPtr<TBusMessage>, EMessageStatus status) {
+#include <util/system/yassert.h>
+
+using namespace NBus;
+using namespace NBus::NTest;
+
+void TBusClientHandlerError::OnError(TAutoPtr<TBusMessage>, EMessageStatus status) {
Y_FAIL("must not be called, status: %s", ToString(status).data());
-}
-
-void TBusClientHandlerError::OnReply(TAutoPtr<TBusMessage>, TAutoPtr<TBusMessage>) {
+}
+
+void TBusClientHandlerError::OnReply(TAutoPtr<TBusMessage>, TAutoPtr<TBusMessage>) {
Y_FAIL("must not be called");
-}
-
-void TBusClientHandlerError::OnMessageSentOneWay(TAutoPtr<TBusMessage>) {
+}
+
+void TBusClientHandlerError::OnMessageSentOneWay(TAutoPtr<TBusMessage>) {
Y_FAIL("must not be called");
-}
-
-void TBusServerHandlerError::OnError(TAutoPtr<TBusMessage>, EMessageStatus status) {
+}
+
+void TBusServerHandlerError::OnError(TAutoPtr<TBusMessage>, EMessageStatus status) {
Y_FAIL("must not be called, status: %s", ToString(status).data());
-}
-
-void TBusServerHandlerError::OnMessage(TOnMessageContext&) {
+}
+
+void TBusServerHandlerError::OnMessage(TOnMessageContext&) {
Y_FAIL("must not be called");
-}
+}
diff --git a/library/cpp/messagebus/test/helper/message_handler_error.h b/library/cpp/messagebus/test/helper/message_handler_error.h
index a314b10761..bba0007a44 100644
--- a/library/cpp/messagebus/test/helper/message_handler_error.h
+++ b/library/cpp/messagebus/test/helper/message_handler_error.h
@@ -1,7 +1,7 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/messagebus/ybus.h>
-
+
namespace NBus {
namespace NTest {
struct TBusClientHandlerError: public IBusClientHandler {
@@ -9,11 +9,11 @@ namespace NBus {
void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override;
void OnReply(TAutoPtr<TBusMessage> pMessage, TAutoPtr<TBusMessage> pReply) override;
};
-
+
struct TBusServerHandlerError: public IBusServerHandler {
void OnError(TAutoPtr<TBusMessage> pMessage, EMessageStatus status) override;
void OnMessage(TOnMessageContext& pMessage) override;
};
-
+
}
}
diff --git a/library/cpp/messagebus/test/helper/object_count_check.h b/library/cpp/messagebus/test/helper/object_count_check.h
index 1c4756e58c..d844469fb9 100644
--- a/library/cpp/messagebus/test/helper/object_count_check.h
+++ b/library/cpp/messagebus/test/helper/object_count_check.h
@@ -1,5 +1,5 @@
-#pragma once
-
+#pragma once
+
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/messagebus/remote_client_connection.h>
@@ -9,66 +9,66 @@
#include <library/cpp/messagebus/ybus.h>
#include <library/cpp/messagebus/oldmodule/module.h>
#include <library/cpp/messagebus/scheduler/scheduler.h>
-
+
#include <util/generic/object_counter.h>
#include <util/system/type_name.h>
#include <util/stream/output.h>
-
+
#include <typeinfo>
-struct TObjectCountCheck {
- bool Enabled;
-
- template <typename T>
- struct TReset {
- TObjectCountCheck* const Thiz;
-
+struct TObjectCountCheck {
+ bool Enabled;
+
+ template <typename T>
+ struct TReset {
+ TObjectCountCheck* const Thiz;
+
TReset(TObjectCountCheck* thiz)
: Thiz(thiz)
{
}
-
+
void operator()() {
long oldValue = TObjectCounter<T>::ResetObjectCount();
- if (oldValue != 0) {
+ if (oldValue != 0) {
Cerr << "warning: previous counter: " << oldValue << " for " << TypeName<T>() << Endl;
- Cerr << "won't check in this test" << Endl;
- Thiz->Enabled = false;
- }
- }
- };
-
- TObjectCountCheck() {
- Enabled = true;
- DoForAllCounters<TReset>();
- }
-
- template <typename T>
- struct TCheckZero {
+ Cerr << "won't check in this test" << Endl;
+ Thiz->Enabled = false;
+ }
+ }
+ };
+
+ TObjectCountCheck() {
+ Enabled = true;
+ DoForAllCounters<TReset>();
+ }
+
+ template <typename T>
+ struct TCheckZero {
TCheckZero(TObjectCountCheck*) {
}
-
+
void operator()() {
UNIT_ASSERT_VALUES_EQUAL_C(0L, TObjectCounter<T>::ObjectCount(), TypeName<T>());
- }
- };
-
- ~TObjectCountCheck() {
- if (Enabled) {
- DoForAllCounters<TCheckZero>();
- }
- }
-
- template <template <typename> class TOp>
- void DoForAllCounters() {
- TOp< ::NBus::NPrivate::TRemoteClientConnection>(this)();
- TOp< ::NBus::NPrivate::TRemoteServerConnection>(this)();
- TOp< ::NBus::NPrivate::TRemoteClientSession>(this)();
- TOp< ::NBus::NPrivate::TRemoteServerSession>(this)();
- TOp< ::NBus::NPrivate::TScheduler>(this)();
- TOp< ::NEventLoop::TEventLoop>(this)();
- TOp< ::NEventLoop::TChannel>(this)();
- TOp< ::NBus::TBusModule>(this)();
- TOp< ::NBus::TBusJob>(this)();
- }
-};
+ }
+ };
+
+ ~TObjectCountCheck() {
+ if (Enabled) {
+ DoForAllCounters<TCheckZero>();
+ }
+ }
+
+ template <template <typename> class TOp>
+ void DoForAllCounters() {
+ TOp< ::NBus::NPrivate::TRemoteClientConnection>(this)();
+ TOp< ::NBus::NPrivate::TRemoteServerConnection>(this)();
+ TOp< ::NBus::NPrivate::TRemoteClientSession>(this)();
+ TOp< ::NBus::NPrivate::TRemoteServerSession>(this)();
+ TOp< ::NBus::NPrivate::TScheduler>(this)();
+ TOp< ::NEventLoop::TEventLoop>(this)();
+ TOp< ::NEventLoop::TChannel>(this)();
+ TOp< ::NBus::TBusModule>(this)();
+ TOp< ::NBus::TBusJob>(this)();
+ }
+};
diff --git a/library/cpp/messagebus/test/helper/wait_for.h b/library/cpp/messagebus/test/helper/wait_for.h
index f09958d4c0..ebd0bfd6e2 100644
--- a/library/cpp/messagebus/test/helper/wait_for.h
+++ b/library/cpp/messagebus/test/helper/wait_for.h
@@ -1,14 +1,14 @@
-#pragma once
-
-#include <util/datetime/base.h>
-#include <util/system/yassert.h>
-
+#pragma once
+
+#include <util/datetime/base.h>
+#include <util/system/yassert.h>
+
#define UNIT_WAIT_FOR(condition) \
do { \
- TInstant start(TInstant::Now()); \
- while (!(condition) && (TInstant::Now() - start < TDuration::Seconds(10))) { \
+ TInstant start(TInstant::Now()); \
+ while (!(condition) && (TInstant::Now() - start < TDuration::Seconds(10))) { \
Sleep(TDuration::MilliSeconds(1)); \
- } \
- /* TODO: use UNIT_ASSERT if in unittest thread */ \
+ } \
+ /* TODO: use UNIT_ASSERT if in unittest thread */ \
Y_VERIFY(condition, "condition failed after 10 seconds wait"); \
- } while (0)
+ } while (0)
diff --git a/library/cpp/messagebus/test/helper/ya.make b/library/cpp/messagebus/test/helper/ya.make
index 97bd45f573..703f6b6953 100644
--- a/library/cpp/messagebus/test/helper/ya.make
+++ b/library/cpp/messagebus/test/helper/ya.make
@@ -1,17 +1,17 @@
-LIBRARY(messagebus_test_helper)
-
+LIBRARY(messagebus_test_helper)
+
OWNER(g:messagebus)
-
-SRCS(
- example.cpp
- example_module.cpp
- fixed_port.cpp
- message_handler_error.cpp
- hanging_server.cpp
-)
-
-PEERDIR(
+
+SRCS(
+ example.cpp
+ example_module.cpp
+ fixed_port.cpp
+ message_handler_error.cpp
+ hanging_server.cpp
+)
+
+PEERDIR(
library/cpp/messagebus/oldmodule
-)
-
-END()
+)
+
+END()