diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/test | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/test')
42 files changed, 4360 insertions, 0 deletions
diff --git a/library/cpp/messagebus/test/TestMessageBus.py b/library/cpp/messagebus/test/TestMessageBus.py new file mode 100644 index 0000000000..0bbaa0a313 --- /dev/null +++ b/library/cpp/messagebus/test/TestMessageBus.py @@ -0,0 +1,8 @@ +from devtools.fleur.ytest import group, constraint +from devtools.fleur.ytest.integration import UnitTestGroup + +@group +@constraint('library.messagebus') +class TestMessageBus(UnitTestGroup): + def __init__(self, context): + UnitTestGroup.__init__(self, context, 'MessageBus', 'library-messagebus-test-ut') diff --git a/library/cpp/messagebus/test/example/client/client.cpp b/library/cpp/messagebus/test/example/client/client.cpp new file mode 100644 index 0000000000..89b5f2c9be --- /dev/null +++ b/library/cpp/messagebus/test/example/client/client.cpp @@ -0,0 +1,81 @@ +#include <library/cpp/messagebus/test/example/common/proto.h> + +#include <util/random/random.h> + +using namespace NBus; +using namespace NCalculator; + +namespace NCalculator { + struct TCalculatorClient: public IBusClientHandler { + TCalculatorProtocol Proto; + TBusMessageQueuePtr MessageQueue; + TBusClientSessionPtr ClientSession; + + TCalculatorClient() { + MessageQueue = CreateMessageQueue(); + TBusClientSessionConfig config; + config.TotalTimeout = 2 * 1000; + ClientSession = TBusClientSession::Create(&Proto, this, config, MessageQueue); + } + + ~TCalculatorClient() override { + MessageQueue->Stop(); + } + + void OnReply(TAutoPtr<TBusMessage> request, TAutoPtr<TBusMessage> response0) override { + Y_VERIFY(response0->GetHeader()->Type == TResponse::MessageType, "wrong response"); + TResponse* response = VerifyDynamicCast<TResponse*>(response0.Get()); + if (request->GetHeader()->Type == TRequestSum::MessageType) { + TRequestSum* requestSum = VerifyDynamicCast<TRequestSum*>(request.Get()); + int a = requestSum->Record.GetA(); + int b = requestSum->Record.GetB(); + Cerr << a << " + " << b << " = " << response->Record.GetResult() << "\n"; + } else if (request->GetHeader()->Type == TRequestMul::MessageType) { + TRequestMul* requestMul = VerifyDynamicCast<TRequestMul*>(request.Get()); + int a = requestMul->Record.GetA(); + int b = requestMul->Record.GetB(); + Cerr << a << " * " << b << " = " << response->Record.GetResult() << "\n"; + } else { + Y_FAIL("unknown request"); + } + } + + void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override { + Cerr << "got error " << status << "\n"; + } + }; + +} + +int main(int, char**) { + TCalculatorClient client; + + for (;;) { + TNetAddr addr(TNetAddr("127.0.0.1", TCalculatorProtocol().GetPort())); + + int a = RandomNumber<unsigned>(10); + int b = RandomNumber<unsigned>(10); + EMessageStatus ok; + if (RandomNumber<bool>()) { + TAutoPtr<TRequestSum> request(new TRequestSum); + request->Record.SetA(a); + request->Record.SetB(b); + Cerr << "sending " << a << " + " << b << "\n"; + ok = client.ClientSession->SendMessageAutoPtr(request, &addr); + } else { + TAutoPtr<TRequestMul> request(new TRequestMul); + request->Record.SetA(a); + request->Record.SetB(b); + Cerr << "sending " << a << " * " << b << "\n"; + ok = client.ClientSession->SendMessageAutoPtr(request, &addr); + } + + if (ok != MESSAGE_OK) { + Cerr << "failed to send message " << ok << "\n"; + } + + Sleep(TDuration::Seconds(1)); + } + + return 0; +} diff --git a/library/cpp/messagebus/test/example/client/ya.make b/library/cpp/messagebus/test/example/client/ya.make new file mode 100644 index 0000000000..a660a01698 --- /dev/null +++ b/library/cpp/messagebus/test/example/client/ya.make @@ -0,0 +1,13 @@ +PROGRAM(messagebus_example_client) + +OWNER(g:messagebus) + +PEERDIR( + library/cpp/messagebus/test/example/common +) + +SRCS( + client.cpp +) + +END() diff --git a/library/cpp/messagebus/test/example/common/messages.proto b/library/cpp/messagebus/test/example/common/messages.proto new file mode 100644 index 0000000000..16b858fc77 --- /dev/null +++ b/library/cpp/messagebus/test/example/common/messages.proto @@ -0,0 +1,15 @@ +package NCalculator; + +message TRequestSumRecord { + required int32 A = 1; + required int32 B = 2; +} + +message TRequestMulRecord { + required int32 A = 1; + required int32 B = 2; +} + +message TResponseRecord { + required int32 Result = 1; +} diff --git a/library/cpp/messagebus/test/example/common/proto.cpp b/library/cpp/messagebus/test/example/common/proto.cpp new file mode 100644 index 0000000000..1d18aa77ea --- /dev/null +++ b/library/cpp/messagebus/test/example/common/proto.cpp @@ -0,0 +1,12 @@ +#include "proto.h" + +using namespace NCalculator; +using namespace NBus; + +TCalculatorProtocol::TCalculatorProtocol() + : TBusBufferProtocol("Calculator", 34567) +{ + RegisterType(new TRequestSum); + RegisterType(new TRequestMul); + RegisterType(new TResponse); +} diff --git a/library/cpp/messagebus/test/example/common/proto.h b/library/cpp/messagebus/test/example/common/proto.h new file mode 100644 index 0000000000..a151aac468 --- /dev/null +++ b/library/cpp/messagebus/test/example/common/proto.h @@ -0,0 +1,17 @@ +#pragma once + +#include <library/cpp/messagebus/test/example/common/messages.pb.h> + +#include <library/cpp/messagebus/ybus.h> +#include <library/cpp/messagebus/protobuf/ybusbuf.h> + +namespace NCalculator { + typedef ::NBus::TBusBufferMessage<TRequestSumRecord, 1> TRequestSum; + typedef ::NBus::TBusBufferMessage<TRequestMulRecord, 2> TRequestMul; + typedef ::NBus::TBusBufferMessage<TResponseRecord, 3> TResponse; + + struct TCalculatorProtocol: public ::NBus::TBusBufferProtocol { + TCalculatorProtocol(); + }; + +} diff --git a/library/cpp/messagebus/test/example/common/ya.make b/library/cpp/messagebus/test/example/common/ya.make new file mode 100644 index 0000000000..4da16608fc --- /dev/null +++ b/library/cpp/messagebus/test/example/common/ya.make @@ -0,0 +1,15 @@ +LIBRARY(messagebus_test_example_common) + +OWNER(g:messagebus) + +PEERDIR( + library/cpp/messagebus + library/cpp/messagebus/protobuf +) + +SRCS( + proto.cpp + messages.proto +) + +END() diff --git a/library/cpp/messagebus/test/example/server/server.cpp b/library/cpp/messagebus/test/example/server/server.cpp new file mode 100644 index 0000000000..13e52d75f5 --- /dev/null +++ b/library/cpp/messagebus/test/example/server/server.cpp @@ -0,0 +1,58 @@ +#include <library/cpp/messagebus/test/example/common/proto.h> + +using namespace NBus; +using namespace NCalculator; + +namespace NCalculator { + struct TCalculatorServer: public IBusServerHandler { + TCalculatorProtocol Proto; + TBusMessageQueuePtr MessageQueue; + TBusServerSessionPtr ServerSession; + + TCalculatorServer() { + MessageQueue = CreateMessageQueue(); + TBusServerSessionConfig config; + ServerSession = TBusServerSession::Create(&Proto, this, config, MessageQueue); + } + + ~TCalculatorServer() override { + MessageQueue->Stop(); + } + + void OnMessage(TOnMessageContext& request) override { + if (request.GetMessage()->GetHeader()->Type == TRequestSum::MessageType) { + TRequestSum* requestSum = VerifyDynamicCast<TRequestSum*>(request.GetMessage()); + int a = requestSum->Record.GetA(); + int b = requestSum->Record.GetB(); + int result = a + b; + Cerr << "requested " << a << " + " << b << ", sending " << result << "\n"; + TAutoPtr<TResponse> response(new TResponse); + response->Record.SetResult(result); + request.SendReplyMove(response); + } else if (request.GetMessage()->GetHeader()->Type == TRequestMul::MessageType) { + TRequestMul* requestMul = VerifyDynamicCast<TRequestMul*>(request.GetMessage()); + int a = requestMul->Record.GetA(); + int b = requestMul->Record.GetB(); + int result = a * b; + Cerr << "requested " << a << " * " << b << ", sending " << result << "\n"; + TAutoPtr<TResponse> response(new TResponse); + response->Record.SetResult(result); + request.SendReplyMove(response); + } else { + Y_FAIL("unknown request"); + } + } + }; +} + +int main(int, char**) { + TCalculatorServer server; + + Cerr << "listening on port " << server.ServerSession->GetActualListenPort() << "\n"; + + for (;;) { + Sleep(TDuration::Seconds(1)); + } + + return 0; +} diff --git a/library/cpp/messagebus/test/example/server/ya.make b/library/cpp/messagebus/test/example/server/ya.make new file mode 100644 index 0000000000..8cdd97cb12 --- /dev/null +++ b/library/cpp/messagebus/test/example/server/ya.make @@ -0,0 +1,13 @@ +PROGRAM(messagebus_example_server) + +OWNER(g:messagebus) + +PEERDIR( + library/cpp/messagebus/test/example/common +) + +SRCS( + server.cpp +) + +END() diff --git a/library/cpp/messagebus/test/example/ya.make b/library/cpp/messagebus/test/example/ya.make new file mode 100644 index 0000000000..f275351c29 --- /dev/null +++ b/library/cpp/messagebus/test/example/ya.make @@ -0,0 +1,7 @@ +OWNER(g:messagebus) + +RECURSE( + client + common + server +) diff --git a/library/cpp/messagebus/test/helper/alloc_counter.h b/library/cpp/messagebus/test/helper/alloc_counter.h new file mode 100644 index 0000000000..ec9041cb15 --- /dev/null +++ b/library/cpp/messagebus/test/helper/alloc_counter.h @@ -0,0 +1,21 @@ +#pragma once + +#include <util/generic/noncopyable.h> +#include <util/system/atomic.h> +#include <util/system/yassert.h> + +class TAllocCounter : TNonCopyable { +private: + TAtomic* CountPtr; + +public: + TAllocCounter(TAtomic* countPtr) + : CountPtr(countPtr) + { + AtomicIncrement(*CountPtr); + } + + ~TAllocCounter() { + Y_VERIFY(AtomicDecrement(*CountPtr) >= 0, "released too many"); + } +}; diff --git a/library/cpp/messagebus/test/helper/example.cpp b/library/cpp/messagebus/test/helper/example.cpp new file mode 100644 index 0000000000..7c6d704042 --- /dev/null +++ b/library/cpp/messagebus/test/helper/example.cpp @@ -0,0 +1,281 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include "example.h" + +#include <util/generic/cast.h> + +using namespace NBus; +using namespace NBus::NTest; + +static void FillWithJunk(TArrayRef<char> data) { + TStringBuf junk = + "01234567890123456789012345678901234567890123456789012345678901234567890123456789" + "01234567890123456789012345678901234567890123456789012345678901234567890123456789" + "01234567890123456789012345678901234567890123456789012345678901234567890123456789" + "01234567890123456789012345678901234567890123456789012345678901234567890123456789"; + + for (size_t i = 0; i < data.size(); i += junk.size()) { + memcpy(data.data() + i, junk.data(), Min(junk.size(), data.size() - i)); + } +} + +static TString JunkString(size_t len) { + TTempBuf temp(len); + TArrayRef<char> tempArrayRef(temp.Data(), len); + FillWithJunk(tempArrayRef); + + return TString(tempArrayRef.data(), tempArrayRef.size()); +} + +TExampleRequest::TExampleRequest(TAtomic* counterPtr, size_t payloadSize) + : TBusMessage(77) + , AllocCounter(counterPtr) + , Data(JunkString(payloadSize)) +{ +} + +TExampleRequest::TExampleRequest(ECreateUninitialized, TAtomic* counterPtr) + : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) + , AllocCounter(counterPtr) +{ +} + +TExampleResponse::TExampleResponse(TAtomic* counterPtr, size_t payloadSize) + : TBusMessage(79) + , AllocCounter(counterPtr) + , Data(JunkString(payloadSize)) +{ +} + +TExampleResponse::TExampleResponse(ECreateUninitialized, TAtomic* counterPtr) + : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) + , AllocCounter(counterPtr) +{ +} + +TExampleProtocol::TExampleProtocol(int port) + : TBusProtocol("Example", port) + , RequestCount(0) + , ResponseCount(0) + , RequestCountDeserialized(0) + , ResponseCountDeserialized(0) + , StartCount(0) +{ +} + +TExampleProtocol::~TExampleProtocol() { + if (UncaughtException()) { + // so it could be reported in test + return; + } + Y_VERIFY(0 == AtomicGet(RequestCount), "protocol %s: must be 0 requests allocated, actually %d", GetService(), int(RequestCount)); + Y_VERIFY(0 == AtomicGet(ResponseCount), "protocol %s: must be 0 responses allocated, actually %d", GetService(), int(ResponseCount)); + Y_VERIFY(0 == AtomicGet(RequestCountDeserialized), "protocol %s: must be 0 requests deserialized allocated, actually %d", GetService(), int(RequestCountDeserialized)); + Y_VERIFY(0 == AtomicGet(ResponseCountDeserialized), "protocol %s: must be 0 responses deserialized allocated, actually %d", GetService(), int(ResponseCountDeserialized)); + Y_VERIFY(0 == AtomicGet(StartCount), "protocol %s: must be 0 start objects allocated, actually %d", GetService(), int(StartCount)); +} + +void TExampleProtocol::Serialize(const TBusMessage* message, TBuffer& buffer) { + // Messages have no data, we recreate them from scratch + // instead of sending, so we don't need to serialize them. + if (const TExampleRequest* exampleMessage = dynamic_cast<const TExampleRequest*>(message)) { + buffer.Append(exampleMessage->Data.data(), exampleMessage->Data.size()); + } else if (const TExampleResponse* exampleReply = dynamic_cast<const TExampleResponse*>(message)) { + buffer.Append(exampleReply->Data.data(), exampleReply->Data.size()); + } else { + Y_FAIL("unknown message type"); + } +} + +TAutoPtr<TBusMessage> TExampleProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) { + // TODO: check data + Y_UNUSED(payload); + + if (messageType == 77) { + TExampleRequest* exampleMessage = new TExampleRequest(MESSAGE_CREATE_UNINITIALIZED, &RequestCountDeserialized); + exampleMessage->Data.append(payload.data(), payload.size()); + return exampleMessage; + } else if (messageType == 79) { + TExampleResponse* exampleReply = new TExampleResponse(MESSAGE_CREATE_UNINITIALIZED, &ResponseCountDeserialized); + exampleReply->Data.append(payload.data(), payload.size()); + return exampleReply; + } else { + return nullptr; + } +} + +TExampleClient::TExampleClient(const TBusClientSessionConfig sessionConfig, int port) + : Proto(port) + , UseCompression(false) + , CrashOnError(false) + , DataSize(320) + , MessageCount(0) + , RepliesCount(0) + , Errors(0) + , LastError(MESSAGE_OK) +{ + Bus = CreateMessageQueue("TExampleClient"); + + Session = TBusClientSession::Create(&Proto, this, sessionConfig, Bus); + + Session->RegisterService("localhost"); +} + +TExampleClient::~TExampleClient() { +} + +EMessageStatus TExampleClient::SendMessage(const TNetAddr* addr) { + TAutoPtr<TExampleRequest> message(new TExampleRequest(&Proto.RequestCount, DataSize)); + message->SetCompressed(UseCompression); + return Session->SendMessageAutoPtr(message, addr); +} + +void TExampleClient::SendMessages(size_t count, const TNetAddr* addr) { + UNIT_ASSERT(MessageCount == 0); + UNIT_ASSERT(RepliesCount == 0); + UNIT_ASSERT(Errors == 0); + + WorkDone.Reset(); + MessageCount = count; + for (ssize_t i = 0; i < MessageCount; ++i) { + EMessageStatus s = SendMessage(addr); + UNIT_ASSERT_EQUAL_C(s, MESSAGE_OK, "expecting OK, got " << s); + } +} + +void TExampleClient::SendMessages(size_t count, const TNetAddr& addr) { + SendMessages(count, &addr); +} + +void TExampleClient::ResetCounters() { + MessageCount = 0; + RepliesCount = 0; + Errors = 0; + LastError = MESSAGE_OK; + + WorkDone.Reset(); +} + +void TExampleClient::WaitReplies() { + WorkDone.WaitT(TDuration::Seconds(60)); + + UNIT_ASSERT_VALUES_EQUAL(AtomicGet(RepliesCount), MessageCount); + UNIT_ASSERT_VALUES_EQUAL(AtomicGet(Errors), 0); + UNIT_ASSERT_VALUES_EQUAL(Session->GetInFlight(), 0); + + ResetCounters(); +} + +EMessageStatus TExampleClient::WaitForError() { + WorkDone.WaitT(TDuration::Seconds(60)); + + UNIT_ASSERT_VALUES_EQUAL(1, MessageCount); + UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount)); + UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight()); + UNIT_ASSERT_VALUES_EQUAL(1, Errors); + EMessageStatus result = LastError; + + ResetCounters(); + return result; +} + +void TExampleClient::WaitForError(EMessageStatus status) { + EMessageStatus error = WaitForError(); + UNIT_ASSERT_VALUES_EQUAL(status, error); +} + +void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr* addr) { + SendMessages(count, addr); + WaitReplies(); +} + +void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr& addr) { + SendMessagesWaitReplies(count, &addr); +} + +void TExampleClient::OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) { + Y_UNUSED(mess); + Y_UNUSED(reply); + + if (AtomicIncrement(RepliesCount) == MessageCount) { + WorkDone.Signal(); + } +} + +void TExampleClient::OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) { + if (CrashOnError) { + Y_FAIL("client failed: %s", ToCString(status)); + } + + Y_UNUSED(mess); + + AtomicIncrement(Errors); + LastError = status; + WorkDone.Signal(); +} + +TExampleServer::TExampleServer( + const char* name, + const TBusServerSessionConfig& sessionConfig) + : UseCompression(false) + , AckMessageBeforeSendReply(false) + , ForgetRequest(false) +{ + Bus = CreateMessageQueue(name); + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); +} + +TExampleServer::TExampleServer(unsigned port, const char* name) + : UseCompression(false) + , AckMessageBeforeSendReply(false) + , ForgetRequest(false) +{ + Bus = CreateMessageQueue(name); + TBusServerSessionConfig sessionConfig; + sessionConfig.ListenPort = port; + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); +} + +TExampleServer::~TExampleServer() { +} + +size_t TExampleServer::GetInFlight() const { + return Session->GetInFlight(); +} + +unsigned TExampleServer::GetActualListenPort() const { + return Session->GetActualListenPort(); +} + +TNetAddr TExampleServer::GetActualListenAddr() const { + return TNetAddr("127.0.0.1", GetActualListenPort()); +} + +void TExampleServer::WaitForOnMessageCount(unsigned n) { + TestSync.WaitFor(n); +} + +void TExampleServer::OnMessage(TOnMessageContext& mess) { + TestSync.Inc(); + + TExampleRequest* request = VerifyDynamicCast<TExampleRequest*>(mess.GetMessage()); + + if (ForgetRequest) { + mess.ForgetRequest(); + return; + } + + TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount, DataSize.GetOrElse(request->Data.size()))); + reply->SetCompressed(UseCompression); + + EMessageStatus status; + if (AckMessageBeforeSendReply) { + TBusIdentity ident; + mess.AckMessage(ident); + status = Session->SendReply(ident, reply.Release()); // TODO: leaks on error + } else { + status = mess.SendReplyMove(reply); + } + + Y_VERIFY(status == MESSAGE_OK, "failed to send reply: %s", ToString(status).data()); +} diff --git a/library/cpp/messagebus/test/helper/example.h b/library/cpp/messagebus/test/helper/example.h new file mode 100644 index 0000000000..26b7475308 --- /dev/null +++ b/library/cpp/messagebus/test/helper/example.h @@ -0,0 +1,132 @@ +#pragma once + +#include <library/cpp/testing/unittest/registar.h> + +#include "alloc_counter.h" +#include "message_handler_error.h" + +#include <library/cpp/messagebus/ybus.h> +#include <library/cpp/messagebus/misc/test_sync.h> + +#include <util/system/event.h> + +namespace NBus { + namespace NTest { + class TExampleRequest: public TBusMessage { + friend class TExampleProtocol; + + private: + TAllocCounter AllocCounter; + + public: + TString Data; + + public: + TExampleRequest(TAtomic* counterPtr, size_t payloadSize = 320); + TExampleRequest(ECreateUninitialized, TAtomic* counterPtr); + }; + + class TExampleResponse: public TBusMessage { + friend class TExampleProtocol; + + private: + TAllocCounter AllocCounter; + + public: + TString Data; + TExampleResponse(TAtomic* counterPtr, size_t payloadSize = 320); + TExampleResponse(ECreateUninitialized, TAtomic* counterPtr); + }; + + class TExampleProtocol: public TBusProtocol { + public: + TAtomic RequestCount; + TAtomic ResponseCount; + TAtomic RequestCountDeserialized; + TAtomic ResponseCountDeserialized; + TAtomic StartCount; + + TExampleProtocol(int port = 0); + + ~TExampleProtocol() override; + + void Serialize(const TBusMessage* message, TBuffer& buffer) override; + + TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override; + }; + + class TExampleClient: private TBusClientHandlerError { + public: + TExampleProtocol Proto; + bool UseCompression; + bool CrashOnError; + size_t DataSize; + + ssize_t MessageCount; + TAtomic RepliesCount; + TAtomic Errors; + EMessageStatus LastError; + + TSystemEvent WorkDone; + + TBusMessageQueuePtr Bus; + TBusClientSessionPtr Session; + + public: + TExampleClient(const TBusClientSessionConfig sessionConfig = TBusClientSessionConfig(), int port = 0); + ~TExampleClient() override; + + EMessageStatus SendMessage(const TNetAddr* addr = nullptr); + + void SendMessages(size_t count, const TNetAddr* addr = nullptr); + void SendMessages(size_t count, const TNetAddr& addr); + + void ResetCounters(); + void WaitReplies(); + EMessageStatus WaitForError(); + void WaitForError(EMessageStatus status); + + void SendMessagesWaitReplies(size_t count, const TNetAddr* addr = nullptr); + void SendMessagesWaitReplies(size_t count, const TNetAddr& addr); + + void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override; + + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus) override; + }; + + class TExampleServer: private TBusServerHandlerError { + public: + TExampleProtocol Proto; + bool UseCompression; + bool AckMessageBeforeSendReply; + TMaybe<size_t> DataSize; // Nothing means use request size + bool ForgetRequest; + + TTestSync TestSync; + + TBusMessageQueuePtr Bus; + TBusServerSessionPtr Session; + + public: + TExampleServer( + const char* name = "TExampleServer", + const TBusServerSessionConfig& sessionConfig = TBusServerSessionConfig()); + + TExampleServer(unsigned port, const char* name = "TExampleServer"); + + ~TExampleServer() override; + + public: + size_t GetInFlight() const; + unsigned GetActualListenPort() const; + // any of + TNetAddr GetActualListenAddr() const; + + void WaitForOnMessageCount(unsigned n); + + protected: + void OnMessage(TOnMessageContext& mess) override; + }; + + } +} diff --git a/library/cpp/messagebus/test/helper/example_module.cpp b/library/cpp/messagebus/test/helper/example_module.cpp new file mode 100644 index 0000000000..65ecfcf73f --- /dev/null +++ b/library/cpp/messagebus/test/helper/example_module.cpp @@ -0,0 +1,43 @@ +#include "example_module.h" + +using namespace NBus; +using namespace NBus::NTest; + +TExampleModule::TExampleModule() + : TBusModule("TExampleModule") +{ + TBusQueueConfig queueConfig; + queueConfig.NumWorkers = 5; + Queue = CreateMessageQueue(queueConfig); +} + +void TExampleModule::StartModule() { + CreatePrivateSessions(Queue.Get()); + StartInput(); +} + +bool TExampleModule::Shutdown() { + TBusModule::Shutdown(); + return true; +} + +TBusServerSessionPtr TExampleModule::CreateExtSession(TBusMessageQueue&) { + return nullptr; +} + +TBusServerSessionPtr TExampleServerModule::CreateExtSession(TBusMessageQueue& queue) { + TBusServerSessionPtr r = CreateDefaultDestination(queue, &Proto, TBusServerSessionConfig()); + ServerAddr = TNetAddr("localhost", r->GetActualListenPort()); + return r; +} + +TExampleClientModule::TExampleClientModule() + : Source() +{ +} + +TBusServerSessionPtr TExampleClientModule::CreateExtSession(TBusMessageQueue& queue) { + Source = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig()); + Source->RegisterService("localhost"); + return nullptr; +} diff --git a/library/cpp/messagebus/test/helper/example_module.h b/library/cpp/messagebus/test/helper/example_module.h new file mode 100644 index 0000000000..a0b295f613 --- /dev/null +++ b/library/cpp/messagebus/test/helper/example_module.h @@ -0,0 +1,37 @@ +#pragma once + +#include "example.h" + +#include <library/cpp/messagebus/oldmodule/module.h> + +namespace NBus { + namespace NTest { + struct TExampleModule: public TBusModule { + TExampleProtocol Proto; + TBusMessageQueuePtr Queue; + + TExampleModule(); + + void StartModule(); + + bool Shutdown() override; + + // nop by default + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override; + }; + + struct TExampleServerModule: public TExampleModule { + TNetAddr ServerAddr; + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override; + }; + + struct TExampleClientModule: public TExampleModule { + TBusClientSessionPtr Source; + + TExampleClientModule(); + + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override; + }; + + } +} diff --git a/library/cpp/messagebus/test/helper/fixed_port.cpp b/library/cpp/messagebus/test/helper/fixed_port.cpp new file mode 100644 index 0000000000..258da0d1a5 --- /dev/null +++ b/library/cpp/messagebus/test/helper/fixed_port.cpp @@ -0,0 +1,10 @@ +#include "fixed_port.h" + +#include <util/system/env.h> + +#include <stdlib.h> + +bool NBus::NTest::IsFixedPortTestAllowed() { + // TODO: report skipped tests to test + return !GetEnv("MB_TESTS_SKIP_FIXED_PORT"); +} diff --git a/library/cpp/messagebus/test/helper/fixed_port.h b/library/cpp/messagebus/test/helper/fixed_port.h new file mode 100644 index 0000000000..a9c61ebc63 --- /dev/null +++ b/library/cpp/messagebus/test/helper/fixed_port.h @@ -0,0 +1,11 @@ +#pragma once + +namespace NBus { + namespace NTest { + bool IsFixedPortTestAllowed(); + + // Must not be in range OS uses for bind on random port. + const unsigned FixedPort = 4927; + + } +} diff --git a/library/cpp/messagebus/test/helper/hanging_server.cpp b/library/cpp/messagebus/test/helper/hanging_server.cpp new file mode 100644 index 0000000000..a35514b00d --- /dev/null +++ b/library/cpp/messagebus/test/helper/hanging_server.cpp @@ -0,0 +1,13 @@ +#include "hanging_server.h" + +#include <util/system/yassert.h> + +using namespace NBus; + +THangingServer::THangingServer(int port) { + BindResult = BindOnPort(port, false); +} + +int THangingServer::GetPort() const { + return BindResult.first; +} diff --git a/library/cpp/messagebus/test/helper/hanging_server.h b/library/cpp/messagebus/test/helper/hanging_server.h new file mode 100644 index 0000000000..cc9fb274d8 --- /dev/null +++ b/library/cpp/messagebus/test/helper/hanging_server.h @@ -0,0 +1,16 @@ +#pragma once + +#include <library/cpp/messagebus/network.h> + +#include <util/network/sock.h> + +class THangingServer { +private: + std::pair<unsigned, TVector<NBus::TBindResult>> BindResult; + +public: + // listen on given port, and nothing else + THangingServer(int port = 0); + // actual port + int GetPort() const; +}; diff --git a/library/cpp/messagebus/test/helper/message_handler_error.cpp b/library/cpp/messagebus/test/helper/message_handler_error.cpp new file mode 100644 index 0000000000..c09811ec67 --- /dev/null +++ b/library/cpp/messagebus/test/helper/message_handler_error.cpp @@ -0,0 +1,26 @@ +#include "message_handler_error.h" + +#include <util/system/yassert.h> + +using namespace NBus; +using namespace NBus::NTest; + +void TBusClientHandlerError::OnError(TAutoPtr<TBusMessage>, EMessageStatus status) { + Y_FAIL("must not be called, status: %s", ToString(status).data()); +} + +void TBusClientHandlerError::OnReply(TAutoPtr<TBusMessage>, TAutoPtr<TBusMessage>) { + Y_FAIL("must not be called"); +} + +void TBusClientHandlerError::OnMessageSentOneWay(TAutoPtr<TBusMessage>) { + Y_FAIL("must not be called"); +} + +void TBusServerHandlerError::OnError(TAutoPtr<TBusMessage>, EMessageStatus status) { + Y_FAIL("must not be called, status: %s", ToString(status).data()); +} + +void TBusServerHandlerError::OnMessage(TOnMessageContext&) { + Y_FAIL("must not be called"); +} diff --git a/library/cpp/messagebus/test/helper/message_handler_error.h b/library/cpp/messagebus/test/helper/message_handler_error.h new file mode 100644 index 0000000000..a314b10761 --- /dev/null +++ b/library/cpp/messagebus/test/helper/message_handler_error.h @@ -0,0 +1,19 @@ +#pragma once + +#include <library/cpp/messagebus/ybus.h> + +namespace NBus { + namespace NTest { + struct TBusClientHandlerError: public IBusClientHandler { + void OnError(TAutoPtr<TBusMessage> pMessage, EMessageStatus status) override; + void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override; + void OnReply(TAutoPtr<TBusMessage> pMessage, TAutoPtr<TBusMessage> pReply) override; + }; + + struct TBusServerHandlerError: public IBusServerHandler { + void OnError(TAutoPtr<TBusMessage> pMessage, EMessageStatus status) override; + void OnMessage(TOnMessageContext& pMessage) override; + }; + + } +} diff --git a/library/cpp/messagebus/test/helper/object_count_check.h b/library/cpp/messagebus/test/helper/object_count_check.h new file mode 100644 index 0000000000..1c4756e58c --- /dev/null +++ b/library/cpp/messagebus/test/helper/object_count_check.h @@ -0,0 +1,74 @@ +#pragma once + +#include <library/cpp/testing/unittest/registar.h> + +#include <library/cpp/messagebus/remote_client_connection.h> +#include <library/cpp/messagebus/remote_client_session.h> +#include <library/cpp/messagebus/remote_server_connection.h> +#include <library/cpp/messagebus/remote_server_session.h> +#include <library/cpp/messagebus/ybus.h> +#include <library/cpp/messagebus/oldmodule/module.h> +#include <library/cpp/messagebus/scheduler/scheduler.h> + +#include <util/generic/object_counter.h> +#include <util/system/type_name.h> +#include <util/stream/output.h> + +#include <typeinfo> + +struct TObjectCountCheck { + bool Enabled; + + template <typename T> + struct TReset { + TObjectCountCheck* const Thiz; + + TReset(TObjectCountCheck* thiz) + : Thiz(thiz) + { + } + + void operator()() { + long oldValue = TObjectCounter<T>::ResetObjectCount(); + if (oldValue != 0) { + Cerr << "warning: previous counter: " << oldValue << " for " << TypeName<T>() << Endl; + Cerr << "won't check in this test" << Endl; + Thiz->Enabled = false; + } + } + }; + + TObjectCountCheck() { + Enabled = true; + DoForAllCounters<TReset>(); + } + + template <typename T> + struct TCheckZero { + TCheckZero(TObjectCountCheck*) { + } + + void operator()() { + UNIT_ASSERT_VALUES_EQUAL_C(0L, TObjectCounter<T>::ObjectCount(), TypeName<T>()); + } + }; + + ~TObjectCountCheck() { + if (Enabled) { + DoForAllCounters<TCheckZero>(); + } + } + + template <template <typename> class TOp> + void DoForAllCounters() { + TOp< ::NBus::NPrivate::TRemoteClientConnection>(this)(); + TOp< ::NBus::NPrivate::TRemoteServerConnection>(this)(); + TOp< ::NBus::NPrivate::TRemoteClientSession>(this)(); + TOp< ::NBus::NPrivate::TRemoteServerSession>(this)(); + TOp< ::NBus::NPrivate::TScheduler>(this)(); + TOp< ::NEventLoop::TEventLoop>(this)(); + TOp< ::NEventLoop::TChannel>(this)(); + TOp< ::NBus::TBusModule>(this)(); + TOp< ::NBus::TBusJob>(this)(); + } +}; diff --git a/library/cpp/messagebus/test/helper/wait_for.h b/library/cpp/messagebus/test/helper/wait_for.h new file mode 100644 index 0000000000..f09958d4c0 --- /dev/null +++ b/library/cpp/messagebus/test/helper/wait_for.h @@ -0,0 +1,14 @@ +#pragma once + +#include <util/datetime/base.h> +#include <util/system/yassert.h> + +#define UNIT_WAIT_FOR(condition) \ + do { \ + TInstant start(TInstant::Now()); \ + while (!(condition) && (TInstant::Now() - start < TDuration::Seconds(10))) { \ + Sleep(TDuration::MilliSeconds(1)); \ + } \ + /* TODO: use UNIT_ASSERT if in unittest thread */ \ + Y_VERIFY(condition, "condition failed after 10 seconds wait"); \ + } while (0) diff --git a/library/cpp/messagebus/test/helper/ya.make b/library/cpp/messagebus/test/helper/ya.make new file mode 100644 index 0000000000..97bd45f573 --- /dev/null +++ b/library/cpp/messagebus/test/helper/ya.make @@ -0,0 +1,17 @@ +LIBRARY(messagebus_test_helper) + +OWNER(g:messagebus) + +SRCS( + example.cpp + example_module.cpp + fixed_port.cpp + message_handler_error.cpp + hanging_server.cpp +) + +PEERDIR( + library/cpp/messagebus/oldmodule +) + +END() diff --git a/library/cpp/messagebus/test/perftest/messages.proto b/library/cpp/messagebus/test/perftest/messages.proto new file mode 100644 index 0000000000..8919034e7a --- /dev/null +++ b/library/cpp/messagebus/test/perftest/messages.proto @@ -0,0 +1,7 @@ +message TPerftestRequestRecord { + required string Data = 1; +} + +message TPerftestResponseRecord { + required string Data = 1; +} diff --git a/library/cpp/messagebus/test/perftest/perftest.cpp b/library/cpp/messagebus/test/perftest/perftest.cpp new file mode 100644 index 0000000000..8489319278 --- /dev/null +++ b/library/cpp/messagebus/test/perftest/perftest.cpp @@ -0,0 +1,713 @@ +#include "simple_proto.h" + +#include <library/cpp/messagebus/test/perftest/messages.pb.h> + +#include <library/cpp/messagebus/text_utils.h> +#include <library/cpp/messagebus/thread_extra.h> +#include <library/cpp/messagebus/ybus.h> +#include <library/cpp/messagebus/oldmodule/module.h> +#include <library/cpp/messagebus/protobuf/ybusbuf.h> +#include <library/cpp/messagebus/www/www.h> + +#include <library/cpp/deprecated/threadable/threadable.h> +#include <library/cpp/execprofile/profile.h> +#include <library/cpp/getopt/opt.h> +#include <library/cpp/lwtrace/start.h> +#include <library/cpp/sighandler/async_signals_handler.h> +#include <library/cpp/threading/future/legacy_future.h> + +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/generic/vector.h> +#include <util/generic/yexception.h> +#include <util/random/random.h> +#include <util/stream/file.h> +#include <util/stream/output.h> +#include <util/stream/str.h> +#include <util/string/split.h> +#include <util/system/event.h> +#include <util/system/sysstat.h> +#include <util/system/thread.h> +#include <util/thread/lfqueue.h> + +#include <signal.h> +#include <stdlib.h> + +using namespace NBus; + +/////////////////////////////////////////////////////// +/// \brief Configuration parameters of the test + +const int DEFAULT_PORT = 55666; + +struct TPerftestConfig { + TString Nodes; ///< node1:port1,node2:port2 + int ClientCount; + int MessageSize; ///< size of message to send + int Delay; ///< server delay (milliseconds) + float Failure; ///< simulated failure rate + int ServerPort; + int Run; + bool ServerUseModules; + bool ExecuteOnMessageInWorkerPool; + bool ExecuteOnReplyInWorkerPool; + bool UseCompression; + bool Profile; + unsigned WwwPort; + + TPerftestConfig(); + + void Print() { + fprintf(stderr, "ClientCount=%d\n", ClientCount); + fprintf(stderr, "ServerPort=%d\n", ServerPort); + fprintf(stderr, "Delay=%d usecs\n", Delay); + fprintf(stderr, "MessageSize=%d bytes\n", MessageSize); + fprintf(stderr, "Failure=%.3f%%\n", Failure * 100.0); + fprintf(stderr, "Runtime=%d seconds\n", Run); + fprintf(stderr, "ServerUseModules=%s\n", ServerUseModules ? "true" : "false"); + fprintf(stderr, "ExecuteOnMessageInWorkerPool=%s\n", ExecuteOnMessageInWorkerPool ? "true" : "false"); + fprintf(stderr, "ExecuteOnReplyInWorkerPool=%s\n", ExecuteOnReplyInWorkerPool ? "true" : "false"); + fprintf(stderr, "UseCompression=%s\n", UseCompression ? "true" : "false"); + fprintf(stderr, "Profile=%s\n", Profile ? "true" : "false"); + fprintf(stderr, "WwwPort=%u\n", WwwPort); + } +}; + +extern TPerftestConfig* TheConfig; +extern bool TheExit; + +TVector<TNetAddr> ServerAddresses; + +struct TConfig { + TBusQueueConfig ServerQueueConfig; + TBusQueueConfig ClientQueueConfig; + TBusServerSessionConfig ServerSessionConfig; + TBusClientSessionConfig ClientSessionConfig; + bool SimpleProtocol; + +private: + void ConfigureDefaults(TBusQueueConfig& config) { + config.NumWorkers = 4; + } + + void ConfigureDefaults(TBusSessionConfig& config) { + config.MaxInFlight = 10000; + config.SendTimeout = TDuration::Seconds(20).MilliSeconds(); + config.TotalTimeout = TDuration::Seconds(60).MilliSeconds(); + } + +public: + TConfig() + : SimpleProtocol(false) + { + ConfigureDefaults(ServerQueueConfig); + ConfigureDefaults(ClientQueueConfig); + ConfigureDefaults(ServerSessionConfig); + ConfigureDefaults(ClientSessionConfig); + } + + void Print() { + // TODO: do not print server if only client and vice verse + Cerr << "server queue config:\n"; + Cerr << IndentText(ServerQueueConfig.PrintToString()); + Cerr << "server session config:" << Endl; + Cerr << IndentText(ServerSessionConfig.PrintToString()); + Cerr << "client queue config:\n"; + Cerr << IndentText(ClientQueueConfig.PrintToString()); + Cerr << "client session config:" << Endl; + Cerr << IndentText(ClientSessionConfig.PrintToString()); + Cerr << "simple protocol: " << SimpleProtocol << "\n"; + } +}; + +TConfig Config; + +//////////////////////////////////////////////////////////////// +/// \brief Fast message + +using TPerftestRequest = TBusBufferMessage<TPerftestRequestRecord, 77>; +using TPerftestResponse = TBusBufferMessage<TPerftestResponseRecord, 79>; + +static size_t RequestSize() { + return RandomNumber<size_t>(TheConfig->MessageSize * 2 + 1); +} + +TAutoPtr<TBusMessage> NewRequest() { + if (Config.SimpleProtocol) { + TAutoPtr<TSimpleMessage> r(new TSimpleMessage); + r->SetCompressed(TheConfig->UseCompression); + r->Payload = 10; + return r.Release(); + } else { + TAutoPtr<TPerftestRequest> r(new TPerftestRequest); + r->SetCompressed(TheConfig->UseCompression); + // TODO: use random content for better compression test + r->Record.SetData(TString(RequestSize(), '?')); + return r.Release(); + } +} + +void CheckRequest(TPerftestRequest* request) { + const TString& data = request->Record.GetData(); + for (size_t i = 0; i != data.size(); ++i) { + Y_VERIFY(data.at(i) == '?', "must be question mark"); + } +} + +TAutoPtr<TPerftestResponse> NewResponse(TPerftestRequest* request) { + TAutoPtr<TPerftestResponse> r(new TPerftestResponse); + r->SetCompressed(TheConfig->UseCompression); + r->Record.SetData(TString(request->Record.GetData().size(), '.')); + return r; +} + +void CheckResponse(TPerftestResponse* response) { + const TString& data = response->Record.GetData(); + for (size_t i = 0; i != data.size(); ++i) { + Y_VERIFY(data.at(i) == '.', "must be dot"); + } +} + +//////////////////////////////////////////////////////////////////// +/// \brief Fast protocol that common between client and server +class TPerftestProtocol: public TBusBufferProtocol { +public: + TPerftestProtocol() + : TBusBufferProtocol("TPerftestProtocol", TheConfig->ServerPort) + { + RegisterType(new TPerftestRequest); + RegisterType(new TPerftestResponse); + } +}; + +class TPerftestServer; +class TPerftestUsingModule; +class TPerftestClient; + +struct TTestStats { + TInstant Start; + + TAtomic Messages; + TAtomic Errors; + TAtomic Replies; + + void IncMessage() { + AtomicIncrement(Messages); + } + void IncReplies() { + AtomicDecrement(Messages); + AtomicIncrement(Replies); + } + int NumMessage() { + return AtomicGet(Messages); + } + void IncErrors() { + AtomicDecrement(Messages); + AtomicIncrement(Errors); + } + int NumErrors() { + return AtomicGet(Errors); + } + int NumReplies() { + return AtomicGet(Replies); + } + + double GetThroughput() { + return NumReplies() * 1000000.0 / (TInstant::Now() - Start).MicroSeconds(); + } + +public: + TTestStats() + : Start(TInstant::Now()) + , Messages(0) + , Errors(0) + , Replies(0) + { + } + + void PeriodicallyPrint(); +}; + +TTestStats Stats; + +//////////////////////////////////////////////////////////////////// +/// \brief Fast of the client session +class TPerftestClient : IBusClientHandler { +public: + TBusClientSessionPtr Session; + THolder<TBusProtocol> Proto; + TBusMessageQueuePtr Bus; + TVector<TBusClientConnectionPtr> Connections; + +public: + /// constructor creates instances of protocol and session + TPerftestClient() { + /// create or get instance of message queue, need one per application + Bus = CreateMessageQueue(Config.ClientQueueConfig, "client"); + + if (Config.SimpleProtocol) { + Proto.Reset(new TSimpleProtocol); + } else { + Proto.Reset(new TPerftestProtocol); + } + + Session = TBusClientSession::Create(Proto.Get(), this, Config.ClientSessionConfig, Bus); + + for (unsigned i = 0; i < ServerAddresses.size(); ++i) { + Connections.push_back(Session->GetConnection(ServerAddresses[i])); + } + } + + /// dispatch of requests is done here + void Work() { + SetCurrentThreadName("FastClient::Work"); + + while (!TheExit) { + TBusClientConnection* connection; + if (Connections.size() == 1) { + connection = Connections.front().Get(); + } else { + connection = Connections.at(RandomNumber<size_t>()).Get(); + } + + TBusMessage* message = NewRequest().Release(); + int ret = connection->SendMessage(message, true); + + if (ret == MESSAGE_OK) { + Stats.IncMessage(); + } else if (ret == MESSAGE_BUSY) { + //delete message; + //Sleep(TDuration::MilliSeconds(1)); + //continue; + Y_FAIL("unreachable"); + } else if (ret == MESSAGE_SHUTDOWN) { + delete message; + } else { + delete message; + Stats.IncErrors(); + } + } + } + + void Stop() { + Session->Shutdown(); + } + + /// actual work is being done here + void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override { + Y_UNUSED(mess); + + if (Config.SimpleProtocol) { + VerifyDynamicCast<TSimpleMessage*>(reply.Get()); + } else { + TPerftestResponse* typed = VerifyDynamicCast<TPerftestResponse*>(reply.Get()); + + CheckResponse(typed); + } + + Stats.IncReplies(); + } + + /// message that could not be delivered + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { + Y_UNUSED(mess); + Y_UNUSED(status); + + if (TheExit) { + return; + } + + Stats.IncErrors(); + + // Y_ASSERT(TheConfig->Failure > 0.0); + } +}; + +class TPerftestServerCommon { +public: + THolder<TBusProtocol> Proto; + + TBusMessageQueuePtr Bus; + + TBusServerSessionPtr Session; + +protected: + TPerftestServerCommon(const char* name) + : Session() + { + if (Config.SimpleProtocol) { + Proto.Reset(new TSimpleProtocol); + } else { + Proto.Reset(new TPerftestProtocol); + } + + /// create or get instance of single message queue, need one for application + Bus = CreateMessageQueue(Config.ServerQueueConfig, name); + } + +public: + void Stop() { + Session->Shutdown(); + } +}; + +struct TAsyncRequest { + TBusMessage* Request; + TInstant ReceivedTime; +}; + +///////////////////////////////////////////////////////////////////// +/// \brief Fast of the server session +class TPerftestServer: public TPerftestServerCommon, public IBusServerHandler { +public: + TLockFreeQueue<TAsyncRequest> AsyncRequests; + +public: + TPerftestServer() + : TPerftestServerCommon("server") + { + /// register destination session + Session = TBusServerSession::Create(Proto.Get(), this, Config.ServerSessionConfig, Bus); + Y_ASSERT(Session && "probably somebody is listening on the same port"); + } + + /// when message comes, send reply + void OnMessage(TOnMessageContext& mess) override { + if (Config.SimpleProtocol) { + TSimpleMessage* typed = VerifyDynamicCast<TSimpleMessage*>(mess.GetMessage()); + TAutoPtr<TSimpleMessage> response(new TSimpleMessage); + response->Payload = typed->Payload; + mess.SendReplyMove(response); + return; + } + + TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess.GetMessage()); + + CheckRequest(typed); + + /// forget replies for few messages, see what happends + if (TheConfig->Failure > RandomNumber<double>()) { + return; + } + + /// sleep requested time + if (TheConfig->Delay) { + TAsyncRequest request; + request.Request = mess.ReleaseMessage(); + request.ReceivedTime = TInstant::Now(); + AsyncRequests.Enqueue(request); + return; + } + + TAutoPtr<TPerftestResponse> reply(NewResponse(typed)); + /// sent empty reply for each message + mess.SendReplyMove(reply); + // TODO: count results + } + + void Stop() { + TPerftestServerCommon::Stop(); + } +}; + +class TPerftestUsingModule: public TPerftestServerCommon, public TBusModule { +public: + TPerftestUsingModule() + : TPerftestServerCommon("server") + , TBusModule("fast") + { + Y_VERIFY(CreatePrivateSessions(Bus.Get()), "failed to initialize dupdetect module"); + Y_VERIFY(StartInput(), "failed to start input"); + } + + ~TPerftestUsingModule() override { + Shutdown(); + } + +private: + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess); + CheckRequest(typed); + + /// sleep requested time + if (TheConfig->Delay) { + usleep(TheConfig->Delay); + } + + /// forget replies for few messages, see what happends + if (TheConfig->Failure > RandomNumber<double>()) { + return nullptr; + } + + job->SendReply(NewResponse(typed).Release()); + return nullptr; + } + + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { + return Session = CreateDefaultDestination(queue, Proto.Get(), Config.ServerSessionConfig); + } +}; + +// ./perftest/perftest -s 11456 -c localhost:11456 -r 60 -n 4 -i 5000 + +using namespace std; +using namespace NBus; + +static TNetworkAddress ParseNetworkAddress(const char* string) { + TString Name; + int Port; + + const char* port = strchr(string, ':'); + + if (port != nullptr) { + Name.append(string, port - string); + Port = atoi(port + 1); + } else { + Name.append(string); + Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT; + } + + return TNetworkAddress(Name, Port); +} + +TVector<TNetAddr> ParseNodes(const TString nodes) { + TVector<TNetAddr> r; + + TVector<TString> hosts; + + size_t numh = Split(nodes.data(), ",", hosts); + + for (int i = 0; i < int(numh); i++) { + const TNetworkAddress& networkAddress = ParseNetworkAddress(hosts[i].data()); + Y_VERIFY(networkAddress.Begin() != networkAddress.End(), "no addresses"); + r.push_back(TNetAddr(networkAddress, &*networkAddress.Begin())); + } + + return r; +} + +TPerftestConfig::TPerftestConfig() { + TBusSessionConfig defaultConfig; + + ServerPort = DEFAULT_PORT; + Delay = 0; // artificial delay inside server OnMessage() + MessageSize = 200; + Failure = 0.00; + Run = 60; // in seconds + Nodes = "localhost"; + ServerUseModules = false; + ExecuteOnMessageInWorkerPool = defaultConfig.ExecuteOnMessageInWorkerPool; + ExecuteOnReplyInWorkerPool = defaultConfig.ExecuteOnReplyInWorkerPool; + UseCompression = false; + Profile = false; + WwwPort = 0; +} + +TPerftestConfig* TheConfig = new TPerftestConfig(); +bool TheExit = false; + +TSystemEvent StopEvent; + +TSimpleSharedPtr<TPerftestServer> Server; +TSimpleSharedPtr<TPerftestUsingModule> ServerUsingModule; + +TVector<TSimpleSharedPtr<TPerftestClient>> Clients; +TMutex ClientsLock; + +void stopsignal(int /*sig*/) { + fprintf(stderr, "\n-------------------- exiting ------------------\n"); + TheExit = true; + StopEvent.Signal(); +} + +// -s <num> - start server on port <num> +// -c <node:port,node:port> - start client + +void TTestStats::PeriodicallyPrint() { + SetCurrentThreadName("print-stats"); + + for (;;) { + StopEvent.WaitT(TDuration::Seconds(1)); + if (TheExit) + break; + + TVector<TSimpleSharedPtr<TPerftestClient>> clients; + { + TGuard<TMutex> guard(ClientsLock); + clients = Clients; + } + + fprintf(stderr, "replies=%d errors=%d throughput=%.3f mess/sec\n", + NumReplies(), NumErrors(), GetThroughput()); + if (!!Server) { + fprintf(stderr, "server: q: %u %s\n", + (unsigned)Server->Bus->GetExecutor()->GetWorkQueueSize(), + Server->Session->GetStatusSingleLine().data()); + } + if (!!ServerUsingModule) { + fprintf(stderr, "server: q: %u %s\n", + (unsigned)ServerUsingModule->Bus->GetExecutor()->GetWorkQueueSize(), + ServerUsingModule->Session->GetStatusSingleLine().data()); + } + for (const auto& client : clients) { + fprintf(stderr, "client: q: %u %s\n", + (unsigned)client->Bus->GetExecutor()->GetWorkQueueSize(), + client->Session->GetStatusSingleLine().data()); + } + + TStringStream stats; + + bool first = true; + if (!!Server) { + if (!first) { + stats << "\n"; + } + first = false; + stats << "server:\n"; + stats << IndentText(Server->Bus->GetStatus()); + } + if (!!ServerUsingModule) { + if (!first) { + stats << "\n"; + } + first = false; + stats << "server using modules:\n"; + stats << IndentText(ServerUsingModule->Bus->GetStatus()); + } + for (const auto& client : clients) { + if (!first) { + stats << "\n"; + } + first = false; + stats << "client:\n"; + stats << IndentText(client->Bus->GetStatus()); + } + + TUnbufferedFileOutput("stats").Write(stats.Str()); + } +} + +int main(int argc, char* argv[]) { + NLWTrace::StartLwtraceFromEnv(); + + /* unix foo */ + setvbuf(stdout, nullptr, _IONBF, 0); + setvbuf(stderr, nullptr, _IONBF, 0); + Umask(0); + SetAsyncSignalHandler(SIGINT, stopsignal); + SetAsyncSignalHandler(SIGTERM, stopsignal); +#ifndef _win_ + SetAsyncSignalHandler(SIGUSR1, stopsignal); +#endif + signal(SIGPIPE, SIG_IGN); + + NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); + opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort); + opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize); + opts.AddLongOption('c', "server-host", "server hosts").RequiredArgument("host[,host]...").StoreResult(&TheConfig->Nodes); + opts.AddCharOption('f', "failure rate (rational number between 0 and 1)").RequiredArgument("rate").StoreResult(&TheConfig->Failure); + opts.AddCharOption('w', "delay before reply").RequiredArgument("microseconds").StoreResult(&TheConfig->Delay); + opts.AddCharOption('r', "run duration").RequiredArgument("seconds").StoreResult(&TheConfig->Run); + opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1"); + opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true); + opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool") + .RequiredArgument("BOOL") + .StoreResult(&TheConfig->ExecuteOnMessageInWorkerPool); + opts.AddLongOption("on-reply-in-pool", "execute OnReply callback in worker pool") + .RequiredArgument("BOOL") + .StoreResult(&TheConfig->ExecuteOnReplyInWorkerPool); + opts.AddLongOption("compression", "use compression").RequiredArgument("BOOL").StoreResult(&TheConfig->UseCompression); + opts.AddLongOption("simple-proto").SetFlag(&Config.SimpleProtocol); + opts.AddLongOption("profile").SetFlag(&TheConfig->Profile); + opts.AddLongOption("www-port").RequiredArgument("PORT").StoreResult(&TheConfig->WwwPort); + opts.AddHelpOption(); + + Config.ServerQueueConfig.ConfigureLastGetopt(opts, "server-"); + Config.ServerSessionConfig.ConfigureLastGetopt(opts, "server-"); + Config.ClientQueueConfig.ConfigureLastGetopt(opts, "client-"); + Config.ClientSessionConfig.ConfigureLastGetopt(opts, "client-"); + + opts.SetFreeArgsMax(0); + + NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv); + + TheConfig->Print(); + Config.Print(); + + if (TheConfig->Profile) { + BeginProfiling(); + } + + TIntrusivePtr<TBusWww> www(new TBusWww); + + ServerAddresses = ParseNodes(TheConfig->Nodes); + + if (TheConfig->ServerPort) { + if (TheConfig->ServerUseModules) { + ServerUsingModule = new TPerftestUsingModule(); + www->RegisterModule(ServerUsingModule.Get()); + } else { + Server = new TPerftestServer(); + www->RegisterServerSession(Server->Session); + } + } + + TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<void, false>>> futures; + + if (ServerAddresses.size() > 0 && TheConfig->ClientCount > 0) { + for (int i = 0; i < TheConfig->ClientCount; ++i) { + TGuard<TMutex> guard(ClientsLock); + Clients.push_back(new TPerftestClient); + futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TPerftestClient::Work, Clients.back()))); + www->RegisterClientSession(Clients.back()->Session); + } + } + + futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TTestStats::PeriodicallyPrint, std::ref(Stats)))); + + THolder<TBusWwwHttpServer> wwwServer; + if (TheConfig->WwwPort != 0) { + wwwServer.Reset(new TBusWwwHttpServer(www, TheConfig->WwwPort)); + } + + /* sit here until signal terminate our process */ + StopEvent.WaitT(TDuration::Seconds(TheConfig->Run)); + TheExit = true; + StopEvent.Signal(); + + if (!!Server) { + Cerr << "Stopping server\n"; + Server->Stop(); + } + if (!!ServerUsingModule) { + Cerr << "Stopping server (using modules)\n"; + ServerUsingModule->Stop(); + } + + TVector<TSimpleSharedPtr<TPerftestClient>> clients; + { + TGuard<TMutex> guard(ClientsLock); + clients = Clients; + } + + if (!clients.empty()) { + Cerr << "Stopping clients\n"; + + for (auto& client : clients) { + client->Stop(); + } + } + + wwwServer.Destroy(); + + for (const auto& future : futures) { + future->Get(); + } + + if (TheConfig->Profile) { + EndProfiling(); + } + + Cerr << "***SUCCESS***\n"; + return 0; +} diff --git a/library/cpp/messagebus/test/perftest/simple_proto.cpp b/library/cpp/messagebus/test/perftest/simple_proto.cpp new file mode 100644 index 0000000000..19d6c15b9d --- /dev/null +++ b/library/cpp/messagebus/test/perftest/simple_proto.cpp @@ -0,0 +1,22 @@ +#include "simple_proto.h" + +#include <util/generic/cast.h> + +#include <typeinfo> + +using namespace NBus; + +void TSimpleProtocol::Serialize(const TBusMessage* mess, TBuffer& data) { + Y_VERIFY(typeid(TSimpleMessage) == typeid(*mess)); + const TSimpleMessage* typed = static_cast<const TSimpleMessage*>(mess); + data.Append((const char*)&typed->Payload, 4); +} + +TAutoPtr<TBusMessage> TSimpleProtocol::Deserialize(ui16, TArrayRef<const char> payload) { + if (payload.size() != 4) { + return nullptr; + } + TAutoPtr<TSimpleMessage> r(new TSimpleMessage); + memcpy(&r->Payload, payload.data(), 4); + return r.Release(); +} diff --git a/library/cpp/messagebus/test/perftest/simple_proto.h b/library/cpp/messagebus/test/perftest/simple_proto.h new file mode 100644 index 0000000000..4a0cc08db3 --- /dev/null +++ b/library/cpp/messagebus/test/perftest/simple_proto.h @@ -0,0 +1,29 @@ +#pragma once + +#include <library/cpp/messagebus/ybus.h> + +struct TSimpleMessage: public NBus::TBusMessage { + ui32 Payload; + + TSimpleMessage() + : TBusMessage(1) + , Payload(0) + { + } + + TSimpleMessage(NBus::ECreateUninitialized) + : TBusMessage(NBus::ECreateUninitialized()) + { + } +}; + +struct TSimpleProtocol: public NBus::TBusProtocol { + TSimpleProtocol() + : NBus::TBusProtocol("simple", 55666) + { + } + + void Serialize(const NBus::TBusMessage* mess, TBuffer& data) override; + + TAutoPtr<NBus::TBusMessage> Deserialize(ui16 ty, TArrayRef<const char> payload) override; +}; diff --git a/library/cpp/messagebus/test/perftest/stackcollect.diff b/library/cpp/messagebus/test/perftest/stackcollect.diff new file mode 100644 index 0000000000..658f0141b3 --- /dev/null +++ b/library/cpp/messagebus/test/perftest/stackcollect.diff @@ -0,0 +1,13 @@ +Index: test/perftest/CMakeLists.txt +=================================================================== +--- test/perftest/CMakeLists.txt (revision 1088840) ++++ test/perftest/CMakeLists.txt (working copy) +@@ -3,7 +3,7 @@ PROGRAM(messagebus_perftest) + OWNER(nga) + + PEERDIR( +- library/cpp/execprofile ++ junk/davenger/stackcollect + library/cpp/messagebus + library/cpp/messagebus/protobuf + library/cpp/sighandler diff --git a/library/cpp/messagebus/test/perftest/ya.make b/library/cpp/messagebus/test/perftest/ya.make new file mode 100644 index 0000000000..24c2848ed5 --- /dev/null +++ b/library/cpp/messagebus/test/perftest/ya.make @@ -0,0 +1,24 @@ +PROGRAM(messagebus_perftest) + +OWNER(g:messagebus) + +PEERDIR( + library/cpp/deprecated/threadable + library/cpp/execprofile + library/cpp/getopt + library/cpp/lwtrace + library/cpp/messagebus + library/cpp/messagebus/oldmodule + library/cpp/messagebus/protobuf + library/cpp/messagebus/www + library/cpp/sighandler + library/cpp/threading/future +) + +SRCS( + messages.proto + perftest.cpp + simple_proto.cpp +) + +END() diff --git a/library/cpp/messagebus/test/ut/count_down_latch.h b/library/cpp/messagebus/test/ut/count_down_latch.h new file mode 100644 index 0000000000..5117db5731 --- /dev/null +++ b/library/cpp/messagebus/test/ut/count_down_latch.h @@ -0,0 +1,30 @@ +#pragma once + +#include <util/system/atomic.h> +#include <util/system/event.h> + +class TCountDownLatch { +private: + TAtomic Current; + TSystemEvent EventObject; + +public: + TCountDownLatch(unsigned initial) + : Current(initial) + { + } + + void CountDown() { + if (AtomicDecrement(Current) == 0) { + EventObject.Signal(); + } + } + + void Await() { + EventObject.Wait(); + } + + bool Await(TDuration timeout) { + return EventObject.WaitT(timeout); + } +}; diff --git a/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp b/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp new file mode 100644 index 0000000000..3fdd175d73 --- /dev/null +++ b/library/cpp/messagebus/test/ut/locator_uniq_ut.cpp @@ -0,0 +1,40 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <library/cpp/messagebus/test_utils.h> +#include <library/cpp/messagebus/ybus.h> + +class TLocatorRegisterUniqTest: public TTestBase { + UNIT_TEST_SUITE(TLocatorRegisterUniqTest); + UNIT_TEST(TestRegister); + UNIT_TEST_SUITE_END(); + +protected: + void TestRegister(); +}; + +UNIT_TEST_SUITE_REGISTRATION(TLocatorRegisterUniqTest); + +void TLocatorRegisterUniqTest::TestRegister() { + ASSUME_IP_V4_ENABLED; + + NBus::TBusLocator locator; + const char* serviceName = "TestService"; + const char* hostName = "192.168.0.42"; + int port = 31337; + + NBus::TBusKeyVec keys; + locator.LocateKeys(serviceName, keys); + UNIT_ASSERT(keys.size() == 0); + + locator.Register(serviceName, hostName, port); + locator.LocateKeys(serviceName, keys); + /// YBUS_KEYMIN YBUS_KEYMAX range + UNIT_ASSERT(keys.size() == 1); + + TVector<NBus::TNetAddr> hosts; + UNIT_ASSERT(locator.LocateAll(serviceName, NBus::YBUS_KEYMIN, hosts) == 1); + + locator.Register(serviceName, hostName, port); + hosts.clear(); + UNIT_ASSERT(locator.LocateAll(serviceName, NBus::YBUS_KEYMIN, hosts) == 1); +} diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp new file mode 100644 index 0000000000..040f9b7702 --- /dev/null +++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp @@ -0,0 +1,1151 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <library/cpp/messagebus/test/helper/example.h> +#include <library/cpp/messagebus/test/helper/fixed_port.h> +#include <library/cpp/messagebus/test/helper/hanging_server.h> +#include <library/cpp/messagebus/test/helper/object_count_check.h> +#include <library/cpp/messagebus/test/helper/wait_for.h> + +#include <library/cpp/messagebus/misc/test_sync.h> + +#include <util/network/sock.h> + +#include <utility> + +using namespace NBus; +using namespace NBus::NTest; + +namespace { + struct TExampleClientSlowOnMessageSent: public TExampleClient { + TAtomic SentCompleted; + + TSystemEvent ReplyReceived; + + TExampleClientSlowOnMessageSent() + : SentCompleted(0) + { + } + + ~TExampleClientSlowOnMessageSent() override { + Session->Shutdown(); + } + + void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override { + Y_VERIFY(AtomicGet(SentCompleted), "must be completed"); + + TExampleClient::OnReply(mess, reply); + + ReplyReceived.Signal(); + } + + void OnMessageSent(TBusMessage*) override { + Sleep(TDuration::MilliSeconds(100)); + AtomicSet(SentCompleted, 1); + } + }; + +} + +Y_UNIT_TEST_SUITE(TMessageBusTests) { + void TestDestinationTemplate(bool useCompression, bool ackMessageBeforeReply, + const TBusServerSessionConfig& sessionConfig) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TExampleClient client(sessionConfig); + client.CrashOnError = true; + + server.UseCompression = useCompression; + client.UseCompression = useCompression; + + server.AckMessageBeforeSendReply = ackMessageBeforeReply; + + client.SendMessagesWaitReplies(100, server.GetActualListenAddr()); + UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0); + UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0); + } + + Y_UNIT_TEST(TestDestination) { + TestDestinationTemplate(false, false, TBusServerSessionConfig()); + } + + Y_UNIT_TEST(TestDestinationUsingAck) { + TestDestinationTemplate(false, true, TBusServerSessionConfig()); + } + + Y_UNIT_TEST(TestDestinationWithCompression) { + TestDestinationTemplate(true, false, TBusServerSessionConfig()); + } + + Y_UNIT_TEST(TestCork) { + TBusServerSessionConfig config; + config.SendThreshold = 1000000000000; + config.Cork = TDuration::MilliSeconds(10); + TestDestinationTemplate(false, false, config); + // TODO: test for cork hanging + } + + Y_UNIT_TEST(TestReconnect) { + if (!IsFixedPortTestAllowed()) { + return; + } + + TObjectCountCheck objectCountCheck; + + unsigned port = FixedPort; + TNetAddr serverAddr("localhost", port); + THolder<TExampleServer> server; + + TBusClientSessionConfig clientConfig; + clientConfig.RetryInterval = 0; + TExampleClient client(clientConfig); + + server.Reset(new TExampleServer(port, "TExampleServer 1")); + + client.SendMessagesWaitReplies(17, serverAddr); + + server.Destroy(); + + // Making the client to detect disconnection. + client.SendMessages(1, serverAddr); + EMessageStatus error = client.WaitForError(); + if (error == MESSAGE_DELIVERY_FAILED) { + client.SendMessages(1, serverAddr); + error = client.WaitForError(); + } + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_CONNECT_FAILED, error); + + server.Reset(new TExampleServer(port, "TExampleServer 2")); + + client.SendMessagesWaitReplies(19, serverAddr); + } + + struct TestNoServerImplClient: public TExampleClient { + TTestSync TestSync; + int failures = 0; + + template <typename... Args> + TestNoServerImplClient(Args&&... args) + : TExampleClient(std::forward<Args>(args)...) + { + } + + ~TestNoServerImplClient() override { + Session->Shutdown(); + } + + void OnError(TAutoPtr<TBusMessage> message, EMessageStatus status) override { + Y_UNUSED(message); + + Y_VERIFY(status == MESSAGE_CONNECT_FAILED, "must be MESSAGE_CONNECT_FAILED, got %s", ToString(status).data()); + + TestSync.CheckAndIncrement((failures++) * 2); + } + }; + + void TestNoServerImpl(unsigned port, bool oneWay) { + TNetAddr noServerAddr("localhost", port); + + TestNoServerImplClient client; + + int count = 0; + for (; count < 200; ++count) { + EMessageStatus status; + if (oneWay) { + status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr); + } else { + TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); + status = client.Session->SendMessageAutoPtr(message, &noServerAddr); + } + + Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data()); + + if (count == 0) { + // lame way to wait until it is connected + Sleep(TDuration::MilliSeconds(10)); + } + client.TestSync.WaitForAndIncrement(count * 2 + 1); + } + + client.TestSync.WaitForAndIncrement(count * 2); + } + + void HangingServerImpl(unsigned port) { + TNetAddr noServerAddr("localhost", port); + + TExampleClient client; + + int count = 0; + for (;; ++count) { + TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); + EMessageStatus status = client.Session->SendMessageAutoPtr(message, &noServerAddr); + if (status == MESSAGE_BUSY) { + break; + } + UNIT_ASSERT_VALUES_EQUAL(int(MESSAGE_OK), int(status)); + + if (count == 0) { + // lame way to wait until it is connected + Sleep(TDuration::MilliSeconds(10)); + } + } + + UNIT_ASSERT_VALUES_EQUAL(client.Session->GetConfig()->MaxInFlight, count); + } + + Y_UNIT_TEST(TestHangindServer) { + TObjectCountCheck objectCountCheck; + + THangingServer server(0); + + HangingServerImpl(server.GetPort()); + } + + Y_UNIT_TEST(TestNoServer) { + TObjectCountCheck objectCountCheck; + + TestNoServerImpl(17, false); + } + + Y_UNIT_TEST(PauseInput) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + server.Session->PauseInput(true); + + TBusClientSessionConfig clientConfig; + clientConfig.MaxInFlight = 1000; + TExampleClient client(clientConfig); + + client.SendMessages(100, server.GetActualListenAddr()); + + server.TestSync.Check(0); + + server.Session->PauseInput(false); + + server.TestSync.WaitFor(100); + + client.WaitReplies(); + + server.Session->PauseInput(true); + + client.SendMessages(200, server.GetActualListenAddr()); + + server.TestSync.Check(100); + + server.Session->PauseInput(false); + + server.TestSync.WaitFor(300); + + client.WaitReplies(); + } + + struct TSendTimeoutCheckerExampleClient: public TExampleClient { + static TBusClientSessionConfig SessionConfig(bool periodLessThanConnectTimeout) { + TBusClientSessionConfig sessionConfig; + if (periodLessThanConnectTimeout) { + sessionConfig.SendTimeout = 1; + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(50); + } else { + sessionConfig.SendTimeout = 50; + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); + } + return sessionConfig; + } + + TSendTimeoutCheckerExampleClient(bool periodLessThanConnectTimeout) + : TExampleClient(SessionConfig(periodLessThanConnectTimeout)) + { + } + + ~TSendTimeoutCheckerExampleClient() override { + Session->Shutdown(); + } + + TSystemEvent ErrorHappened; + + void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override { + Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "got status: %s", ToString(status).data()); + ErrorHappened.Signal(); + } + }; + + void NoServer_SendTimeout_Callback_Impl(bool periodLessThanConnectTimeout) { + TObjectCountCheck objectCountCheck; + + TNetAddr serverAddr("localhost", 17); + + TSendTimeoutCheckerExampleClient client(periodLessThanConnectTimeout); + + client.SendMessages(1, serverAddr); + + client.ErrorHappened.WaitI(); + } + + Y_UNIT_TEST(NoServer_SendTimeout_Callback_PeriodLess) { + NoServer_SendTimeout_Callback_Impl(true); + } + + Y_UNIT_TEST(NoServer_SendTimeout_Callback_TimeoutLess) { + NoServer_SendTimeout_Callback_Impl(false); + } + + Y_UNIT_TEST(TestOnReplyCalledAfterOnMessageSent) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + TNetAddr serverAddr = server.GetActualListenAddr(); + TExampleClientSlowOnMessageSent client; + + TAutoPtr<TExampleRequest> message(new TExampleRequest(&client.Proto.RequestCount)); + EMessageStatus s = client.Session->SendMessageAutoPtr(message, &serverAddr); + UNIT_ASSERT_EQUAL(s, MESSAGE_OK); + + UNIT_ASSERT(client.ReplyReceived.WaitT(TDuration::Seconds(5))); + } + + struct TDelayReplyServer: public TBusServerHandlerError { + TBusMessageQueuePtr Bus; + TExampleProtocol Proto; + TSystemEvent MessageReceivedEvent; // 1 wait for 1 message + TBusServerSessionPtr Session; + TMutex Lock_; + TDeque<TAutoPtr<TOnMessageContext>> DelayedMessages; + + TDelayReplyServer() + : MessageReceivedEvent(TEventResetType::rAuto) + { + Bus = CreateMessageQueue("TDelayReplyServer"); + TBusServerSessionConfig sessionConfig; + sessionConfig.SendTimeout = 1000; + sessionConfig.TotalTimeout = 2001; + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); + if (!Session) { + ythrow yexception() << "Failed to create destination session"; + } + } + + void OnMessage(TOnMessageContext& mess) override { + Y_VERIFY(mess.IsConnectionAlive(), "connection should be alive here"); + TAutoPtr<TOnMessageContext> delayedMsg(new TOnMessageContext); + delayedMsg->Swap(mess); + auto g(Guard(Lock_)); + DelayedMessages.push_back(delayedMsg); + MessageReceivedEvent.Signal(); + } + + bool CheckClientIsAlive() { + auto g(Guard(Lock_)); + for (auto& delayedMessage : DelayedMessages) { + if (!delayedMessage->IsConnectionAlive()) { + return false; + } + } + return true; + } + + bool CheckClientIsDead() const { + auto g(Guard(Lock_)); + for (const auto& delayedMessage : DelayedMessages) { + if (delayedMessage->IsConnectionAlive()) { + return false; + } + } + return true; + } + + void ReplyToDelayedMessages() { + while (true) { + TOnMessageContext msg; + { + auto g(Guard(Lock_)); + if (DelayedMessages.empty()) { + break; + } + DelayedMessages.front()->Swap(msg); + DelayedMessages.pop_front(); + } + TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount)); + msg.SendReplyMove(reply); + } + } + + size_t GetDelayedMessageCount() const { + auto g(Guard(Lock_)); + return DelayedMessages.size(); + } + + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { + Y_UNUSED(mess); + Y_VERIFY(status == MESSAGE_SHUTDOWN, "only shutdown allowed, got %s", ToString(status).data()); + } + }; + + Y_UNIT_TEST(TestReplyCalledAfterClientDisconnected) { + TObjectCountCheck objectCountCheck; + + TDelayReplyServer server; + + THolder<TExampleClient> client(new TExampleClient); + + client->SendMessages(1, TNetAddr("localhost", server.Session->GetActualListenPort())); + + UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); + + UNIT_ASSERT_VALUES_EQUAL(1, server.Session->GetInFlight()); + + client.Destroy(); + + UNIT_WAIT_FOR(server.CheckClientIsDead()); + + server.ReplyToDelayedMessages(); + + // wait until all server message are delivered + UNIT_WAIT_FOR(0 == server.Session->GetInFlight()); + } + + struct TPackUnpackServer: public TBusServerHandlerError { + TBusMessageQueuePtr Bus; + TExampleProtocol Proto; + TSystemEvent MessageReceivedEvent; + TSystemEvent ClientDiedEvent; + TBusServerSessionPtr Session; + + TPackUnpackServer() { + Bus = CreateMessageQueue("TPackUnpackServer"); + TBusServerSessionConfig sessionConfig; + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); + } + + void OnMessage(TOnMessageContext& mess) override { + TBusIdentity ident; + mess.AckMessage(ident); + + char packed[BUS_IDENTITY_PACKED_SIZE]; + ident.Pack(packed); + TBusIdentity resurrected; + resurrected.Unpack(packed); + + mess.GetSession()->SendReply(resurrected, new TExampleResponse(&Proto.ResponseCount)); + } + + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { + Y_UNUSED(mess); + Y_VERIFY(status == MESSAGE_SHUTDOWN, "only shutdown allowed"); + } + }; + + Y_UNIT_TEST(PackUnpack) { + TObjectCountCheck objectCountCheck; + + TPackUnpackServer server; + + THolder<TExampleClient> client(new TExampleClient); + + client->SendMessagesWaitReplies(1, TNetAddr("localhost", server.Session->GetActualListenPort())); + } + + Y_UNIT_TEST(ClientRequestTooLarge) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TBusClientSessionConfig clientConfig; + clientConfig.MaxMessageSize = 100; + TExampleClient client(clientConfig); + + client.DataSize = 10; + client.SendMessagesWaitReplies(1, server.GetActualListenAddr()); + + client.DataSize = 1000; + client.SendMessages(1, server.GetActualListenAddr()); + client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE); + + client.DataSize = 20; + client.SendMessagesWaitReplies(10, server.GetActualListenAddr()); + + client.DataSize = 10000; + client.SendMessages(1, server.GetActualListenAddr()); + client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE); + } + + struct TServerForResponseTooLarge: public TExampleServer { + TTestSync TestSync; + + static TBusServerSessionConfig Config() { + TBusServerSessionConfig config; + config.MaxMessageSize = 100; + return config; + } + + TServerForResponseTooLarge() + : TExampleServer("TServerForResponseTooLarge", Config()) + { + } + + ~TServerForResponseTooLarge() override { + Session->Shutdown(); + } + + void OnMessage(TOnMessageContext& mess) override { + TAutoPtr<TBusMessage> response; + + if (TestSync.Get() == 0) { + TestSync.CheckAndIncrement(0); + response.Reset(new TExampleResponse(&Proto.ResponseCount, 1000)); + } else { + TestSync.WaitForAndIncrement(3); + response.Reset(new TExampleResponse(&Proto.ResponseCount, 10)); + } + + mess.SendReplyMove(response); + } + + void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override { + TestSync.WaitForAndIncrement(1); + + Y_VERIFY(status == MESSAGE_MESSAGE_TOO_LARGE, "status"); + } + }; + + Y_UNIT_TEST(ServerResponseTooLarge) { + TObjectCountCheck objectCountCheck; + + TServerForResponseTooLarge server; + + TExampleClient client; + client.DataSize = 10; + + client.SendMessages(1, server.GetActualListenAddr()); + server.TestSync.WaitForAndIncrement(2); + client.ResetCounters(); + + client.SendMessages(1, server.GetActualListenAddr()); + + client.WorkDone.WaitI(); + + server.TestSync.CheckAndIncrement(4); + + UNIT_ASSERT_VALUES_EQUAL(1, client.Session->GetInFlight()); + } + + struct TServerForRequestTooLarge: public TExampleServer { + TTestSync TestSync; + + static TBusServerSessionConfig Config() { + TBusServerSessionConfig config; + config.MaxMessageSize = 100; + return config; + } + + TServerForRequestTooLarge() + : TExampleServer("TServerForRequestTooLarge", Config()) + { + } + + ~TServerForRequestTooLarge() override { + Session->Shutdown(); + } + + void OnMessage(TOnMessageContext& req) override { + unsigned n = TestSync.Get(); + if (n < 2) { + TestSync.CheckAndIncrement(n); + TAutoPtr<TExampleResponse> resp(new TExampleResponse(&Proto.ResponseCount, 10)); + req.SendReplyMove(resp); + } else { + Y_FAIL("wrong"); + } + } + }; + + Y_UNIT_TEST(ServerRequestTooLarge) { + TObjectCountCheck objectCountCheck; + + TServerForRequestTooLarge server; + + TExampleClient client; + client.DataSize = 10; + + client.SendMessagesWaitReplies(2, server.GetActualListenAddr()); + + server.TestSync.CheckAndIncrement(2); + + client.DataSize = 200; + client.SendMessages(1, server.GetActualListenAddr()); + // server closes connection, so MESSAGE_DELIVERY_FAILED is returned to client + client.WaitForError(MESSAGE_DELIVERY_FAILED); + } + + Y_UNIT_TEST(ClientResponseTooLarge) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + server.DataSize = 10; + + TBusClientSessionConfig clientSessionConfig; + clientSessionConfig.MaxMessageSize = 100; + TExampleClient client(clientSessionConfig); + client.DataSize = 10; + + client.SendMessagesWaitReplies(3, server.GetActualListenAddr()); + + server.DataSize = 1000; + + client.SendMessages(1, server.GetActualListenAddr()); + client.WaitForError(MESSAGE_DELIVERY_FAILED); + } + + Y_UNIT_TEST(ServerUnknownMessage) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + TNetAddr serverAddr = server.GetActualListenAddr(); + + TExampleClient client; + + client.SendMessagesWaitReplies(2, serverAddr); + + TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount)); + req->GetHeader()->Type = 11; + client.Session->SendMessageAutoPtr(req, &serverAddr); + client.MessageCount = 1; + + client.WaitForError(MESSAGE_DELIVERY_FAILED); + } + + Y_UNIT_TEST(ServerMessageReservedIds) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + TNetAddr serverAddr = server.GetActualListenAddr(); + + TExampleClient client; + + client.SendMessagesWaitReplies(2, serverAddr); + + // This test doens't check 0, 1, YBUS_KEYINVALID because there are asserts() on sending side + + TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount)); + req->GetHeader()->Id = 2; + client.Session->SendMessageAutoPtr(req, &serverAddr); + client.MessageCount = 1; + client.WaitForError(MESSAGE_DELIVERY_FAILED); + + req.Reset(new TExampleRequest(&client.Proto.RequestCount)); + req->GetHeader()->Id = YBUS_KEYLOCAL; + client.Session->SendMessageAutoPtr(req, &serverAddr); + client.MessageCount = 1; + client.WaitForError(MESSAGE_DELIVERY_FAILED); + } + + Y_UNIT_TEST(TestGetInFlightForDestination) { + TObjectCountCheck objectCountCheck; + + TDelayReplyServer server; + + TExampleClient client; + + TNetAddr addr("localhost", server.Session->GetActualListenPort()); + + UNIT_ASSERT_VALUES_EQUAL(size_t(0), client.Session->GetInFlight(addr)); + + client.SendMessages(2, &addr); + + for (size_t i = 0; i < 5; ++i) { + // One MessageReceivedEvent indicates one message, we need to wait for two + UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); + if (server.GetDelayedMessageCount() == 2) { + break; + } + } + UNIT_ASSERT_VALUES_EQUAL(server.GetDelayedMessageCount(), 2); + + size_t inFlight = client.Session->GetInFlight(addr); + // 4 is for messagebus1 that adds inFlight counter twice for some reason + UNIT_ASSERT(inFlight == 2 || inFlight == 4); + + UNIT_ASSERT(server.CheckClientIsAlive()); + + server.ReplyToDelayedMessages(); + + client.WaitReplies(); + } + + struct TResetAfterSendOneWayErrorInCallbackClient: public TExampleClient { + TTestSync TestSync; + + static TBusClientSessionConfig SessionConfig() { + TBusClientSessionConfig config; + // 1 ms is not enough when test is running under valgrind + config.ConnectTimeout = 10; + config.SendTimeout = 10; + config.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); + return config; + } + + TResetAfterSendOneWayErrorInCallbackClient() + : TExampleClient(SessionConfig()) + { + } + + ~TResetAfterSendOneWayErrorInCallbackClient() override { + Session->Shutdown(); + } + + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { + TestSync.WaitForAndIncrement(0); + Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "must be connection failed, got %s", ToString(status).data()); + mess.Destroy(); + TestSync.CheckAndIncrement(1); + } + }; + + Y_UNIT_TEST(ResetAfterSendOneWayErrorInCallback) { + TObjectCountCheck objectCountCheck; + + TNetAddr noServerAddr("localhost", 17); + + TResetAfterSendOneWayErrorInCallbackClient client; + + EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr); + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); + + client.TestSync.WaitForAndIncrement(2); + } + + struct TResetAfterSendMessageOneWayDuringShutdown: public TExampleClient { + TTestSync TestSync; + + ~TResetAfterSendMessageOneWayDuringShutdown() override { + Session->Shutdown(); + } + + void OnError(TAutoPtr<TBusMessage> message, EMessageStatus status) override { + TestSync.CheckAndIncrement(0); + + Y_VERIFY(status == MESSAGE_CONNECT_FAILED, "must be MESSAGE_CONNECT_FAILED, got %s", ToString(status).data()); + + // check reset is possible here + message->Reset(); + + // intentionally don't destroy the message + // we will try to resend it + Y_UNUSED(message.Release()); + + TestSync.CheckAndIncrement(1); + } + }; + + Y_UNIT_TEST(ResetAfterSendMessageOneWayDuringShutdown) { + TObjectCountCheck objectCountCheck; + + TNetAddr noServerAddr("localhost", 17); + + TResetAfterSendMessageOneWayDuringShutdown client; + + TExampleRequest* message = new TExampleRequest(&client.Proto.RequestCount); + EMessageStatus ok = client.Session->SendMessageOneWay(message, &noServerAddr); + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); + + client.TestSync.WaitForAndIncrement(2); + + client.Session->Shutdown(); + + ok = client.Session->SendMessageOneWay(message); + Y_VERIFY(ok == MESSAGE_SHUTDOWN, "must be shutdown when sending during shutdown, got %s", ToString(ok).data()); + + // check reset is possible here + message->Reset(); + client.TestSync.CheckAndIncrement(3); + + delete message; + } + + Y_UNIT_TEST(ResetAfterSendOneWayErrorInReturn) { + TObjectCountCheck objectCountCheck; + + TestNoServerImpl(17, true); + } + + struct TResetAfterSendOneWaySuccessClient: public TExampleClient { + TTestSync TestSync; + + ~TResetAfterSendOneWaySuccessClient() override { + Session->Shutdown(); + } + + void OnMessageSentOneWay(TAutoPtr<TBusMessage> sent) override { + TestSync.WaitForAndIncrement(0); + sent->Reset(); + TestSync.CheckAndIncrement(1); + } + }; + + Y_UNIT_TEST(ResetAfterSendOneWaySuccess) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + TNetAddr serverAddr = server.GetActualListenAddr(); + + TResetAfterSendOneWaySuccessClient client; + + EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &serverAddr); + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); + // otherwize message might go to OnError(MESSAGE_SHUTDOWN) + server.WaitForOnMessageCount(1); + + client.TestSync.WaitForAndIncrement(2); + } + + Y_UNIT_TEST(GetStatus) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TExampleClient client; + // make sure connected + client.SendMessagesWaitReplies(3, server.GetActualListenAddr()); + + server.Bus->GetStatus(); + server.Bus->GetStatus(); + server.Bus->GetStatus(); + + client.Bus->GetStatus(); + client.Bus->GetStatus(); + client.Bus->GetStatus(); + } + + Y_UNIT_TEST(BindOnRandomPort) { + TObjectCountCheck objectCountCheck; + + TBusServerSessionConfig serverConfig; + TExampleServer server; + + TExampleClient client; + TNetAddr addr(TNetAddr("127.0.0.1", server.Session->GetActualListenPort())); + client.SendMessagesWaitReplies(3, &addr); + } + + Y_UNIT_TEST(UnbindOnShutdown) { + TBusMessageQueuePtr queue(CreateMessageQueue()); + + TExampleProtocol proto; + TBusServerHandlerError handler; + TBusServerSessionPtr session = TBusServerSession::Create( + &proto, &handler, TBusServerSessionConfig(), queue); + + unsigned port = session->GetActualListenPort(); + UNIT_ASSERT(port > 0); + + session->Shutdown(); + + // fails is Shutdown() didn't unbind + THangingServer hangingServer(port); + } + + Y_UNIT_TEST(VersionNegotiation) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TSockAddrInet addr(IpFromString("127.0.0.1"), server.Session->GetActualListenPort()); + + TInetStreamSocket socket; + int r1 = socket.Connect(&addr); + UNIT_ASSERT(r1 >= 0); + + TStreamSocketOutput output(&socket); + + TBusHeader request; + Zero(request); + request.Size = sizeof(request); + request.SetVersionInternal(0xF); // max + output.Write(&request, sizeof(request)); + + UNIT_ASSERT_VALUES_EQUAL(IsVersionNegotiation(request), true); + + TStreamSocketInput input(&socket); + + TBusHeader response; + size_t pos = 0; + + while (pos < sizeof(response)) { + size_t count = input.Read(((char*)&response) + pos, sizeof(response) - pos); + pos += count; + } + + UNIT_ASSERT_VALUES_EQUAL(sizeof(response), pos); + + UNIT_ASSERT_VALUES_EQUAL(YBUS_VERSION, response.GetVersionInternal()); + } + + struct TOnConnectionEventClient: public TExampleClient { + TTestSync Sync; + + ~TOnConnectionEventClient() override { + Session->Shutdown(); + } + + void OnClientConnectionEvent(const TClientConnectionEvent& event) override { + if (Sync.Get() > 2) { + // Test OnClientConnectionEvent_Disconnect is broken. + // Sometimes reconnect happens during server shutdown + // when acceptor connections is still alive, and + // server connection is already closed + return; + } + + if (event.GetType() == TClientConnectionEvent::CONNECTED) { + Sync.WaitForAndIncrement(0); + } else if (event.GetType() == TClientConnectionEvent::DISCONNECTED) { + Sync.WaitForAndIncrement(2); + } + } + + void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override { + // We do not check for message errors in this test. + } + + void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override { + } + }; + + struct TOnConnectionEventServer: public TExampleServer { + TOnConnectionEventServer() + : TExampleServer("TOnConnectionEventServer") + { + } + + ~TOnConnectionEventServer() override { + Session->Shutdown(); + } + + void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override { + // We do not check for server message errors in this test. + } + }; + + Y_UNIT_TEST(OnClientConnectionEvent_Shutdown) { + TObjectCountCheck objectCountCheck; + + TOnConnectionEventServer server; + + TOnConnectionEventClient client; + + TNetAddr addr("127.0.0.1", server.Session->GetActualListenPort()); + + client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr); + + client.Sync.WaitForAndIncrement(1); + + client.Session->Shutdown(); + + client.Sync.WaitForAndIncrement(3); + } + + Y_UNIT_TEST(OnClientConnectionEvent_Disconnect) { + TObjectCountCheck objectCountCheck; + + THolder<TOnConnectionEventServer> server(new TOnConnectionEventServer); + + TOnConnectionEventClient client; + TNetAddr addr("127.0.0.1", server->Session->GetActualListenPort()); + + client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr); + + client.Sync.WaitForAndIncrement(1); + + server.Destroy(); + + client.Sync.WaitForAndIncrement(3); + } + + struct TServerForQuotaWake: public TExampleServer { + TSystemEvent GoOn; + TMutex OneLock; + + TOnMessageContext OneMessage; + + static TBusServerSessionConfig Config() { + TBusServerSessionConfig config; + + config.PerConnectionMaxInFlight = 1; + config.PerConnectionMaxInFlightBySize = 1500; + config.MaxMessageSize = 1024; + + return config; + } + + TServerForQuotaWake() + : TExampleServer("TServerForQuotaWake", Config()) + { + } + + ~TServerForQuotaWake() override { + Session->Shutdown(); + } + + void OnMessage(TOnMessageContext& req) override { + if (!GoOn.Wait(0)) { + TGuard<TMutex> guard(OneLock); + + UNIT_ASSERT(!OneMessage); + + OneMessage.Swap(req); + } else + TExampleServer::OnMessage(req); + } + + void WakeOne() { + TGuard<TMutex> guard(OneLock); + + UNIT_ASSERT(!!OneMessage); + + TExampleServer::OnMessage(OneMessage); + + TOnMessageContext().Swap(OneMessage); + } + }; + + Y_UNIT_TEST(WakeReaderOnQuota) { + const size_t test_msg_count = 64; + + TBusClientSessionConfig clientConfig; + + clientConfig.MaxInFlight = test_msg_count; + + TExampleClient client(clientConfig); + TServerForQuotaWake server; + TInstant start; + + client.MessageCount = test_msg_count; + + const NBus::TNetAddr addr = server.GetActualListenAddr(); + + for (unsigned count = 0;;) { + UNIT_ASSERT(count <= test_msg_count); + + TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); + EMessageStatus status = client.Session->SendMessageAutoPtr(message, &addr); + + if (status == MESSAGE_OK) { + count++; + + } else if (status == MESSAGE_BUSY) { + if (count == test_msg_count) { + TInstant now = TInstant::Now(); + + if (start.GetValue() == 0) { + start = now; + + // TODO: properly check that server is blocked + } else if (start + TDuration::MilliSeconds(100) < now) { + break; + } + } + + Sleep(TDuration::MilliSeconds(10)); + + } else + UNIT_ASSERT(false); + } + + server.GoOn.Signal(); + server.WakeOne(); + + client.WaitReplies(); + + server.WaitForOnMessageCount(test_msg_count); + }; + + Y_UNIT_TEST(TestConnectionAttempts) { + TObjectCountCheck objectCountCheck; + + TNetAddr noServerAddr("localhost", 17); + TBusClientSessionConfig clientConfig; + clientConfig.RetryInterval = 100; + TestNoServerImplClient client(clientConfig); + + int count = 0; + for (; count < 10; ++count) { + EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), + &noServerAddr); + + Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data()); + client.TestSync.WaitForAndIncrement(count * 2 + 1); + + // First connection attempt is for connect call; second one is to get connect result. + UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); + } + Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval)); + for (; count < 10; ++count) { + EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), + &noServerAddr); + + Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data()); + client.TestSync.WaitForAndIncrement(count * 2 + 1); + + // First connection attempt is for connect call; second one is to get connect result. + UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 4); + } + }; + + Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndNotReconnectWhenIdle) { + TObjectCountCheck objectCountCheck; + + TNetAddr noServerAddr("localhost", 17); + TBusClientSessionConfig clientConfig; + clientConfig.RetryInterval = 100; + clientConfig.ReconnectWhenIdle = false; + TestNoServerImplClient client(clientConfig); + + int count = 0; + for (; count < 10; ++count) { + EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), + &noServerAddr); + + Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data()); + client.TestSync.WaitForAndIncrement(count * 2 + 1); + + // First connection attempt is for connect call; second one is to get connect result. + UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); + } + + Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval / 2)); + UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); + Sleep(TDuration::MilliSeconds(10 * clientConfig.RetryInterval)); + UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); + }; + + Y_UNIT_TEST(TestConnectionAttemptsOnNoMessagesAndReconnectWhenIdle) { + TObjectCountCheck objectCountCheck; + + TNetAddr noServerAddr("localhost", 17); + TBusClientSessionConfig clientConfig; + clientConfig.ReconnectWhenIdle = true; + clientConfig.RetryInterval = 100; + TestNoServerImplClient client(clientConfig); + + int count = 0; + for (; count < 10; ++count) { + EMessageStatus status = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), + &noServerAddr); + + Y_VERIFY(status == MESSAGE_OK, "must be MESSAGE_OK, got %s", ToString(status).data()); + client.TestSync.WaitForAndIncrement(count * 2 + 1); + + // First connection attempt is for connect call; second one is to get connect result. + UNIT_ASSERT_VALUES_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); + } + + Sleep(TDuration::MilliSeconds(clientConfig.RetryInterval / 2)); + UNIT_ASSERT_EQUAL(client.Session->GetConnectSyscallsNumForTest(noServerAddr), 2); + Sleep(TDuration::MilliSeconds(10 * clientConfig.RetryInterval)); + // it is undeterministic how many reconnects will be during that amount of time + // but it should occur at least once + UNIT_ASSERT(client.Session->GetConnectSyscallsNumForTest(noServerAddr) > 2); + }; +}; diff --git a/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp new file mode 100644 index 0000000000..4083cf3b7b --- /dev/null +++ b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp @@ -0,0 +1,143 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <library/cpp/messagebus/test/helper/example.h> +#include <library/cpp/messagebus/test/helper/message_handler_error.h> + +#include <library/cpp/messagebus/misc/test_sync.h> +#include <library/cpp/messagebus/oldmodule/module.h> + +using namespace NBus; +using namespace NBus::NTest; + +Y_UNIT_TEST_SUITE(ModuleClientOneWay) { + struct TTestServer: public TBusServerHandlerError { + TExampleProtocol Proto; + + TTestSync* const TestSync; + + TBusMessageQueuePtr Queue; + TBusServerSessionPtr ServerSession; + + TTestServer(TTestSync* testSync) + : TestSync(testSync) + { + Queue = CreateMessageQueue(); + ServerSession = TBusServerSession::Create(&Proto, this, TBusServerSessionConfig(), Queue); + } + + void OnMessage(TOnMessageContext& context) override { + TestSync->WaitForAndIncrement(1); + context.ForgetRequest(); + } + }; + + struct TClientModule: public TBusModule { + TExampleProtocol Proto; + + TTestSync* const TestSync; + unsigned const Port; + + TBusClientSessionPtr ClientSession; + + TClientModule(TTestSync* testSync, unsigned port) + : TBusModule("m") + , TestSync(testSync) + , Port(port) + { + } + + TJobHandler Start(TBusJob* job, TBusMessage*) override { + TestSync->WaitForAndIncrement(0); + + job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", Port)); + + return &TClientModule::Sent; + } + + TJobHandler Sent(TBusJob* job, TBusMessage*) { + TestSync->WaitForAndIncrement(2); + job->Cancel(MESSAGE_DONT_ASK); + return nullptr; + } + + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { + ClientSession = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig()); + return nullptr; + } + }; + + Y_UNIT_TEST(Simple) { + TTestSync testSync; + + TTestServer server(&testSync); + + TBusMessageQueuePtr queue = CreateMessageQueue(); + TClientModule clientModule(&testSync, server.ServerSession->GetActualListenPort()); + + clientModule.CreatePrivateSessions(queue.Get()); + clientModule.StartInput(); + + clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount)); + + testSync.WaitForAndIncrement(3); + + clientModule.Shutdown(); + } + + struct TSendErrorModule: public TBusModule { + TExampleProtocol Proto; + + TTestSync* const TestSync; + + TBusClientSessionPtr ClientSession; + + TSendErrorModule(TTestSync* testSync) + : TBusModule("m") + , TestSync(testSync) + { + } + + TJobHandler Start(TBusJob* job, TBusMessage*) override { + TestSync->WaitForAndIncrement(0); + + job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", 1)); + + return &TSendErrorModule::Sent; + } + + TJobHandler Sent(TBusJob* job, TBusMessage*) { + TestSync->WaitForAndIncrement(1); + job->Cancel(MESSAGE_DONT_ASK); + return nullptr; + } + + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { + TBusServerSessionConfig sessionConfig; + sessionConfig.ConnectTimeout = 1; + sessionConfig.SendTimeout = 1; + sessionConfig.TotalTimeout = 1; + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); + ClientSession = CreateDefaultSource(queue, &Proto, sessionConfig); + return nullptr; + } + }; + + Y_UNIT_TEST(SendError) { + TTestSync testSync; + + TBusQueueConfig queueConfig; + queueConfig.NumWorkers = 5; + + TBusMessageQueuePtr queue = CreateMessageQueue(queueConfig); + TSendErrorModule clientModule(&testSync); + + clientModule.CreatePrivateSessions(queue.Get()); + clientModule.StartInput(); + + clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount)); + + testSync.WaitForAndIncrement(2); + + clientModule.Shutdown(); + } +} diff --git a/library/cpp/messagebus/test/ut/module_client_ut.cpp b/library/cpp/messagebus/test/ut/module_client_ut.cpp new file mode 100644 index 0000000000..ebfe185cc6 --- /dev/null +++ b/library/cpp/messagebus/test/ut/module_client_ut.cpp @@ -0,0 +1,368 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include "count_down_latch.h" +#include "moduletest.h" + +#include <library/cpp/messagebus/test/helper/example.h> +#include <library/cpp/messagebus/test/helper/example_module.h> +#include <library/cpp/messagebus/test/helper/object_count_check.h> +#include <library/cpp/messagebus/test/helper/wait_for.h> + +#include <library/cpp/messagebus/misc/test_sync.h> +#include <library/cpp/messagebus/oldmodule/module.h> + +#include <util/generic/cast.h> +#include <util/system/event.h> + +using namespace NBus; +using namespace NBus::NTest; + +// helper class that cleans TBusJob instance, so job's destructor can +// be completed without assertion fail. +struct TJobGuard { +public: + TJobGuard(NBus::TBusJob* job) + : Job(job) + { + } + + ~TJobGuard() { + Job->ClearAllMessageStates(); + } + +private: + NBus::TBusJob* Job; +}; + +class TMessageOk: public NBus::TBusMessage { +public: + TMessageOk() + : NBus::TBusMessage(1) + { + } +}; + +class TMessageError: public NBus::TBusMessage { +public: + TMessageError() + : NBus::TBusMessage(2) + { + } +}; + +Y_UNIT_TEST_SUITE(BusJobTest) { +#if 0 + Y_UNIT_TEST(TestPending) { + TObjectCountCheck objectCountCheck; + + TDupDetectModule module; + TBusJob job(&module, new TBusMessage(0)); + // Guard will clear the job if unit-assertion fails. + TJobGuard g(&job); + + NBus::TBusMessage* msg = new NBus::TBusMessage(1); + job.Send(msg, NULL); + NBus::TJobStateVec pending; + job.GetPending(&pending); + + UNIT_ASSERT_VALUES_EQUAL(pending.size(), 1u); + UNIT_ASSERT_EQUAL(msg, pending[0].Message); + } + + Y_UNIT_TEST(TestCallReplyHandler) { + TObjectCountCheck objectCountCheck; + + TDupDetectModule module; + NBus::TBusJob job(&module, new NBus::TBusMessage(0)); + // Guard will clear the job if unit-assertion fails. + TJobGuard g(&job); + + NBus::TBusMessage* msgOk = new TMessageOk; + NBus::TBusMessage* msgError = new TMessageError; + job.Send(msgOk, NULL); + job.Send(msgError, NULL); + + UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>(), NULL); + UNIT_ASSERT_EQUAL(job.GetState<TMessageError>(), NULL); + + NBus::TBusMessage* reply = new NBus::TBusMessage(0); + job.CallReplyHandler(NBus::MESSAGE_OK, msgOk, reply); + job.CallReplyHandler(NBus::MESSAGE_TIMEOUT, msgError, NULL); + + UNIT_ASSERT_UNEQUAL(job.GetState<TMessageOk>(), NULL); + UNIT_ASSERT_UNEQUAL(job.GetState<TMessageError>(), NULL); + + UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageError>(), NBus::MESSAGE_TIMEOUT); + UNIT_ASSERT_EQUAL(job.GetState<TMessageError>()->Status, NBus::MESSAGE_TIMEOUT); + + UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageOk>(), NBus::MESSAGE_OK); + UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>()->Reply, reply); + } +#endif + + struct TParallelOnReplyModule : TExampleClientModule { + TNetAddr ServerAddr; + + TCountDownLatch RepliesLatch; + + TParallelOnReplyModule(const TNetAddr& serverAddr) + : ServerAddr(serverAddr) + , RepliesLatch(2) + { + } + + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + Y_UNUSED(mess); + job->Send(new TExampleRequest(&Proto.RequestCount), Source, TReplyHandler(&TParallelOnReplyModule::ReplyHandler), 0, ServerAddr); + return &TParallelOnReplyModule::HandleReplies; + } + + void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { + Y_UNUSED(mess); + Y_UNUSED(reply); + Y_VERIFY(status == MESSAGE_OK, "failed to get reply: %s", ToCString(status)); + } + + TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) { + Y_UNUSED(mess); + RepliesLatch.CountDown(); + Y_VERIFY(RepliesLatch.Await(TDuration::Seconds(10)), "failed to get answers"); + job->Cancel(MESSAGE_UNKNOWN); + return nullptr; + } + }; + + Y_UNIT_TEST(TestReplyHandlerCalledInParallel) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TExampleProtocol proto; + + TBusQueueConfig config; + config.NumWorkers = 5; + + TParallelOnReplyModule module(server.GetActualListenAddr()); + module.StartModule(); + + module.StartJob(new TExampleRequest(&proto.StartCount)); + module.StartJob(new TExampleRequest(&proto.StartCount)); + + UNIT_ASSERT(module.RepliesLatch.Await(TDuration::Seconds(10))); + + module.Shutdown(); + } + + struct TErrorHandlerCheckerModule : TExampleModule { + TNetAddr ServerAddr; + + TBusClientSessionPtr Source; + + TCountDownLatch GotReplyLatch; + + TBusMessage* SentMessage; + + TErrorHandlerCheckerModule() + : ServerAddr("localhost", 17) + , GotReplyLatch(2) + , SentMessage() + { + } + + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + Y_UNUSED(mess); + TExampleRequest* message = new TExampleRequest(&Proto.RequestCount); + job->Send(message, Source, TReplyHandler(&TErrorHandlerCheckerModule::ReplyHandler), 0, ServerAddr); + SentMessage = message; + return &TErrorHandlerCheckerModule::HandleReplies; + } + + void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* req, TBusMessage* resp) { + Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "got wrong status: %s", ToString(status).data()); + Y_VERIFY(req == SentMessage, "checking request"); + Y_VERIFY(resp == nullptr, "checking response"); + GotReplyLatch.CountDown(); + } + + TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) { + Y_UNUSED(mess); + job->Cancel(MESSAGE_UNKNOWN); + GotReplyLatch.CountDown(); + return nullptr; + } + + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { + TBusClientSessionConfig sessionConfig; + sessionConfig.SendTimeout = 1; // TODO: allow 0 + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10); + Source = CreateDefaultSource(queue, &Proto, sessionConfig); + Source->RegisterService("localhost"); + return nullptr; + } + }; + + Y_UNIT_TEST(ErrorHandler) { + TExampleProtocol proto; + + TBusQueueConfig config; + config.NumWorkers = 5; + + TErrorHandlerCheckerModule module; + + TBusModuleConfig moduleConfig; + moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10); + module.SetConfig(moduleConfig); + + module.StartModule(); + + module.StartJob(new TExampleRequest(&proto.StartCount)); + + module.GotReplyLatch.Await(); + + module.Shutdown(); + } + + struct TSlowReplyServer: public TBusServerHandlerError { + TTestSync* const TestSync; + TBusMessageQueuePtr Bus; + TBusServerSessionPtr ServerSession; + TExampleProtocol Proto; + + TAtomic OnMessageCount; + + TSlowReplyServer(TTestSync* testSync) + : TestSync(testSync) + , OnMessageCount(0) + { + Bus = CreateMessageQueue("TSlowReplyServer"); + TBusServerSessionConfig sessionConfig; + ServerSession = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); + } + + void OnMessage(TOnMessageContext& req) override { + if (AtomicIncrement(OnMessageCount) == 1) { + TestSync->WaitForAndIncrement(0); + } + TAutoPtr<TBusMessage> response(new TExampleResponse(&Proto.ResponseCount)); + req.SendReplyMove(response); + } + }; + + struct TModuleThatSendsReplyEarly: public TExampleClientModule { + TTestSync* const TestSync; + const unsigned ServerPort; + + TBusServerSessionPtr ServerSession; + TAtomic ReplyCount; + + TModuleThatSendsReplyEarly(TTestSync* testSync, unsigned serverPort) + : TestSync(testSync) + , ServerPort(serverPort) + , ServerSession(nullptr) + , ReplyCount(0) + { + } + + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + Y_UNUSED(mess); + for (unsigned i = 0; i < 2; ++i) { + job->Send( + new TExampleRequest(&Proto.RequestCount), + Source, + TReplyHandler(&TModuleThatSendsReplyEarly::ReplyHandler), + 0, + TNetAddr("127.0.0.1", ServerPort)); + } + return &TModuleThatSendsReplyEarly::HandleReplies; + } + + void ReplyHandler(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { + Y_UNUSED(mess); + Y_UNUSED(reply); + Y_VERIFY(status == MESSAGE_OK, "failed to get reply"); + if (AtomicIncrement(ReplyCount) == 1) { + TestSync->WaitForAndIncrement(1); + job->SendReply(new TExampleResponse(&Proto.ResponseCount)); + } else { + TestSync->WaitForAndIncrement(3); + } + } + + TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) { + Y_UNUSED(mess); + job->Cancel(MESSAGE_UNKNOWN); + return nullptr; + } + + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { + TExampleClientModule::CreateExtSession(queue); + TBusServerSessionConfig sessionConfig; + return ServerSession = CreateDefaultDestination(queue, &Proto, sessionConfig); + } + }; + + Y_UNIT_TEST(SendReplyCalledBeforeAllRepliesReceived) { + TTestSync testSync; + + TSlowReplyServer slowReplyServer(&testSync); + + TModuleThatSendsReplyEarly module(&testSync, slowReplyServer.ServerSession->GetActualListenPort()); + module.StartModule(); + + TExampleClient client; + TNetAddr addr("127.0.0.1", module.ServerSession->GetActualListenPort()); + client.SendMessagesWaitReplies(1, &addr); + + testSync.WaitForAndIncrement(2); + + module.Shutdown(); + } + + struct TShutdownCalledBeforeReplyReceivedModule: public TExampleClientModule { + unsigned ServerPort; + + TTestSync TestSync; + + TShutdownCalledBeforeReplyReceivedModule(unsigned serverPort) + : ServerPort(serverPort) + { + } + + TJobHandler Start(TBusJob* job, TBusMessage*) override { + TestSync.CheckAndIncrement(0); + + job->Send(new TExampleRequest(&Proto.RequestCount), Source, + TReplyHandler(&TShutdownCalledBeforeReplyReceivedModule::HandleReply), + 0, TNetAddr("localhost", ServerPort)); + return &TShutdownCalledBeforeReplyReceivedModule::End; + } + + void HandleReply(TBusJob*, EMessageStatus status, TBusMessage*, TBusMessage*) { + Y_VERIFY(status == MESSAGE_SHUTDOWN, "got %s", ToCString(status)); + TestSync.CheckAndIncrement(1); + } + + TJobHandler End(TBusJob* job, TBusMessage*) { + TestSync.CheckAndIncrement(2); + job->Cancel(MESSAGE_SHUTDOWN); + return nullptr; + } + }; + + Y_UNIT_TEST(ShutdownCalledBeforeReplyReceived) { + TExampleServer server; + server.ForgetRequest = true; + + TShutdownCalledBeforeReplyReceivedModule module(server.GetActualListenPort()); + + module.StartModule(); + + module.StartJob(new TExampleRequest(&module.Proto.RequestCount)); + + server.TestSync.WaitFor(1); + + module.Shutdown(); + + module.TestSync.CheckAndIncrement(3); + } +} diff --git a/library/cpp/messagebus/test/ut/module_server_ut.cpp b/library/cpp/messagebus/test/ut/module_server_ut.cpp new file mode 100644 index 0000000000..88fe1dd9b6 --- /dev/null +++ b/library/cpp/messagebus/test/ut/module_server_ut.cpp @@ -0,0 +1,119 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include "count_down_latch.h" +#include "moduletest.h" + +#include <library/cpp/messagebus/test/helper/example.h> +#include <library/cpp/messagebus/test/helper/example_module.h> +#include <library/cpp/messagebus/test/helper/object_count_check.h> +#include <library/cpp/messagebus/test/helper/wait_for.h> + +#include <library/cpp/messagebus/oldmodule/module.h> + +#include <util/generic/cast.h> + +using namespace NBus; +using namespace NBus::NTest; + +Y_UNIT_TEST_SUITE(ModuleServerTests) { + Y_UNIT_TEST(TestModule) { + TObjectCountCheck objectCountCheck; + + /// create or get instance of message queue, need one per application + TBusMessageQueuePtr bus(CreateMessageQueue()); + THostInfoHandler hostHandler(bus.Get()); + TDupDetectModule module(hostHandler.GetActualListenAddr()); + bool success; + success = module.Init(bus.Get()); + UNIT_ASSERT_C(success, "failed to initialize dupdetect module"); + + success = module.StartInput(); + UNIT_ASSERT_C(success, "failed to start dupdetect module"); + + TDupDetectHandler dupHandler(module.ListenAddr, bus.Get()); + dupHandler.Work(); + + UNIT_WAIT_FOR(dupHandler.NumMessages == dupHandler.NumReplies); + + module.Shutdown(); + dupHandler.DupDetect->Shutdown(); + } + + struct TParallelOnMessageModule: public TExampleServerModule { + TCountDownLatch WaitTwoRequestsLatch; + + TParallelOnMessageModule() + : WaitTwoRequestsLatch(2) + { + } + + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + WaitTwoRequestsLatch.CountDown(); + Y_VERIFY(WaitTwoRequestsLatch.Await(TDuration::Seconds(5)), "oops"); + + VerifyDynamicCast<TExampleRequest*>(mess); + + job->SendReply(new TExampleResponse(&Proto.ResponseCount)); + return nullptr; + } + }; + + Y_UNIT_TEST(TestOnMessageHandlerCalledInParallel) { + TObjectCountCheck objectCountCheck; + + TBusQueueConfig config; + config.NumWorkers = 5; + + TParallelOnMessageModule module; + module.StartModule(); + + TExampleClient client; + + client.SendMessagesWaitReplies(2, module.ServerAddr); + + module.Shutdown(); + } + + struct TDelayReplyServer: public TExampleServerModule { + TSystemEvent MessageReceivedEvent; + TSystemEvent ClientDiedEvent; + + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + Y_UNUSED(mess); + + MessageReceivedEvent.Signal(); + + Y_VERIFY(ClientDiedEvent.WaitT(TDuration::Seconds(5)), "oops"); + + job->SendReply(new TExampleResponse(&Proto.ResponseCount)); + return nullptr; + } + }; + + Y_UNIT_TEST(TestReplyCalledAfterClientDisconnected) { + TObjectCountCheck objectCountCheck; + + TBusQueueConfig config; + config.NumWorkers = 5; + + TDelayReplyServer server; + server.StartModule(); + + THolder<TExampleClient> client(new TExampleClient); + + client->SendMessages(1, server.ServerAddr); + + UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); + + UNIT_ASSERT_VALUES_EQUAL(1, server.GetModuleSessionInFlight()); + + client.Destroy(); + + server.ClientDiedEvent.Signal(); + + // wait until all server message are delivered + UNIT_WAIT_FOR(0 == server.GetModuleSessionInFlight()); + + server.Shutdown(); + } +} diff --git a/library/cpp/messagebus/test/ut/moduletest.h b/library/cpp/messagebus/test/ut/moduletest.h new file mode 100644 index 0000000000..d5da72c0cb --- /dev/null +++ b/library/cpp/messagebus/test/ut/moduletest.h @@ -0,0 +1,221 @@ +#pragma once + +/////////////////////////////////////////////////////////////////// +/// \file +/// \brief Example of using local session for communication. + +#include <library/cpp/messagebus/test/helper/alloc_counter.h> +#include <library/cpp/messagebus/test/helper/example.h> +#include <library/cpp/messagebus/test/helper/message_handler_error.h> + +#include <library/cpp/messagebus/ybus.h> +#include <library/cpp/messagebus/oldmodule/module.h> + +namespace NBus { + namespace NTest { + using namespace std; + +#define TYPE_HOSTINFOREQUEST 100 +#define TYPE_HOSTINFORESPONSE 101 + + //////////////////////////////////////////////////////////////////// + /// \brief DupDetect protocol that common between client and server + //////////////////////////////////////////////////////////////////// + /// \brief HostInfo request class + class THostInfoMessage: public TBusMessage { + public: + THostInfoMessage() + : TBusMessage(TYPE_HOSTINFOREQUEST) + { + } + THostInfoMessage(ECreateUninitialized) + : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) + { + } + + ~THostInfoMessage() override { + } + }; + + //////////////////////////////////////////////////////////////////// + /// \brief HostInfo reply class + class THostInfoReply: public TBusMessage { + public: + THostInfoReply() + : TBusMessage(TYPE_HOSTINFORESPONSE) + { + } + THostInfoReply(ECreateUninitialized) + : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) + { + } + + ~THostInfoReply() override { + } + }; + + //////////////////////////////////////////////////////////////////// + /// \brief HostInfo protocol that common between client and server + class THostInfoProtocol: public TBusProtocol { + public: + THostInfoProtocol() + : TBusProtocol("HOSTINFO", 0) + { + } + /// serialized protocol specific data into TBusData + void Serialize(const TBusMessage* mess, TBuffer& data) override { + Y_UNUSED(data); + Y_UNUSED(mess); + } + + /// deserialized TBusData into new instance of the message + TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override { + Y_UNUSED(payload); + + if (messageType == TYPE_HOSTINFOREQUEST) { + return new THostInfoMessage(MESSAGE_CREATE_UNINITIALIZED); + } else if (messageType == TYPE_HOSTINFORESPONSE) { + return new THostInfoReply(MESSAGE_CREATE_UNINITIALIZED); + } else { + Y_FAIL("unknown"); + } + } + }; + + ////////////////////////////////////////////////////////////// + /// \brief HostInfo handler (should convert it to module too) + struct THostInfoHandler: public TBusServerHandlerError { + TBusServerSessionPtr Session; + TBusServerSessionConfig HostInfoConfig; + THostInfoProtocol HostInfoProto; + + THostInfoHandler(TBusMessageQueue* queue) { + Session = TBusServerSession::Create(&HostInfoProto, this, HostInfoConfig, queue); + } + + void OnMessage(TOnMessageContext& mess) override { + usleep(10 * 1000); /// pretend we are doing something + + TAutoPtr<THostInfoReply> reply(new THostInfoReply()); + + mess.SendReplyMove(reply); + } + + TNetAddr GetActualListenAddr() { + return TNetAddr("localhost", Session->GetActualListenPort()); + } + }; + + ////////////////////////////////////////////////////////////// + /// \brief DupDetect handler (should convert it to module too) + struct TDupDetectHandler: public TBusClientHandlerError { + TNetAddr ServerAddr; + + TBusClientSessionPtr DupDetect; + TBusClientSessionConfig DupDetectConfig; + TExampleProtocol DupDetectProto; + + int NumMessages; + int NumReplies; + + TDupDetectHandler(const TNetAddr& serverAddr, TBusMessageQueuePtr queue) + : ServerAddr(serverAddr) + { + DupDetect = TBusClientSession::Create(&DupDetectProto, this, DupDetectConfig, queue); + DupDetect->RegisterService("localhost"); + } + + void Work() { + NumMessages = 10; + NumReplies = 0; + + for (int i = 0; i < NumMessages; i++) { + TExampleRequest* mess = new TExampleRequest(&DupDetectProto.RequestCount); + DupDetect->SendMessage(mess, &ServerAddr); + } + } + + void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override { + Y_UNUSED(mess); + Y_UNUSED(reply); + NumReplies++; + } + }; + + ///////////////////////////////////////////////////////////////// + /// \brief DupDetect module + + struct TDupDetectModule: public TBusModule { + TNetAddr HostInfoAddr; + + TBusClientSessionPtr HostInfoClientSession; + TBusClientSessionConfig HostInfoConfig; + THostInfoProtocol HostInfoProto; + + TExampleProtocol DupDetectProto; + TBusServerSessionConfig DupDetectConfig; + + TNetAddr ListenAddr; + + TDupDetectModule(const TNetAddr& hostInfoAddr) + : TBusModule("DUPDETECTMODULE") + , HostInfoAddr(hostInfoAddr) + { + } + + bool Init(TBusMessageQueue* queue) { + HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig); + HostInfoClientSession->RegisterService("localhost"); + + return TBusModule::CreatePrivateSessions(queue); + } + + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { + TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig); + + ListenAddr = TNetAddr("localhost", session->GetActualListenPort()); + + return session; + } + + /// entry point into module, first function to call + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); + Y_UNUSED(dmess); + + THostInfoMessage* hmess = new THostInfoMessage(); + + /// send message to imaginary hostinfo server + job->Send(hmess, HostInfoClientSession, TReplyHandler(), 0, HostInfoAddr); + + return TJobHandler(&TDupDetectModule::ProcessHostInfo); + } + + /// next handler is executed when all outstanding requests from previous handler is completed + TJobHandler ProcessHostInfo(TBusJob* job, TBusMessage* mess) { + TExampleRequest* dmess = dynamic_cast<TExampleRequest*>(mess); + Y_UNUSED(dmess); + + THostInfoMessage* hmess = job->Get<THostInfoMessage>(); + THostInfoReply* hreply = job->Get<THostInfoReply>(); + EMessageStatus hstatus = job->GetStatus<THostInfoMessage>(); + Y_ASSERT(hmess != nullptr); + Y_ASSERT(hreply != nullptr); + Y_ASSERT(hstatus == MESSAGE_OK); + + return TJobHandler(&TDupDetectModule::Finish); + } + + /// last handler sends reply and returns NULL + TJobHandler Finish(TBusJob* job, TBusMessage* mess) { + Y_UNUSED(mess); + + TExampleResponse* reply = new TExampleResponse(&DupDetectProto.ResponseCount); + job->SendReply(reply); + + return nullptr; + } + }; + + } +} diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp new file mode 100644 index 0000000000..9c21227e2b --- /dev/null +++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp @@ -0,0 +1,255 @@ +/////////////////////////////////////////////////////////////////// +/// \file +/// \brief Example of reply-less communication + +/// This example demostrates how asynchronous message passing library +/// can be used to send message and do not wait for reply back. +/// The usage of reply-less communication should be restricted to +/// low-throughput clients and high-throughput server to provide reasonable +/// utility. Removing replies from the communication removes any restriction +/// on how many message can be send to server and rougue clients may overwelm +/// server without thoughtput control. + +/// 1) To implement reply-less client \n + +/// Call NBus::TBusSession::AckMessage() +/// from within NBus::IMessageHandler::OnSent() handler when message has +/// gone into wire on client end. See example in NBus::NullClient::OnMessageSent(). +/// Discard identity for reply message. + +/// 2) To implement reply-less server \n + +/// Call NBus::TBusSession::AckMessage() from within NBus::IMessageHandler::OnMessage() +/// handler when message has been received on server end. +/// See example in NBus::NullServer::OnMessage(). +/// Discard identity for reply message. + +#include <library/cpp/messagebus/test/helper/alloc_counter.h> +#include <library/cpp/messagebus/test/helper/example.h> +#include <library/cpp/messagebus/test/helper/hanging_server.h> +#include <library/cpp/messagebus/test/helper/message_handler_error.h> +#include <library/cpp/messagebus/test/helper/object_count_check.h> +#include <library/cpp/messagebus/test/helper/wait_for.h> + +#include <library/cpp/messagebus/ybus.h> + +using namespace std; +using namespace NBus; +using namespace NBus::NPrivate; +using namespace NBus::NTest; + +//////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////// +/// \brief Reply-less client and handler +struct NullClient : TBusClientHandlerError { + TNetAddr ServerAddr; + + TBusMessageQueuePtr Queue; + TBusClientSessionPtr Session; + TExampleProtocol Proto; + + /// constructor creates instances of protocol and session + NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig()) + : ServerAddr(serverAddr) + { + UNIT_ASSERT(serverAddr.GetPort() > 0); + + /// create or get instance of message queue, need one per application + Queue = CreateMessageQueue(); + + /// register source/client session + Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue); + + /// register service, announce to clients via LocatorService + Session->RegisterService("localhost"); + } + + ~NullClient() override { + Session->Shutdown(); + } + + /// dispatch of requests is done here + void Work() { + int batch = 10; + + for (int i = 0; i < batch; i++) { + TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount); + mess->Data = "TADA"; + Session->SendMessageOneWay(mess, &ServerAddr); + } + } + + void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override { + } +}; + +///////////////////////////////////////////////////////////////////// +/// \brief Reply-less server and handler +class NullServer: public TBusServerHandlerError { +public: + /// session object to maintian + TBusMessageQueuePtr Queue; + TBusServerSessionPtr Session; + TExampleProtocol Proto; + +public: + TAtomic NumMessages; + + NullServer() { + NumMessages = 0; + + /// create or get instance of single message queue, need one for application + Queue = CreateMessageQueue(); + + /// register destination session + TBusServerSessionConfig sessionConfig; + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue); + } + + ~NullServer() override { + Session->Shutdown(); + } + + /// when message comes do not send reply, just acknowledge + void OnMessage(TOnMessageContext& mess) override { + TExampleRequest* fmess = static_cast<TExampleRequest*>(mess.GetMessage()); + + Y_ASSERT(fmess->Data == "TADA"); + + /// tell session to forget this message and never expect any reply + mess.ForgetRequest(); + + AtomicIncrement(NumMessages); + } + + /// this handler should not be called because this server does not send replies + void OnSent(TAutoPtr<TBusMessage> mess) override { + Y_UNUSED(mess); + Y_FAIL("This server does not sent replies"); + } +}; + +Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { + Y_UNIT_TEST(Simple) { + TObjectCountCheck objectCountCheck; + + NullServer server; + NullClient client(TNetAddr("localhost", server.Session->GetActualListenPort())); + + client.Work(); + + // wait until all client message are delivered + UNIT_WAIT_FOR(AtomicGet(server.NumMessages) == 10); + + // assert correct number of messages + UNIT_ASSERT_VALUES_EQUAL(AtomicGet(server.NumMessages), 10); + UNIT_ASSERT_VALUES_EQUAL(server.Session->GetInFlight(), 0); + UNIT_ASSERT_VALUES_EQUAL(client.Session->GetInFlight(), 0); + } + + struct TMessageTooLargeClient: public NullClient { + TSystemEvent GotTooLarge; + + TBusClientSessionConfig Config() { + TBusClientSessionConfig r; + r.MaxMessageSize = 1; + return r; + } + + TMessageTooLargeClient(unsigned port) + : NullClient(TNetAddr("localhost", port), Config()) + { + } + + ~TMessageTooLargeClient() override { + Session->Shutdown(); + } + + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { + Y_UNUSED(mess); + + Y_VERIFY(status == MESSAGE_MESSAGE_TOO_LARGE, "wrong status: %s", ToCString(status)); + + GotTooLarge.Signal(); + } + }; + + Y_UNIT_TEST(MessageTooLargeOnClient) { + TObjectCountCheck objectCountCheck; + + NullServer server; + + TMessageTooLargeClient client(server.Session->GetActualListenPort()); + + EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); + + client.GotTooLarge.WaitI(); + } + + struct TCheckTimeoutClient: public NullClient { + ~TCheckTimeoutClient() override { + Session->Shutdown(); + } + + static TBusClientSessionConfig SessionConfig() { + TBusClientSessionConfig sessionConfig; + sessionConfig.SendTimeout = 1; + sessionConfig.ConnectTimeout = 1; + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10); + return sessionConfig; + } + + TCheckTimeoutClient(const TNetAddr& serverAddr) + : NullClient(serverAddr, SessionConfig()) + { + } + + TSystemEvent GotError; + + /// message that could not be delivered + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { + Y_UNUSED(mess); + Y_UNUSED(status); // TODO: check status + + GotError.Signal(); + } + }; + + Y_UNIT_TEST(SendTimeout_Callback_NoServer) { + TObjectCountCheck objectCountCheck; + + TCheckTimeoutClient client(TNetAddr("localhost", 17)); + + EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); + UNIT_ASSERT_EQUAL(ok, MESSAGE_OK); + + client.GotError.WaitI(); + } + + Y_UNIT_TEST(SendTimeout_Callback_HangingServer) { + THangingServer server; + + TObjectCountCheck objectCountCheck; + + TCheckTimeoutClient client(TNetAddr("localhost", server.GetPort())); + + bool first = true; + for (;;) { + EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); + if (ok == MESSAGE_BUSY) { + UNIT_ASSERT(!first); + break; + } + UNIT_ASSERT_VALUES_EQUAL(ok, MESSAGE_OK); + first = false; + } + + // BUGBUG: The test is buggy: the client might not get any error when sending one-way messages. + // All the messages that the client has sent before he gets first MESSAGE_BUSY error might get + // serailized and written to the socket buffer, so the write queue gets drained and there are + // no messages to timeout when periodic timeout check happens. + + client.GotError.WaitI(); + } +} diff --git a/library/cpp/messagebus/test/ut/starter_ut.cpp b/library/cpp/messagebus/test/ut/starter_ut.cpp new file mode 100644 index 0000000000..dd4d3aaa5e --- /dev/null +++ b/library/cpp/messagebus/test/ut/starter_ut.cpp @@ -0,0 +1,140 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <library/cpp/messagebus/test/helper/example_module.h> +#include <library/cpp/messagebus/test/helper/object_count_check.h> +#include <library/cpp/messagebus/test/helper/wait_for.h> + +using namespace NBus; +using namespace NBus::NTest; + +Y_UNIT_TEST_SUITE(TBusStarterTest) { + struct TStartJobTestModule: public TExampleModule { + using TBusModule::CreateDefaultStarter; + + TAtomic StartCount; + + TStartJobTestModule() + : StartCount(0) + { + } + + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + Y_UNUSED(mess); + AtomicIncrement(StartCount); + job->Sleep(10); + return &TStartJobTestModule::End; + } + + TJobHandler End(TBusJob* job, TBusMessage* mess) { + Y_UNUSED(mess); + AtomicIncrement(StartCount); + job->Cancel(MESSAGE_UNKNOWN); + return nullptr; + } + }; + + Y_UNIT_TEST(Test) { + TObjectCountCheck objectCountCheck; + + TBusMessageQueuePtr bus(CreateMessageQueue()); + + TStartJobTestModule module; + + //module.StartModule(); + module.CreatePrivateSessions(bus.Get()); + module.StartInput(); + + TBusSessionConfig config; + config.SendTimeout = 10; + + module.CreateDefaultStarter(*bus, config); + + UNIT_WAIT_FOR(AtomicGet(module.StartCount) >= 3); + + module.Shutdown(); + bus->Stop(); + } + + Y_UNIT_TEST(TestModuleStartJob) { + TObjectCountCheck objectCountCheck; + + TExampleProtocol proto; + + TStartJobTestModule module; + + TBusModuleConfig moduleConfig; + moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10); + module.SetConfig(moduleConfig); + + module.StartModule(); + + module.StartJob(new TExampleRequest(&proto.RequestCount)); + + UNIT_WAIT_FOR(AtomicGet(module.StartCount) != 2); + + module.Shutdown(); + } + + struct TSleepModule: public TExampleServerModule { + TSystemEvent MessageReceivedEvent; + + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + Y_UNUSED(mess); + + MessageReceivedEvent.Signal(); + + job->Sleep(1000000000); + + return TJobHandler(&TSleepModule::Never); + } + + TJobHandler Never(TBusJob*, TBusMessage*) { + Y_FAIL("happens"); + throw 1; + } + }; + + Y_UNIT_TEST(StartJobDestroyDuringSleep) { + TObjectCountCheck objectCountCheck; + + TExampleProtocol proto; + + TSleepModule module; + + module.StartModule(); + + module.StartJob(new TExampleRequest(&proto.StartCount)); + + module.MessageReceivedEvent.WaitI(); + + module.Shutdown(); + } + + struct TSendReplyModule: public TExampleServerModule { + TSystemEvent MessageReceivedEvent; + + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { + Y_UNUSED(mess); + + job->SendReply(new TExampleResponse(&Proto.ResponseCount)); + + MessageReceivedEvent.Signal(); + + return nullptr; + } + }; + + Y_UNIT_TEST(AllowSendReplyInStarted) { + TObjectCountCheck objectCountCheck; + + TExampleProtocol proto; + + TSendReplyModule module; + module.StartModule(); + module.StartJob(new TExampleRequest(&proto.StartCount)); + + module.MessageReceivedEvent.WaitI(); + + module.Shutdown(); + } +} diff --git a/library/cpp/messagebus/test/ut/sync_client_ut.cpp b/library/cpp/messagebus/test/ut/sync_client_ut.cpp new file mode 100644 index 0000000000..400128193f --- /dev/null +++ b/library/cpp/messagebus/test/ut/sync_client_ut.cpp @@ -0,0 +1,69 @@ +#include <library/cpp/messagebus/test/helper/example.h> +#include <library/cpp/messagebus/test/helper/object_count_check.h> + +namespace NBus { + namespace NTest { + using namespace std; + + //////////////////////////////////////////////////////////////////// + /// \brief Client for sending synchronous message to local server + struct TSyncClient { + TNetAddr ServerAddr; + + TExampleProtocol Proto; + TBusMessageQueuePtr Bus; + TBusSyncClientSessionPtr Session; + + int NumReplies; + int NumMessages; + + /// constructor creates instances of queue, protocol and session + TSyncClient(const TNetAddr& serverAddr) + : ServerAddr(serverAddr) + { + /// create or get instance of message queue, need one per application + Bus = CreateMessageQueue(); + + NumReplies = 0; + NumMessages = 10; + + /// register source/client session + TBusClientSessionConfig sessionConfig; + Session = Bus->CreateSyncSource(&Proto, sessionConfig); + Session->RegisterService("localhost"); + } + + ~TSyncClient() { + Session->Shutdown(); + } + + /// dispatch of requests is done here + void Work() { + for (int i = 0; i < NumMessages; i++) { + THolder<TExampleRequest> mess(new TExampleRequest(&Proto.RequestCount)); + EMessageStatus status; + THolder<TBusMessage> reply(Session->SendSyncMessage(mess.Get(), status, &ServerAddr)); + if (!!reply) { + NumReplies++; + } + } + } + }; + + Y_UNIT_TEST_SUITE(SyncClientTest) { + Y_UNIT_TEST(TestSync) { + TObjectCountCheck objectCountCheck; + + TExampleServer server; + TSyncClient client(server.GetActualListenAddr()); + client.Work(); + // assert correct number of replies + UNIT_ASSERT_EQUAL(client.NumReplies, client.NumMessages); + // assert that there is no message left in flight + UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0); + UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0); + } + } + + } +} diff --git a/library/cpp/messagebus/test/ut/ya.make b/library/cpp/messagebus/test/ut/ya.make new file mode 100644 index 0000000000..fe1b4961d6 --- /dev/null +++ b/library/cpp/messagebus/test/ut/ya.make @@ -0,0 +1,56 @@ +OWNER(g:messagebus) + +UNITTEST_FOR(library/cpp/messagebus) + +TIMEOUT(1200) + +SIZE(LARGE) + +TAG( + ya:not_autocheck + ya:fat +) + +FORK_SUBTESTS() + +PEERDIR( + library/cpp/testing/unittest_main + library/cpp/messagebus + library/cpp/messagebus/test/helper + library/cpp/messagebus/www +) + +SRCS( + messagebus_ut.cpp + module_client_ut.cpp + module_client_one_way_ut.cpp + module_server_ut.cpp + one_way_ut.cpp + starter_ut.cpp + sync_client_ut.cpp + locator_uniq_ut.cpp + ../../actor/actor_ut.cpp + ../../actor/ring_buffer_ut.cpp + ../../actor/tasks_ut.cpp + ../../actor/what_thread_does_guard_ut.cpp + ../../async_result_ut.cpp + ../../cc_semaphore_ut.cpp + ../../coreconn_ut.cpp + ../../duration_histogram_ut.cpp + ../../message_status_counter_ut.cpp + ../../misc/weak_ptr_ut.cpp + ../../latch_ut.cpp + ../../lfqueue_batch_ut.cpp + ../../local_flags_ut.cpp + ../../memory_ut.cpp + ../../moved_ut.cpp + ../../netaddr_ut.cpp + ../../network_ut.cpp + ../../nondestroying_holder_ut.cpp + ../../scheduler_actor_ut.cpp + ../../scheduler/scheduler_ut.cpp + ../../socket_addr_ut.cpp + ../../vector_swaps_ut.cpp +) + +END() diff --git a/library/cpp/messagebus/test/ya.make b/library/cpp/messagebus/test/ya.make new file mode 100644 index 0000000000..0dc4bd4720 --- /dev/null +++ b/library/cpp/messagebus/test/ya.make @@ -0,0 +1,7 @@ +OWNER(g:messagebus) + +RECURSE( + example + perftest + ut +) |