aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/test/helper/example.cpp
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commitc2a1af049e9deca890e9923abe64fe6c59060348 (patch)
treeb222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/messagebus/test/helper/example.cpp
parent1f553f46fb4f3c5eec631352cdd900a0709016af (diff)
downloadydb-c2a1af049e9deca890e9923abe64fe6c59060348.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/test/helper/example.cpp')
-rw-r--r--library/cpp/messagebus/test/helper/example.cpp464
1 files changed, 232 insertions, 232 deletions
diff --git a/library/cpp/messagebus/test/helper/example.cpp b/library/cpp/messagebus/test/helper/example.cpp
index a1913b58c1..7c6d704042 100644
--- a/library/cpp/messagebus/test/helper/example.cpp
+++ b/library/cpp/messagebus/test/helper/example.cpp
@@ -1,281 +1,281 @@
#include <library/cpp/testing/unittest/registar.h>
-
+
#include "example.h"
-#include <util/generic/cast.h>
-
-using namespace NBus;
-using namespace NBus::NTest;
-
+#include <util/generic/cast.h>
+
+using namespace NBus;
+using namespace NBus::NTest;
+
static void FillWithJunk(TArrayRef<char> data) {
TStringBuf junk =
"01234567890123456789012345678901234567890123456789012345678901234567890123456789"
"01234567890123456789012345678901234567890123456789012345678901234567890123456789"
"01234567890123456789012345678901234567890123456789012345678901234567890123456789"
"01234567890123456789012345678901234567890123456789012345678901234567890123456789";
-
+
for (size_t i = 0; i < data.size(); i += junk.size()) {
memcpy(data.data() + i, junk.data(), Min(junk.size(), data.size() - i));
- }
-}
-
+ }
+}
+
static TString JunkString(size_t len) {
- TTempBuf temp(len);
+ TTempBuf temp(len);
TArrayRef<char> tempArrayRef(temp.Data(), len);
- FillWithJunk(tempArrayRef);
-
+ FillWithJunk(tempArrayRef);
+
return TString(tempArrayRef.data(), tempArrayRef.size());
-}
-
-TExampleRequest::TExampleRequest(TAtomic* counterPtr, size_t payloadSize)
- : TBusMessage(77)
- , AllocCounter(counterPtr)
- , Data(JunkString(payloadSize))
-{
-}
-
-TExampleRequest::TExampleRequest(ECreateUninitialized, TAtomic* counterPtr)
- : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
- , AllocCounter(counterPtr)
+}
+
+TExampleRequest::TExampleRequest(TAtomic* counterPtr, size_t payloadSize)
+ : TBusMessage(77)
+ , AllocCounter(counterPtr)
+ , Data(JunkString(payloadSize))
{
}
-
-TExampleResponse::TExampleResponse(TAtomic* counterPtr, size_t payloadSize)
- : TBusMessage(79)
- , AllocCounter(counterPtr)
- , Data(JunkString(payloadSize))
-{
-}
-
-TExampleResponse::TExampleResponse(ECreateUninitialized, TAtomic* counterPtr)
- : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
- , AllocCounter(counterPtr)
+
+TExampleRequest::TExampleRequest(ECreateUninitialized, TAtomic* counterPtr)
+ : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
+ , AllocCounter(counterPtr)
{
}
-
-TExampleProtocol::TExampleProtocol(int port)
- : TBusProtocol("Example", port)
- , RequestCount(0)
- , ResponseCount(0)
- , RequestCountDeserialized(0)
- , ResponseCountDeserialized(0)
- , StartCount(0)
+
+TExampleResponse::TExampleResponse(TAtomic* counterPtr, size_t payloadSize)
+ : TBusMessage(79)
+ , AllocCounter(counterPtr)
+ , Data(JunkString(payloadSize))
{
}
-
-TExampleProtocol::~TExampleProtocol() {
- if (UncaughtException()) {
- // so it could be reported in test
- return;
- }
+
+TExampleResponse::TExampleResponse(ECreateUninitialized, TAtomic* counterPtr)
+ : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
+ , AllocCounter(counterPtr)
+{
+}
+
+TExampleProtocol::TExampleProtocol(int port)
+ : TBusProtocol("Example", port)
+ , RequestCount(0)
+ , ResponseCount(0)
+ , RequestCountDeserialized(0)
+ , ResponseCountDeserialized(0)
+ , StartCount(0)
+{
+}
+
+TExampleProtocol::~TExampleProtocol() {
+ if (UncaughtException()) {
+ // so it could be reported in test
+ return;
+ }
Y_VERIFY(0 == AtomicGet(RequestCount), "protocol %s: must be 0 requests allocated, actually %d", GetService(), int(RequestCount));
Y_VERIFY(0 == AtomicGet(ResponseCount), "protocol %s: must be 0 responses allocated, actually %d", GetService(), int(ResponseCount));
Y_VERIFY(0 == AtomicGet(RequestCountDeserialized), "protocol %s: must be 0 requests deserialized allocated, actually %d", GetService(), int(RequestCountDeserialized));
Y_VERIFY(0 == AtomicGet(ResponseCountDeserialized), "protocol %s: must be 0 responses deserialized allocated, actually %d", GetService(), int(ResponseCountDeserialized));
Y_VERIFY(0 == AtomicGet(StartCount), "protocol %s: must be 0 start objects allocated, actually %d", GetService(), int(StartCount));
-}
-
-void TExampleProtocol::Serialize(const TBusMessage* message, TBuffer& buffer) {
- // Messages have no data, we recreate them from scratch
- // instead of sending, so we don't need to serialize them.
- if (const TExampleRequest* exampleMessage = dynamic_cast<const TExampleRequest*>(message)) {
+}
+
+void TExampleProtocol::Serialize(const TBusMessage* message, TBuffer& buffer) {
+ // Messages have no data, we recreate them from scratch
+ // instead of sending, so we don't need to serialize them.
+ if (const TExampleRequest* exampleMessage = dynamic_cast<const TExampleRequest*>(message)) {
buffer.Append(exampleMessage->Data.data(), exampleMessage->Data.size());
- } else if (const TExampleResponse* exampleReply = dynamic_cast<const TExampleResponse*>(message)) {
+ } else if (const TExampleResponse* exampleReply = dynamic_cast<const TExampleResponse*>(message)) {
buffer.Append(exampleReply->Data.data(), exampleReply->Data.size());
- } else {
+ } else {
Y_FAIL("unknown message type");
- }
-}
-
+ }
+}
+
TAutoPtr<TBusMessage> TExampleProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) {
- // TODO: check data
+ // TODO: check data
Y_UNUSED(payload);
-
- if (messageType == 77) {
- TExampleRequest* exampleMessage = new TExampleRequest(MESSAGE_CREATE_UNINITIALIZED, &RequestCountDeserialized);
- exampleMessage->Data.append(payload.data(), payload.size());
- return exampleMessage;
- } else if (messageType == 79) {
- TExampleResponse* exampleReply = new TExampleResponse(MESSAGE_CREATE_UNINITIALIZED, &ResponseCountDeserialized);
- exampleReply->Data.append(payload.data(), payload.size());
- return exampleReply;
- } else {
+
+ if (messageType == 77) {
+ TExampleRequest* exampleMessage = new TExampleRequest(MESSAGE_CREATE_UNINITIALIZED, &RequestCountDeserialized);
+ exampleMessage->Data.append(payload.data(), payload.size());
+ return exampleMessage;
+ } else if (messageType == 79) {
+ TExampleResponse* exampleReply = new TExampleResponse(MESSAGE_CREATE_UNINITIALIZED, &ResponseCountDeserialized);
+ exampleReply->Data.append(payload.data(), payload.size());
+ return exampleReply;
+ } else {
return nullptr;
- }
-}
-
-TExampleClient::TExampleClient(const TBusClientSessionConfig sessionConfig, int port)
- : Proto(port)
- , UseCompression(false)
- , CrashOnError(false)
- , DataSize(320)
- , MessageCount(0)
- , RepliesCount(0)
- , Errors(0)
- , LastError(MESSAGE_OK)
-{
- Bus = CreateMessageQueue("TExampleClient");
-
- Session = TBusClientSession::Create(&Proto, this, sessionConfig, Bus);
-
- Session->RegisterService("localhost");
-}
-
-TExampleClient::~TExampleClient() {
-}
-
+ }
+}
+
+TExampleClient::TExampleClient(const TBusClientSessionConfig sessionConfig, int port)
+ : Proto(port)
+ , UseCompression(false)
+ , CrashOnError(false)
+ , DataSize(320)
+ , MessageCount(0)
+ , RepliesCount(0)
+ , Errors(0)
+ , LastError(MESSAGE_OK)
+{
+ Bus = CreateMessageQueue("TExampleClient");
+
+ Session = TBusClientSession::Create(&Proto, this, sessionConfig, Bus);
+
+ Session->RegisterService("localhost");
+}
+
+TExampleClient::~TExampleClient() {
+}
+
EMessageStatus TExampleClient::SendMessage(const TNetAddr* addr) {
- TAutoPtr<TExampleRequest> message(new TExampleRequest(&Proto.RequestCount, DataSize));
- message->SetCompressed(UseCompression);
- return Session->SendMessageAutoPtr(message, addr);
-}
-
+ TAutoPtr<TExampleRequest> message(new TExampleRequest(&Proto.RequestCount, DataSize));
+ message->SetCompressed(UseCompression);
+ return Session->SendMessageAutoPtr(message, addr);
+}
+
void TExampleClient::SendMessages(size_t count, const TNetAddr* addr) {
- UNIT_ASSERT(MessageCount == 0);
- UNIT_ASSERT(RepliesCount == 0);
- UNIT_ASSERT(Errors == 0);
-
- WorkDone.Reset();
- MessageCount = count;
- for (ssize_t i = 0; i < MessageCount; ++i) {
- EMessageStatus s = SendMessage(addr);
- UNIT_ASSERT_EQUAL_C(s, MESSAGE_OK, "expecting OK, got " << s);
- }
-}
-
-void TExampleClient::SendMessages(size_t count, const TNetAddr& addr) {
- SendMessages(count, &addr);
-}
-
-void TExampleClient::ResetCounters() {
- MessageCount = 0;
- RepliesCount = 0;
- Errors = 0;
- LastError = MESSAGE_OK;
-
- WorkDone.Reset();
-}
-
-void TExampleClient::WaitReplies() {
- WorkDone.WaitT(TDuration::Seconds(60));
-
- UNIT_ASSERT_VALUES_EQUAL(AtomicGet(RepliesCount), MessageCount);
- UNIT_ASSERT_VALUES_EQUAL(AtomicGet(Errors), 0);
- UNIT_ASSERT_VALUES_EQUAL(Session->GetInFlight(), 0);
-
- ResetCounters();
-}
-
+ UNIT_ASSERT(MessageCount == 0);
+ UNIT_ASSERT(RepliesCount == 0);
+ UNIT_ASSERT(Errors == 0);
+
+ WorkDone.Reset();
+ MessageCount = count;
+ for (ssize_t i = 0; i < MessageCount; ++i) {
+ EMessageStatus s = SendMessage(addr);
+ UNIT_ASSERT_EQUAL_C(s, MESSAGE_OK, "expecting OK, got " << s);
+ }
+}
+
+void TExampleClient::SendMessages(size_t count, const TNetAddr& addr) {
+ SendMessages(count, &addr);
+}
+
+void TExampleClient::ResetCounters() {
+ MessageCount = 0;
+ RepliesCount = 0;
+ Errors = 0;
+ LastError = MESSAGE_OK;
+
+ WorkDone.Reset();
+}
+
+void TExampleClient::WaitReplies() {
+ WorkDone.WaitT(TDuration::Seconds(60));
+
+ UNIT_ASSERT_VALUES_EQUAL(AtomicGet(RepliesCount), MessageCount);
+ UNIT_ASSERT_VALUES_EQUAL(AtomicGet(Errors), 0);
+ UNIT_ASSERT_VALUES_EQUAL(Session->GetInFlight(), 0);
+
+ ResetCounters();
+}
+
EMessageStatus TExampleClient::WaitForError() {
- WorkDone.WaitT(TDuration::Seconds(60));
-
- UNIT_ASSERT_VALUES_EQUAL(1, MessageCount);
- UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount));
- UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight());
- UNIT_ASSERT_VALUES_EQUAL(1, Errors);
+ WorkDone.WaitT(TDuration::Seconds(60));
+
+ UNIT_ASSERT_VALUES_EQUAL(1, MessageCount);
+ UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount));
+ UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight());
+ UNIT_ASSERT_VALUES_EQUAL(1, Errors);
EMessageStatus result = LastError;
-
- ResetCounters();
+
+ ResetCounters();
return result;
-}
-
+}
+
void TExampleClient::WaitForError(EMessageStatus status) {
EMessageStatus error = WaitForError();
UNIT_ASSERT_VALUES_EQUAL(status, error);
}
void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr* addr) {
- SendMessages(count, addr);
- WaitReplies();
-}
-
-void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr& addr) {
- SendMessagesWaitReplies(count, &addr);
-}
-
-void TExampleClient::OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) {
+ SendMessages(count, addr);
+ WaitReplies();
+}
+
+void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr& addr) {
+ SendMessagesWaitReplies(count, &addr);
+}
+
+void TExampleClient::OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) {
Y_UNUSED(mess);
Y_UNUSED(reply);
-
- if (AtomicIncrement(RepliesCount) == MessageCount) {
- WorkDone.Signal();
- }
-}
-
-void TExampleClient::OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) {
- if (CrashOnError) {
+
+ if (AtomicIncrement(RepliesCount) == MessageCount) {
+ WorkDone.Signal();
+ }
+}
+
+void TExampleClient::OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) {
+ if (CrashOnError) {
Y_FAIL("client failed: %s", ToCString(status));
- }
-
+ }
+
Y_UNUSED(mess);
-
- AtomicIncrement(Errors);
- LastError = status;
- WorkDone.Signal();
-}
-
-TExampleServer::TExampleServer(
+
+ AtomicIncrement(Errors);
+ LastError = status;
+ WorkDone.Signal();
+}
+
+TExampleServer::TExampleServer(
const char* name,
const TBusServerSessionConfig& sessionConfig)
- : UseCompression(false)
- , AckMessageBeforeSendReply(false)
- , ForgetRequest(false)
-{
- Bus = CreateMessageQueue(name);
- Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
-}
-
-TExampleServer::TExampleServer(unsigned port, const char* name)
- : UseCompression(false)
- , AckMessageBeforeSendReply(false)
- , ForgetRequest(false)
-{
- Bus = CreateMessageQueue(name);
- TBusServerSessionConfig sessionConfig;
- sessionConfig.ListenPort = port;
- Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
-}
-
-TExampleServer::~TExampleServer() {
-}
-
-size_t TExampleServer::GetInFlight() const {
- return Session->GetInFlight();
-}
-
-unsigned TExampleServer::GetActualListenPort() const {
- return Session->GetActualListenPort();
-}
-
-TNetAddr TExampleServer::GetActualListenAddr() const {
- return TNetAddr("127.0.0.1", GetActualListenPort());
-}
-
-void TExampleServer::WaitForOnMessageCount(unsigned n) {
- TestSync.WaitFor(n);
-}
-
-void TExampleServer::OnMessage(TOnMessageContext& mess) {
- TestSync.Inc();
-
- TExampleRequest* request = VerifyDynamicCast<TExampleRequest*>(mess.GetMessage());
-
- if (ForgetRequest) {
- mess.ForgetRequest();
- return;
- }
-
- TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount, DataSize.GetOrElse(request->Data.size())));
- reply->SetCompressed(UseCompression);
-
- EMessageStatus status;
- if (AckMessageBeforeSendReply) {
- TBusIdentity ident;
- mess.AckMessage(ident);
- status = Session->SendReply(ident, reply.Release()); // TODO: leaks on error
- } else {
- status = mess.SendReplyMove(reply);
- }
-
+ : UseCompression(false)
+ , AckMessageBeforeSendReply(false)
+ , ForgetRequest(false)
+{
+ Bus = CreateMessageQueue(name);
+ Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
+}
+
+TExampleServer::TExampleServer(unsigned port, const char* name)
+ : UseCompression(false)
+ , AckMessageBeforeSendReply(false)
+ , ForgetRequest(false)
+{
+ Bus = CreateMessageQueue(name);
+ TBusServerSessionConfig sessionConfig;
+ sessionConfig.ListenPort = port;
+ Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
+}
+
+TExampleServer::~TExampleServer() {
+}
+
+size_t TExampleServer::GetInFlight() const {
+ return Session->GetInFlight();
+}
+
+unsigned TExampleServer::GetActualListenPort() const {
+ return Session->GetActualListenPort();
+}
+
+TNetAddr TExampleServer::GetActualListenAddr() const {
+ return TNetAddr("127.0.0.1", GetActualListenPort());
+}
+
+void TExampleServer::WaitForOnMessageCount(unsigned n) {
+ TestSync.WaitFor(n);
+}
+
+void TExampleServer::OnMessage(TOnMessageContext& mess) {
+ TestSync.Inc();
+
+ TExampleRequest* request = VerifyDynamicCast<TExampleRequest*>(mess.GetMessage());
+
+ if (ForgetRequest) {
+ mess.ForgetRequest();
+ return;
+ }
+
+ TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount, DataSize.GetOrElse(request->Data.size())));
+ reply->SetCompressed(UseCompression);
+
+ EMessageStatus status;
+ if (AckMessageBeforeSendReply) {
+ TBusIdentity ident;
+ mess.AckMessage(ident);
+ status = Session->SendReply(ident, reply.Release()); // TODO: leaks on error
+ } else {
+ status = mess.SendReplyMove(reply);
+ }
+
Y_VERIFY(status == MESSAGE_OK, "failed to send reply: %s", ToString(status).data());
-}
+}