diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | 1f553f46fb4f3c5eec631352cdd900a0709016af (patch) | |
tree | a231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/test/helper | |
parent | c4de7efdedc25b49cbea74bd589eecb61b55b60a (diff) | |
download | ydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/test/helper')
-rw-r--r-- | library/cpp/messagebus/test/helper/alloc_counter.h | 30 | ||||
-rw-r--r-- | library/cpp/messagebus/test/helper/example.cpp | 464 | ||||
-rw-r--r-- | library/cpp/messagebus/test/helper/example.h | 38 | ||||
-rw-r--r-- | library/cpp/messagebus/test/helper/example_module.cpp | 74 | ||||
-rw-r--r-- | library/cpp/messagebus/test/helper/example_module.h | 26 | ||||
-rw-r--r-- | library/cpp/messagebus/test/helper/fixed_port.cpp | 8 | ||||
-rw-r--r-- | library/cpp/messagebus/test/helper/fixed_port.h | 8 | ||||
-rw-r--r-- | library/cpp/messagebus/test/helper/hanging_server.cpp | 18 | ||||
-rw-r--r-- | library/cpp/messagebus/test/helper/hanging_server.h | 24 | ||||
-rw-r--r-- | library/cpp/messagebus/test/helper/message_handler_error.cpp | 38 | ||||
-rw-r--r-- | library/cpp/messagebus/test/helper/message_handler_error.h | 10 | ||||
-rw-r--r-- | library/cpp/messagebus/test/helper/object_count_check.h | 98 | ||||
-rw-r--r-- | library/cpp/messagebus/test/helper/wait_for.h | 20 | ||||
-rw-r--r-- | library/cpp/messagebus/test/helper/ya.make | 30 |
14 files changed, 443 insertions, 443 deletions
diff --git a/library/cpp/messagebus/test/helper/alloc_counter.h b/library/cpp/messagebus/test/helper/alloc_counter.h index ec9041cb15..88db651e69 100644 --- a/library/cpp/messagebus/test/helper/alloc_counter.h +++ b/library/cpp/messagebus/test/helper/alloc_counter.h @@ -1,21 +1,21 @@ -#pragma once - -#include <util/generic/noncopyable.h> -#include <util/system/atomic.h> -#include <util/system/yassert.h> - +#pragma once + +#include <util/generic/noncopyable.h> +#include <util/system/atomic.h> +#include <util/system/yassert.h> + class TAllocCounter : TNonCopyable { -private: - TAtomic* CountPtr; +private: + TAtomic* CountPtr; -public: +public: TAllocCounter(TAtomic* countPtr) : CountPtr(countPtr) { - AtomicIncrement(*CountPtr); - } - - ~TAllocCounter() { + AtomicIncrement(*CountPtr); + } + + ~TAllocCounter() { Y_VERIFY(AtomicDecrement(*CountPtr) >= 0, "released too many"); - } -}; + } +}; diff --git a/library/cpp/messagebus/test/helper/example.cpp b/library/cpp/messagebus/test/helper/example.cpp index 7c6d704042..a1913b58c1 100644 --- a/library/cpp/messagebus/test/helper/example.cpp +++ b/library/cpp/messagebus/test/helper/example.cpp @@ -1,281 +1,281 @@ #include <library/cpp/testing/unittest/registar.h> - + #include "example.h" -#include <util/generic/cast.h> - -using namespace NBus; -using namespace NBus::NTest; - +#include <util/generic/cast.h> + +using namespace NBus; +using namespace NBus::NTest; + static void FillWithJunk(TArrayRef<char> data) { TStringBuf junk = "01234567890123456789012345678901234567890123456789012345678901234567890123456789" "01234567890123456789012345678901234567890123456789012345678901234567890123456789" "01234567890123456789012345678901234567890123456789012345678901234567890123456789" "01234567890123456789012345678901234567890123456789012345678901234567890123456789"; - + for (size_t i = 0; i < data.size(); i += junk.size()) { memcpy(data.data() + i, junk.data(), Min(junk.size(), data.size() - i)); - } -} - + } +} + static TString JunkString(size_t len) { - TTempBuf temp(len); + TTempBuf temp(len); TArrayRef<char> tempArrayRef(temp.Data(), len); - FillWithJunk(tempArrayRef); - + FillWithJunk(tempArrayRef); + return TString(tempArrayRef.data(), tempArrayRef.size()); -} - -TExampleRequest::TExampleRequest(TAtomic* counterPtr, size_t payloadSize) - : TBusMessage(77) - , AllocCounter(counterPtr) - , Data(JunkString(payloadSize)) +} + +TExampleRequest::TExampleRequest(TAtomic* counterPtr, size_t payloadSize) + : TBusMessage(77) + , AllocCounter(counterPtr) + , Data(JunkString(payloadSize)) +{ +} + +TExampleRequest::TExampleRequest(ECreateUninitialized, TAtomic* counterPtr) + : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) + , AllocCounter(counterPtr) { } - -TExampleRequest::TExampleRequest(ECreateUninitialized, TAtomic* counterPtr) - : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) - , AllocCounter(counterPtr) + +TExampleResponse::TExampleResponse(TAtomic* counterPtr, size_t payloadSize) + : TBusMessage(79) + , AllocCounter(counterPtr) + , Data(JunkString(payloadSize)) +{ +} + +TExampleResponse::TExampleResponse(ECreateUninitialized, TAtomic* counterPtr) + : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) + , AllocCounter(counterPtr) { } - -TExampleResponse::TExampleResponse(TAtomic* counterPtr, size_t payloadSize) - : TBusMessage(79) - , AllocCounter(counterPtr) - , Data(JunkString(payloadSize)) + +TExampleProtocol::TExampleProtocol(int port) + : TBusProtocol("Example", port) + , RequestCount(0) + , ResponseCount(0) + , RequestCountDeserialized(0) + , ResponseCountDeserialized(0) + , StartCount(0) { } - -TExampleResponse::TExampleResponse(ECreateUninitialized, TAtomic* counterPtr) - : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) - , AllocCounter(counterPtr) -{ -} - -TExampleProtocol::TExampleProtocol(int port) - : TBusProtocol("Example", port) - , RequestCount(0) - , ResponseCount(0) - , RequestCountDeserialized(0) - , ResponseCountDeserialized(0) - , StartCount(0) -{ -} - -TExampleProtocol::~TExampleProtocol() { - if (UncaughtException()) { - // so it could be reported in test - return; - } + +TExampleProtocol::~TExampleProtocol() { + if (UncaughtException()) { + // so it could be reported in test + return; + } Y_VERIFY(0 == AtomicGet(RequestCount), "protocol %s: must be 0 requests allocated, actually %d", GetService(), int(RequestCount)); Y_VERIFY(0 == AtomicGet(ResponseCount), "protocol %s: must be 0 responses allocated, actually %d", GetService(), int(ResponseCount)); Y_VERIFY(0 == AtomicGet(RequestCountDeserialized), "protocol %s: must be 0 requests deserialized allocated, actually %d", GetService(), int(RequestCountDeserialized)); Y_VERIFY(0 == AtomicGet(ResponseCountDeserialized), "protocol %s: must be 0 responses deserialized allocated, actually %d", GetService(), int(ResponseCountDeserialized)); Y_VERIFY(0 == AtomicGet(StartCount), "protocol %s: must be 0 start objects allocated, actually %d", GetService(), int(StartCount)); -} - -void TExampleProtocol::Serialize(const TBusMessage* message, TBuffer& buffer) { - // Messages have no data, we recreate them from scratch - // instead of sending, so we don't need to serialize them. - if (const TExampleRequest* exampleMessage = dynamic_cast<const TExampleRequest*>(message)) { +} + +void TExampleProtocol::Serialize(const TBusMessage* message, TBuffer& buffer) { + // Messages have no data, we recreate them from scratch + // instead of sending, so we don't need to serialize them. + if (const TExampleRequest* exampleMessage = dynamic_cast<const TExampleRequest*>(message)) { buffer.Append(exampleMessage->Data.data(), exampleMessage->Data.size()); - } else if (const TExampleResponse* exampleReply = dynamic_cast<const TExampleResponse*>(message)) { + } else if (const TExampleResponse* exampleReply = dynamic_cast<const TExampleResponse*>(message)) { buffer.Append(exampleReply->Data.data(), exampleReply->Data.size()); - } else { + } else { Y_FAIL("unknown message type"); - } -} - + } +} + TAutoPtr<TBusMessage> TExampleProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) { - // TODO: check data + // TODO: check data Y_UNUSED(payload); - - if (messageType == 77) { - TExampleRequest* exampleMessage = new TExampleRequest(MESSAGE_CREATE_UNINITIALIZED, &RequestCountDeserialized); - exampleMessage->Data.append(payload.data(), payload.size()); - return exampleMessage; - } else if (messageType == 79) { - TExampleResponse* exampleReply = new TExampleResponse(MESSAGE_CREATE_UNINITIALIZED, &ResponseCountDeserialized); - exampleReply->Data.append(payload.data(), payload.size()); - return exampleReply; - } else { + + if (messageType == 77) { + TExampleRequest* exampleMessage = new TExampleRequest(MESSAGE_CREATE_UNINITIALIZED, &RequestCountDeserialized); + exampleMessage->Data.append(payload.data(), payload.size()); + return exampleMessage; + } else if (messageType == 79) { + TExampleResponse* exampleReply = new TExampleResponse(MESSAGE_CREATE_UNINITIALIZED, &ResponseCountDeserialized); + exampleReply->Data.append(payload.data(), payload.size()); + return exampleReply; + } else { return nullptr; - } -} - -TExampleClient::TExampleClient(const TBusClientSessionConfig sessionConfig, int port) - : Proto(port) - , UseCompression(false) - , CrashOnError(false) - , DataSize(320) - , MessageCount(0) - , RepliesCount(0) - , Errors(0) - , LastError(MESSAGE_OK) -{ - Bus = CreateMessageQueue("TExampleClient"); - - Session = TBusClientSession::Create(&Proto, this, sessionConfig, Bus); - - Session->RegisterService("localhost"); -} - -TExampleClient::~TExampleClient() { -} - + } +} + +TExampleClient::TExampleClient(const TBusClientSessionConfig sessionConfig, int port) + : Proto(port) + , UseCompression(false) + , CrashOnError(false) + , DataSize(320) + , MessageCount(0) + , RepliesCount(0) + , Errors(0) + , LastError(MESSAGE_OK) +{ + Bus = CreateMessageQueue("TExampleClient"); + + Session = TBusClientSession::Create(&Proto, this, sessionConfig, Bus); + + Session->RegisterService("localhost"); +} + +TExampleClient::~TExampleClient() { +} + EMessageStatus TExampleClient::SendMessage(const TNetAddr* addr) { - TAutoPtr<TExampleRequest> message(new TExampleRequest(&Proto.RequestCount, DataSize)); - message->SetCompressed(UseCompression); - return Session->SendMessageAutoPtr(message, addr); -} - + TAutoPtr<TExampleRequest> message(new TExampleRequest(&Proto.RequestCount, DataSize)); + message->SetCompressed(UseCompression); + return Session->SendMessageAutoPtr(message, addr); +} + void TExampleClient::SendMessages(size_t count, const TNetAddr* addr) { - UNIT_ASSERT(MessageCount == 0); - UNIT_ASSERT(RepliesCount == 0); - UNIT_ASSERT(Errors == 0); - - WorkDone.Reset(); - MessageCount = count; - for (ssize_t i = 0; i < MessageCount; ++i) { - EMessageStatus s = SendMessage(addr); - UNIT_ASSERT_EQUAL_C(s, MESSAGE_OK, "expecting OK, got " << s); - } -} - -void TExampleClient::SendMessages(size_t count, const TNetAddr& addr) { - SendMessages(count, &addr); -} - -void TExampleClient::ResetCounters() { - MessageCount = 0; - RepliesCount = 0; - Errors = 0; - LastError = MESSAGE_OK; - - WorkDone.Reset(); -} - -void TExampleClient::WaitReplies() { - WorkDone.WaitT(TDuration::Seconds(60)); - - UNIT_ASSERT_VALUES_EQUAL(AtomicGet(RepliesCount), MessageCount); - UNIT_ASSERT_VALUES_EQUAL(AtomicGet(Errors), 0); - UNIT_ASSERT_VALUES_EQUAL(Session->GetInFlight(), 0); - - ResetCounters(); -} - + UNIT_ASSERT(MessageCount == 0); + UNIT_ASSERT(RepliesCount == 0); + UNIT_ASSERT(Errors == 0); + + WorkDone.Reset(); + MessageCount = count; + for (ssize_t i = 0; i < MessageCount; ++i) { + EMessageStatus s = SendMessage(addr); + UNIT_ASSERT_EQUAL_C(s, MESSAGE_OK, "expecting OK, got " << s); + } +} + +void TExampleClient::SendMessages(size_t count, const TNetAddr& addr) { + SendMessages(count, &addr); +} + +void TExampleClient::ResetCounters() { + MessageCount = 0; + RepliesCount = 0; + Errors = 0; + LastError = MESSAGE_OK; + + WorkDone.Reset(); +} + +void TExampleClient::WaitReplies() { + WorkDone.WaitT(TDuration::Seconds(60)); + + UNIT_ASSERT_VALUES_EQUAL(AtomicGet(RepliesCount), MessageCount); + UNIT_ASSERT_VALUES_EQUAL(AtomicGet(Errors), 0); + UNIT_ASSERT_VALUES_EQUAL(Session->GetInFlight(), 0); + + ResetCounters(); +} + EMessageStatus TExampleClient::WaitForError() { - WorkDone.WaitT(TDuration::Seconds(60)); - - UNIT_ASSERT_VALUES_EQUAL(1, MessageCount); - UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount)); - UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight()); - UNIT_ASSERT_VALUES_EQUAL(1, Errors); + WorkDone.WaitT(TDuration::Seconds(60)); + + UNIT_ASSERT_VALUES_EQUAL(1, MessageCount); + UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount)); + UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight()); + UNIT_ASSERT_VALUES_EQUAL(1, Errors); EMessageStatus result = LastError; - - ResetCounters(); + + ResetCounters(); return result; -} - +} + void TExampleClient::WaitForError(EMessageStatus status) { EMessageStatus error = WaitForError(); UNIT_ASSERT_VALUES_EQUAL(status, error); } void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr* addr) { - SendMessages(count, addr); - WaitReplies(); -} - -void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr& addr) { - SendMessagesWaitReplies(count, &addr); -} - -void TExampleClient::OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) { + SendMessages(count, addr); + WaitReplies(); +} + +void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr& addr) { + SendMessagesWaitReplies(count, &addr); +} + +void TExampleClient::OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) { Y_UNUSED(mess); Y_UNUSED(reply); - - if (AtomicIncrement(RepliesCount) == MessageCount) { - WorkDone.Signal(); - } -} - -void TExampleClient::OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) { - if (CrashOnError) { + + if (AtomicIncrement(RepliesCount) == MessageCount) { + WorkDone.Signal(); + } +} + +void TExampleClient::OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) { + if (CrashOnError) { Y_FAIL("client failed: %s", ToCString(status)); - } - + } + Y_UNUSED(mess); - - AtomicIncrement(Errors); - LastError = status; - WorkDone.Signal(); -} - -TExampleServer::TExampleServer( + + AtomicIncrement(Errors); + LastError = status; + WorkDone.Signal(); +} + +TExampleServer::TExampleServer( const char* name, const TBusServerSessionConfig& sessionConfig) - : UseCompression(false) - , AckMessageBeforeSendReply(false) - , ForgetRequest(false) -{ - Bus = CreateMessageQueue(name); - Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); -} - -TExampleServer::TExampleServer(unsigned port, const char* name) - : UseCompression(false) - , AckMessageBeforeSendReply(false) - , ForgetRequest(false) -{ - Bus = CreateMessageQueue(name); - TBusServerSessionConfig sessionConfig; - sessionConfig.ListenPort = port; - Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); -} - -TExampleServer::~TExampleServer() { -} - -size_t TExampleServer::GetInFlight() const { - return Session->GetInFlight(); -} - -unsigned TExampleServer::GetActualListenPort() const { - return Session->GetActualListenPort(); -} - -TNetAddr TExampleServer::GetActualListenAddr() const { - return TNetAddr("127.0.0.1", GetActualListenPort()); -} - -void TExampleServer::WaitForOnMessageCount(unsigned n) { - TestSync.WaitFor(n); -} - -void TExampleServer::OnMessage(TOnMessageContext& mess) { - TestSync.Inc(); - - TExampleRequest* request = VerifyDynamicCast<TExampleRequest*>(mess.GetMessage()); - - if (ForgetRequest) { - mess.ForgetRequest(); - return; - } - - TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount, DataSize.GetOrElse(request->Data.size()))); - reply->SetCompressed(UseCompression); - - EMessageStatus status; - if (AckMessageBeforeSendReply) { - TBusIdentity ident; - mess.AckMessage(ident); - status = Session->SendReply(ident, reply.Release()); // TODO: leaks on error - } else { - status = mess.SendReplyMove(reply); - } - + : UseCompression(false) + , AckMessageBeforeSendReply(false) + , ForgetRequest(false) +{ + Bus = CreateMessageQueue(name); + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); +} + +TExampleServer::TExampleServer(unsigned port, const char* name) + : UseCompression(false) + , AckMessageBeforeSendReply(false) + , ForgetRequest(false) +{ + Bus = CreateMessageQueue(name); + TBusServerSessionConfig sessionConfig; + sessionConfig.ListenPort = port; + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); +} + +TExampleServer::~TExampleServer() { +} + +size_t TExampleServer::GetInFlight() const { + return Session->GetInFlight(); +} + +unsigned TExampleServer::GetActualListenPort() const { + return Session->GetActualListenPort(); +} + +TNetAddr TExampleServer::GetActualListenAddr() const { + return TNetAddr("127.0.0.1", GetActualListenPort()); +} + +void TExampleServer::WaitForOnMessageCount(unsigned n) { + TestSync.WaitFor(n); +} + +void TExampleServer::OnMessage(TOnMessageContext& mess) { + TestSync.Inc(); + + TExampleRequest* request = VerifyDynamicCast<TExampleRequest*>(mess.GetMessage()); + + if (ForgetRequest) { + mess.ForgetRequest(); + return; + } + + TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount, DataSize.GetOrElse(request->Data.size()))); + reply->SetCompressed(UseCompression); + + EMessageStatus status; + if (AckMessageBeforeSendReply) { + TBusIdentity ident; + mess.AckMessage(ident); + status = Session->SendReply(ident, reply.Release()); // TODO: leaks on error + } else { + status = mess.SendReplyMove(reply); + } + Y_VERIFY(status == MESSAGE_OK, "failed to send reply: %s", ToString(status).data()); -} +} diff --git a/library/cpp/messagebus/test/helper/example.h b/library/cpp/messagebus/test/helper/example.h index 26b7475308..1ff7d16c1a 100644 --- a/library/cpp/messagebus/test/helper/example.h +++ b/library/cpp/messagebus/test/helper/example.h @@ -1,9 +1,9 @@ #pragma once #include <library/cpp/testing/unittest/registar.h> - -#include "alloc_counter.h" -#include "message_handler_error.h" + +#include "alloc_counter.h" +#include "message_handler_error.h" #include <library/cpp/messagebus/ybus.h> #include <library/cpp/messagebus/misc/test_sync.h> @@ -25,7 +25,7 @@ namespace NBus { TExampleRequest(TAtomic* counterPtr, size_t payloadSize = 320); TExampleRequest(ECreateUninitialized, TAtomic* counterPtr); }; - + class TExampleResponse: public TBusMessage { friend class TExampleProtocol; @@ -37,7 +37,7 @@ namespace NBus { TExampleResponse(TAtomic* counterPtr, size_t payloadSize = 320); TExampleResponse(ECreateUninitialized, TAtomic* counterPtr); }; - + class TExampleProtocol: public TBusProtocol { public: TAtomic RequestCount; @@ -45,7 +45,7 @@ namespace NBus { TAtomic RequestCountDeserialized; TAtomic ResponseCountDeserialized; TAtomic StartCount; - + TExampleProtocol(int port = 0); ~TExampleProtocol() override; @@ -54,28 +54,28 @@ namespace NBus { TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override; }; - + class TExampleClient: private TBusClientHandlerError { public: TExampleProtocol Proto; bool UseCompression; bool CrashOnError; size_t DataSize; - + ssize_t MessageCount; TAtomic RepliesCount; TAtomic Errors; EMessageStatus LastError; - + TSystemEvent WorkDone; - + TBusMessageQueuePtr Bus; TBusClientSessionPtr Session; - + public: TExampleClient(const TBusClientSessionConfig sessionConfig = TBusClientSessionConfig(), int port = 0); ~TExampleClient() override; - + EMessageStatus SendMessage(const TNetAddr* addr = nullptr); void SendMessages(size_t count, const TNetAddr* addr = nullptr); @@ -85,15 +85,15 @@ namespace NBus { void WaitReplies(); EMessageStatus WaitForError(); void WaitForError(EMessageStatus status); - + void SendMessagesWaitReplies(size_t count, const TNetAddr* addr = nullptr); void SendMessagesWaitReplies(size_t count, const TNetAddr& addr); - + void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override; void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus) override; }; - + class TExampleServer: private TBusServerHandlerError { public: TExampleProtocol Proto; @@ -103,7 +103,7 @@ namespace NBus { bool ForgetRequest; TTestSync TestSync; - + TBusMessageQueuePtr Bus; TBusServerSessionPtr Session; @@ -111,7 +111,7 @@ namespace NBus { TExampleServer( const char* name = "TExampleServer", const TBusServerSessionConfig& sessionConfig = TBusServerSessionConfig()); - + TExampleServer(unsigned port, const char* name = "TExampleServer"); ~TExampleServer() override; @@ -121,9 +121,9 @@ namespace NBus { unsigned GetActualListenPort() const; // any of TNetAddr GetActualListenAddr() const; - + void WaitForOnMessageCount(unsigned n); - + protected: void OnMessage(TOnMessageContext& mess) override; }; diff --git a/library/cpp/messagebus/test/helper/example_module.cpp b/library/cpp/messagebus/test/helper/example_module.cpp index 65ecfcf73f..c907825924 100644 --- a/library/cpp/messagebus/test/helper/example_module.cpp +++ b/library/cpp/messagebus/test/helper/example_module.cpp @@ -1,43 +1,43 @@ -#include "example_module.h" - -using namespace NBus; -using namespace NBus::NTest; - -TExampleModule::TExampleModule() - : TBusModule("TExampleModule") -{ - TBusQueueConfig queueConfig; - queueConfig.NumWorkers = 5; - Queue = CreateMessageQueue(queueConfig); -} - -void TExampleModule::StartModule() { - CreatePrivateSessions(Queue.Get()); - StartInput(); -} - -bool TExampleModule::Shutdown() { - TBusModule::Shutdown(); - return true; -} - -TBusServerSessionPtr TExampleModule::CreateExtSession(TBusMessageQueue&) { +#include "example_module.h" + +using namespace NBus; +using namespace NBus::NTest; + +TExampleModule::TExampleModule() + : TBusModule("TExampleModule") +{ + TBusQueueConfig queueConfig; + queueConfig.NumWorkers = 5; + Queue = CreateMessageQueue(queueConfig); +} + +void TExampleModule::StartModule() { + CreatePrivateSessions(Queue.Get()); + StartInput(); +} + +bool TExampleModule::Shutdown() { + TBusModule::Shutdown(); + return true; +} + +TBusServerSessionPtr TExampleModule::CreateExtSession(TBusMessageQueue&) { return nullptr; -} - -TBusServerSessionPtr TExampleServerModule::CreateExtSession(TBusMessageQueue& queue) { - TBusServerSessionPtr r = CreateDefaultDestination(queue, &Proto, TBusServerSessionConfig()); - ServerAddr = TNetAddr("localhost", r->GetActualListenPort()); - return r; -} - +} + +TBusServerSessionPtr TExampleServerModule::CreateExtSession(TBusMessageQueue& queue) { + TBusServerSessionPtr r = CreateDefaultDestination(queue, &Proto, TBusServerSessionConfig()); + ServerAddr = TNetAddr("localhost", r->GetActualListenPort()); + return r; +} + TExampleClientModule::TExampleClientModule() : Source() { } - -TBusServerSessionPtr TExampleClientModule::CreateExtSession(TBusMessageQueue& queue) { - Source = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig()); - Source->RegisterService("localhost"); + +TBusServerSessionPtr TExampleClientModule::CreateExtSession(TBusMessageQueue& queue) { + Source = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig()); + Source->RegisterService("localhost"); return nullptr; -} +} diff --git a/library/cpp/messagebus/test/helper/example_module.h b/library/cpp/messagebus/test/helper/example_module.h index a0b295f613..b1b0a6dd14 100644 --- a/library/cpp/messagebus/test/helper/example_module.h +++ b/library/cpp/messagebus/test/helper/example_module.h @@ -1,7 +1,7 @@ -#pragma once - -#include "example.h" - +#pragma once + +#include "example.h" + #include <library/cpp/messagebus/oldmodule/module.h> namespace NBus { @@ -9,29 +9,29 @@ namespace NBus { struct TExampleModule: public TBusModule { TExampleProtocol Proto; TBusMessageQueuePtr Queue; - + TExampleModule(); - + void StartModule(); - + bool Shutdown() override; - + // nop by default TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override; }; - + struct TExampleServerModule: public TExampleModule { TNetAddr ServerAddr; TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override; }; - + struct TExampleClientModule: public TExampleModule { TBusClientSessionPtr Source; - + TExampleClientModule(); - + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override; }; - + } } diff --git a/library/cpp/messagebus/test/helper/fixed_port.cpp b/library/cpp/messagebus/test/helper/fixed_port.cpp index 258da0d1a5..f83ce3161a 100644 --- a/library/cpp/messagebus/test/helper/fixed_port.cpp +++ b/library/cpp/messagebus/test/helper/fixed_port.cpp @@ -1,10 +1,10 @@ #include "fixed_port.h" #include <util/system/env.h> - + #include <stdlib.h> - + bool NBus::NTest::IsFixedPortTestAllowed() { - // TODO: report skipped tests to test + // TODO: report skipped tests to test return !GetEnv("MB_TESTS_SKIP_FIXED_PORT"); -} +} diff --git a/library/cpp/messagebus/test/helper/fixed_port.h b/library/cpp/messagebus/test/helper/fixed_port.h index a9c61ebc63..39c8da4dfa 100644 --- a/library/cpp/messagebus/test/helper/fixed_port.h +++ b/library/cpp/messagebus/test/helper/fixed_port.h @@ -1,11 +1,11 @@ -#pragma once - +#pragma once + namespace NBus { namespace NTest { bool IsFixedPortTestAllowed(); - + // Must not be in range OS uses for bind on random port. const unsigned FixedPort = 4927; - + } } diff --git a/library/cpp/messagebus/test/helper/hanging_server.cpp b/library/cpp/messagebus/test/helper/hanging_server.cpp index a35514b00d..3911ff10ba 100644 --- a/library/cpp/messagebus/test/helper/hanging_server.cpp +++ b/library/cpp/messagebus/test/helper/hanging_server.cpp @@ -1,13 +1,13 @@ #include "hanging_server.h" -#include <util/system/yassert.h> - -using namespace NBus; - +#include <util/system/yassert.h> + +using namespace NBus; + THangingServer::THangingServer(int port) { BindResult = BindOnPort(port, false); -} - -int THangingServer::GetPort() const { - return BindResult.first; -} +} + +int THangingServer::GetPort() const { + return BindResult.first; +} diff --git a/library/cpp/messagebus/test/helper/hanging_server.h b/library/cpp/messagebus/test/helper/hanging_server.h index cc9fb274d8..384f07d7cf 100644 --- a/library/cpp/messagebus/test/helper/hanging_server.h +++ b/library/cpp/messagebus/test/helper/hanging_server.h @@ -1,16 +1,16 @@ -#pragma once - +#pragma once + #include <library/cpp/messagebus/network.h> -#include <util/network/sock.h> - -class THangingServer { -private: +#include <util/network/sock.h> + +class THangingServer { +private: std::pair<unsigned, TVector<NBus::TBindResult>> BindResult; -public: - // listen on given port, and nothing else - THangingServer(int port = 0); - // actual port - int GetPort() const; -}; +public: + // listen on given port, and nothing else + THangingServer(int port = 0); + // actual port + int GetPort() const; +}; diff --git a/library/cpp/messagebus/test/helper/message_handler_error.cpp b/library/cpp/messagebus/test/helper/message_handler_error.cpp index c09811ec67..40997adec8 100644 --- a/library/cpp/messagebus/test/helper/message_handler_error.cpp +++ b/library/cpp/messagebus/test/helper/message_handler_error.cpp @@ -1,26 +1,26 @@ #include "message_handler_error.h" -#include <util/system/yassert.h> - -using namespace NBus; -using namespace NBus::NTest; - -void TBusClientHandlerError::OnError(TAutoPtr<TBusMessage>, EMessageStatus status) { +#include <util/system/yassert.h> + +using namespace NBus; +using namespace NBus::NTest; + +void TBusClientHandlerError::OnError(TAutoPtr<TBusMessage>, EMessageStatus status) { Y_FAIL("must not be called, status: %s", ToString(status).data()); -} - -void TBusClientHandlerError::OnReply(TAutoPtr<TBusMessage>, TAutoPtr<TBusMessage>) { +} + +void TBusClientHandlerError::OnReply(TAutoPtr<TBusMessage>, TAutoPtr<TBusMessage>) { Y_FAIL("must not be called"); -} - -void TBusClientHandlerError::OnMessageSentOneWay(TAutoPtr<TBusMessage>) { +} + +void TBusClientHandlerError::OnMessageSentOneWay(TAutoPtr<TBusMessage>) { Y_FAIL("must not be called"); -} - -void TBusServerHandlerError::OnError(TAutoPtr<TBusMessage>, EMessageStatus status) { +} + +void TBusServerHandlerError::OnError(TAutoPtr<TBusMessage>, EMessageStatus status) { Y_FAIL("must not be called, status: %s", ToString(status).data()); -} - -void TBusServerHandlerError::OnMessage(TOnMessageContext&) { +} + +void TBusServerHandlerError::OnMessage(TOnMessageContext&) { Y_FAIL("must not be called"); -} +} diff --git a/library/cpp/messagebus/test/helper/message_handler_error.h b/library/cpp/messagebus/test/helper/message_handler_error.h index a314b10761..bba0007a44 100644 --- a/library/cpp/messagebus/test/helper/message_handler_error.h +++ b/library/cpp/messagebus/test/helper/message_handler_error.h @@ -1,7 +1,7 @@ -#pragma once - +#pragma once + #include <library/cpp/messagebus/ybus.h> - + namespace NBus { namespace NTest { struct TBusClientHandlerError: public IBusClientHandler { @@ -9,11 +9,11 @@ namespace NBus { void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override; void OnReply(TAutoPtr<TBusMessage> pMessage, TAutoPtr<TBusMessage> pReply) override; }; - + struct TBusServerHandlerError: public IBusServerHandler { void OnError(TAutoPtr<TBusMessage> pMessage, EMessageStatus status) override; void OnMessage(TOnMessageContext& pMessage) override; }; - + } } diff --git a/library/cpp/messagebus/test/helper/object_count_check.h b/library/cpp/messagebus/test/helper/object_count_check.h index 1c4756e58c..d844469fb9 100644 --- a/library/cpp/messagebus/test/helper/object_count_check.h +++ b/library/cpp/messagebus/test/helper/object_count_check.h @@ -1,5 +1,5 @@ -#pragma once - +#pragma once + #include <library/cpp/testing/unittest/registar.h> #include <library/cpp/messagebus/remote_client_connection.h> @@ -9,66 +9,66 @@ #include <library/cpp/messagebus/ybus.h> #include <library/cpp/messagebus/oldmodule/module.h> #include <library/cpp/messagebus/scheduler/scheduler.h> - + #include <util/generic/object_counter.h> #include <util/system/type_name.h> #include <util/stream/output.h> - + #include <typeinfo> -struct TObjectCountCheck { - bool Enabled; - - template <typename T> - struct TReset { - TObjectCountCheck* const Thiz; - +struct TObjectCountCheck { + bool Enabled; + + template <typename T> + struct TReset { + TObjectCountCheck* const Thiz; + TReset(TObjectCountCheck* thiz) : Thiz(thiz) { } - + void operator()() { long oldValue = TObjectCounter<T>::ResetObjectCount(); - if (oldValue != 0) { + if (oldValue != 0) { Cerr << "warning: previous counter: " << oldValue << " for " << TypeName<T>() << Endl; - Cerr << "won't check in this test" << Endl; - Thiz->Enabled = false; - } - } - }; - - TObjectCountCheck() { - Enabled = true; - DoForAllCounters<TReset>(); - } - - template <typename T> - struct TCheckZero { + Cerr << "won't check in this test" << Endl; + Thiz->Enabled = false; + } + } + }; + + TObjectCountCheck() { + Enabled = true; + DoForAllCounters<TReset>(); + } + + template <typename T> + struct TCheckZero { TCheckZero(TObjectCountCheck*) { } - + void operator()() { UNIT_ASSERT_VALUES_EQUAL_C(0L, TObjectCounter<T>::ObjectCount(), TypeName<T>()); - } - }; - - ~TObjectCountCheck() { - if (Enabled) { - DoForAllCounters<TCheckZero>(); - } - } - - template <template <typename> class TOp> - void DoForAllCounters() { - TOp< ::NBus::NPrivate::TRemoteClientConnection>(this)(); - TOp< ::NBus::NPrivate::TRemoteServerConnection>(this)(); - TOp< ::NBus::NPrivate::TRemoteClientSession>(this)(); - TOp< ::NBus::NPrivate::TRemoteServerSession>(this)(); - TOp< ::NBus::NPrivate::TScheduler>(this)(); - TOp< ::NEventLoop::TEventLoop>(this)(); - TOp< ::NEventLoop::TChannel>(this)(); - TOp< ::NBus::TBusModule>(this)(); - TOp< ::NBus::TBusJob>(this)(); - } -}; + } + }; + + ~TObjectCountCheck() { + if (Enabled) { + DoForAllCounters<TCheckZero>(); + } + } + + template <template <typename> class TOp> + void DoForAllCounters() { + TOp< ::NBus::NPrivate::TRemoteClientConnection>(this)(); + TOp< ::NBus::NPrivate::TRemoteServerConnection>(this)(); + TOp< ::NBus::NPrivate::TRemoteClientSession>(this)(); + TOp< ::NBus::NPrivate::TRemoteServerSession>(this)(); + TOp< ::NBus::NPrivate::TScheduler>(this)(); + TOp< ::NEventLoop::TEventLoop>(this)(); + TOp< ::NEventLoop::TChannel>(this)(); + TOp< ::NBus::TBusModule>(this)(); + TOp< ::NBus::TBusJob>(this)(); + } +}; diff --git a/library/cpp/messagebus/test/helper/wait_for.h b/library/cpp/messagebus/test/helper/wait_for.h index f09958d4c0..ebd0bfd6e2 100644 --- a/library/cpp/messagebus/test/helper/wait_for.h +++ b/library/cpp/messagebus/test/helper/wait_for.h @@ -1,14 +1,14 @@ -#pragma once - -#include <util/datetime/base.h> -#include <util/system/yassert.h> - +#pragma once + +#include <util/datetime/base.h> +#include <util/system/yassert.h> + #define UNIT_WAIT_FOR(condition) \ do { \ - TInstant start(TInstant::Now()); \ - while (!(condition) && (TInstant::Now() - start < TDuration::Seconds(10))) { \ + TInstant start(TInstant::Now()); \ + while (!(condition) && (TInstant::Now() - start < TDuration::Seconds(10))) { \ Sleep(TDuration::MilliSeconds(1)); \ - } \ - /* TODO: use UNIT_ASSERT if in unittest thread */ \ + } \ + /* TODO: use UNIT_ASSERT if in unittest thread */ \ Y_VERIFY(condition, "condition failed after 10 seconds wait"); \ - } while (0) + } while (0) diff --git a/library/cpp/messagebus/test/helper/ya.make b/library/cpp/messagebus/test/helper/ya.make index 97bd45f573..703f6b6953 100644 --- a/library/cpp/messagebus/test/helper/ya.make +++ b/library/cpp/messagebus/test/helper/ya.make @@ -1,17 +1,17 @@ -LIBRARY(messagebus_test_helper) - +LIBRARY(messagebus_test_helper) + OWNER(g:messagebus) - -SRCS( - example.cpp - example_module.cpp - fixed_port.cpp - message_handler_error.cpp - hanging_server.cpp -) - -PEERDIR( + +SRCS( + example.cpp + example_module.cpp + fixed_port.cpp + message_handler_error.cpp + hanging_server.cpp +) + +PEERDIR( library/cpp/messagebus/oldmodule -) - -END() +) + +END() |