diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | c2a1af049e9deca890e9923abe64fe6c59060348 (patch) | |
tree | b222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/messagebus/test | |
parent | 1f553f46fb4f3c5eec631352cdd900a0709016af (diff) | |
download | ydb-c2a1af049e9deca890e9923abe64fe6c59060348.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/test')
41 files changed, 2648 insertions, 2648 deletions
diff --git a/library/cpp/messagebus/test/TestMessageBus.py b/library/cpp/messagebus/test/TestMessageBus.py index 4173c9866e..0bbaa0a313 100644 --- a/library/cpp/messagebus/test/TestMessageBus.py +++ b/library/cpp/messagebus/test/TestMessageBus.py @@ -3,6 +3,6 @@ from devtools.fleur.ytest.integration import UnitTestGroup @group @constraint('library.messagebus') -class TestMessageBus(UnitTestGroup): +class TestMessageBus(UnitTestGroup): def __init__(self, context): UnitTestGroup.__init__(self, context, 'MessageBus', 'library-messagebus-test-ut') diff --git a/library/cpp/messagebus/test/example/client/client.cpp b/library/cpp/messagebus/test/example/client/client.cpp index 3bd9a6f768..89b5f2c9be 100644 --- a/library/cpp/messagebus/test/example/client/client.cpp +++ b/library/cpp/messagebus/test/example/client/client.cpp @@ -1,81 +1,81 @@ #include <library/cpp/messagebus/test/example/common/proto.h> -#include <util/random/random.h> - -using namespace NBus; -using namespace NCalculator; - -namespace NCalculator { +#include <util/random/random.h> + +using namespace NBus; +using namespace NCalculator; + +namespace NCalculator { struct TCalculatorClient: public IBusClientHandler { - TCalculatorProtocol Proto; - TBusMessageQueuePtr MessageQueue; - TBusClientSessionPtr ClientSession; - - TCalculatorClient() { - MessageQueue = CreateMessageQueue(); - TBusClientSessionConfig config; - config.TotalTimeout = 2 * 1000; - ClientSession = TBusClientSession::Create(&Proto, this, config, MessageQueue); - } - + TCalculatorProtocol Proto; + TBusMessageQueuePtr MessageQueue; + TBusClientSessionPtr ClientSession; + + TCalculatorClient() { + MessageQueue = CreateMessageQueue(); + TBusClientSessionConfig config; + config.TotalTimeout = 2 * 1000; + ClientSession = TBusClientSession::Create(&Proto, this, config, MessageQueue); + } + ~TCalculatorClient() override { - MessageQueue->Stop(); - } - + MessageQueue->Stop(); + } + void OnReply(TAutoPtr<TBusMessage> request, TAutoPtr<TBusMessage> response0) override { Y_VERIFY(response0->GetHeader()->Type == TResponse::MessageType, "wrong response"); - TResponse* response = VerifyDynamicCast<TResponse*>(response0.Get()); - if (request->GetHeader()->Type == TRequestSum::MessageType) { - TRequestSum* requestSum = VerifyDynamicCast<TRequestSum*>(request.Get()); - int a = requestSum->Record.GetA(); - int b = requestSum->Record.GetB(); - Cerr << a << " + " << b << " = " << response->Record.GetResult() << "\n"; - } else if (request->GetHeader()->Type == TRequestMul::MessageType) { - TRequestMul* requestMul = VerifyDynamicCast<TRequestMul*>(request.Get()); - int a = requestMul->Record.GetA(); - int b = requestMul->Record.GetB(); - Cerr << a << " * " << b << " = " << response->Record.GetResult() << "\n"; - } else { + TResponse* response = VerifyDynamicCast<TResponse*>(response0.Get()); + if (request->GetHeader()->Type == TRequestSum::MessageType) { + TRequestSum* requestSum = VerifyDynamicCast<TRequestSum*>(request.Get()); + int a = requestSum->Record.GetA(); + int b = requestSum->Record.GetB(); + Cerr << a << " + " << b << " = " << response->Record.GetResult() << "\n"; + } else if (request->GetHeader()->Type == TRequestMul::MessageType) { + TRequestMul* requestMul = VerifyDynamicCast<TRequestMul*>(request.Get()); + int a = requestMul->Record.GetA(); + int b = requestMul->Record.GetB(); + Cerr << a << " * " << b << " = " << response->Record.GetResult() << "\n"; + } else { Y_FAIL("unknown request"); - } - } - + } + } + void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override { - Cerr << "got error " << status << "\n"; - } - }; - -} - -int main(int, char**) { - TCalculatorClient client; - - for (;;) { - TNetAddr addr(TNetAddr("127.0.0.1", TCalculatorProtocol().GetPort())); - - int a = RandomNumber<unsigned>(10); - int b = RandomNumber<unsigned>(10); - EMessageStatus ok; - if (RandomNumber<bool>()) { - TAutoPtr<TRequestSum> request(new TRequestSum); - request->Record.SetA(a); - request->Record.SetB(b); - Cerr << "sending " << a << " + " << b << "\n"; - ok = client.ClientSession->SendMessageAutoPtr(request, &addr); - } else { - TAutoPtr<TRequestMul> request(new TRequestMul); - request->Record.SetA(a); - request->Record.SetB(b); - Cerr << "sending " << a << " * " << b << "\n"; - ok = client.ClientSession->SendMessageAutoPtr(request, &addr); - } - - if (ok != MESSAGE_OK) { - Cerr << "failed to send message " << ok << "\n"; - } - - Sleep(TDuration::Seconds(1)); - } - - return 0; -} + Cerr << "got error " << status << "\n"; + } + }; + +} + +int main(int, char**) { + TCalculatorClient client; + + for (;;) { + TNetAddr addr(TNetAddr("127.0.0.1", TCalculatorProtocol().GetPort())); + + int a = RandomNumber<unsigned>(10); + int b = RandomNumber<unsigned>(10); + EMessageStatus ok; + if (RandomNumber<bool>()) { + TAutoPtr<TRequestSum> request(new TRequestSum); + request->Record.SetA(a); + request->Record.SetB(b); + Cerr << "sending " << a << " + " << b << "\n"; + ok = client.ClientSession->SendMessageAutoPtr(request, &addr); + } else { + TAutoPtr<TRequestMul> request(new TRequestMul); + request->Record.SetA(a); + request->Record.SetB(b); + Cerr << "sending " << a << " * " << b << "\n"; + ok = client.ClientSession->SendMessageAutoPtr(request, &addr); + } + + if (ok != MESSAGE_OK) { + Cerr << "failed to send message " << ok << "\n"; + } + + Sleep(TDuration::Seconds(1)); + } + + return 0; +} diff --git a/library/cpp/messagebus/test/example/client/ya.make b/library/cpp/messagebus/test/example/client/ya.make index 81713a5318..a660a01698 100644 --- a/library/cpp/messagebus/test/example/client/ya.make +++ b/library/cpp/messagebus/test/example/client/ya.make @@ -1,13 +1,13 @@ -PROGRAM(messagebus_example_client) - +PROGRAM(messagebus_example_client) + OWNER(g:messagebus) - -PEERDIR( + +PEERDIR( library/cpp/messagebus/test/example/common -) - -SRCS( - client.cpp -) - -END() +) + +SRCS( + client.cpp +) + +END() diff --git a/library/cpp/messagebus/test/example/common/messages.proto b/library/cpp/messagebus/test/example/common/messages.proto index 12cdf38fb5..16b858fc77 100644 --- a/library/cpp/messagebus/test/example/common/messages.proto +++ b/library/cpp/messagebus/test/example/common/messages.proto @@ -1,15 +1,15 @@ -package NCalculator; - -message TRequestSumRecord { - required int32 A = 1; - required int32 B = 2; -} - -message TRequestMulRecord { - required int32 A = 1; - required int32 B = 2; -} - -message TResponseRecord { - required int32 Result = 1; -} +package NCalculator; + +message TRequestSumRecord { + required int32 A = 1; + required int32 B = 2; +} + +message TRequestMulRecord { + required int32 A = 1; + required int32 B = 2; +} + +message TResponseRecord { + required int32 Result = 1; +} diff --git a/library/cpp/messagebus/test/example/common/proto.cpp b/library/cpp/messagebus/test/example/common/proto.cpp index 3531e3d06c..1d18aa77ea 100644 --- a/library/cpp/messagebus/test/example/common/proto.cpp +++ b/library/cpp/messagebus/test/example/common/proto.cpp @@ -1,12 +1,12 @@ -#include "proto.h" - -using namespace NCalculator; -using namespace NBus; - -TCalculatorProtocol::TCalculatorProtocol() - : TBusBufferProtocol("Calculator", 34567) -{ - RegisterType(new TRequestSum); - RegisterType(new TRequestMul); - RegisterType(new TResponse); -} +#include "proto.h" + +using namespace NCalculator; +using namespace NBus; + +TCalculatorProtocol::TCalculatorProtocol() + : TBusBufferProtocol("Calculator", 34567) +{ + RegisterType(new TRequestSum); + RegisterType(new TRequestMul); + RegisterType(new TResponse); +} diff --git a/library/cpp/messagebus/test/example/common/proto.h b/library/cpp/messagebus/test/example/common/proto.h index f9fbd5ce56..a151aac468 100644 --- a/library/cpp/messagebus/test/example/common/proto.h +++ b/library/cpp/messagebus/test/example/common/proto.h @@ -1,17 +1,17 @@ -#pragma once - +#pragma once + #include <library/cpp/messagebus/test/example/common/messages.pb.h> - + #include <library/cpp/messagebus/ybus.h> #include <library/cpp/messagebus/protobuf/ybusbuf.h> -namespace NCalculator { - typedef ::NBus::TBusBufferMessage<TRequestSumRecord, 1> TRequestSum; - typedef ::NBus::TBusBufferMessage<TRequestMulRecord, 2> TRequestMul; - typedef ::NBus::TBusBufferMessage<TResponseRecord, 3> TResponse; - +namespace NCalculator { + typedef ::NBus::TBusBufferMessage<TRequestSumRecord, 1> TRequestSum; + typedef ::NBus::TBusBufferMessage<TRequestMulRecord, 2> TRequestMul; + typedef ::NBus::TBusBufferMessage<TResponseRecord, 3> TResponse; + struct TCalculatorProtocol: public ::NBus::TBusBufferProtocol { - TCalculatorProtocol(); - }; - -} + TCalculatorProtocol(); + }; + +} diff --git a/library/cpp/messagebus/test/example/common/ya.make b/library/cpp/messagebus/test/example/common/ya.make index 14f48ff6c0..4da16608fc 100644 --- a/library/cpp/messagebus/test/example/common/ya.make +++ b/library/cpp/messagebus/test/example/common/ya.make @@ -1,15 +1,15 @@ -LIBRARY(messagebus_test_example_common) - +LIBRARY(messagebus_test_example_common) + OWNER(g:messagebus) - -PEERDIR( + +PEERDIR( library/cpp/messagebus library/cpp/messagebus/protobuf -) - -SRCS( - proto.cpp - messages.proto -) - -END() +) + +SRCS( + proto.cpp + messages.proto +) + +END() diff --git a/library/cpp/messagebus/test/example/server/server.cpp b/library/cpp/messagebus/test/example/server/server.cpp index a080f3548b..13e52d75f5 100644 --- a/library/cpp/messagebus/test/example/server/server.cpp +++ b/library/cpp/messagebus/test/example/server/server.cpp @@ -1,58 +1,58 @@ #include <library/cpp/messagebus/test/example/common/proto.h> - -using namespace NBus; -using namespace NCalculator; - -namespace NCalculator { + +using namespace NBus; +using namespace NCalculator; + +namespace NCalculator { struct TCalculatorServer: public IBusServerHandler { - TCalculatorProtocol Proto; - TBusMessageQueuePtr MessageQueue; - TBusServerSessionPtr ServerSession; - - TCalculatorServer() { - MessageQueue = CreateMessageQueue(); - TBusServerSessionConfig config; - ServerSession = TBusServerSession::Create(&Proto, this, config, MessageQueue); - } - + TCalculatorProtocol Proto; + TBusMessageQueuePtr MessageQueue; + TBusServerSessionPtr ServerSession; + + TCalculatorServer() { + MessageQueue = CreateMessageQueue(); + TBusServerSessionConfig config; + ServerSession = TBusServerSession::Create(&Proto, this, config, MessageQueue); + } + ~TCalculatorServer() override { - MessageQueue->Stop(); - } - + MessageQueue->Stop(); + } + void OnMessage(TOnMessageContext& request) override { - if (request.GetMessage()->GetHeader()->Type == TRequestSum::MessageType) { - TRequestSum* requestSum = VerifyDynamicCast<TRequestSum*>(request.GetMessage()); - int a = requestSum->Record.GetA(); - int b = requestSum->Record.GetB(); - int result = a + b; - Cerr << "requested " << a << " + " << b << ", sending " << result << "\n"; - TAutoPtr<TResponse> response(new TResponse); - response->Record.SetResult(result); - request.SendReplyMove(response); - } else if (request.GetMessage()->GetHeader()->Type == TRequestMul::MessageType) { - TRequestMul* requestMul = VerifyDynamicCast<TRequestMul*>(request.GetMessage()); - int a = requestMul->Record.GetA(); - int b = requestMul->Record.GetB(); - int result = a * b; - Cerr << "requested " << a << " * " << b << ", sending " << result << "\n"; - TAutoPtr<TResponse> response(new TResponse); - response->Record.SetResult(result); - request.SendReplyMove(response); - } else { + if (request.GetMessage()->GetHeader()->Type == TRequestSum::MessageType) { + TRequestSum* requestSum = VerifyDynamicCast<TRequestSum*>(request.GetMessage()); + int a = requestSum->Record.GetA(); + int b = requestSum->Record.GetB(); + int result = a + b; + Cerr << "requested " << a << " + " << b << ", sending " << result << "\n"; + TAutoPtr<TResponse> response(new TResponse); + response->Record.SetResult(result); + request.SendReplyMove(response); + } else if (request.GetMessage()->GetHeader()->Type == TRequestMul::MessageType) { + TRequestMul* requestMul = VerifyDynamicCast<TRequestMul*>(request.GetMessage()); + int a = requestMul->Record.GetA(); + int b = requestMul->Record.GetB(); + int result = a * b; + Cerr << "requested " << a << " * " << b << ", sending " << result << "\n"; + TAutoPtr<TResponse> response(new TResponse); + response->Record.SetResult(result); + request.SendReplyMove(response); + } else { Y_FAIL("unknown request"); - } - } - }; + } + } + }; +} + +int main(int, char**) { + TCalculatorServer server; + + Cerr << "listening on port " << server.ServerSession->GetActualListenPort() << "\n"; + + for (;;) { + Sleep(TDuration::Seconds(1)); + } + + return 0; } - -int main(int, char**) { - TCalculatorServer server; - - Cerr << "listening on port " << server.ServerSession->GetActualListenPort() << "\n"; - - for (;;) { - Sleep(TDuration::Seconds(1)); - } - - return 0; -} diff --git a/library/cpp/messagebus/test/example/server/ya.make b/library/cpp/messagebus/test/example/server/ya.make index 3bf4c31853..8cdd97cb12 100644 --- a/library/cpp/messagebus/test/example/server/ya.make +++ b/library/cpp/messagebus/test/example/server/ya.make @@ -1,13 +1,13 @@ -PROGRAM(messagebus_example_server) - +PROGRAM(messagebus_example_server) + OWNER(g:messagebus) - -PEERDIR( + +PEERDIR( library/cpp/messagebus/test/example/common -) - -SRCS( - server.cpp -) - -END() +) + +SRCS( + server.cpp +) + +END() diff --git a/library/cpp/messagebus/test/example/ya.make b/library/cpp/messagebus/test/example/ya.make index 972458d255..f275351c29 100644 --- a/library/cpp/messagebus/test/example/ya.make +++ b/library/cpp/messagebus/test/example/ya.make @@ -1,7 +1,7 @@ OWNER(g:messagebus) - -RECURSE( + +RECURSE( client common server -) +) diff --git a/library/cpp/messagebus/test/helper/alloc_counter.h b/library/cpp/messagebus/test/helper/alloc_counter.h index 88db651e69..ec9041cb15 100644 --- a/library/cpp/messagebus/test/helper/alloc_counter.h +++ b/library/cpp/messagebus/test/helper/alloc_counter.h @@ -1,21 +1,21 @@ -#pragma once - -#include <util/generic/noncopyable.h> -#include <util/system/atomic.h> -#include <util/system/yassert.h> - +#pragma once + +#include <util/generic/noncopyable.h> +#include <util/system/atomic.h> +#include <util/system/yassert.h> + class TAllocCounter : TNonCopyable { -private: - TAtomic* CountPtr; +private: + TAtomic* CountPtr; -public: +public: TAllocCounter(TAtomic* countPtr) : CountPtr(countPtr) { - AtomicIncrement(*CountPtr); - } - - ~TAllocCounter() { + AtomicIncrement(*CountPtr); + } + + ~TAllocCounter() { Y_VERIFY(AtomicDecrement(*CountPtr) >= 0, "released too many"); - } -}; + } +}; diff --git a/library/cpp/messagebus/test/helper/example.cpp b/library/cpp/messagebus/test/helper/example.cpp index a1913b58c1..7c6d704042 100644 --- a/library/cpp/messagebus/test/helper/example.cpp +++ b/library/cpp/messagebus/test/helper/example.cpp @@ -1,281 +1,281 @@ #include <library/cpp/testing/unittest/registar.h> - + #include "example.h" -#include <util/generic/cast.h> - -using namespace NBus; -using namespace NBus::NTest; - +#include <util/generic/cast.h> + +using namespace NBus; +using namespace NBus::NTest; + static void FillWithJunk(TArrayRef<char> data) { TStringBuf junk = "01234567890123456789012345678901234567890123456789012345678901234567890123456789" "01234567890123456789012345678901234567890123456789012345678901234567890123456789" "01234567890123456789012345678901234567890123456789012345678901234567890123456789" "01234567890123456789012345678901234567890123456789012345678901234567890123456789"; - + for (size_t i = 0; i < data.size(); i += junk.size()) { memcpy(data.data() + i, junk.data(), Min(junk.size(), data.size() - i)); - } -} - + } +} + static TString JunkString(size_t len) { - TTempBuf temp(len); + TTempBuf temp(len); TArrayRef<char> tempArrayRef(temp.Data(), len); - FillWithJunk(tempArrayRef); - + FillWithJunk(tempArrayRef); + return TString(tempArrayRef.data(), tempArrayRef.size()); -} - -TExampleRequest::TExampleRequest(TAtomic* counterPtr, size_t payloadSize) - : TBusMessage(77) - , AllocCounter(counterPtr) - , Data(JunkString(payloadSize)) -{ -} - -TExampleRequest::TExampleRequest(ECreateUninitialized, TAtomic* counterPtr) - : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) - , AllocCounter(counterPtr) +} + +TExampleRequest::TExampleRequest(TAtomic* counterPtr, size_t payloadSize) + : TBusMessage(77) + , AllocCounter(counterPtr) + , Data(JunkString(payloadSize)) { } - -TExampleResponse::TExampleResponse(TAtomic* counterPtr, size_t payloadSize) - : TBusMessage(79) - , AllocCounter(counterPtr) - , Data(JunkString(payloadSize)) -{ -} - -TExampleResponse::TExampleResponse(ECreateUninitialized, TAtomic* counterPtr) - : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) - , AllocCounter(counterPtr) + +TExampleRequest::TExampleRequest(ECreateUninitialized, TAtomic* counterPtr) + : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) + , AllocCounter(counterPtr) { } - -TExampleProtocol::TExampleProtocol(int port) - : TBusProtocol("Example", port) - , RequestCount(0) - , ResponseCount(0) - , RequestCountDeserialized(0) - , ResponseCountDeserialized(0) - , StartCount(0) + +TExampleResponse::TExampleResponse(TAtomic* counterPtr, size_t payloadSize) + : TBusMessage(79) + , AllocCounter(counterPtr) + , Data(JunkString(payloadSize)) { } - -TExampleProtocol::~TExampleProtocol() { - if (UncaughtException()) { - // so it could be reported in test - return; - } + +TExampleResponse::TExampleResponse(ECreateUninitialized, TAtomic* counterPtr) + : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) + , AllocCounter(counterPtr) +{ +} + +TExampleProtocol::TExampleProtocol(int port) + : TBusProtocol("Example", port) + , RequestCount(0) + , ResponseCount(0) + , RequestCountDeserialized(0) + , ResponseCountDeserialized(0) + , StartCount(0) +{ +} + +TExampleProtocol::~TExampleProtocol() { + if (UncaughtException()) { + // so it could be reported in test + return; + } Y_VERIFY(0 == AtomicGet(RequestCount), "protocol %s: must be 0 requests allocated, actually %d", GetService(), int(RequestCount)); Y_VERIFY(0 == AtomicGet(ResponseCount), "protocol %s: must be 0 responses allocated, actually %d", GetService(), int(ResponseCount)); Y_VERIFY(0 == AtomicGet(RequestCountDeserialized), "protocol %s: must be 0 requests deserialized allocated, actually %d", GetService(), int(RequestCountDeserialized)); Y_VERIFY(0 == AtomicGet(ResponseCountDeserialized), "protocol %s: must be 0 responses deserialized allocated, actually %d", GetService(), int(ResponseCountDeserialized)); Y_VERIFY(0 == AtomicGet(StartCount), "protocol %s: must be 0 start objects allocated, actually %d", GetService(), int(StartCount)); -} - -void TExampleProtocol::Serialize(const TBusMessage* message, TBuffer& buffer) { - // Messages have no data, we recreate them from scratch - // instead of sending, so we don't need to serialize them. - if (const TExampleRequest* exampleMessage = dynamic_cast<const TExampleRequest*>(message)) { +} + +void TExampleProtocol::Serialize(const TBusMessage* message, TBuffer& buffer) { + // Messages have no data, we recreate them from scratch + // instead of sending, so we don't need to serialize them. + if (const TExampleRequest* exampleMessage = dynamic_cast<const TExampleRequest*>(message)) { buffer.Append(exampleMessage->Data.data(), exampleMessage->Data.size()); - } else if (const TExampleResponse* exampleReply = dynamic_cast<const TExampleResponse*>(message)) { + } else if (const TExampleResponse* exampleReply = dynamic_cast<const TExampleResponse*>(message)) { buffer.Append(exampleReply->Data.data(), exampleReply->Data.size()); - } else { + } else { Y_FAIL("unknown message type"); - } -} - + } +} + TAutoPtr<TBusMessage> TExampleProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) { - // TODO: check data + // TODO: check data Y_UNUSED(payload); - - if (messageType == 77) { - TExampleRequest* exampleMessage = new TExampleRequest(MESSAGE_CREATE_UNINITIALIZED, &RequestCountDeserialized); - exampleMessage->Data.append(payload.data(), payload.size()); - return exampleMessage; - } else if (messageType == 79) { - TExampleResponse* exampleReply = new TExampleResponse(MESSAGE_CREATE_UNINITIALIZED, &ResponseCountDeserialized); - exampleReply->Data.append(payload.data(), payload.size()); - return exampleReply; - } else { + + if (messageType == 77) { + TExampleRequest* exampleMessage = new TExampleRequest(MESSAGE_CREATE_UNINITIALIZED, &RequestCountDeserialized); + exampleMessage->Data.append(payload.data(), payload.size()); + return exampleMessage; + } else if (messageType == 79) { + TExampleResponse* exampleReply = new TExampleResponse(MESSAGE_CREATE_UNINITIALIZED, &ResponseCountDeserialized); + exampleReply->Data.append(payload.data(), payload.size()); + return exampleReply; + } else { return nullptr; - } -} - -TExampleClient::TExampleClient(const TBusClientSessionConfig sessionConfig, int port) - : Proto(port) - , UseCompression(false) - , CrashOnError(false) - , DataSize(320) - , MessageCount(0) - , RepliesCount(0) - , Errors(0) - , LastError(MESSAGE_OK) -{ - Bus = CreateMessageQueue("TExampleClient"); - - Session = TBusClientSession::Create(&Proto, this, sessionConfig, Bus); - - Session->RegisterService("localhost"); -} - -TExampleClient::~TExampleClient() { -} - + } +} + +TExampleClient::TExampleClient(const TBusClientSessionConfig sessionConfig, int port) + : Proto(port) + , UseCompression(false) + , CrashOnError(false) + , DataSize(320) + , MessageCount(0) + , RepliesCount(0) + , Errors(0) + , LastError(MESSAGE_OK) +{ + Bus = CreateMessageQueue("TExampleClient"); + + Session = TBusClientSession::Create(&Proto, this, sessionConfig, Bus); + + Session->RegisterService("localhost"); +} + +TExampleClient::~TExampleClient() { +} + EMessageStatus TExampleClient::SendMessage(const TNetAddr* addr) { - TAutoPtr<TExampleRequest> message(new TExampleRequest(&Proto.RequestCount, DataSize)); - message->SetCompressed(UseCompression); - return Session->SendMessageAutoPtr(message, addr); -} - + TAutoPtr<TExampleRequest> message(new TExampleRequest(&Proto.RequestCount, DataSize)); + message->SetCompressed(UseCompression); + return Session->SendMessageAutoPtr(message, addr); +} + void TExampleClient::SendMessages(size_t count, const TNetAddr* addr) { - UNIT_ASSERT(MessageCount == 0); - UNIT_ASSERT(RepliesCount == 0); - UNIT_ASSERT(Errors == 0); - - WorkDone.Reset(); - MessageCount = count; - for (ssize_t i = 0; i < MessageCount; ++i) { - EMessageStatus s = SendMessage(addr); - UNIT_ASSERT_EQUAL_C(s, MESSAGE_OK, "expecting OK, got " << s); - } -} - -void TExampleClient::SendMessages(size_t count, const TNetAddr& addr) { - SendMessages(count, &addr); -} - -void TExampleClient::ResetCounters() { - MessageCount = 0; - RepliesCount = 0; - Errors = 0; - LastError = MESSAGE_OK; - - WorkDone.Reset(); -} - -void TExampleClient::WaitReplies() { - WorkDone.WaitT(TDuration::Seconds(60)); - - UNIT_ASSERT_VALUES_EQUAL(AtomicGet(RepliesCount), MessageCount); - UNIT_ASSERT_VALUES_EQUAL(AtomicGet(Errors), 0); - UNIT_ASSERT_VALUES_EQUAL(Session->GetInFlight(), 0); - - ResetCounters(); -} - + UNIT_ASSERT(MessageCount == 0); + UNIT_ASSERT(RepliesCount == 0); + UNIT_ASSERT(Errors == 0); + + WorkDone.Reset(); + MessageCount = count; + for (ssize_t i = 0; i < MessageCount; ++i) { + EMessageStatus s = SendMessage(addr); + UNIT_ASSERT_EQUAL_C(s, MESSAGE_OK, "expecting OK, got " << s); + } +} + +void TExampleClient::SendMessages(size_t count, const TNetAddr& addr) { + SendMessages(count, &addr); +} + +void TExampleClient::ResetCounters() { + MessageCount = 0; + RepliesCount = 0; + Errors = 0; + LastError = MESSAGE_OK; + + WorkDone.Reset(); +} + +void TExampleClient::WaitReplies() { + WorkDone.WaitT(TDuration::Seconds(60)); + + UNIT_ASSERT_VALUES_EQUAL(AtomicGet(RepliesCount), MessageCount); + UNIT_ASSERT_VALUES_EQUAL(AtomicGet(Errors), 0); + UNIT_ASSERT_VALUES_EQUAL(Session->GetInFlight(), 0); + + ResetCounters(); +} + EMessageStatus TExampleClient::WaitForError() { - WorkDone.WaitT(TDuration::Seconds(60)); - - UNIT_ASSERT_VALUES_EQUAL(1, MessageCount); - UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount)); - UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight()); - UNIT_ASSERT_VALUES_EQUAL(1, Errors); + WorkDone.WaitT(TDuration::Seconds(60)); + + UNIT_ASSERT_VALUES_EQUAL(1, MessageCount); + UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount)); + UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight()); + UNIT_ASSERT_VALUES_EQUAL(1, Errors); EMessageStatus result = LastError; - - ResetCounters(); + + ResetCounters(); return result; -} - +} + void TExampleClient::WaitForError(EMessageStatus status) { EMessageStatus error = WaitForError(); UNIT_ASSERT_VALUES_EQUAL(status, error); } void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr* addr) { - SendMessages(count, addr); - WaitReplies(); -} - -void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr& addr) { - SendMessagesWaitReplies(count, &addr); -} - -void TExampleClient::OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) { + SendMessages(count, addr); + WaitReplies(); +} + +void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr& addr) { + SendMessagesWaitReplies(count, &addr); +} + +void TExampleClient::OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) { Y_UNUSED(mess); Y_UNUSED(reply); - - if (AtomicIncrement(RepliesCount) == MessageCount) { - WorkDone.Signal(); - } -} - -void TExampleClient::OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) { - if (CrashOnError) { + + if (AtomicIncrement(RepliesCount) == MessageCount) { + WorkDone.Signal(); + } +} + +void TExampleClient::OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) { + if (CrashOnError) { Y_FAIL("client failed: %s", ToCString(status)); - } - + } + Y_UNUSED(mess); - - AtomicIncrement(Errors); - LastError = status; - WorkDone.Signal(); -} - -TExampleServer::TExampleServer( + + AtomicIncrement(Errors); + LastError = status; + WorkDone.Signal(); +} + +TExampleServer::TExampleServer( const char* name, const TBusServerSessionConfig& sessionConfig) - : UseCompression(false) - , AckMessageBeforeSendReply(false) - , ForgetRequest(false) -{ - Bus = CreateMessageQueue(name); - Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); -} - -TExampleServer::TExampleServer(unsigned port, const char* name) - : UseCompression(false) - , AckMessageBeforeSendReply(false) - , ForgetRequest(false) -{ - Bus = CreateMessageQueue(name); - TBusServerSessionConfig sessionConfig; - sessionConfig.ListenPort = port; - Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); -} - -TExampleServer::~TExampleServer() { -} - -size_t TExampleServer::GetInFlight() const { - return Session->GetInFlight(); -} - -unsigned TExampleServer::GetActualListenPort() const { - return Session->GetActualListenPort(); -} - -TNetAddr TExampleServer::GetActualListenAddr() const { - return TNetAddr("127.0.0.1", GetActualListenPort()); -} - -void TExampleServer::WaitForOnMessageCount(unsigned n) { - TestSync.WaitFor(n); -} - -void TExampleServer::OnMessage(TOnMessageContext& mess) { - TestSync.Inc(); - - TExampleRequest* request = VerifyDynamicCast<TExampleRequest*>(mess.GetMessage()); - - if (ForgetRequest) { - mess.ForgetRequest(); - return; - } - - TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount, DataSize.GetOrElse(request->Data.size()))); - reply->SetCompressed(UseCompression); - - EMessageStatus status; - if (AckMessageBeforeSendReply) { - TBusIdentity ident; - mess.AckMessage(ident); - status = Session->SendReply(ident, reply.Release()); // TODO: leaks on error - } else { - status = mess.SendReplyMove(reply); - } - + : UseCompression(false) + , AckMessageBeforeSendReply(false) + , ForgetRequest(false) +{ + Bus = CreateMessageQueue(name); + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); +} + +TExampleServer::TExampleServer(unsigned port, const char* name) + : UseCompression(false) + , AckMessageBeforeSendReply(false) + , ForgetRequest(false) +{ + Bus = CreateMessageQueue(name); + TBusServerSessionConfig sessionConfig; + sessionConfig.ListenPort = port; + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); +} + +TExampleServer::~TExampleServer() { +} + +size_t TExampleServer::GetInFlight() const { + return Session->GetInFlight(); +} + +unsigned TExampleServer::GetActualListenPort() const { + return Session->GetActualListenPort(); +} + +TNetAddr TExampleServer::GetActualListenAddr() const { + return TNetAddr("127.0.0.1", GetActualListenPort()); +} + +void TExampleServer::WaitForOnMessageCount(unsigned n) { + TestSync.WaitFor(n); +} + +void TExampleServer::OnMessage(TOnMessageContext& mess) { + TestSync.Inc(); + + TExampleRequest* request = VerifyDynamicCast<TExampleRequest*>(mess.GetMessage()); + + if (ForgetRequest) { + mess.ForgetRequest(); + return; + } + + TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount, DataSize.GetOrElse(request->Data.size()))); + reply->SetCompressed(UseCompression); + + EMessageStatus status; + if (AckMessageBeforeSendReply) { + TBusIdentity ident; + mess.AckMessage(ident); + status = Session->SendReply(ident, reply.Release()); // TODO: leaks on error + } else { + status = mess.SendReplyMove(reply); + } + Y_VERIFY(status == MESSAGE_OK, "failed to send reply: %s", ToString(status).data()); -} +} diff --git a/library/cpp/messagebus/test/helper/example.h b/library/cpp/messagebus/test/helper/example.h index 1ff7d16c1a..26b7475308 100644 --- a/library/cpp/messagebus/test/helper/example.h +++ b/library/cpp/messagebus/test/helper/example.h @@ -1,9 +1,9 @@ #pragma once #include <library/cpp/testing/unittest/registar.h> - -#include "alloc_counter.h" -#include "message_handler_error.h" + +#include "alloc_counter.h" +#include "message_handler_error.h" #include <library/cpp/messagebus/ybus.h> #include <library/cpp/messagebus/misc/test_sync.h> @@ -25,7 +25,7 @@ namespace NBus { TExampleRequest(TAtomic* counterPtr, size_t payloadSize = 320); TExampleRequest(ECreateUninitialized, TAtomic* counterPtr); }; - + class TExampleResponse: public TBusMessage { friend class TExampleProtocol; @@ -37,7 +37,7 @@ namespace NBus { TExampleResponse(TAtomic* counterPtr, size_t payloadSize = 320); TExampleResponse(ECreateUninitialized, TAtomic* counterPtr); }; - + class TExampleProtocol: public TBusProtocol { public: TAtomic RequestCount; @@ -45,7 +45,7 @@ namespace NBus { TAtomic RequestCountDeserialized; TAtomic ResponseCountDeserialized; TAtomic StartCount; - + TExampleProtocol(int port = 0); ~TExampleProtocol() override; @@ -54,28 +54,28 @@ namespace NBus { TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override; }; - + class TExampleClient: private TBusClientHandlerError { public: TExampleProtocol Proto; bool UseCompression; bool CrashOnError; size_t DataSize; - + ssize_t MessageCount; TAtomic RepliesCount; TAtomic Errors; EMessageStatus LastError; - + TSystemEvent WorkDone; - + TBusMessageQueuePtr Bus; TBusClientSessionPtr Session; - + public: TExampleClient(const TBusClientSessionConfig sessionConfig = TBusClientSessionConfig(), int port = 0); ~TExampleClient() override; - + EMessageStatus SendMessage(const TNetAddr* addr = nullptr); void SendMessages(size_t count, const TNetAddr* addr = nullptr); @@ -85,15 +85,15 @@ namespace NBus { void WaitReplies(); EMessageStatus WaitForError(); void WaitForError(EMessageStatus status); - + void SendMessagesWaitReplies(size_t count, const TNetAddr* addr = nullptr); void SendMessagesWaitReplies(size_t count, const TNetAddr& addr); - + void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override; void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus) override; }; - + class TExampleServer: private TBusServerHandlerError { public: TExampleProtocol Proto; @@ -103,7 +103,7 @@ namespace NBus { bool ForgetRequest; TTestSync TestSync; - + TBusMessageQueuePtr Bus; TBusServerSessionPtr Session; @@ -111,7 +111,7 @@ namespace NBus { TExampleServer( const char* name = "TExampleServer", const TBusServerSessionConfig& sessionConfig = TBusServerSessionConfig()); - + TExampleServer(unsigned port, const char* name = "TExampleServer"); ~TExampleServer() override; @@ -121,9 +121,9 @@ namespace NBus { unsigned GetActualListenPort() const; // any of TNetAddr GetActualListenAddr() const; - + void WaitForOnMessageCount(unsigned n); - + protected: void OnMessage(TOnMessageContext& mess) override; }; diff --git a/library/cpp/messagebus/test/helper/example_module.cpp b/library/cpp/messagebus/test/helper/example_module.cpp index c907825924..65ecfcf73f 100644 --- a/library/cpp/messagebus/test/helper/example_module.cpp +++ b/library/cpp/messagebus/test/helper/example_module.cpp @@ -1,43 +1,43 @@ -#include "example_module.h" - -using namespace NBus; -using namespace NBus::NTest; - -TExampleModule::TExampleModule() - : TBusModule("TExampleModule") -{ - TBusQueueConfig queueConfig; - queueConfig.NumWorkers = 5; - Queue = CreateMessageQueue(queueConfig); -} - -void TExampleModule::StartModule() { - CreatePrivateSessions(Queue.Get()); - StartInput(); -} - -bool TExampleModule::Shutdown() { - TBusModule::Shutdown(); - return true; -} - -TBusServerSessionPtr TExampleModule::CreateExtSession(TBusMessageQueue&) { +#include "example_module.h" + +using namespace NBus; +using namespace NBus::NTest; + +TExampleModule::TExampleModule() + : TBusModule("TExampleModule") +{ + TBusQueueConfig queueConfig; + queueConfig.NumWorkers = 5; + Queue = CreateMessageQueue(queueConfig); +} + +void TExampleModule::StartModule() { + CreatePrivateSessions(Queue.Get()); + StartInput(); +} + +bool TExampleModule::Shutdown() { + TBusModule::Shutdown(); + return true; +} + +TBusServerSessionPtr TExampleModule::CreateExtSession(TBusMessageQueue&) { return nullptr; -} - -TBusServerSessionPtr TExampleServerModule::CreateExtSession(TBusMessageQueue& queue) { - TBusServerSessionPtr r = CreateDefaultDestination(queue, &Proto, TBusServerSessionConfig()); - ServerAddr = TNetAddr("localhost", r->GetActualListenPort()); - return r; -} - +} + +TBusServerSessionPtr TExampleServerModule::CreateExtSession(TBusMessageQueue& queue) { + TBusServerSessionPtr r = CreateDefaultDestination(queue, &Proto, TBusServerSessionConfig()); + ServerAddr = TNetAddr("localhost", r->GetActualListenPort()); + return r; +} + TExampleClientModule::TExampleClientModule() : Source() { } - -TBusServerSessionPtr TExampleClientModule::CreateExtSession(TBusMessageQueue& queue) { - Source = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig()); - Source->RegisterService("localhost"); + +TBusServerSessionPtr TExampleClientModule::CreateExtSession(TBusMessageQueue& queue) { + Source = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig()); + Source->RegisterService("localhost"); return nullptr; -} +} diff --git a/library/cpp/messagebus/test/helper/example_module.h b/library/cpp/messagebus/test/helper/example_module.h index b1b0a6dd14..a0b295f613 100644 --- a/library/cpp/messagebus/test/helper/example_module.h +++ b/library/cpp/messagebus/test/helper/example_module.h @@ -1,7 +1,7 @@ -#pragma once - -#include "example.h" - +#pragma once + +#include "example.h" + #include <library/cpp/messagebus/oldmodule/module.h> namespace NBus { @@ -9,29 +9,29 @@ namespace NBus { struct TExampleModule: public TBusModule { TExampleProtocol Proto; TBusMessageQueuePtr Queue; - + TExampleModule(); - + void StartModule(); - + bool Shutdown() override; - + // nop by default TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override; }; - + struct TExampleServerModule: public TExampleModule { TNetAddr ServerAddr; TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override; }; - + struct TExampleClientModule: public TExampleModule { TBusClientSessionPtr Source; - + TExampleClientModule(); - + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override; }; - + } } diff --git a/library/cpp/messagebus/test/helper/fixed_port.cpp b/library/cpp/messagebus/test/helper/fixed_port.cpp index f83ce3161a..258da0d1a5 100644 --- a/library/cpp/messagebus/test/helper/fixed_port.cpp +++ b/library/cpp/messagebus/test/helper/fixed_port.cpp @@ -1,10 +1,10 @@ #include "fixed_port.h" #include <util/system/env.h> - + #include <stdlib.h> - + bool NBus::NTest::IsFixedPortTestAllowed() { - // TODO: report skipped tests to test + // TODO: report skipped tests to test return !GetEnv("MB_TESTS_SKIP_FIXED_PORT"); -} +} diff --git a/library/cpp/messagebus/test/helper/fixed_port.h b/library/cpp/messagebus/test/helper/fixed_port.h index 39c8da4dfa..a9c61ebc63 100644 --- a/library/cpp/messagebus/test/helper/fixed_port.h +++ b/library/cpp/messagebus/test/helper/fixed_port.h @@ -1,11 +1,11 @@ -#pragma once - +#pragma once + namespace NBus { namespace NTest { bool IsFixedPortTestAllowed(); - + // Must not be in range OS uses for bind on random port. const unsigned FixedPort = 4927; - + } } diff --git a/library/cpp/messagebus/test/helper/hanging_server.cpp b/library/cpp/messagebus/test/helper/hanging_server.cpp index 3911ff10ba..a35514b00d 100644 --- a/library/cpp/messagebus/test/helper/hanging_server.cpp +++ b/library/cpp/messagebus/test/helper/hanging_server.cpp @@ -1,13 +1,13 @@ #include "hanging_server.h" -#include <util/system/yassert.h> - -using namespace NBus; - +#include <util/system/yassert.h> + +using namespace NBus; + THangingServer::THangingServer(int port) { BindResult = BindOnPort(port, false); -} - -int THangingServer::GetPort() const { - return BindResult.first; -} +} + +int THangingServer::GetPort() const { + return BindResult.first; +} diff --git a/library/cpp/messagebus/test/helper/hanging_server.h b/library/cpp/messagebus/test/helper/hanging_server.h index 384f07d7cf..cc9fb274d8 100644 --- a/library/cpp/messagebus/test/helper/hanging_server.h +++ b/library/cpp/messagebus/test/helper/hanging_server.h @@ -1,16 +1,16 @@ -#pragma once - +#pragma once + #include <library/cpp/messagebus/network.h> -#include <util/network/sock.h> - -class THangingServer { -private: +#include <util/network/sock.h> + +class THangingServer { +private: std::pair<unsigned, TVector<NBus::TBindResult>> BindResult; -public: - // listen on given port, and nothing else - THangingServer(int port = 0); - // actual port - int GetPort() const; -}; +public: + // listen on given port, and nothing else + THangingServer(int port = 0); + // actual port + int GetPort() const; +}; diff --git a/library/cpp/messagebus/test/helper/message_handler_error.cpp b/library/cpp/messagebus/test/helper/message_handler_error.cpp index 40997adec8..c09811ec67 100644 --- a/library/cpp/messagebus/test/helper/message_handler_error.cpp +++ b/library/cpp/messagebus/test/helper/message_handler_error.cpp @@ -1,26 +1,26 @@ #include "message_handler_error.h" -#include <util/system/yassert.h> - -using namespace NBus; -using namespace NBus::NTest; - -void TBusClientHandlerError::OnError(TAutoPtr<TBusMessage>, EMessageStatus status) { +#include <util/system/yassert.h> + +using namespace NBus; +using namespace NBus::NTest; + +void TBusClientHandlerError::OnError(TAutoPtr<TBusMessage>, EMessageStatus status) { Y_FAIL("must not be called, status: %s", ToString(status).data()); -} - -void TBusClientHandlerError::OnReply(TAutoPtr<TBusMessage>, TAutoPtr<TBusMessage>) { +} + +void TBusClientHandlerError::OnReply(TAutoPtr<TBusMessage>, TAutoPtr<TBusMessage>) { Y_FAIL("must not be called"); -} - -void TBusClientHandlerError::OnMessageSentOneWay(TAutoPtr<TBusMessage>) { +} + +void TBusClientHandlerError::OnMessageSentOneWay(TAutoPtr<TBusMessage>) { Y_FAIL("must not be called"); -} - -void TBusServerHandlerError::OnError(TAutoPtr<TBusMessage>, EMessageStatus status) { +} + +void TBusServerHandlerError::OnError(TAutoPtr<TBusMessage>, EMessageStatus status) { Y_FAIL("must not be called, status: %s", ToString(status).data()); -} - -void TBusServerHandlerError::OnMessage(TOnMessageContext&) { +} + +void TBusServerHandlerError::OnMessage(TOnMessageContext&) { Y_FAIL("must not be called"); -} +} diff --git a/library/cpp/messagebus/test/helper/message_handler_error.h b/library/cpp/messagebus/test/helper/message_handler_error.h index bba0007a44..a314b10761 100644 --- a/library/cpp/messagebus/test/helper/message_handler_error.h +++ b/library/cpp/messagebus/test/helper/message_handler_error.h @@ -1,7 +1,7 @@ -#pragma once - +#pragma once + #include <library/cpp/messagebus/ybus.h> - + namespace NBus { namespace NTest { struct TBusClientHandlerError: public IBusClientHandler { @@ -9,11 +9,11 @@ namespace NBus { void OnMessageSentOneWay(TAutoPtr<TBusMessage> pMessage) override; void OnReply(TAutoPtr<TBusMessage> pMessage, TAutoPtr<TBusMessage> pReply) override; }; - + struct TBusServerHandlerError: public IBusServerHandler { void OnError(TAutoPtr<TBusMessage> pMessage, EMessageStatus status) override; void OnMessage(TOnMessageContext& pMessage) override; }; - + } } diff --git a/library/cpp/messagebus/test/helper/object_count_check.h b/library/cpp/messagebus/test/helper/object_count_check.h index d844469fb9..1c4756e58c 100644 --- a/library/cpp/messagebus/test/helper/object_count_check.h +++ b/library/cpp/messagebus/test/helper/object_count_check.h @@ -1,5 +1,5 @@ -#pragma once - +#pragma once + #include <library/cpp/testing/unittest/registar.h> #include <library/cpp/messagebus/remote_client_connection.h> @@ -9,66 +9,66 @@ #include <library/cpp/messagebus/ybus.h> #include <library/cpp/messagebus/oldmodule/module.h> #include <library/cpp/messagebus/scheduler/scheduler.h> - + #include <util/generic/object_counter.h> #include <util/system/type_name.h> #include <util/stream/output.h> - + #include <typeinfo> -struct TObjectCountCheck { - bool Enabled; - - template <typename T> - struct TReset { - TObjectCountCheck* const Thiz; - +struct TObjectCountCheck { + bool Enabled; + + template <typename T> + struct TReset { + TObjectCountCheck* const Thiz; + TReset(TObjectCountCheck* thiz) : Thiz(thiz) { } - + void operator()() { long oldValue = TObjectCounter<T>::ResetObjectCount(); - if (oldValue != 0) { + if (oldValue != 0) { Cerr << "warning: previous counter: " << oldValue << " for " << TypeName<T>() << Endl; - Cerr << "won't check in this test" << Endl; - Thiz->Enabled = false; - } - } - }; - - TObjectCountCheck() { - Enabled = true; - DoForAllCounters<TReset>(); - } - - template <typename T> - struct TCheckZero { + Cerr << "won't check in this test" << Endl; + Thiz->Enabled = false; + } + } + }; + + TObjectCountCheck() { + Enabled = true; + DoForAllCounters<TReset>(); + } + + template <typename T> + struct TCheckZero { TCheckZero(TObjectCountCheck*) { } - + void operator()() { UNIT_ASSERT_VALUES_EQUAL_C(0L, TObjectCounter<T>::ObjectCount(), TypeName<T>()); - } - }; - - ~TObjectCountCheck() { - if (Enabled) { - DoForAllCounters<TCheckZero>(); - } - } - - template <template <typename> class TOp> - void DoForAllCounters() { - TOp< ::NBus::NPrivate::TRemoteClientConnection>(this)(); - TOp< ::NBus::NPrivate::TRemoteServerConnection>(this)(); - TOp< ::NBus::NPrivate::TRemoteClientSession>(this)(); - TOp< ::NBus::NPrivate::TRemoteServerSession>(this)(); - TOp< ::NBus::NPrivate::TScheduler>(this)(); - TOp< ::NEventLoop::TEventLoop>(this)(); - TOp< ::NEventLoop::TChannel>(this)(); - TOp< ::NBus::TBusModule>(this)(); - TOp< ::NBus::TBusJob>(this)(); - } -}; + } + }; + + ~TObjectCountCheck() { + if (Enabled) { + DoForAllCounters<TCheckZero>(); + } + } + + template <template <typename> class TOp> + void DoForAllCounters() { + TOp< ::NBus::NPrivate::TRemoteClientConnection>(this)(); + TOp< ::NBus::NPrivate::TRemoteServerConnection>(this)(); + TOp< ::NBus::NPrivate::TRemoteClientSession>(this)(); + TOp< ::NBus::NPrivate::TRemoteServerSession>(this)(); + TOp< ::NBus::NPrivate::TScheduler>(this)(); + TOp< ::NEventLoop::TEventLoop>(this)(); + TOp< ::NEventLoop::TChannel>(this)(); + TOp< ::NBus::TBusModule>(this)(); + TOp< ::NBus::TBusJob>(this)(); + } +}; diff --git a/library/cpp/messagebus/test/helper/wait_for.h b/library/cpp/messagebus/test/helper/wait_for.h index ebd0bfd6e2..f09958d4c0 100644 --- a/library/cpp/messagebus/test/helper/wait_for.h +++ b/library/cpp/messagebus/test/helper/wait_for.h @@ -1,14 +1,14 @@ -#pragma once - -#include <util/datetime/base.h> -#include <util/system/yassert.h> - +#pragma once + +#include <util/datetime/base.h> +#include <util/system/yassert.h> + #define UNIT_WAIT_FOR(condition) \ do { \ - TInstant start(TInstant::Now()); \ - while (!(condition) && (TInstant::Now() - start < TDuration::Seconds(10))) { \ + TInstant start(TInstant::Now()); \ + while (!(condition) && (TInstant::Now() - start < TDuration::Seconds(10))) { \ Sleep(TDuration::MilliSeconds(1)); \ - } \ - /* TODO: use UNIT_ASSERT if in unittest thread */ \ + } \ + /* TODO: use UNIT_ASSERT if in unittest thread */ \ Y_VERIFY(condition, "condition failed after 10 seconds wait"); \ - } while (0) + } while (0) diff --git a/library/cpp/messagebus/test/helper/ya.make b/library/cpp/messagebus/test/helper/ya.make index 703f6b6953..97bd45f573 100644 --- a/library/cpp/messagebus/test/helper/ya.make +++ b/library/cpp/messagebus/test/helper/ya.make @@ -1,17 +1,17 @@ -LIBRARY(messagebus_test_helper) - +LIBRARY(messagebus_test_helper) + OWNER(g:messagebus) - -SRCS( - example.cpp - example_module.cpp - fixed_port.cpp - message_handler_error.cpp - hanging_server.cpp -) - -PEERDIR( + +SRCS( + example.cpp + example_module.cpp + fixed_port.cpp + message_handler_error.cpp + hanging_server.cpp +) + +PEERDIR( library/cpp/messagebus/oldmodule -) - -END() +) + +END() diff --git a/library/cpp/messagebus/test/perftest/messages.proto b/library/cpp/messagebus/test/perftest/messages.proto index a48bb2f480..8919034e7a 100644 --- a/library/cpp/messagebus/test/perftest/messages.proto +++ b/library/cpp/messagebus/test/perftest/messages.proto @@ -1,7 +1,7 @@ -message TPerftestRequestRecord { - required string Data = 1; -} - -message TPerftestResponseRecord { - required string Data = 1; -} +message TPerftestRequestRecord { + required string Data = 1; +} + +message TPerftestResponseRecord { + required string Data = 1; +} diff --git a/library/cpp/messagebus/test/perftest/perftest.cpp b/library/cpp/messagebus/test/perftest/perftest.cpp index 44fb4d837d..8489319278 100644 --- a/library/cpp/messagebus/test/perftest/perftest.cpp +++ b/library/cpp/messagebus/test/perftest/perftest.cpp @@ -15,10 +15,10 @@ #include <library/cpp/lwtrace/start.h> #include <library/cpp/sighandler/async_signals_handler.h> #include <library/cpp/threading/future/legacy_future.h> - + #include <util/generic/ptr.h> #include <util/generic/string.h> -#include <util/generic/vector.h> +#include <util/generic/vector.h> #include <util/generic/yexception.h> #include <util/random/random.h> #include <util/stream/file.h> @@ -29,18 +29,18 @@ #include <util/system/sysstat.h> #include <util/system/thread.h> #include <util/thread/lfqueue.h> - + #include <signal.h> #include <stdlib.h> - -using namespace NBus; - -/////////////////////////////////////////////////////// -/// \brief Configuration parameters of the test - -const int DEFAULT_PORT = 55666; - -struct TPerftestConfig { + +using namespace NBus; + +/////////////////////////////////////////////////////// +/// \brief Configuration parameters of the test + +const int DEFAULT_PORT = 55666; + +struct TPerftestConfig { TString Nodes; ///< node1:port1,node2:port2 int ClientCount; int MessageSize; ///< size of message to send @@ -53,144 +53,144 @@ struct TPerftestConfig { bool ExecuteOnReplyInWorkerPool; bool UseCompression; bool Profile; - unsigned WwwPort; - - TPerftestConfig(); - - void Print() { - fprintf(stderr, "ClientCount=%d\n", ClientCount); - fprintf(stderr, "ServerPort=%d\n", ServerPort); - fprintf(stderr, "Delay=%d usecs\n", Delay); + unsigned WwwPort; + + TPerftestConfig(); + + void Print() { + fprintf(stderr, "ClientCount=%d\n", ClientCount); + fprintf(stderr, "ServerPort=%d\n", ServerPort); + fprintf(stderr, "Delay=%d usecs\n", Delay); fprintf(stderr, "MessageSize=%d bytes\n", MessageSize); fprintf(stderr, "Failure=%.3f%%\n", Failure * 100.0); - fprintf(stderr, "Runtime=%d seconds\n", Run); - fprintf(stderr, "ServerUseModules=%s\n", ServerUseModules ? "true" : "false"); - fprintf(stderr, "ExecuteOnMessageInWorkerPool=%s\n", ExecuteOnMessageInWorkerPool ? "true" : "false"); - fprintf(stderr, "ExecuteOnReplyInWorkerPool=%s\n", ExecuteOnReplyInWorkerPool ? "true" : "false"); - fprintf(stderr, "UseCompression=%s\n", UseCompression ? "true" : "false"); - fprintf(stderr, "Profile=%s\n", Profile ? "true" : "false"); - fprintf(stderr, "WwwPort=%u\n", WwwPort); - } -}; - + fprintf(stderr, "Runtime=%d seconds\n", Run); + fprintf(stderr, "ServerUseModules=%s\n", ServerUseModules ? "true" : "false"); + fprintf(stderr, "ExecuteOnMessageInWorkerPool=%s\n", ExecuteOnMessageInWorkerPool ? "true" : "false"); + fprintf(stderr, "ExecuteOnReplyInWorkerPool=%s\n", ExecuteOnReplyInWorkerPool ? "true" : "false"); + fprintf(stderr, "UseCompression=%s\n", UseCompression ? "true" : "false"); + fprintf(stderr, "Profile=%s\n", Profile ? "true" : "false"); + fprintf(stderr, "WwwPort=%u\n", WwwPort); + } +}; + extern TPerftestConfig* TheConfig; -extern bool TheExit; - +extern bool TheExit; + TVector<TNetAddr> ServerAddresses; - -struct TConfig { - TBusQueueConfig ServerQueueConfig; - TBusQueueConfig ClientQueueConfig; - TBusServerSessionConfig ServerSessionConfig; - TBusClientSessionConfig ClientSessionConfig; - bool SimpleProtocol; - -private: - void ConfigureDefaults(TBusQueueConfig& config) { - config.NumWorkers = 4; - } - - void ConfigureDefaults(TBusSessionConfig& config) { - config.MaxInFlight = 10000; - config.SendTimeout = TDuration::Seconds(20).MilliSeconds(); - config.TotalTimeout = TDuration::Seconds(60).MilliSeconds(); - } - -public: - TConfig() - : SimpleProtocol(false) - { - ConfigureDefaults(ServerQueueConfig); - ConfigureDefaults(ClientQueueConfig); - ConfigureDefaults(ServerSessionConfig); - ConfigureDefaults(ClientSessionConfig); - } - - void Print() { - // TODO: do not print server if only client and vice verse - Cerr << "server queue config:\n"; - Cerr << IndentText(ServerQueueConfig.PrintToString()); - Cerr << "server session config:" << Endl; - Cerr << IndentText(ServerSessionConfig.PrintToString()); - Cerr << "client queue config:\n"; - Cerr << IndentText(ClientQueueConfig.PrintToString()); - Cerr << "client session config:" << Endl; - Cerr << IndentText(ClientSessionConfig.PrintToString()); - Cerr << "simple protocol: " << SimpleProtocol << "\n"; - } -}; - -TConfig Config; - -//////////////////////////////////////////////////////////////// -/// \brief Fast message - + +struct TConfig { + TBusQueueConfig ServerQueueConfig; + TBusQueueConfig ClientQueueConfig; + TBusServerSessionConfig ServerSessionConfig; + TBusClientSessionConfig ClientSessionConfig; + bool SimpleProtocol; + +private: + void ConfigureDefaults(TBusQueueConfig& config) { + config.NumWorkers = 4; + } + + void ConfigureDefaults(TBusSessionConfig& config) { + config.MaxInFlight = 10000; + config.SendTimeout = TDuration::Seconds(20).MilliSeconds(); + config.TotalTimeout = TDuration::Seconds(60).MilliSeconds(); + } + +public: + TConfig() + : SimpleProtocol(false) + { + ConfigureDefaults(ServerQueueConfig); + ConfigureDefaults(ClientQueueConfig); + ConfigureDefaults(ServerSessionConfig); + ConfigureDefaults(ClientSessionConfig); + } + + void Print() { + // TODO: do not print server if only client and vice verse + Cerr << "server queue config:\n"; + Cerr << IndentText(ServerQueueConfig.PrintToString()); + Cerr << "server session config:" << Endl; + Cerr << IndentText(ServerSessionConfig.PrintToString()); + Cerr << "client queue config:\n"; + Cerr << IndentText(ClientQueueConfig.PrintToString()); + Cerr << "client session config:" << Endl; + Cerr << IndentText(ClientSessionConfig.PrintToString()); + Cerr << "simple protocol: " << SimpleProtocol << "\n"; + } +}; + +TConfig Config; + +//////////////////////////////////////////////////////////////// +/// \brief Fast message + using TPerftestRequest = TBusBufferMessage<TPerftestRequestRecord, 77>; using TPerftestResponse = TBusBufferMessage<TPerftestResponseRecord, 79>; - -static size_t RequestSize() { - return RandomNumber<size_t>(TheConfig->MessageSize * 2 + 1); -} - -TAutoPtr<TBusMessage> NewRequest() { - if (Config.SimpleProtocol) { - TAutoPtr<TSimpleMessage> r(new TSimpleMessage); - r->SetCompressed(TheConfig->UseCompression); - r->Payload = 10; - return r.Release(); - } else { - TAutoPtr<TPerftestRequest> r(new TPerftestRequest); - r->SetCompressed(TheConfig->UseCompression); - // TODO: use random content for better compression test + +static size_t RequestSize() { + return RandomNumber<size_t>(TheConfig->MessageSize * 2 + 1); +} + +TAutoPtr<TBusMessage> NewRequest() { + if (Config.SimpleProtocol) { + TAutoPtr<TSimpleMessage> r(new TSimpleMessage); + r->SetCompressed(TheConfig->UseCompression); + r->Payload = 10; + return r.Release(); + } else { + TAutoPtr<TPerftestRequest> r(new TPerftestRequest); + r->SetCompressed(TheConfig->UseCompression); + // TODO: use random content for better compression test r->Record.SetData(TString(RequestSize(), '?')); - return r.Release(); - } -} - -void CheckRequest(TPerftestRequest* request) { + return r.Release(); + } +} + +void CheckRequest(TPerftestRequest* request) { const TString& data = request->Record.GetData(); - for (size_t i = 0; i != data.size(); ++i) { + for (size_t i = 0; i != data.size(); ++i) { Y_VERIFY(data.at(i) == '?', "must be question mark"); - } -} - -TAutoPtr<TPerftestResponse> NewResponse(TPerftestRequest* request) { - TAutoPtr<TPerftestResponse> r(new TPerftestResponse); - r->SetCompressed(TheConfig->UseCompression); + } +} + +TAutoPtr<TPerftestResponse> NewResponse(TPerftestRequest* request) { + TAutoPtr<TPerftestResponse> r(new TPerftestResponse); + r->SetCompressed(TheConfig->UseCompression); r->Record.SetData(TString(request->Record.GetData().size(), '.')); - return r; -} - -void CheckResponse(TPerftestResponse* response) { + return r; +} + +void CheckResponse(TPerftestResponse* response) { const TString& data = response->Record.GetData(); - for (size_t i = 0; i != data.size(); ++i) { + for (size_t i = 0; i != data.size(); ++i) { Y_VERIFY(data.at(i) == '.', "must be dot"); - } -} - -//////////////////////////////////////////////////////////////////// -/// \brief Fast protocol that common between client and server -class TPerftestProtocol: public TBusBufferProtocol { -public: - TPerftestProtocol() - : TBusBufferProtocol("TPerftestProtocol", TheConfig->ServerPort) - { - RegisterType(new TPerftestRequest); - RegisterType(new TPerftestResponse); - } -}; - -class TPerftestServer; -class TPerftestUsingModule; -class TPerftestClient; - -struct TTestStats { - TInstant Start; - - TAtomic Messages; - TAtomic Errors; - TAtomic Replies; - + } +} + +//////////////////////////////////////////////////////////////////// +/// \brief Fast protocol that common between client and server +class TPerftestProtocol: public TBusBufferProtocol { +public: + TPerftestProtocol() + : TBusBufferProtocol("TPerftestProtocol", TheConfig->ServerPort) + { + RegisterType(new TPerftestRequest); + RegisterType(new TPerftestResponse); + } +}; + +class TPerftestServer; +class TPerftestUsingModule; +class TPerftestClient; + +struct TTestStats { + TInstant Start; + + TAtomic Messages; + TAtomic Errors; + TAtomic Replies; + void IncMessage() { AtomicIncrement(Messages); } @@ -211,265 +211,265 @@ struct TTestStats { int NumReplies() { return AtomicGet(Replies); } - + double GetThroughput() { - return NumReplies() * 1000000.0 / (TInstant::Now() - Start).MicroSeconds(); - } - -public: - TTestStats() - : Start(TInstant::Now()) - , Messages(0) - , Errors(0) - , Replies(0) - { - } - - void PeriodicallyPrint(); -}; - -TTestStats Stats; - -//////////////////////////////////////////////////////////////////// -/// \brief Fast of the client session + return NumReplies() * 1000000.0 / (TInstant::Now() - Start).MicroSeconds(); + } + +public: + TTestStats() + : Start(TInstant::Now()) + , Messages(0) + , Errors(0) + , Replies(0) + { + } + + void PeriodicallyPrint(); +}; + +TTestStats Stats; + +//////////////////////////////////////////////////////////////////// +/// \brief Fast of the client session class TPerftestClient : IBusClientHandler { -public: - TBusClientSessionPtr Session; - THolder<TBusProtocol> Proto; - TBusMessageQueuePtr Bus; +public: + TBusClientSessionPtr Session; + THolder<TBusProtocol> Proto; + TBusMessageQueuePtr Bus; TVector<TBusClientConnectionPtr> Connections; - -public: - /// constructor creates instances of protocol and session - TPerftestClient() { - /// create or get instance of message queue, need one per application - Bus = CreateMessageQueue(Config.ClientQueueConfig, "client"); - - if (Config.SimpleProtocol) { - Proto.Reset(new TSimpleProtocol); - } else { - Proto.Reset(new TPerftestProtocol); - } - - Session = TBusClientSession::Create(Proto.Get(), this, Config.ClientSessionConfig, Bus); - - for (unsigned i = 0; i < ServerAddresses.size(); ++i) { - Connections.push_back(Session->GetConnection(ServerAddresses[i])); - } - } - - /// dispatch of requests is done here - void Work() { + +public: + /// constructor creates instances of protocol and session + TPerftestClient() { + /// create or get instance of message queue, need one per application + Bus = CreateMessageQueue(Config.ClientQueueConfig, "client"); + + if (Config.SimpleProtocol) { + Proto.Reset(new TSimpleProtocol); + } else { + Proto.Reset(new TPerftestProtocol); + } + + Session = TBusClientSession::Create(Proto.Get(), this, Config.ClientSessionConfig, Bus); + + for (unsigned i = 0; i < ServerAddresses.size(); ++i) { + Connections.push_back(Session->GetConnection(ServerAddresses[i])); + } + } + + /// dispatch of requests is done here + void Work() { SetCurrentThreadName("FastClient::Work"); - - while (!TheExit) { - TBusClientConnection* connection; - if (Connections.size() == 1) { - connection = Connections.front().Get(); - } else { - connection = Connections.at(RandomNumber<size_t>()).Get(); - } - + + while (!TheExit) { + TBusClientConnection* connection; + if (Connections.size() == 1) { + connection = Connections.front().Get(); + } else { + connection = Connections.at(RandomNumber<size_t>()).Get(); + } + TBusMessage* message = NewRequest().Release(); - int ret = connection->SendMessage(message, true); - - if (ret == MESSAGE_OK) { - Stats.IncMessage(); - } else if (ret == MESSAGE_BUSY) { - //delete message; - //Sleep(TDuration::MilliSeconds(1)); - //continue; + int ret = connection->SendMessage(message, true); + + if (ret == MESSAGE_OK) { + Stats.IncMessage(); + } else if (ret == MESSAGE_BUSY) { + //delete message; + //Sleep(TDuration::MilliSeconds(1)); + //continue; Y_FAIL("unreachable"); - } else if (ret == MESSAGE_SHUTDOWN) { - delete message; - } else { - delete message; - Stats.IncErrors(); - } - } - } - - void Stop() { - Session->Shutdown(); - } - - /// actual work is being done here + } else if (ret == MESSAGE_SHUTDOWN) { + delete message; + } else { + delete message; + Stats.IncErrors(); + } + } + } + + void Stop() { + Session->Shutdown(); + } + + /// actual work is being done here void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override { Y_UNUSED(mess); - - if (Config.SimpleProtocol) { - VerifyDynamicCast<TSimpleMessage*>(reply.Get()); - } else { - TPerftestResponse* typed = VerifyDynamicCast<TPerftestResponse*>(reply.Get()); - - CheckResponse(typed); - } - - Stats.IncReplies(); - } - - /// message that could not be delivered + + if (Config.SimpleProtocol) { + VerifyDynamicCast<TSimpleMessage*>(reply.Get()); + } else { + TPerftestResponse* typed = VerifyDynamicCast<TPerftestResponse*>(reply.Get()); + + CheckResponse(typed); + } + + Stats.IncReplies(); + } + + /// message that could not be delivered void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { Y_UNUSED(mess); Y_UNUSED(status); - - if (TheExit) { - return; - } - - Stats.IncErrors(); - + + if (TheExit) { + return; + } + + Stats.IncErrors(); + // Y_ASSERT(TheConfig->Failure > 0.0); - } -}; - -class TPerftestServerCommon { -public: - THolder<TBusProtocol> Proto; - - TBusMessageQueuePtr Bus; - - TBusServerSessionPtr Session; - -protected: - TPerftestServerCommon(const char* name) - : Session() - { - if (Config.SimpleProtocol) { - Proto.Reset(new TSimpleProtocol); - } else { - Proto.Reset(new TPerftestProtocol); - } - - /// create or get instance of single message queue, need one for application - Bus = CreateMessageQueue(Config.ServerQueueConfig, name); - } - -public: - void Stop() { - Session->Shutdown(); - } -}; - -struct TAsyncRequest { - TBusMessage* Request; - TInstant ReceivedTime; -}; - -///////////////////////////////////////////////////////////////////// -/// \brief Fast of the server session -class TPerftestServer: public TPerftestServerCommon, public IBusServerHandler { -public: - TLockFreeQueue<TAsyncRequest> AsyncRequests; - -public: - TPerftestServer() - : TPerftestServerCommon("server") - { - /// register destination session - Session = TBusServerSession::Create(Proto.Get(), this, Config.ServerSessionConfig, Bus); + } +}; + +class TPerftestServerCommon { +public: + THolder<TBusProtocol> Proto; + + TBusMessageQueuePtr Bus; + + TBusServerSessionPtr Session; + +protected: + TPerftestServerCommon(const char* name) + : Session() + { + if (Config.SimpleProtocol) { + Proto.Reset(new TSimpleProtocol); + } else { + Proto.Reset(new TPerftestProtocol); + } + + /// create or get instance of single message queue, need one for application + Bus = CreateMessageQueue(Config.ServerQueueConfig, name); + } + +public: + void Stop() { + Session->Shutdown(); + } +}; + +struct TAsyncRequest { + TBusMessage* Request; + TInstant ReceivedTime; +}; + +///////////////////////////////////////////////////////////////////// +/// \brief Fast of the server session +class TPerftestServer: public TPerftestServerCommon, public IBusServerHandler { +public: + TLockFreeQueue<TAsyncRequest> AsyncRequests; + +public: + TPerftestServer() + : TPerftestServerCommon("server") + { + /// register destination session + Session = TBusServerSession::Create(Proto.Get(), this, Config.ServerSessionConfig, Bus); Y_ASSERT(Session && "probably somebody is listening on the same port"); - } - - /// when message comes, send reply + } + + /// when message comes, send reply void OnMessage(TOnMessageContext& mess) override { - if (Config.SimpleProtocol) { - TSimpleMessage* typed = VerifyDynamicCast<TSimpleMessage*>(mess.GetMessage()); - TAutoPtr<TSimpleMessage> response(new TSimpleMessage); - response->Payload = typed->Payload; - mess.SendReplyMove(response); - return; - } - - TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess.GetMessage()); - - CheckRequest(typed); - - /// forget replies for few messages, see what happends + if (Config.SimpleProtocol) { + TSimpleMessage* typed = VerifyDynamicCast<TSimpleMessage*>(mess.GetMessage()); + TAutoPtr<TSimpleMessage> response(new TSimpleMessage); + response->Payload = typed->Payload; + mess.SendReplyMove(response); + return; + } + + TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess.GetMessage()); + + CheckRequest(typed); + + /// forget replies for few messages, see what happends if (TheConfig->Failure > RandomNumber<double>()) { - return; - } - - /// sleep requested time - if (TheConfig->Delay) { - TAsyncRequest request; - request.Request = mess.ReleaseMessage(); - request.ReceivedTime = TInstant::Now(); - AsyncRequests.Enqueue(request); - return; - } - - TAutoPtr<TPerftestResponse> reply(NewResponse(typed)); - /// sent empty reply for each message - mess.SendReplyMove(reply); - // TODO: count results - } - - void Stop() { - TPerftestServerCommon::Stop(); - } -}; - -class TPerftestUsingModule: public TPerftestServerCommon, public TBusModule { -public: - TPerftestUsingModule() - : TPerftestServerCommon("server") - , TBusModule("fast") - { + return; + } + + /// sleep requested time + if (TheConfig->Delay) { + TAsyncRequest request; + request.Request = mess.ReleaseMessage(); + request.ReceivedTime = TInstant::Now(); + AsyncRequests.Enqueue(request); + return; + } + + TAutoPtr<TPerftestResponse> reply(NewResponse(typed)); + /// sent empty reply for each message + mess.SendReplyMove(reply); + // TODO: count results + } + + void Stop() { + TPerftestServerCommon::Stop(); + } +}; + +class TPerftestUsingModule: public TPerftestServerCommon, public TBusModule { +public: + TPerftestUsingModule() + : TPerftestServerCommon("server") + , TBusModule("fast") + { Y_VERIFY(CreatePrivateSessions(Bus.Get()), "failed to initialize dupdetect module"); Y_VERIFY(StartInput(), "failed to start input"); - } - + } + ~TPerftestUsingModule() override { - Shutdown(); - } - -private: + Shutdown(); + } + +private: TJobHandler Start(TBusJob* job, TBusMessage* mess) override { - TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess); - CheckRequest(typed); - - /// sleep requested time - if (TheConfig->Delay) { - usleep(TheConfig->Delay); - } - - /// forget replies for few messages, see what happends + TPerftestRequest* typed = VerifyDynamicCast<TPerftestRequest*>(mess); + CheckRequest(typed); + + /// sleep requested time + if (TheConfig->Delay) { + usleep(TheConfig->Delay); + } + + /// forget replies for few messages, see what happends if (TheConfig->Failure > RandomNumber<double>()) { return nullptr; - } - - job->SendReply(NewResponse(typed).Release()); + } + + job->SendReply(NewResponse(typed).Release()); return nullptr; - } - + } + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { - return Session = CreateDefaultDestination(queue, Proto.Get(), Config.ServerSessionConfig); - } -}; - + return Session = CreateDefaultDestination(queue, Proto.Get(), Config.ServerSessionConfig); + } +}; + // ./perftest/perftest -s 11456 -c localhost:11456 -r 60 -n 4 -i 5000 using namespace std; using namespace NBus; -static TNetworkAddress ParseNetworkAddress(const char* string) { +static TNetworkAddress ParseNetworkAddress(const char* string) { TString Name; int Port; const char* port = strchr(string, ':'); if (port != nullptr) { - Name.append(string, port - string); + Name.append(string, port - string); Port = atoi(port + 1); } else { - Name.append(string); - Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT; + Name.append(string); + Port = TheConfig->ServerPort != 0 ? TheConfig->ServerPort : DEFAULT_PORT; } - return TNetworkAddress(Name, Port); -} - + return TNetworkAddress(Name, Port); +} + TVector<TNetAddr> ParseNodes(const TString nodes) { TVector<TNetAddr> r; @@ -480,234 +480,234 @@ TVector<TNetAddr> ParseNodes(const TString nodes) { for (int i = 0; i < int(numh); i++) { const TNetworkAddress& networkAddress = ParseNetworkAddress(hosts[i].data()); Y_VERIFY(networkAddress.Begin() != networkAddress.End(), "no addresses"); - r.push_back(TNetAddr(networkAddress, &*networkAddress.Begin())); + r.push_back(TNetAddr(networkAddress, &*networkAddress.Begin())); } - return r; + return r; } -TPerftestConfig::TPerftestConfig() { - TBusSessionConfig defaultConfig; - - ServerPort = DEFAULT_PORT; +TPerftestConfig::TPerftestConfig() { + TBusSessionConfig defaultConfig; + + ServerPort = DEFAULT_PORT; Delay = 0; // artificial delay inside server OnMessage() MessageSize = 200; Failure = 0.00; Run = 60; // in seconds Nodes = "localhost"; - ServerUseModules = false; - ExecuteOnMessageInWorkerPool = defaultConfig.ExecuteOnMessageInWorkerPool; - ExecuteOnReplyInWorkerPool = defaultConfig.ExecuteOnReplyInWorkerPool; - UseCompression = false; - Profile = false; - WwwPort = 0; + ServerUseModules = false; + ExecuteOnMessageInWorkerPool = defaultConfig.ExecuteOnMessageInWorkerPool; + ExecuteOnReplyInWorkerPool = defaultConfig.ExecuteOnReplyInWorkerPool; + UseCompression = false; + Profile = false; + WwwPort = 0; } TPerftestConfig* TheConfig = new TPerftestConfig(); bool TheExit = false; - + TSystemEvent StopEvent; TSimpleSharedPtr<TPerftestServer> Server; TSimpleSharedPtr<TPerftestUsingModule> ServerUsingModule; - + TVector<TSimpleSharedPtr<TPerftestClient>> Clients; -TMutex ClientsLock; - +TMutex ClientsLock; + void stopsignal(int /*sig*/) { fprintf(stderr, "\n-------------------- exiting ------------------\n"); TheExit = true; - StopEvent.Signal(); + StopEvent.Signal(); } // -s <num> - start server on port <num> // -c <node:port,node:port> - start client -void TTestStats::PeriodicallyPrint() { +void TTestStats::PeriodicallyPrint() { SetCurrentThreadName("print-stats"); - - for (;;) { - StopEvent.WaitT(TDuration::Seconds(1)); - if (TheExit) - break; - + + for (;;) { + StopEvent.WaitT(TDuration::Seconds(1)); + if (TheExit) + break; + TVector<TSimpleSharedPtr<TPerftestClient>> clients; - { - TGuard<TMutex> guard(ClientsLock); - clients = Clients; - } - - fprintf(stderr, "replies=%d errors=%d throughput=%.3f mess/sec\n", + { + TGuard<TMutex> guard(ClientsLock); + clients = Clients; + } + + fprintf(stderr, "replies=%d errors=%d throughput=%.3f mess/sec\n", NumReplies(), NumErrors(), GetThroughput()); - if (!!Server) { - fprintf(stderr, "server: q: %u %s\n", + if (!!Server) { + fprintf(stderr, "server: q: %u %s\n", (unsigned)Server->Bus->GetExecutor()->GetWorkQueueSize(), Server->Session->GetStatusSingleLine().data()); - } - if (!!ServerUsingModule) { - fprintf(stderr, "server: q: %u %s\n", + } + if (!!ServerUsingModule) { + fprintf(stderr, "server: q: %u %s\n", (unsigned)ServerUsingModule->Bus->GetExecutor()->GetWorkQueueSize(), ServerUsingModule->Session->GetStatusSingleLine().data()); - } + } for (const auto& client : clients) { - fprintf(stderr, "client: q: %u %s\n", + fprintf(stderr, "client: q: %u %s\n", (unsigned)client->Bus->GetExecutor()->GetWorkQueueSize(), client->Session->GetStatusSingleLine().data()); - } - - TStringStream stats; - - bool first = true; - if (!!Server) { - if (!first) { - stats << "\n"; - } - first = false; - stats << "server:\n"; - stats << IndentText(Server->Bus->GetStatus()); - } - if (!!ServerUsingModule) { - if (!first) { - stats << "\n"; - } - first = false; - stats << "server using modules:\n"; - stats << IndentText(ServerUsingModule->Bus->GetStatus()); - } + } + + TStringStream stats; + + bool first = true; + if (!!Server) { + if (!first) { + stats << "\n"; + } + first = false; + stats << "server:\n"; + stats << IndentText(Server->Bus->GetStatus()); + } + if (!!ServerUsingModule) { + if (!first) { + stats << "\n"; + } + first = false; + stats << "server using modules:\n"; + stats << IndentText(ServerUsingModule->Bus->GetStatus()); + } for (const auto& client : clients) { - if (!first) { - stats << "\n"; - } - first = false; - stats << "client:\n"; + if (!first) { + stats << "\n"; + } + first = false; + stats << "client:\n"; stats << IndentText(client->Bus->GetStatus()); - } - + } + TUnbufferedFileOutput("stats").Write(stats.Str()); - } -} - + } +} + int main(int argc, char* argv[]) { - NLWTrace::StartLwtraceFromEnv(); + NLWTrace::StartLwtraceFromEnv(); /* unix foo */ setvbuf(stdout, nullptr, _IONBF, 0); setvbuf(stderr, nullptr, _IONBF, 0); Umask(0); SetAsyncSignalHandler(SIGINT, stopsignal); - SetAsyncSignalHandler(SIGTERM, stopsignal); + SetAsyncSignalHandler(SIGTERM, stopsignal); #ifndef _win_ - SetAsyncSignalHandler(SIGUSR1, stopsignal); + SetAsyncSignalHandler(SIGUSR1, stopsignal); #endif signal(SIGPIPE, SIG_IGN); - NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); - opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort); - opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize); - opts.AddLongOption('c', "server-host", "server hosts").RequiredArgument("host[,host]...").StoreResult(&TheConfig->Nodes); + NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); + opts.AddLongOption('s', "server-port", "server port").RequiredArgument("port").StoreResult(&TheConfig->ServerPort); + opts.AddCharOption('m', "average message size").RequiredArgument("size").StoreResult(&TheConfig->MessageSize); + opts.AddLongOption('c', "server-host", "server hosts").RequiredArgument("host[,host]...").StoreResult(&TheConfig->Nodes); opts.AddCharOption('f', "failure rate (rational number between 0 and 1)").RequiredArgument("rate").StoreResult(&TheConfig->Failure); opts.AddCharOption('w', "delay before reply").RequiredArgument("microseconds").StoreResult(&TheConfig->Delay); opts.AddCharOption('r', "run duration").RequiredArgument("seconds").StoreResult(&TheConfig->Run); opts.AddLongOption("client-count", "amount of clients").RequiredArgument("count").StoreResult(&TheConfig->ClientCount).DefaultValue("1"); - opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true); - opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool") + opts.AddLongOption("server-use-modules").StoreResult(&TheConfig->ServerUseModules, true); + opts.AddLongOption("on-message-in-pool", "execute OnMessage callback in worker pool") .RequiredArgument("BOOL") .StoreResult(&TheConfig->ExecuteOnMessageInWorkerPool); - opts.AddLongOption("on-reply-in-pool", "execute OnReply callback in worker pool") + opts.AddLongOption("on-reply-in-pool", "execute OnReply callback in worker pool") .RequiredArgument("BOOL") .StoreResult(&TheConfig->ExecuteOnReplyInWorkerPool); - opts.AddLongOption("compression", "use compression").RequiredArgument("BOOL").StoreResult(&TheConfig->UseCompression); - opts.AddLongOption("simple-proto").SetFlag(&Config.SimpleProtocol); - opts.AddLongOption("profile").SetFlag(&TheConfig->Profile); - opts.AddLongOption("www-port").RequiredArgument("PORT").StoreResult(&TheConfig->WwwPort); - opts.AddHelpOption(); - - Config.ServerQueueConfig.ConfigureLastGetopt(opts, "server-"); - Config.ServerSessionConfig.ConfigureLastGetopt(opts, "server-"); - Config.ClientQueueConfig.ConfigureLastGetopt(opts, "client-"); - Config.ClientSessionConfig.ConfigureLastGetopt(opts, "client-"); - - opts.SetFreeArgsMax(0); - - NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv); - + opts.AddLongOption("compression", "use compression").RequiredArgument("BOOL").StoreResult(&TheConfig->UseCompression); + opts.AddLongOption("simple-proto").SetFlag(&Config.SimpleProtocol); + opts.AddLongOption("profile").SetFlag(&TheConfig->Profile); + opts.AddLongOption("www-port").RequiredArgument("PORT").StoreResult(&TheConfig->WwwPort); + opts.AddHelpOption(); + + Config.ServerQueueConfig.ConfigureLastGetopt(opts, "server-"); + Config.ServerSessionConfig.ConfigureLastGetopt(opts, "server-"); + Config.ClientQueueConfig.ConfigureLastGetopt(opts, "client-"); + Config.ClientSessionConfig.ConfigureLastGetopt(opts, "client-"); + + opts.SetFreeArgsMax(0); + + NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv); + TheConfig->Print(); - Config.Print(); + Config.Print(); - if (TheConfig->Profile) { - BeginProfiling(); - } - - TIntrusivePtr<TBusWww> www(new TBusWww); - - ServerAddresses = ParseNodes(TheConfig->Nodes); + if (TheConfig->Profile) { + BeginProfiling(); + } + + TIntrusivePtr<TBusWww> www(new TBusWww); + + ServerAddresses = ParseNodes(TheConfig->Nodes); if (TheConfig->ServerPort) { - if (TheConfig->ServerUseModules) { - ServerUsingModule = new TPerftestUsingModule(); - www->RegisterModule(ServerUsingModule.Get()); - } else { - Server = new TPerftestServer(); - www->RegisterServerSession(Server->Session); - } + if (TheConfig->ServerUseModules) { + ServerUsingModule = new TPerftestUsingModule(); + www->RegisterModule(ServerUsingModule.Get()); + } else { + Server = new TPerftestServer(); + www->RegisterServerSession(Server->Session); + } } TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<void, false>>> futures; - - if (ServerAddresses.size() > 0 && TheConfig->ClientCount > 0) { - for (int i = 0; i < TheConfig->ClientCount; ++i) { - TGuard<TMutex> guard(ClientsLock); - Clients.push_back(new TPerftestClient); + + if (ServerAddresses.size() > 0 && TheConfig->ClientCount > 0) { + for (int i = 0; i < TheConfig->ClientCount; ++i) { + TGuard<TMutex> guard(ClientsLock); + Clients.push_back(new TPerftestClient); futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TPerftestClient::Work, Clients.back()))); - www->RegisterClientSession(Clients.back()->Session); - } + www->RegisterClientSession(Clients.back()->Session); + } } futures.push_back(new NThreading::TLegacyFuture<void, false>(std::bind(&TTestStats::PeriodicallyPrint, std::ref(Stats)))); - - THolder<TBusWwwHttpServer> wwwServer; - if (TheConfig->WwwPort != 0) { - wwwServer.Reset(new TBusWwwHttpServer(www, TheConfig->WwwPort)); - } - - /* sit here until signal terminate our process */ - StopEvent.WaitT(TDuration::Seconds(TheConfig->Run)); - TheExit = true; - StopEvent.Signal(); - - if (!!Server) { - Cerr << "Stopping server\n"; - Server->Stop(); - } - if (!!ServerUsingModule) { - Cerr << "Stopping server (using modules)\n"; - ServerUsingModule->Stop(); - } - + + THolder<TBusWwwHttpServer> wwwServer; + if (TheConfig->WwwPort != 0) { + wwwServer.Reset(new TBusWwwHttpServer(www, TheConfig->WwwPort)); + } + + /* sit here until signal terminate our process */ + StopEvent.WaitT(TDuration::Seconds(TheConfig->Run)); + TheExit = true; + StopEvent.Signal(); + + if (!!Server) { + Cerr << "Stopping server\n"; + Server->Stop(); + } + if (!!ServerUsingModule) { + Cerr << "Stopping server (using modules)\n"; + ServerUsingModule->Stop(); + } + TVector<TSimpleSharedPtr<TPerftestClient>> clients; - { - TGuard<TMutex> guard(ClientsLock); - clients = Clients; - } - - if (!clients.empty()) { - Cerr << "Stopping clients\n"; - + { + TGuard<TMutex> guard(ClientsLock); + clients = Clients; + } + + if (!clients.empty()) { + Cerr << "Stopping clients\n"; + for (auto& client : clients) { client->Stop(); - } - } - - wwwServer.Destroy(); - + } + } + + wwwServer.Destroy(); + for (const auto& future : futures) { future->Get(); - } - - if (TheConfig->Profile) { - EndProfiling(); - } - - Cerr << "***SUCCESS***\n"; + } + + if (TheConfig->Profile) { + EndProfiling(); + } + + Cerr << "***SUCCESS***\n"; return 0; } diff --git a/library/cpp/messagebus/test/perftest/simple_proto.cpp b/library/cpp/messagebus/test/perftest/simple_proto.cpp index 7fab33be6b..19d6c15b9d 100644 --- a/library/cpp/messagebus/test/perftest/simple_proto.cpp +++ b/library/cpp/messagebus/test/perftest/simple_proto.cpp @@ -1,22 +1,22 @@ #include "simple_proto.h" - -#include <util/generic/cast.h> - + +#include <util/generic/cast.h> + #include <typeinfo> - -using namespace NBus; - + +using namespace NBus; + void TSimpleProtocol::Serialize(const TBusMessage* mess, TBuffer& data) { Y_VERIFY(typeid(TSimpleMessage) == typeid(*mess)); - const TSimpleMessage* typed = static_cast<const TSimpleMessage*>(mess); + const TSimpleMessage* typed = static_cast<const TSimpleMessage*>(mess); data.Append((const char*)&typed->Payload, 4); -} - +} + TAutoPtr<TBusMessage> TSimpleProtocol::Deserialize(ui16, TArrayRef<const char> payload) { - if (payload.size() != 4) { + if (payload.size() != 4) { return nullptr; - } - TAutoPtr<TSimpleMessage> r(new TSimpleMessage); - memcpy(&r->Payload, payload.data(), 4); - return r.Release(); -} + } + TAutoPtr<TSimpleMessage> r(new TSimpleMessage); + memcpy(&r->Payload, payload.data(), 4); + return r.Release(); +} diff --git a/library/cpp/messagebus/test/perftest/simple_proto.h b/library/cpp/messagebus/test/perftest/simple_proto.h index 8b0275cf51..4a0cc08db3 100644 --- a/library/cpp/messagebus/test/perftest/simple_proto.h +++ b/library/cpp/messagebus/test/perftest/simple_proto.h @@ -1,29 +1,29 @@ -#pragma once - +#pragma once + #include <library/cpp/messagebus/ybus.h> - + struct TSimpleMessage: public NBus::TBusMessage { - ui32 Payload; - - TSimpleMessage() + ui32 Payload; + + TSimpleMessage() : TBusMessage(1) , Payload(0) { } - - TSimpleMessage(NBus::ECreateUninitialized) - : TBusMessage(NBus::ECreateUninitialized()) + + TSimpleMessage(NBus::ECreateUninitialized) + : TBusMessage(NBus::ECreateUninitialized()) { } -}; - -struct TSimpleProtocol: public NBus::TBusProtocol { +}; + +struct TSimpleProtocol: public NBus::TBusProtocol { TSimpleProtocol() : NBus::TBusProtocol("simple", 55666) { } - + void Serialize(const NBus::TBusMessage* mess, TBuffer& data) override; - + TAutoPtr<NBus::TBusMessage> Deserialize(ui16 ty, TArrayRef<const char> payload) override; -}; +}; diff --git a/library/cpp/messagebus/test/perftest/stackcollect.diff b/library/cpp/messagebus/test/perftest/stackcollect.diff index a454de3a5d..658f0141b3 100644 --- a/library/cpp/messagebus/test/perftest/stackcollect.diff +++ b/library/cpp/messagebus/test/perftest/stackcollect.diff @@ -1,13 +1,13 @@ -Index: test/perftest/CMakeLists.txt -=================================================================== ---- test/perftest/CMakeLists.txt (revision 1088840) -+++ test/perftest/CMakeLists.txt (working copy) -@@ -3,7 +3,7 @@ PROGRAM(messagebus_perftest) - OWNER(nga) - - PEERDIR( +Index: test/perftest/CMakeLists.txt +=================================================================== +--- test/perftest/CMakeLists.txt (revision 1088840) ++++ test/perftest/CMakeLists.txt (working copy) +@@ -3,7 +3,7 @@ PROGRAM(messagebus_perftest) + OWNER(nga) + + PEERDIR( - library/cpp/execprofile -+ junk/davenger/stackcollect ++ junk/davenger/stackcollect library/cpp/messagebus library/cpp/messagebus/protobuf library/cpp/sighandler diff --git a/library/cpp/messagebus/test/perftest/ya.make b/library/cpp/messagebus/test/perftest/ya.make index 37038ed2a5..24c2848ed5 100644 --- a/library/cpp/messagebus/test/perftest/ya.make +++ b/library/cpp/messagebus/test/perftest/ya.make @@ -1,7 +1,7 @@ -PROGRAM(messagebus_perftest) +PROGRAM(messagebus_perftest) OWNER(g:messagebus) - + PEERDIR( library/cpp/deprecated/threadable library/cpp/execprofile @@ -16,9 +16,9 @@ PEERDIR( ) SRCS( - messages.proto + messages.proto perftest.cpp - simple_proto.cpp + simple_proto.cpp ) END() diff --git a/library/cpp/messagebus/test/ut/count_down_latch.h b/library/cpp/messagebus/test/ut/count_down_latch.h index fb6374e773..5117db5731 100644 --- a/library/cpp/messagebus/test/ut/count_down_latch.h +++ b/library/cpp/messagebus/test/ut/count_down_latch.h @@ -1,30 +1,30 @@ -#pragma once - -#include <util/system/atomic.h> -#include <util/system/event.h> - -class TCountDownLatch { -private: - TAtomic Current; +#pragma once + +#include <util/system/atomic.h> +#include <util/system/event.h> + +class TCountDownLatch { +private: + TAtomic Current; TSystemEvent EventObject; -public: - TCountDownLatch(unsigned initial) - : Current(initial) +public: + TCountDownLatch(unsigned initial) + : Current(initial) { } - - void CountDown() { - if (AtomicDecrement(Current) == 0) { - EventObject.Signal(); - } - } - - void Await() { - EventObject.Wait(); - } - - bool Await(TDuration timeout) { - return EventObject.WaitT(timeout); - } -}; + + void CountDown() { + if (AtomicDecrement(Current) == 0) { + EventObject.Signal(); + } + } + + void Await() { + EventObject.Wait(); + } + + bool Await(TDuration timeout) { + return EventObject.WaitT(timeout); + } +}; diff --git a/library/cpp/messagebus/test/ut/messagebus_ut.cpp b/library/cpp/messagebus/test/ut/messagebus_ut.cpp index 42d4a1e9b2..040f9b7702 100644 --- a/library/cpp/messagebus/test/ut/messagebus_ut.cpp +++ b/library/cpp/messagebus/test/ut/messagebus_ut.cpp @@ -8,104 +8,104 @@ #include <library/cpp/messagebus/misc/test_sync.h> -#include <util/network/sock.h> - +#include <util/network/sock.h> + #include <utility> using namespace NBus; using namespace NBus::NTest; -namespace { - struct TExampleClientSlowOnMessageSent: public TExampleClient { - TAtomic SentCompleted; - +namespace { + struct TExampleClientSlowOnMessageSent: public TExampleClient { + TAtomic SentCompleted; + TSystemEvent ReplyReceived; - - TExampleClientSlowOnMessageSent() - : SentCompleted(0) + + TExampleClientSlowOnMessageSent() + : SentCompleted(0) { } - + ~TExampleClientSlowOnMessageSent() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override { Y_VERIFY(AtomicGet(SentCompleted), "must be completed"); - - TExampleClient::OnReply(mess, reply); - - ReplyReceived.Signal(); - } - + + TExampleClient::OnReply(mess, reply); + + ReplyReceived.Signal(); + } + void OnMessageSent(TBusMessage*) override { - Sleep(TDuration::MilliSeconds(100)); - AtomicSet(SentCompleted, 1); - } - }; - -} - + Sleep(TDuration::MilliSeconds(100)); + AtomicSet(SentCompleted, 1); + } + }; + +} + Y_UNIT_TEST_SUITE(TMessageBusTests) { - void TestDestinationTemplate(bool useCompression, bool ackMessageBeforeReply, + void TestDestinationTemplate(bool useCompression, bool ackMessageBeforeReply, const TBusServerSessionConfig& sessionConfig) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - - TExampleClient client(sessionConfig); - client.CrashOnError = true; - - server.UseCompression = useCompression; - client.UseCompression = useCompression; - - server.AckMessageBeforeSendReply = ackMessageBeforeReply; - - client.SendMessagesWaitReplies(100, server.GetActualListenAddr()); - UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0); - UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0); - } - + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TExampleClient client(sessionConfig); + client.CrashOnError = true; + + server.UseCompression = useCompression; + client.UseCompression = useCompression; + + server.AckMessageBeforeSendReply = ackMessageBeforeReply; + + client.SendMessagesWaitReplies(100, server.GetActualListenAddr()); + UNIT_ASSERT_EQUAL(server.Session->GetInFlight(), 0); + UNIT_ASSERT_EQUAL(client.Session->GetInFlight(), 0); + } + Y_UNIT_TEST(TestDestination) { - TestDestinationTemplate(false, false, TBusServerSessionConfig()); - } - + TestDestinationTemplate(false, false, TBusServerSessionConfig()); + } + Y_UNIT_TEST(TestDestinationUsingAck) { - TestDestinationTemplate(false, true, TBusServerSessionConfig()); - } - + TestDestinationTemplate(false, true, TBusServerSessionConfig()); + } + Y_UNIT_TEST(TestDestinationWithCompression) { - TestDestinationTemplate(true, false, TBusServerSessionConfig()); - } - + TestDestinationTemplate(true, false, TBusServerSessionConfig()); + } + Y_UNIT_TEST(TestCork) { - TBusServerSessionConfig config; - config.SendThreshold = 1000000000000; - config.Cork = TDuration::MilliSeconds(10); - TestDestinationTemplate(false, false, config); - // TODO: test for cork hanging - } - + TBusServerSessionConfig config; + config.SendThreshold = 1000000000000; + config.Cork = TDuration::MilliSeconds(10); + TestDestinationTemplate(false, false, config); + // TODO: test for cork hanging + } + Y_UNIT_TEST(TestReconnect) { - if (!IsFixedPortTestAllowed()) { - return; - } - - TObjectCountCheck objectCountCheck; - - unsigned port = FixedPort; - TNetAddr serverAddr("localhost", port); - THolder<TExampleServer> server; - - TBusClientSessionConfig clientConfig; + if (!IsFixedPortTestAllowed()) { + return; + } + + TObjectCountCheck objectCountCheck; + + unsigned port = FixedPort; + TNetAddr serverAddr("localhost", port); + THolder<TExampleServer> server; + + TBusClientSessionConfig clientConfig; clientConfig.RetryInterval = 0; - TExampleClient client(clientConfig); - - server.Reset(new TExampleServer(port, "TExampleServer 1")); - - client.SendMessagesWaitReplies(17, serverAddr); - - server.Destroy(); + TExampleClient client(clientConfig); + + server.Reset(new TExampleServer(port, "TExampleServer 1")); + + client.SendMessagesWaitReplies(17, serverAddr); + + server.Destroy(); // Making the client to detect disconnection. client.SendMessages(1, serverAddr); @@ -116,11 +116,11 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { } UNIT_ASSERT_VALUES_EQUAL(MESSAGE_CONNECT_FAILED, error); - server.Reset(new TExampleServer(port, "TExampleServer 2")); - - client.SendMessagesWaitReplies(19, serverAddr); - } - + server.Reset(new TExampleServer(port, "TExampleServer 2")); + + client.SendMessagesWaitReplies(19, serverAddr); + } + struct TestNoServerImplClient: public TExampleClient { TTestSync TestSync; int failures = 0; @@ -145,8 +145,8 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { }; void TestNoServerImpl(unsigned port, bool oneWay) { - TNetAddr noServerAddr("localhost", port); - + TNetAddr noServerAddr("localhost", port); + TestNoServerImplClient client; int count = 0; @@ -174,167 +174,167 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { void HangingServerImpl(unsigned port) { TNetAddr noServerAddr("localhost", port); - TExampleClient client; - - int count = 0; - for (;; ++count) { - TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); - EMessageStatus status = client.Session->SendMessageAutoPtr(message, &noServerAddr); - if (status == MESSAGE_BUSY) { - break; - } - UNIT_ASSERT_VALUES_EQUAL(int(MESSAGE_OK), int(status)); - - if (count == 0) { - // lame way to wait until it is connected - Sleep(TDuration::MilliSeconds(10)); - } - } - - UNIT_ASSERT_VALUES_EQUAL(client.Session->GetConfig()->MaxInFlight, count); - } - + TExampleClient client; + + int count = 0; + for (;; ++count) { + TAutoPtr<TBusMessage> message(new TExampleRequest(&client.Proto.RequestCount)); + EMessageStatus status = client.Session->SendMessageAutoPtr(message, &noServerAddr); + if (status == MESSAGE_BUSY) { + break; + } + UNIT_ASSERT_VALUES_EQUAL(int(MESSAGE_OK), int(status)); + + if (count == 0) { + // lame way to wait until it is connected + Sleep(TDuration::MilliSeconds(10)); + } + } + + UNIT_ASSERT_VALUES_EQUAL(client.Session->GetConfig()->MaxInFlight, count); + } + Y_UNIT_TEST(TestHangindServer) { - TObjectCountCheck objectCountCheck; - - THangingServer server(0); - + TObjectCountCheck objectCountCheck; + + THangingServer server(0); + HangingServerImpl(server.GetPort()); - } - + } + Y_UNIT_TEST(TestNoServer) { - TObjectCountCheck objectCountCheck; - + TObjectCountCheck objectCountCheck; + TestNoServerImpl(17, false); - } - + } + Y_UNIT_TEST(PauseInput) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - server.Session->PauseInput(true); - - TBusClientSessionConfig clientConfig; - clientConfig.MaxInFlight = 1000; - TExampleClient client(clientConfig); - - client.SendMessages(100, server.GetActualListenAddr()); - - server.TestSync.Check(0); - - server.Session->PauseInput(false); - - server.TestSync.WaitFor(100); - - client.WaitReplies(); - - server.Session->PauseInput(true); - - client.SendMessages(200, server.GetActualListenAddr()); - - server.TestSync.Check(100); - - server.Session->PauseInput(false); - - server.TestSync.WaitFor(300); - - client.WaitReplies(); - } - + TObjectCountCheck objectCountCheck; + + TExampleServer server; + server.Session->PauseInput(true); + + TBusClientSessionConfig clientConfig; + clientConfig.MaxInFlight = 1000; + TExampleClient client(clientConfig); + + client.SendMessages(100, server.GetActualListenAddr()); + + server.TestSync.Check(0); + + server.Session->PauseInput(false); + + server.TestSync.WaitFor(100); + + client.WaitReplies(); + + server.Session->PauseInput(true); + + client.SendMessages(200, server.GetActualListenAddr()); + + server.TestSync.Check(100); + + server.Session->PauseInput(false); + + server.TestSync.WaitFor(300); + + client.WaitReplies(); + } + struct TSendTimeoutCheckerExampleClient: public TExampleClient { - static TBusClientSessionConfig SessionConfig(bool periodLessThanConnectTimeout) { - TBusClientSessionConfig sessionConfig; - if (periodLessThanConnectTimeout) { - sessionConfig.SendTimeout = 1; - sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(50); - } else { - sessionConfig.SendTimeout = 50; - sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); - } - return sessionConfig; - } - - TSendTimeoutCheckerExampleClient(bool periodLessThanConnectTimeout) - : TExampleClient(SessionConfig(periodLessThanConnectTimeout)) + static TBusClientSessionConfig SessionConfig(bool periodLessThanConnectTimeout) { + TBusClientSessionConfig sessionConfig; + if (periodLessThanConnectTimeout) { + sessionConfig.SendTimeout = 1; + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(50); + } else { + sessionConfig.SendTimeout = 50; + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); + } + return sessionConfig; + } + + TSendTimeoutCheckerExampleClient(bool periodLessThanConnectTimeout) + : TExampleClient(SessionConfig(periodLessThanConnectTimeout)) { } - + ~TSendTimeoutCheckerExampleClient() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + TSystemEvent ErrorHappened; - + void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override { Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "got status: %s", ToString(status).data()); - ErrorHappened.Signal(); - } - }; - - void NoServer_SendTimeout_Callback_Impl(bool periodLessThanConnectTimeout) { - TObjectCountCheck objectCountCheck; - - TNetAddr serverAddr("localhost", 17); - - TSendTimeoutCheckerExampleClient client(periodLessThanConnectTimeout); - - client.SendMessages(1, serverAddr); - - client.ErrorHappened.WaitI(); - } - + ErrorHappened.Signal(); + } + }; + + void NoServer_SendTimeout_Callback_Impl(bool periodLessThanConnectTimeout) { + TObjectCountCheck objectCountCheck; + + TNetAddr serverAddr("localhost", 17); + + TSendTimeoutCheckerExampleClient client(periodLessThanConnectTimeout); + + client.SendMessages(1, serverAddr); + + client.ErrorHappened.WaitI(); + } + Y_UNIT_TEST(NoServer_SendTimeout_Callback_PeriodLess) { - NoServer_SendTimeout_Callback_Impl(true); - } - + NoServer_SendTimeout_Callback_Impl(true); + } + Y_UNIT_TEST(NoServer_SendTimeout_Callback_TimeoutLess) { - NoServer_SendTimeout_Callback_Impl(false); - } - + NoServer_SendTimeout_Callback_Impl(false); + } + Y_UNIT_TEST(TestOnReplyCalledAfterOnMessageSent) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - TNetAddr serverAddr = server.GetActualListenAddr(); - TExampleClientSlowOnMessageSent client; - - TAutoPtr<TExampleRequest> message(new TExampleRequest(&client.Proto.RequestCount)); - EMessageStatus s = client.Session->SendMessageAutoPtr(message, &serverAddr); - UNIT_ASSERT_EQUAL(s, MESSAGE_OK); - - UNIT_ASSERT(client.ReplyReceived.WaitT(TDuration::Seconds(5))); - } - - struct TDelayReplyServer: public TBusServerHandlerError { - TBusMessageQueuePtr Bus; - TExampleProtocol Proto; + TObjectCountCheck objectCountCheck; + + TExampleServer server; + TNetAddr serverAddr = server.GetActualListenAddr(); + TExampleClientSlowOnMessageSent client; + + TAutoPtr<TExampleRequest> message(new TExampleRequest(&client.Proto.RequestCount)); + EMessageStatus s = client.Session->SendMessageAutoPtr(message, &serverAddr); + UNIT_ASSERT_EQUAL(s, MESSAGE_OK); + + UNIT_ASSERT(client.ReplyReceived.WaitT(TDuration::Seconds(5))); + } + + struct TDelayReplyServer: public TBusServerHandlerError { + TBusMessageQueuePtr Bus; + TExampleProtocol Proto; TSystemEvent MessageReceivedEvent; // 1 wait for 1 message - TBusServerSessionPtr Session; + TBusServerSessionPtr Session; TMutex Lock_; TDeque<TAutoPtr<TOnMessageContext>> DelayedMessages; - + TDelayReplyServer() : MessageReceivedEvent(TEventResetType::rAuto) { - Bus = CreateMessageQueue("TDelayReplyServer"); - TBusServerSessionConfig sessionConfig; - sessionConfig.SendTimeout = 1000; - sessionConfig.TotalTimeout = 2001; - Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); - if (!Session) { - ythrow yexception() << "Failed to create destination session"; - } - } - + Bus = CreateMessageQueue("TDelayReplyServer"); + TBusServerSessionConfig sessionConfig; + sessionConfig.SendTimeout = 1000; + sessionConfig.TotalTimeout = 2001; + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); + if (!Session) { + ythrow yexception() << "Failed to create destination session"; + } + } + void OnMessage(TOnMessageContext& mess) override { Y_VERIFY(mess.IsConnectionAlive(), "connection should be alive here"); TAutoPtr<TOnMessageContext> delayedMsg(new TOnMessageContext); delayedMsg->Swap(mess); auto g(Guard(Lock_)); DelayedMessages.push_back(delayedMsg); - MessageReceivedEvent.Signal(); + MessageReceivedEvent.Signal(); } - + bool CheckClientIsAlive() { auto g(Guard(Lock_)); for (auto& delayedMessage : DelayedMessages) { @@ -370,252 +370,252 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { msg.SendReplyMove(reply); } } - + size_t GetDelayedMessageCount() const { auto g(Guard(Lock_)); return DelayedMessages.size(); - } - + } + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { Y_UNUSED(mess); Y_VERIFY(status == MESSAGE_SHUTDOWN, "only shutdown allowed, got %s", ToString(status).data()); - } - }; - + } + }; + Y_UNIT_TEST(TestReplyCalledAfterClientDisconnected) { - TObjectCountCheck objectCountCheck; - - TDelayReplyServer server; - - THolder<TExampleClient> client(new TExampleClient); - - client->SendMessages(1, TNetAddr("localhost", server.Session->GetActualListenPort())); - - UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); - - UNIT_ASSERT_VALUES_EQUAL(1, server.Session->GetInFlight()); - - client.Destroy(); - + TObjectCountCheck objectCountCheck; + + TDelayReplyServer server; + + THolder<TExampleClient> client(new TExampleClient); + + client->SendMessages(1, TNetAddr("localhost", server.Session->GetActualListenPort())); + + UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); + + UNIT_ASSERT_VALUES_EQUAL(1, server.Session->GetInFlight()); + + client.Destroy(); + UNIT_WAIT_FOR(server.CheckClientIsDead()); - + server.ReplyToDelayedMessages(); - // wait until all server message are delivered - UNIT_WAIT_FOR(0 == server.Session->GetInFlight()); - } - - struct TPackUnpackServer: public TBusServerHandlerError { - TBusMessageQueuePtr Bus; - TExampleProtocol Proto; + // wait until all server message are delivered + UNIT_WAIT_FOR(0 == server.Session->GetInFlight()); + } + + struct TPackUnpackServer: public TBusServerHandlerError { + TBusMessageQueuePtr Bus; + TExampleProtocol Proto; TSystemEvent MessageReceivedEvent; TSystemEvent ClientDiedEvent; - TBusServerSessionPtr Session; - - TPackUnpackServer() { - Bus = CreateMessageQueue("TPackUnpackServer"); - TBusServerSessionConfig sessionConfig; - Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); - } - + TBusServerSessionPtr Session; + + TPackUnpackServer() { + Bus = CreateMessageQueue("TPackUnpackServer"); + TBusServerSessionConfig sessionConfig; + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); + } + void OnMessage(TOnMessageContext& mess) override { - TBusIdentity ident; - mess.AckMessage(ident); - - char packed[BUS_IDENTITY_PACKED_SIZE]; - ident.Pack(packed); - TBusIdentity resurrected; - resurrected.Unpack(packed); - - mess.GetSession()->SendReply(resurrected, new TExampleResponse(&Proto.ResponseCount)); - } - + TBusIdentity ident; + mess.AckMessage(ident); + + char packed[BUS_IDENTITY_PACKED_SIZE]; + ident.Pack(packed); + TBusIdentity resurrected; + resurrected.Unpack(packed); + + mess.GetSession()->SendReply(resurrected, new TExampleResponse(&Proto.ResponseCount)); + } + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { Y_UNUSED(mess); Y_VERIFY(status == MESSAGE_SHUTDOWN, "only shutdown allowed"); - } - }; - + } + }; + Y_UNIT_TEST(PackUnpack) { - TObjectCountCheck objectCountCheck; - - TPackUnpackServer server; - - THolder<TExampleClient> client(new TExampleClient); - - client->SendMessagesWaitReplies(1, TNetAddr("localhost", server.Session->GetActualListenPort())); - } - + TObjectCountCheck objectCountCheck; + + TPackUnpackServer server; + + THolder<TExampleClient> client(new TExampleClient); + + client->SendMessagesWaitReplies(1, TNetAddr("localhost", server.Session->GetActualListenPort())); + } + Y_UNIT_TEST(ClientRequestTooLarge) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - - TBusClientSessionConfig clientConfig; - clientConfig.MaxMessageSize = 100; - TExampleClient client(clientConfig); - - client.DataSize = 10; - client.SendMessagesWaitReplies(1, server.GetActualListenAddr()); - - client.DataSize = 1000; - client.SendMessages(1, server.GetActualListenAddr()); - client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE); - - client.DataSize = 20; - client.SendMessagesWaitReplies(10, server.GetActualListenAddr()); - - client.DataSize = 10000; - client.SendMessages(1, server.GetActualListenAddr()); - client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE); - } - + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TBusClientSessionConfig clientConfig; + clientConfig.MaxMessageSize = 100; + TExampleClient client(clientConfig); + + client.DataSize = 10; + client.SendMessagesWaitReplies(1, server.GetActualListenAddr()); + + client.DataSize = 1000; + client.SendMessages(1, server.GetActualListenAddr()); + client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE); + + client.DataSize = 20; + client.SendMessagesWaitReplies(10, server.GetActualListenAddr()); + + client.DataSize = 10000; + client.SendMessages(1, server.GetActualListenAddr()); + client.WaitForError(MESSAGE_MESSAGE_TOO_LARGE); + } + struct TServerForResponseTooLarge: public TExampleServer { - TTestSync TestSync; - - static TBusServerSessionConfig Config() { - TBusServerSessionConfig config; - config.MaxMessageSize = 100; - return config; - } - - TServerForResponseTooLarge() - : TExampleServer("TServerForResponseTooLarge", Config()) + TTestSync TestSync; + + static TBusServerSessionConfig Config() { + TBusServerSessionConfig config; + config.MaxMessageSize = 100; + return config; + } + + TServerForResponseTooLarge() + : TExampleServer("TServerForResponseTooLarge", Config()) { } - + ~TServerForResponseTooLarge() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnMessage(TOnMessageContext& mess) override { - TAutoPtr<TBusMessage> response; - - if (TestSync.Get() == 0) { - TestSync.CheckAndIncrement(0); - response.Reset(new TExampleResponse(&Proto.ResponseCount, 1000)); - } else { - TestSync.WaitForAndIncrement(3); - response.Reset(new TExampleResponse(&Proto.ResponseCount, 10)); - } - - mess.SendReplyMove(response); - } - + TAutoPtr<TBusMessage> response; + + if (TestSync.Get() == 0) { + TestSync.CheckAndIncrement(0); + response.Reset(new TExampleResponse(&Proto.ResponseCount, 1000)); + } else { + TestSync.WaitForAndIncrement(3); + response.Reset(new TExampleResponse(&Proto.ResponseCount, 10)); + } + + mess.SendReplyMove(response); + } + void OnError(TAutoPtr<TBusMessage>, EMessageStatus status) override { - TestSync.WaitForAndIncrement(1); - + TestSync.WaitForAndIncrement(1); + Y_VERIFY(status == MESSAGE_MESSAGE_TOO_LARGE, "status"); - } - }; - + } + }; + Y_UNIT_TEST(ServerResponseTooLarge) { - TObjectCountCheck objectCountCheck; - - TServerForResponseTooLarge server; - - TExampleClient client; - client.DataSize = 10; - - client.SendMessages(1, server.GetActualListenAddr()); - server.TestSync.WaitForAndIncrement(2); - client.ResetCounters(); - - client.SendMessages(1, server.GetActualListenAddr()); - - client.WorkDone.WaitI(); - - server.TestSync.CheckAndIncrement(4); - - UNIT_ASSERT_VALUES_EQUAL(1, client.Session->GetInFlight()); - } - + TObjectCountCheck objectCountCheck; + + TServerForResponseTooLarge server; + + TExampleClient client; + client.DataSize = 10; + + client.SendMessages(1, server.GetActualListenAddr()); + server.TestSync.WaitForAndIncrement(2); + client.ResetCounters(); + + client.SendMessages(1, server.GetActualListenAddr()); + + client.WorkDone.WaitI(); + + server.TestSync.CheckAndIncrement(4); + + UNIT_ASSERT_VALUES_EQUAL(1, client.Session->GetInFlight()); + } + struct TServerForRequestTooLarge: public TExampleServer { - TTestSync TestSync; - - static TBusServerSessionConfig Config() { - TBusServerSessionConfig config; - config.MaxMessageSize = 100; - return config; - } - - TServerForRequestTooLarge() - : TExampleServer("TServerForRequestTooLarge", Config()) + TTestSync TestSync; + + static TBusServerSessionConfig Config() { + TBusServerSessionConfig config; + config.MaxMessageSize = 100; + return config; + } + + TServerForRequestTooLarge() + : TExampleServer("TServerForRequestTooLarge", Config()) { } - + ~TServerForRequestTooLarge() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnMessage(TOnMessageContext& req) override { - unsigned n = TestSync.Get(); - if (n < 2) { - TestSync.CheckAndIncrement(n); - TAutoPtr<TExampleResponse> resp(new TExampleResponse(&Proto.ResponseCount, 10)); - req.SendReplyMove(resp); - } else { + unsigned n = TestSync.Get(); + if (n < 2) { + TestSync.CheckAndIncrement(n); + TAutoPtr<TExampleResponse> resp(new TExampleResponse(&Proto.ResponseCount, 10)); + req.SendReplyMove(resp); + } else { Y_FAIL("wrong"); - } - } - }; - + } + } + }; + Y_UNIT_TEST(ServerRequestTooLarge) { - TObjectCountCheck objectCountCheck; - - TServerForRequestTooLarge server; - - TExampleClient client; - client.DataSize = 10; - - client.SendMessagesWaitReplies(2, server.GetActualListenAddr()); - - server.TestSync.CheckAndIncrement(2); - - client.DataSize = 200; - client.SendMessages(1, server.GetActualListenAddr()); - // server closes connection, so MESSAGE_DELIVERY_FAILED is returned to client - client.WaitForError(MESSAGE_DELIVERY_FAILED); - } - + TObjectCountCheck objectCountCheck; + + TServerForRequestTooLarge server; + + TExampleClient client; + client.DataSize = 10; + + client.SendMessagesWaitReplies(2, server.GetActualListenAddr()); + + server.TestSync.CheckAndIncrement(2); + + client.DataSize = 200; + client.SendMessages(1, server.GetActualListenAddr()); + // server closes connection, so MESSAGE_DELIVERY_FAILED is returned to client + client.WaitForError(MESSAGE_DELIVERY_FAILED); + } + Y_UNIT_TEST(ClientResponseTooLarge) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - - server.DataSize = 10; - - TBusClientSessionConfig clientSessionConfig; - clientSessionConfig.MaxMessageSize = 100; - TExampleClient client(clientSessionConfig); - client.DataSize = 10; - - client.SendMessagesWaitReplies(3, server.GetActualListenAddr()); - - server.DataSize = 1000; - - client.SendMessages(1, server.GetActualListenAddr()); - client.WaitForError(MESSAGE_DELIVERY_FAILED); - } - + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + server.DataSize = 10; + + TBusClientSessionConfig clientSessionConfig; + clientSessionConfig.MaxMessageSize = 100; + TExampleClient client(clientSessionConfig); + client.DataSize = 10; + + client.SendMessagesWaitReplies(3, server.GetActualListenAddr()); + + server.DataSize = 1000; + + client.SendMessages(1, server.GetActualListenAddr()); + client.WaitForError(MESSAGE_DELIVERY_FAILED); + } + Y_UNIT_TEST(ServerUnknownMessage) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - TNetAddr serverAddr = server.GetActualListenAddr(); - - TExampleClient client; - - client.SendMessagesWaitReplies(2, serverAddr); - - TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount)); - req->GetHeader()->Type = 11; - client.Session->SendMessageAutoPtr(req, &serverAddr); - client.MessageCount = 1; - - client.WaitForError(MESSAGE_DELIVERY_FAILED); - } - + TObjectCountCheck objectCountCheck; + + TExampleServer server; + TNetAddr serverAddr = server.GetActualListenAddr(); + + TExampleClient client; + + client.SendMessagesWaitReplies(2, serverAddr); + + TAutoPtr<TBusMessage> req(new TExampleRequest(&client.Proto.RequestCount)); + req->GetHeader()->Type = 11; + client.Session->SendMessageAutoPtr(req, &serverAddr); + client.MessageCount = 1; + + client.WaitForError(MESSAGE_DELIVERY_FAILED); + } + Y_UNIT_TEST(ServerMessageReservedIds) { TObjectCountCheck objectCountCheck; @@ -642,18 +642,18 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { } Y_UNIT_TEST(TestGetInFlightForDestination) { - TObjectCountCheck objectCountCheck; - - TDelayReplyServer server; - - TExampleClient client; - - TNetAddr addr("localhost", server.Session->GetActualListenPort()); - - UNIT_ASSERT_VALUES_EQUAL(size_t(0), client.Session->GetInFlight(addr)); - - client.SendMessages(2, &addr); - + TObjectCountCheck objectCountCheck; + + TDelayReplyServer server; + + TExampleClient client; + + TNetAddr addr("localhost", server.Session->GetActualListenPort()); + + UNIT_ASSERT_VALUES_EQUAL(size_t(0), client.Session->GetInFlight(addr)); + + client.SendMessages(2, &addr); + for (size_t i = 0; i < 5; ++i) { // One MessageReceivedEvent indicates one message, we need to wait for two UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); @@ -662,98 +662,98 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { } } UNIT_ASSERT_VALUES_EQUAL(server.GetDelayedMessageCount(), 2); - - size_t inFlight = client.Session->GetInFlight(addr); - // 4 is for messagebus1 that adds inFlight counter twice for some reason - UNIT_ASSERT(inFlight == 2 || inFlight == 4); - + + size_t inFlight = client.Session->GetInFlight(addr); + // 4 is for messagebus1 that adds inFlight counter twice for some reason + UNIT_ASSERT(inFlight == 2 || inFlight == 4); + UNIT_ASSERT(server.CheckClientIsAlive()); - + server.ReplyToDelayedMessages(); - client.WaitReplies(); - } - + client.WaitReplies(); + } + struct TResetAfterSendOneWayErrorInCallbackClient: public TExampleClient { - TTestSync TestSync; - - static TBusClientSessionConfig SessionConfig() { - TBusClientSessionConfig config; - // 1 ms is not enough when test is running under valgrind - config.ConnectTimeout = 10; - config.SendTimeout = 10; - config.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); - return config; - } - - TResetAfterSendOneWayErrorInCallbackClient() - : TExampleClient(SessionConfig()) - { - } - + TTestSync TestSync; + + static TBusClientSessionConfig SessionConfig() { + TBusClientSessionConfig config; + // 1 ms is not enough when test is running under valgrind + config.ConnectTimeout = 10; + config.SendTimeout = 10; + config.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); + return config; + } + + TResetAfterSendOneWayErrorInCallbackClient() + : TExampleClient(SessionConfig()) + { + } + ~TResetAfterSendOneWayErrorInCallbackClient() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { - TestSync.WaitForAndIncrement(0); + TestSync.WaitForAndIncrement(0); Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "must be connection failed, got %s", ToString(status).data()); - mess.Destroy(); - TestSync.CheckAndIncrement(1); - } - }; - + mess.Destroy(); + TestSync.CheckAndIncrement(1); + } + }; + Y_UNIT_TEST(ResetAfterSendOneWayErrorInCallback) { - TObjectCountCheck objectCountCheck; - - TNetAddr noServerAddr("localhost", 17); - - TResetAfterSendOneWayErrorInCallbackClient client; - - EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr); - UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); - - client.TestSync.WaitForAndIncrement(2); - } - + TObjectCountCheck objectCountCheck; + + TNetAddr noServerAddr("localhost", 17); + + TResetAfterSendOneWayErrorInCallbackClient client; + + EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &noServerAddr); + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); + + client.TestSync.WaitForAndIncrement(2); + } + struct TResetAfterSendMessageOneWayDuringShutdown: public TExampleClient { - TTestSync TestSync; - + TTestSync TestSync; + ~TResetAfterSendMessageOneWayDuringShutdown() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnError(TAutoPtr<TBusMessage> message, EMessageStatus status) override { - TestSync.CheckAndIncrement(0); - + TestSync.CheckAndIncrement(0); + Y_VERIFY(status == MESSAGE_CONNECT_FAILED, "must be MESSAGE_CONNECT_FAILED, got %s", ToString(status).data()); - - // check reset is possible here - message->Reset(); - + + // check reset is possible here + message->Reset(); + // intentionally don't destroy the message // we will try to resend it Y_UNUSED(message.Release()); - TestSync.CheckAndIncrement(1); - } - }; - + TestSync.CheckAndIncrement(1); + } + }; + Y_UNIT_TEST(ResetAfterSendMessageOneWayDuringShutdown) { - TObjectCountCheck objectCountCheck; - - TNetAddr noServerAddr("localhost", 17); - - TResetAfterSendMessageOneWayDuringShutdown client; - + TObjectCountCheck objectCountCheck; + + TNetAddr noServerAddr("localhost", 17); + + TResetAfterSendMessageOneWayDuringShutdown client; + TExampleRequest* message = new TExampleRequest(&client.Proto.RequestCount); EMessageStatus ok = client.Session->SendMessageOneWay(message, &noServerAddr); - UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); - + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); + client.TestSync.WaitForAndIncrement(2); - client.Session->Shutdown(); - + client.Session->Shutdown(); + ok = client.Session->SendMessageOneWay(message); Y_VERIFY(ok == MESSAGE_SHUTDOWN, "must be shutdown when sending during shutdown, got %s", ToString(ok).data()); @@ -762,148 +762,148 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { client.TestSync.CheckAndIncrement(3); delete message; - } - + } + Y_UNIT_TEST(ResetAfterSendOneWayErrorInReturn) { - TObjectCountCheck objectCountCheck; - + TObjectCountCheck objectCountCheck; + TestNoServerImpl(17, true); - } - + } + struct TResetAfterSendOneWaySuccessClient: public TExampleClient { - TTestSync TestSync; - + TTestSync TestSync; + ~TResetAfterSendOneWaySuccessClient() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnMessageSentOneWay(TAutoPtr<TBusMessage> sent) override { - TestSync.WaitForAndIncrement(0); - sent->Reset(); - TestSync.CheckAndIncrement(1); - } - }; - + TestSync.WaitForAndIncrement(0); + sent->Reset(); + TestSync.CheckAndIncrement(1); + } + }; + Y_UNIT_TEST(ResetAfterSendOneWaySuccess) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - TNetAddr serverAddr = server.GetActualListenAddr(); - - TResetAfterSendOneWaySuccessClient client; - - EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &serverAddr); - UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); - // otherwize message might go to OnError(MESSAGE_SHUTDOWN) - server.WaitForOnMessageCount(1); - - client.TestSync.WaitForAndIncrement(2); - } - + TObjectCountCheck objectCountCheck; + + TExampleServer server; + TNetAddr serverAddr = server.GetActualListenAddr(); + + TResetAfterSendOneWaySuccessClient client; + + EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &serverAddr); + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); + // otherwize message might go to OnError(MESSAGE_SHUTDOWN) + server.WaitForOnMessageCount(1); + + client.TestSync.WaitForAndIncrement(2); + } + Y_UNIT_TEST(GetStatus) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - - TExampleClient client; - // make sure connected - client.SendMessagesWaitReplies(3, server.GetActualListenAddr()); - - server.Bus->GetStatus(); - server.Bus->GetStatus(); - server.Bus->GetStatus(); - - client.Bus->GetStatus(); - client.Bus->GetStatus(); - client.Bus->GetStatus(); - } - + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TExampleClient client; + // make sure connected + client.SendMessagesWaitReplies(3, server.GetActualListenAddr()); + + server.Bus->GetStatus(); + server.Bus->GetStatus(); + server.Bus->GetStatus(); + + client.Bus->GetStatus(); + client.Bus->GetStatus(); + client.Bus->GetStatus(); + } + Y_UNIT_TEST(BindOnRandomPort) { - TObjectCountCheck objectCountCheck; - - TBusServerSessionConfig serverConfig; - TExampleServer server; - - TExampleClient client; - TNetAddr addr(TNetAddr("127.0.0.1", server.Session->GetActualListenPort())); - client.SendMessagesWaitReplies(3, &addr); - } - + TObjectCountCheck objectCountCheck; + + TBusServerSessionConfig serverConfig; + TExampleServer server; + + TExampleClient client; + TNetAddr addr(TNetAddr("127.0.0.1", server.Session->GetActualListenPort())); + client.SendMessagesWaitReplies(3, &addr); + } + Y_UNIT_TEST(UnbindOnShutdown) { - TBusMessageQueuePtr queue(CreateMessageQueue()); - - TExampleProtocol proto; - TBusServerHandlerError handler; - TBusServerSessionPtr session = TBusServerSession::Create( + TBusMessageQueuePtr queue(CreateMessageQueue()); + + TExampleProtocol proto; + TBusServerHandlerError handler; + TBusServerSessionPtr session = TBusServerSession::Create( &proto, &handler, TBusServerSessionConfig(), queue); - - unsigned port = session->GetActualListenPort(); - UNIT_ASSERT(port > 0); - - session->Shutdown(); - - // fails is Shutdown() didn't unbind - THangingServer hangingServer(port); - } - + + unsigned port = session->GetActualListenPort(); + UNIT_ASSERT(port > 0); + + session->Shutdown(); + + // fails is Shutdown() didn't unbind + THangingServer hangingServer(port); + } + Y_UNIT_TEST(VersionNegotiation) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - - TSockAddrInet addr(IpFromString("127.0.0.1"), server.Session->GetActualListenPort()); - - TInetStreamSocket socket; - int r1 = socket.Connect(&addr); - UNIT_ASSERT(r1 >= 0); - - TStreamSocketOutput output(&socket); - - TBusHeader request; - Zero(request); - request.Size = sizeof(request); - request.SetVersionInternal(0xF); // max - output.Write(&request, sizeof(request)); - + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TSockAddrInet addr(IpFromString("127.0.0.1"), server.Session->GetActualListenPort()); + + TInetStreamSocket socket; + int r1 = socket.Connect(&addr); + UNIT_ASSERT(r1 >= 0); + + TStreamSocketOutput output(&socket); + + TBusHeader request; + Zero(request); + request.Size = sizeof(request); + request.SetVersionInternal(0xF); // max + output.Write(&request, sizeof(request)); + UNIT_ASSERT_VALUES_EQUAL(IsVersionNegotiation(request), true); - TStreamSocketInput input(&socket); - - TBusHeader response; - size_t pos = 0; - - while (pos < sizeof(response)) { + TStreamSocketInput input(&socket); + + TBusHeader response; + size_t pos = 0; + + while (pos < sizeof(response)) { size_t count = input.Read(((char*)&response) + pos, sizeof(response) - pos); - pos += count; - } - - UNIT_ASSERT_VALUES_EQUAL(sizeof(response), pos); - - UNIT_ASSERT_VALUES_EQUAL(YBUS_VERSION, response.GetVersionInternal()); - } - + pos += count; + } + + UNIT_ASSERT_VALUES_EQUAL(sizeof(response), pos); + + UNIT_ASSERT_VALUES_EQUAL(YBUS_VERSION, response.GetVersionInternal()); + } + struct TOnConnectionEventClient: public TExampleClient { - TTestSync Sync; - + TTestSync Sync; + ~TOnConnectionEventClient() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnClientConnectionEvent(const TClientConnectionEvent& event) override { - if (Sync.Get() > 2) { - // Test OnClientConnectionEvent_Disconnect is broken. - // Sometimes reconnect happens during server shutdown - // when acceptor connections is still alive, and - // server connection is already closed - return; - } - - if (event.GetType() == TClientConnectionEvent::CONNECTED) { - Sync.WaitForAndIncrement(0); - } else if (event.GetType() == TClientConnectionEvent::DISCONNECTED) { - Sync.WaitForAndIncrement(2); - } - } + if (Sync.Get() > 2) { + // Test OnClientConnectionEvent_Disconnect is broken. + // Sometimes reconnect happens during server shutdown + // when acceptor connections is still alive, and + // server connection is already closed + return; + } + + if (event.GetType() == TClientConnectionEvent::CONNECTED) { + Sync.WaitForAndIncrement(0); + } else if (event.GetType() == TClientConnectionEvent::DISCONNECTED) { + Sync.WaitForAndIncrement(2); + } + } void OnError(TAutoPtr<TBusMessage>, EMessageStatus) override { // We do not check for message errors in this test. @@ -911,8 +911,8 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { void OnMessageSentOneWay(TAutoPtr<TBusMessage>) override { } - }; - + }; + struct TOnConnectionEventServer: public TExampleServer { TOnConnectionEventServer() : TExampleServer("TOnConnectionEventServer") @@ -929,39 +929,39 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { }; Y_UNIT_TEST(OnClientConnectionEvent_Shutdown) { - TObjectCountCheck objectCountCheck; - + TObjectCountCheck objectCountCheck; + TOnConnectionEventServer server; - - TOnConnectionEventClient client; - - TNetAddr addr("127.0.0.1", server.Session->GetActualListenPort()); - + + TOnConnectionEventClient client; + + TNetAddr addr("127.0.0.1", server.Session->GetActualListenPort()); + client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr); - - client.Sync.WaitForAndIncrement(1); - - client.Session->Shutdown(); - - client.Sync.WaitForAndIncrement(3); - } - + + client.Sync.WaitForAndIncrement(1); + + client.Session->Shutdown(); + + client.Sync.WaitForAndIncrement(3); + } + Y_UNIT_TEST(OnClientConnectionEvent_Disconnect) { - TObjectCountCheck objectCountCheck; - + TObjectCountCheck objectCountCheck; + THolder<TOnConnectionEventServer> server(new TOnConnectionEventServer); - - TOnConnectionEventClient client; - TNetAddr addr("127.0.0.1", server->Session->GetActualListenPort()); - + + TOnConnectionEventClient client; + TNetAddr addr("127.0.0.1", server->Session->GetActualListenPort()); + client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &addr); - - client.Sync.WaitForAndIncrement(1); - - server.Destroy(); - - client.Sync.WaitForAndIncrement(3); - } + + client.Sync.WaitForAndIncrement(1); + + server.Destroy(); + + client.Sync.WaitForAndIncrement(3); + } struct TServerForQuotaWake: public TExampleServer { TSystemEvent GoOn; @@ -1042,7 +1042,7 @@ Y_UNIT_TEST_SUITE(TMessageBusTests) { start = now; // TODO: properly check that server is blocked - } else if (start + TDuration::MilliSeconds(100) < now) { + } else if (start + TDuration::MilliSeconds(100) < now) { break; } } diff --git a/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp index 9c1224ada9..4083cf3b7b 100644 --- a/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp +++ b/library/cpp/messagebus/test/ut/module_client_one_way_ut.cpp @@ -1,143 +1,143 @@ #include <library/cpp/testing/unittest/registar.h> - + #include <library/cpp/messagebus/test/helper/example.h> #include <library/cpp/messagebus/test/helper/message_handler_error.h> - + #include <library/cpp/messagebus/misc/test_sync.h> #include <library/cpp/messagebus/oldmodule/module.h> -using namespace NBus; -using namespace NBus::NTest; - +using namespace NBus; +using namespace NBus::NTest; + Y_UNIT_TEST_SUITE(ModuleClientOneWay) { - struct TTestServer: public TBusServerHandlerError { - TExampleProtocol Proto; - - TTestSync* const TestSync; - - TBusMessageQueuePtr Queue; - TBusServerSessionPtr ServerSession; - - TTestServer(TTestSync* testSync) - : TestSync(testSync) - { - Queue = CreateMessageQueue(); - ServerSession = TBusServerSession::Create(&Proto, this, TBusServerSessionConfig(), Queue); - } - + struct TTestServer: public TBusServerHandlerError { + TExampleProtocol Proto; + + TTestSync* const TestSync; + + TBusMessageQueuePtr Queue; + TBusServerSessionPtr ServerSession; + + TTestServer(TTestSync* testSync) + : TestSync(testSync) + { + Queue = CreateMessageQueue(); + ServerSession = TBusServerSession::Create(&Proto, this, TBusServerSessionConfig(), Queue); + } + void OnMessage(TOnMessageContext& context) override { - TestSync->WaitForAndIncrement(1); - context.ForgetRequest(); - } - }; - - struct TClientModule: public TBusModule { - TExampleProtocol Proto; - - TTestSync* const TestSync; - unsigned const Port; - - TBusClientSessionPtr ClientSession; - - TClientModule(TTestSync* testSync, unsigned port) - : TBusModule("m") - , TestSync(testSync) - , Port(port) + TestSync->WaitForAndIncrement(1); + context.ForgetRequest(); + } + }; + + struct TClientModule: public TBusModule { + TExampleProtocol Proto; + + TTestSync* const TestSync; + unsigned const Port; + + TBusClientSessionPtr ClientSession; + + TClientModule(TTestSync* testSync, unsigned port) + : TBusModule("m") + , TestSync(testSync) + , Port(port) { } - + TJobHandler Start(TBusJob* job, TBusMessage*) override { - TestSync->WaitForAndIncrement(0); - - job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", Port)); - - return &TClientModule::Sent; - } - - TJobHandler Sent(TBusJob* job, TBusMessage*) { - TestSync->WaitForAndIncrement(2); - job->Cancel(MESSAGE_DONT_ASK); + TestSync->WaitForAndIncrement(0); + + job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", Port)); + + return &TClientModule::Sent; + } + + TJobHandler Sent(TBusJob* job, TBusMessage*) { + TestSync->WaitForAndIncrement(2); + job->Cancel(MESSAGE_DONT_ASK); return nullptr; - } - + } + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { - ClientSession = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig()); + ClientSession = CreateDefaultSource(queue, &Proto, TBusServerSessionConfig()); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(Simple) { - TTestSync testSync; - - TTestServer server(&testSync); - - TBusMessageQueuePtr queue = CreateMessageQueue(); - TClientModule clientModule(&testSync, server.ServerSession->GetActualListenPort()); - - clientModule.CreatePrivateSessions(queue.Get()); - clientModule.StartInput(); - - clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount)); - - testSync.WaitForAndIncrement(3); - - clientModule.Shutdown(); - } - - struct TSendErrorModule: public TBusModule { - TExampleProtocol Proto; - - TTestSync* const TestSync; - - TBusClientSessionPtr ClientSession; - - TSendErrorModule(TTestSync* testSync) - : TBusModule("m") - , TestSync(testSync) + TTestSync testSync; + + TTestServer server(&testSync); + + TBusMessageQueuePtr queue = CreateMessageQueue(); + TClientModule clientModule(&testSync, server.ServerSession->GetActualListenPort()); + + clientModule.CreatePrivateSessions(queue.Get()); + clientModule.StartInput(); + + clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount)); + + testSync.WaitForAndIncrement(3); + + clientModule.Shutdown(); + } + + struct TSendErrorModule: public TBusModule { + TExampleProtocol Proto; + + TTestSync* const TestSync; + + TBusClientSessionPtr ClientSession; + + TSendErrorModule(TTestSync* testSync) + : TBusModule("m") + , TestSync(testSync) { } - + TJobHandler Start(TBusJob* job, TBusMessage*) override { - TestSync->WaitForAndIncrement(0); - - job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", 1)); - - return &TSendErrorModule::Sent; - } - - TJobHandler Sent(TBusJob* job, TBusMessage*) { - TestSync->WaitForAndIncrement(1); - job->Cancel(MESSAGE_DONT_ASK); + TestSync->WaitForAndIncrement(0); + + job->SendOneWayTo(new TExampleRequest(&Proto.RequestCount), ClientSession.Get(), TNetAddr("localhost", 1)); + + return &TSendErrorModule::Sent; + } + + TJobHandler Sent(TBusJob* job, TBusMessage*) { + TestSync->WaitForAndIncrement(1); + job->Cancel(MESSAGE_DONT_ASK); return nullptr; - } - + } + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { - TBusServerSessionConfig sessionConfig; - sessionConfig.ConnectTimeout = 1; - sessionConfig.SendTimeout = 1; - sessionConfig.TotalTimeout = 1; - sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); - ClientSession = CreateDefaultSource(queue, &Proto, sessionConfig); + TBusServerSessionConfig sessionConfig; + sessionConfig.ConnectTimeout = 1; + sessionConfig.SendTimeout = 1; + sessionConfig.TotalTimeout = 1; + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(1); + ClientSession = CreateDefaultSource(queue, &Proto, sessionConfig); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(SendError) { - TTestSync testSync; - - TBusQueueConfig queueConfig; - queueConfig.NumWorkers = 5; - - TBusMessageQueuePtr queue = CreateMessageQueue(queueConfig); - TSendErrorModule clientModule(&testSync); - - clientModule.CreatePrivateSessions(queue.Get()); - clientModule.StartInput(); - - clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount)); - - testSync.WaitForAndIncrement(2); - - clientModule.Shutdown(); - } -} + TTestSync testSync; + + TBusQueueConfig queueConfig; + queueConfig.NumWorkers = 5; + + TBusMessageQueuePtr queue = CreateMessageQueue(queueConfig); + TSendErrorModule clientModule(&testSync); + + clientModule.CreatePrivateSessions(queue.Get()); + clientModule.StartInput(); + + clientModule.StartJob(new TExampleRequest(&clientModule.Proto.StartCount)); + + testSync.WaitForAndIncrement(2); + + clientModule.Shutdown(); + } +} diff --git a/library/cpp/messagebus/test/ut/module_client_ut.cpp b/library/cpp/messagebus/test/ut/module_client_ut.cpp index faffdbb625..ebfe185cc6 100644 --- a/library/cpp/messagebus/test/ut/module_client_ut.cpp +++ b/library/cpp/messagebus/test/ut/module_client_ut.cpp @@ -1,368 +1,368 @@ #include <library/cpp/testing/unittest/registar.h> - + #include "count_down_latch.h" -#include "moduletest.h" - +#include "moduletest.h" + #include <library/cpp/messagebus/test/helper/example.h> #include <library/cpp/messagebus/test/helper/example_module.h> #include <library/cpp/messagebus/test/helper/object_count_check.h> #include <library/cpp/messagebus/test/helper/wait_for.h> - + #include <library/cpp/messagebus/misc/test_sync.h> #include <library/cpp/messagebus/oldmodule/module.h> #include <util/generic/cast.h> #include <util/system/event.h> -using namespace NBus; -using namespace NBus::NTest; - -// helper class that cleans TBusJob instance, so job's destructor can -// be completed without assertion fail. -struct TJobGuard { +using namespace NBus; +using namespace NBus::NTest; + +// helper class that cleans TBusJob instance, so job's destructor can +// be completed without assertion fail. +struct TJobGuard { public: TJobGuard(NBus::TBusJob* job) : Job(job) { } - + ~TJobGuard() { Job->ClearAllMessageStates(); } - + private: NBus::TBusJob* Job; -}; - +}; + class TMessageOk: public NBus::TBusMessage { public: TMessageOk() : NBus::TBusMessage(1) { } -}; - +}; + class TMessageError: public NBus::TBusMessage { public: TMessageError() : NBus::TBusMessage(2) { } -}; - +}; + Y_UNIT_TEST_SUITE(BusJobTest) { -#if 0 +#if 0 Y_UNIT_TEST(TestPending) { - TObjectCountCheck objectCountCheck; - - TDupDetectModule module; - TBusJob job(&module, new TBusMessage(0)); - // Guard will clear the job if unit-assertion fails. - TJobGuard g(&job); - - NBus::TBusMessage* msg = new NBus::TBusMessage(1); - job.Send(msg, NULL); - NBus::TJobStateVec pending; - job.GetPending(&pending); - - UNIT_ASSERT_VALUES_EQUAL(pending.size(), 1u); - UNIT_ASSERT_EQUAL(msg, pending[0].Message); - } - + TObjectCountCheck objectCountCheck; + + TDupDetectModule module; + TBusJob job(&module, new TBusMessage(0)); + // Guard will clear the job if unit-assertion fails. + TJobGuard g(&job); + + NBus::TBusMessage* msg = new NBus::TBusMessage(1); + job.Send(msg, NULL); + NBus::TJobStateVec pending; + job.GetPending(&pending); + + UNIT_ASSERT_VALUES_EQUAL(pending.size(), 1u); + UNIT_ASSERT_EQUAL(msg, pending[0].Message); + } + Y_UNIT_TEST(TestCallReplyHandler) { - TObjectCountCheck objectCountCheck; - - TDupDetectModule module; - NBus::TBusJob job(&module, new NBus::TBusMessage(0)); - // Guard will clear the job if unit-assertion fails. - TJobGuard g(&job); - - NBus::TBusMessage* msgOk = new TMessageOk; - NBus::TBusMessage* msgError = new TMessageError; - job.Send(msgOk, NULL); - job.Send(msgError, NULL); - - UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>(), NULL); - UNIT_ASSERT_EQUAL(job.GetState<TMessageError>(), NULL); - - NBus::TBusMessage* reply = new NBus::TBusMessage(0); - job.CallReplyHandler(NBus::MESSAGE_OK, msgOk, reply); - job.CallReplyHandler(NBus::MESSAGE_TIMEOUT, msgError, NULL); - - UNIT_ASSERT_UNEQUAL(job.GetState<TMessageOk>(), NULL); - UNIT_ASSERT_UNEQUAL(job.GetState<TMessageError>(), NULL); - - UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageError>(), NBus::MESSAGE_TIMEOUT); - UNIT_ASSERT_EQUAL(job.GetState<TMessageError>()->Status, NBus::MESSAGE_TIMEOUT); - - UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageOk>(), NBus::MESSAGE_OK); - UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>()->Reply, reply); - } -#endif - - struct TParallelOnReplyModule : TExampleClientModule { - TNetAddr ServerAddr; - - TCountDownLatch RepliesLatch; - - TParallelOnReplyModule(const TNetAddr& serverAddr) - : ServerAddr(serverAddr) - , RepliesLatch(2) + TObjectCountCheck objectCountCheck; + + TDupDetectModule module; + NBus::TBusJob job(&module, new NBus::TBusMessage(0)); + // Guard will clear the job if unit-assertion fails. + TJobGuard g(&job); + + NBus::TBusMessage* msgOk = new TMessageOk; + NBus::TBusMessage* msgError = new TMessageError; + job.Send(msgOk, NULL); + job.Send(msgError, NULL); + + UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>(), NULL); + UNIT_ASSERT_EQUAL(job.GetState<TMessageError>(), NULL); + + NBus::TBusMessage* reply = new NBus::TBusMessage(0); + job.CallReplyHandler(NBus::MESSAGE_OK, msgOk, reply); + job.CallReplyHandler(NBus::MESSAGE_TIMEOUT, msgError, NULL); + + UNIT_ASSERT_UNEQUAL(job.GetState<TMessageOk>(), NULL); + UNIT_ASSERT_UNEQUAL(job.GetState<TMessageError>(), NULL); + + UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageError>(), NBus::MESSAGE_TIMEOUT); + UNIT_ASSERT_EQUAL(job.GetState<TMessageError>()->Status, NBus::MESSAGE_TIMEOUT); + + UNIT_ASSERT_VALUES_EQUAL(job.GetStatus<TMessageOk>(), NBus::MESSAGE_OK); + UNIT_ASSERT_EQUAL(job.GetState<TMessageOk>()->Reply, reply); + } +#endif + + struct TParallelOnReplyModule : TExampleClientModule { + TNetAddr ServerAddr; + + TCountDownLatch RepliesLatch; + + TParallelOnReplyModule(const TNetAddr& serverAddr) + : ServerAddr(serverAddr) + , RepliesLatch(2) { } - + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); - job->Send(new TExampleRequest(&Proto.RequestCount), Source, TReplyHandler(&TParallelOnReplyModule::ReplyHandler), 0, ServerAddr); - return &TParallelOnReplyModule::HandleReplies; - } - - void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { + job->Send(new TExampleRequest(&Proto.RequestCount), Source, TReplyHandler(&TParallelOnReplyModule::ReplyHandler), 0, ServerAddr); + return &TParallelOnReplyModule::HandleReplies; + } + + void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { Y_UNUSED(mess); Y_UNUSED(reply); Y_VERIFY(status == MESSAGE_OK, "failed to get reply: %s", ToCString(status)); - } - - TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) { + } + + TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) { Y_UNUSED(mess); - RepliesLatch.CountDown(); + RepliesLatch.CountDown(); Y_VERIFY(RepliesLatch.Await(TDuration::Seconds(10)), "failed to get answers"); - job->Cancel(MESSAGE_UNKNOWN); + job->Cancel(MESSAGE_UNKNOWN); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(TestReplyHandlerCalledInParallel) { - TObjectCountCheck objectCountCheck; - - TExampleServer server; - - TExampleProtocol proto; - - TBusQueueConfig config; - config.NumWorkers = 5; - - TParallelOnReplyModule module(server.GetActualListenAddr()); - module.StartModule(); - - module.StartJob(new TExampleRequest(&proto.StartCount)); - module.StartJob(new TExampleRequest(&proto.StartCount)); - - UNIT_ASSERT(module.RepliesLatch.Await(TDuration::Seconds(10))); - - module.Shutdown(); - } - - struct TErrorHandlerCheckerModule : TExampleModule { - TNetAddr ServerAddr; - - TBusClientSessionPtr Source; - - TCountDownLatch GotReplyLatch; - - TBusMessage* SentMessage; - - TErrorHandlerCheckerModule() - : ServerAddr("localhost", 17) - , GotReplyLatch(2) - , SentMessage() + TObjectCountCheck objectCountCheck; + + TExampleServer server; + + TExampleProtocol proto; + + TBusQueueConfig config; + config.NumWorkers = 5; + + TParallelOnReplyModule module(server.GetActualListenAddr()); + module.StartModule(); + + module.StartJob(new TExampleRequest(&proto.StartCount)); + module.StartJob(new TExampleRequest(&proto.StartCount)); + + UNIT_ASSERT(module.RepliesLatch.Await(TDuration::Seconds(10))); + + module.Shutdown(); + } + + struct TErrorHandlerCheckerModule : TExampleModule { + TNetAddr ServerAddr; + + TBusClientSessionPtr Source; + + TCountDownLatch GotReplyLatch; + + TBusMessage* SentMessage; + + TErrorHandlerCheckerModule() + : ServerAddr("localhost", 17) + , GotReplyLatch(2) + , SentMessage() { } - + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); - TExampleRequest* message = new TExampleRequest(&Proto.RequestCount); - job->Send(message, Source, TReplyHandler(&TErrorHandlerCheckerModule::ReplyHandler), 0, ServerAddr); - SentMessage = message; - return &TErrorHandlerCheckerModule::HandleReplies; - } - - void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* req, TBusMessage* resp) { + TExampleRequest* message = new TExampleRequest(&Proto.RequestCount); + job->Send(message, Source, TReplyHandler(&TErrorHandlerCheckerModule::ReplyHandler), 0, ServerAddr); + SentMessage = message; + return &TErrorHandlerCheckerModule::HandleReplies; + } + + void ReplyHandler(TBusJob*, EMessageStatus status, TBusMessage* req, TBusMessage* resp) { Y_VERIFY(status == MESSAGE_CONNECT_FAILED || status == MESSAGE_TIMEOUT, "got wrong status: %s", ToString(status).data()); Y_VERIFY(req == SentMessage, "checking request"); Y_VERIFY(resp == nullptr, "checking response"); - GotReplyLatch.CountDown(); - } - - TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) { + GotReplyLatch.CountDown(); + } + + TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) { Y_UNUSED(mess); - job->Cancel(MESSAGE_UNKNOWN); - GotReplyLatch.CountDown(); + job->Cancel(MESSAGE_UNKNOWN); + GotReplyLatch.CountDown(); return nullptr; - } - + } + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { - TBusClientSessionConfig sessionConfig; - sessionConfig.SendTimeout = 1; // TODO: allow 0 - sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10); - Source = CreateDefaultSource(queue, &Proto, sessionConfig); - Source->RegisterService("localhost"); + TBusClientSessionConfig sessionConfig; + sessionConfig.SendTimeout = 1; // TODO: allow 0 + sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10); + Source = CreateDefaultSource(queue, &Proto, sessionConfig); + Source->RegisterService("localhost"); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(ErrorHandler) { - TExampleProtocol proto; - - TBusQueueConfig config; - config.NumWorkers = 5; - - TErrorHandlerCheckerModule module; - - TBusModuleConfig moduleConfig; - moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10); - module.SetConfig(moduleConfig); - - module.StartModule(); - - module.StartJob(new TExampleRequest(&proto.StartCount)); - - module.GotReplyLatch.Await(); - - module.Shutdown(); - } - + TExampleProtocol proto; + + TBusQueueConfig config; + config.NumWorkers = 5; + + TErrorHandlerCheckerModule module; + + TBusModuleConfig moduleConfig; + moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10); + module.SetConfig(moduleConfig); + + module.StartModule(); + + module.StartJob(new TExampleRequest(&proto.StartCount)); + + module.GotReplyLatch.Await(); + + module.Shutdown(); + } + struct TSlowReplyServer: public TBusServerHandlerError { - TTestSync* const TestSync; - TBusMessageQueuePtr Bus; - TBusServerSessionPtr ServerSession; - TExampleProtocol Proto; - - TAtomic OnMessageCount; - - TSlowReplyServer(TTestSync* testSync) - : TestSync(testSync) - , OnMessageCount(0) - { - Bus = CreateMessageQueue("TSlowReplyServer"); - TBusServerSessionConfig sessionConfig; - ServerSession = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); - } - + TTestSync* const TestSync; + TBusMessageQueuePtr Bus; + TBusServerSessionPtr ServerSession; + TExampleProtocol Proto; + + TAtomic OnMessageCount; + + TSlowReplyServer(TTestSync* testSync) + : TestSync(testSync) + , OnMessageCount(0) + { + Bus = CreateMessageQueue("TSlowReplyServer"); + TBusServerSessionConfig sessionConfig; + ServerSession = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); + } + void OnMessage(TOnMessageContext& req) override { - if (AtomicIncrement(OnMessageCount) == 1) { - TestSync->WaitForAndIncrement(0); - } - TAutoPtr<TBusMessage> response(new TExampleResponse(&Proto.ResponseCount)); - req.SendReplyMove(response); - } - }; - + if (AtomicIncrement(OnMessageCount) == 1) { + TestSync->WaitForAndIncrement(0); + } + TAutoPtr<TBusMessage> response(new TExampleResponse(&Proto.ResponseCount)); + req.SendReplyMove(response); + } + }; + struct TModuleThatSendsReplyEarly: public TExampleClientModule { - TTestSync* const TestSync; - const unsigned ServerPort; - - TBusServerSessionPtr ServerSession; - TAtomic ReplyCount; - - TModuleThatSendsReplyEarly(TTestSync* testSync, unsigned serverPort) - : TestSync(testSync) - , ServerPort(serverPort) + TTestSync* const TestSync; + const unsigned ServerPort; + + TBusServerSessionPtr ServerSession; + TAtomic ReplyCount; + + TModuleThatSendsReplyEarly(TTestSync* testSync, unsigned serverPort) + : TestSync(testSync) + , ServerPort(serverPort) , ServerSession(nullptr) - , ReplyCount(0) + , ReplyCount(0) { } - + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); - for (unsigned i = 0; i < 2; ++i) { - job->Send( - new TExampleRequest(&Proto.RequestCount), - Source, - TReplyHandler(&TModuleThatSendsReplyEarly::ReplyHandler), - 0, - TNetAddr("127.0.0.1", ServerPort)); - } - return &TModuleThatSendsReplyEarly::HandleReplies; - } - - void ReplyHandler(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { + for (unsigned i = 0; i < 2; ++i) { + job->Send( + new TExampleRequest(&Proto.RequestCount), + Source, + TReplyHandler(&TModuleThatSendsReplyEarly::ReplyHandler), + 0, + TNetAddr("127.0.0.1", ServerPort)); + } + return &TModuleThatSendsReplyEarly::HandleReplies; + } + + void ReplyHandler(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply) { Y_UNUSED(mess); Y_UNUSED(reply); Y_VERIFY(status == MESSAGE_OK, "failed to get reply"); - if (AtomicIncrement(ReplyCount) == 1) { - TestSync->WaitForAndIncrement(1); - job->SendReply(new TExampleResponse(&Proto.ResponseCount)); - } else { - TestSync->WaitForAndIncrement(3); - } - } - - TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) { + if (AtomicIncrement(ReplyCount) == 1) { + TestSync->WaitForAndIncrement(1); + job->SendReply(new TExampleResponse(&Proto.ResponseCount)); + } else { + TestSync->WaitForAndIncrement(3); + } + } + + TJobHandler HandleReplies(TBusJob* job, TBusMessage* mess) { Y_UNUSED(mess); - job->Cancel(MESSAGE_UNKNOWN); + job->Cancel(MESSAGE_UNKNOWN); return nullptr; - } - + } + TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) override { - TExampleClientModule::CreateExtSession(queue); - TBusServerSessionConfig sessionConfig; - return ServerSession = CreateDefaultDestination(queue, &Proto, sessionConfig); - } - }; - + TExampleClientModule::CreateExtSession(queue); + TBusServerSessionConfig sessionConfig; + return ServerSession = CreateDefaultDestination(queue, &Proto, sessionConfig); + } + }; + Y_UNIT_TEST(SendReplyCalledBeforeAllRepliesReceived) { - TTestSync testSync; - - TSlowReplyServer slowReplyServer(&testSync); - - TModuleThatSendsReplyEarly module(&testSync, slowReplyServer.ServerSession->GetActualListenPort()); - module.StartModule(); - - TExampleClient client; - TNetAddr addr("127.0.0.1", module.ServerSession->GetActualListenPort()); - client.SendMessagesWaitReplies(1, &addr); - - testSync.WaitForAndIncrement(2); - - module.Shutdown(); - } - + TTestSync testSync; + + TSlowReplyServer slowReplyServer(&testSync); + + TModuleThatSendsReplyEarly module(&testSync, slowReplyServer.ServerSession->GetActualListenPort()); + module.StartModule(); + + TExampleClient client; + TNetAddr addr("127.0.0.1", module.ServerSession->GetActualListenPort()); + client.SendMessagesWaitReplies(1, &addr); + + testSync.WaitForAndIncrement(2); + + module.Shutdown(); + } + struct TShutdownCalledBeforeReplyReceivedModule: public TExampleClientModule { - unsigned ServerPort; - - TTestSync TestSync; - - TShutdownCalledBeforeReplyReceivedModule(unsigned serverPort) - : ServerPort(serverPort) + unsigned ServerPort; + + TTestSync TestSync; + + TShutdownCalledBeforeReplyReceivedModule(unsigned serverPort) + : ServerPort(serverPort) { } - + TJobHandler Start(TBusJob* job, TBusMessage*) override { - TestSync.CheckAndIncrement(0); - - job->Send(new TExampleRequest(&Proto.RequestCount), Source, + TestSync.CheckAndIncrement(0); + + job->Send(new TExampleRequest(&Proto.RequestCount), Source, TReplyHandler(&TShutdownCalledBeforeReplyReceivedModule::HandleReply), 0, TNetAddr("localhost", ServerPort)); - return &TShutdownCalledBeforeReplyReceivedModule::End; - } - - void HandleReply(TBusJob*, EMessageStatus status, TBusMessage*, TBusMessage*) { + return &TShutdownCalledBeforeReplyReceivedModule::End; + } + + void HandleReply(TBusJob*, EMessageStatus status, TBusMessage*, TBusMessage*) { Y_VERIFY(status == MESSAGE_SHUTDOWN, "got %s", ToCString(status)); - TestSync.CheckAndIncrement(1); - } - - TJobHandler End(TBusJob* job, TBusMessage*) { - TestSync.CheckAndIncrement(2); - job->Cancel(MESSAGE_SHUTDOWN); + TestSync.CheckAndIncrement(1); + } + + TJobHandler End(TBusJob* job, TBusMessage*) { + TestSync.CheckAndIncrement(2); + job->Cancel(MESSAGE_SHUTDOWN); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(ShutdownCalledBeforeReplyReceived) { - TExampleServer server; - server.ForgetRequest = true; - - TShutdownCalledBeforeReplyReceivedModule module(server.GetActualListenPort()); - - module.StartModule(); - - module.StartJob(new TExampleRequest(&module.Proto.RequestCount)); - - server.TestSync.WaitFor(1); - - module.Shutdown(); - - module.TestSync.CheckAndIncrement(3); - } -} + TExampleServer server; + server.ForgetRequest = true; + + TShutdownCalledBeforeReplyReceivedModule module(server.GetActualListenPort()); + + module.StartModule(); + + module.StartJob(new TExampleRequest(&module.Proto.RequestCount)); + + server.TestSync.WaitFor(1); + + module.Shutdown(); + + module.TestSync.CheckAndIncrement(3); + } +} diff --git a/library/cpp/messagebus/test/ut/module_server_ut.cpp b/library/cpp/messagebus/test/ut/module_server_ut.cpp index 38f3fcc4ed..88fe1dd9b6 100644 --- a/library/cpp/messagebus/test/ut/module_server_ut.cpp +++ b/library/cpp/messagebus/test/ut/module_server_ut.cpp @@ -1,8 +1,8 @@ #include <library/cpp/testing/unittest/registar.h> - + #include "count_down_latch.h" -#include "moduletest.h" - +#include "moduletest.h" + #include <library/cpp/messagebus/test/helper/example.h> #include <library/cpp/messagebus/test/helper/example_module.h> #include <library/cpp/messagebus/test/helper/object_count_check.h> @@ -12,108 +12,108 @@ #include <util/generic/cast.h> -using namespace NBus; -using namespace NBus::NTest; - +using namespace NBus; +using namespace NBus::NTest; + Y_UNIT_TEST_SUITE(ModuleServerTests) { Y_UNIT_TEST(TestModule) { - TObjectCountCheck objectCountCheck; - - /// create or get instance of message queue, need one per application - TBusMessageQueuePtr bus(CreateMessageQueue()); + TObjectCountCheck objectCountCheck; + + /// create or get instance of message queue, need one per application + TBusMessageQueuePtr bus(CreateMessageQueue()); THostInfoHandler hostHandler(bus.Get()); - TDupDetectModule module(hostHandler.GetActualListenAddr()); - bool success; - success = module.Init(bus.Get()); - UNIT_ASSERT_C(success, "failed to initialize dupdetect module"); - - success = module.StartInput(); - UNIT_ASSERT_C(success, "failed to start dupdetect module"); - - TDupDetectHandler dupHandler(module.ListenAddr, bus.Get()); - dupHandler.Work(); - - UNIT_WAIT_FOR(dupHandler.NumMessages == dupHandler.NumReplies); - - module.Shutdown(); - dupHandler.DupDetect->Shutdown(); - } - + TDupDetectModule module(hostHandler.GetActualListenAddr()); + bool success; + success = module.Init(bus.Get()); + UNIT_ASSERT_C(success, "failed to initialize dupdetect module"); + + success = module.StartInput(); + UNIT_ASSERT_C(success, "failed to start dupdetect module"); + + TDupDetectHandler dupHandler(module.ListenAddr, bus.Get()); + dupHandler.Work(); + + UNIT_WAIT_FOR(dupHandler.NumMessages == dupHandler.NumReplies); + + module.Shutdown(); + dupHandler.DupDetect->Shutdown(); + } + struct TParallelOnMessageModule: public TExampleServerModule { - TCountDownLatch WaitTwoRequestsLatch; - - TParallelOnMessageModule() - : WaitTwoRequestsLatch(2) + TCountDownLatch WaitTwoRequestsLatch; + + TParallelOnMessageModule() + : WaitTwoRequestsLatch(2) { } - + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { - WaitTwoRequestsLatch.CountDown(); + WaitTwoRequestsLatch.CountDown(); Y_VERIFY(WaitTwoRequestsLatch.Await(TDuration::Seconds(5)), "oops"); - - VerifyDynamicCast<TExampleRequest*>(mess); - - job->SendReply(new TExampleResponse(&Proto.ResponseCount)); + + VerifyDynamicCast<TExampleRequest*>(mess); + + job->SendReply(new TExampleResponse(&Proto.ResponseCount)); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(TestOnMessageHandlerCalledInParallel) { - TObjectCountCheck objectCountCheck; - - TBusQueueConfig config; - config.NumWorkers = 5; - - TParallelOnMessageModule module; - module.StartModule(); - - TExampleClient client; - - client.SendMessagesWaitReplies(2, module.ServerAddr); - - module.Shutdown(); - } - - struct TDelayReplyServer: public TExampleServerModule { + TObjectCountCheck objectCountCheck; + + TBusQueueConfig config; + config.NumWorkers = 5; + + TParallelOnMessageModule module; + module.StartModule(); + + TExampleClient client; + + client.SendMessagesWaitReplies(2, module.ServerAddr); + + module.Shutdown(); + } + + struct TDelayReplyServer: public TExampleServerModule { TSystemEvent MessageReceivedEvent; TSystemEvent ClientDiedEvent; - + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); - - MessageReceivedEvent.Signal(); - + + MessageReceivedEvent.Signal(); + Y_VERIFY(ClientDiedEvent.WaitT(TDuration::Seconds(5)), "oops"); - - job->SendReply(new TExampleResponse(&Proto.ResponseCount)); + + job->SendReply(new TExampleResponse(&Proto.ResponseCount)); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(TestReplyCalledAfterClientDisconnected) { - TObjectCountCheck objectCountCheck; - - TBusQueueConfig config; - config.NumWorkers = 5; - - TDelayReplyServer server; - server.StartModule(); - - THolder<TExampleClient> client(new TExampleClient); - - client->SendMessages(1, server.ServerAddr); - - UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); - - UNIT_ASSERT_VALUES_EQUAL(1, server.GetModuleSessionInFlight()); - - client.Destroy(); - - server.ClientDiedEvent.Signal(); - - // wait until all server message are delivered - UNIT_WAIT_FOR(0 == server.GetModuleSessionInFlight()); - - server.Shutdown(); - } -} + TObjectCountCheck objectCountCheck; + + TBusQueueConfig config; + config.NumWorkers = 5; + + TDelayReplyServer server; + server.StartModule(); + + THolder<TExampleClient> client(new TExampleClient); + + client->SendMessages(1, server.ServerAddr); + + UNIT_ASSERT(server.MessageReceivedEvent.WaitT(TDuration::Seconds(5))); + + UNIT_ASSERT_VALUES_EQUAL(1, server.GetModuleSessionInFlight()); + + client.Destroy(); + + server.ClientDiedEvent.Signal(); + + // wait until all server message are delivered + UNIT_WAIT_FOR(0 == server.GetModuleSessionInFlight()); + + server.Shutdown(); + } +} diff --git a/library/cpp/messagebus/test/ut/moduletest.h b/library/cpp/messagebus/test/ut/moduletest.h index 0f9834d9ff..d5da72c0cb 100644 --- a/library/cpp/messagebus/test/ut/moduletest.h +++ b/library/cpp/messagebus/test/ut/moduletest.h @@ -7,10 +7,10 @@ #include <library/cpp/messagebus/test/helper/alloc_counter.h> #include <library/cpp/messagebus/test/helper/example.h> #include <library/cpp/messagebus/test/helper/message_handler_error.h> - + #include <library/cpp/messagebus/ybus.h> #include <library/cpp/messagebus/oldmodule/module.h> - + namespace NBus { namespace NTest { using namespace std; @@ -71,7 +71,7 @@ namespace NBus { /// deserialized TBusData into new instance of the message TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override { Y_UNUSED(payload); - + if (messageType == TYPE_HOSTINFOREQUEST) { return new THostInfoMessage(MESSAGE_CREATE_UNINITIALIZED); } else if (messageType == TYPE_HOSTINFORESPONSE) { @@ -100,7 +100,7 @@ namespace NBus { mess.SendReplyMove(reply); } - + TNetAddr GetActualListenAddr() { return TNetAddr("localhost", Session->GetActualListenPort()); } @@ -110,7 +110,7 @@ namespace NBus { /// \brief DupDetect handler (should convert it to module too) struct TDupDetectHandler: public TBusClientHandlerError { TNetAddr ServerAddr; - + TBusClientSessionPtr DupDetect; TBusClientSessionConfig DupDetectConfig; TExampleProtocol DupDetectProto; @@ -147,7 +147,7 @@ namespace NBus { struct TDupDetectModule: public TBusModule { TNetAddr HostInfoAddr; - + TBusClientSessionPtr HostInfoClientSession; TBusClientSessionConfig HostInfoConfig; THostInfoProtocol HostInfoProto; @@ -162,7 +162,7 @@ namespace NBus { , HostInfoAddr(hostInfoAddr) { } - + bool Init(TBusMessageQueue* queue) { HostInfoClientSession = CreateDefaultSource(*queue, &HostInfoProto, HostInfoConfig); HostInfoClientSession->RegisterService("localhost"); @@ -174,7 +174,7 @@ namespace NBus { TBusServerSessionPtr session = CreateDefaultDestination(queue, &DupDetectProto, DupDetectConfig); ListenAddr = TNetAddr("localhost", session->GetActualListenPort()); - + return session; } diff --git a/library/cpp/messagebus/test/ut/one_way_ut.cpp b/library/cpp/messagebus/test/ut/one_way_ut.cpp index 7a907cc620..9c21227e2b 100644 --- a/library/cpp/messagebus/test/ut/one_way_ut.cpp +++ b/library/cpp/messagebus/test/ut/one_way_ut.cpp @@ -32,33 +32,33 @@ #include <library/cpp/messagebus/test/helper/wait_for.h> #include <library/cpp/messagebus/ybus.h> - + using namespace std; -using namespace NBus; -using namespace NBus::NPrivate; -using namespace NBus::NTest; +using namespace NBus; +using namespace NBus::NPrivate; +using namespace NBus::NTest; //////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////// /// \brief Reply-less client and handler struct NullClient : TBusClientHandlerError { - TNetAddr ServerAddr; - - TBusMessageQueuePtr Queue; - TBusClientSessionPtr Session; - TExampleProtocol Proto; + TNetAddr ServerAddr; + + TBusMessageQueuePtr Queue; + TBusClientSessionPtr Session; + TExampleProtocol Proto; /// constructor creates instances of protocol and session - NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig()) - : ServerAddr(serverAddr) - { - UNIT_ASSERT(serverAddr.GetPort() > 0); + NullClient(const TNetAddr& serverAddr, const TBusClientSessionConfig& sessionConfig = TBusClientSessionConfig()) + : ServerAddr(serverAddr) + { + UNIT_ASSERT(serverAddr.GetPort() > 0); /// create or get instance of message queue, need one per application Queue = CreateMessageQueue(); /// register source/client session - Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue); + Session = TBusClientSession::Create(&Proto, this, sessionConfig, Queue); /// register service, announce to clients via LocatorService Session->RegisterService("localhost"); @@ -74,8 +74,8 @@ struct NullClient : TBusClientHandlerError { for (int i = 0; i < batch; i++) { TExampleRequest* mess = new TExampleRequest(&Proto.RequestCount); - mess->Data = "TADA"; - Session->SendMessageOneWay(mess, &ServerAddr); + mess->Data = "TADA"; + Session->SendMessageOneWay(mess, &ServerAddr); } } @@ -85,12 +85,12 @@ struct NullClient : TBusClientHandlerError { ///////////////////////////////////////////////////////////////////// /// \brief Reply-less server and handler -class NullServer: public TBusServerHandlerError { +class NullServer: public TBusServerHandlerError { public: /// session object to maintian - TBusMessageQueuePtr Queue; - TBusServerSessionPtr Session; - TExampleProtocol Proto; + TBusMessageQueuePtr Queue; + TBusServerSessionPtr Session; + TExampleProtocol Proto; public: TAtomic NumMessages; @@ -102,8 +102,8 @@ public: Queue = CreateMessageQueue(); /// register destination session - TBusServerSessionConfig sessionConfig; - Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue); + TBusServerSessionConfig sessionConfig; + Session = TBusServerSession::Create(&Proto, this, sessionConfig, Queue); } ~NullServer() override { @@ -117,7 +117,7 @@ public: Y_ASSERT(fmess->Data == "TADA"); /// tell session to forget this message and never expect any reply - mess.ForgetRequest(); + mess.ForgetRequest(); AtomicIncrement(NumMessages); } @@ -131,125 +131,125 @@ public: Y_UNIT_TEST_SUITE(TMessageBusTests_OneWay) { Y_UNIT_TEST(Simple) { - TObjectCountCheck objectCountCheck; - - NullServer server; - NullClient client(TNetAddr("localhost", server.Session->GetActualListenPort())); - - client.Work(); - - // wait until all client message are delivered + TObjectCountCheck objectCountCheck; + + NullServer server; + NullClient client(TNetAddr("localhost", server.Session->GetActualListenPort())); + + client.Work(); + + // wait until all client message are delivered UNIT_WAIT_FOR(AtomicGet(server.NumMessages) == 10); - - // assert correct number of messages + + // assert correct number of messages UNIT_ASSERT_VALUES_EQUAL(AtomicGet(server.NumMessages), 10); - UNIT_ASSERT_VALUES_EQUAL(server.Session->GetInFlight(), 0); - UNIT_ASSERT_VALUES_EQUAL(client.Session->GetInFlight(), 0); - } - - struct TMessageTooLargeClient: public NullClient { + UNIT_ASSERT_VALUES_EQUAL(server.Session->GetInFlight(), 0); + UNIT_ASSERT_VALUES_EQUAL(client.Session->GetInFlight(), 0); + } + + struct TMessageTooLargeClient: public NullClient { TSystemEvent GotTooLarge; - - TBusClientSessionConfig Config() { - TBusClientSessionConfig r; - r.MaxMessageSize = 1; - return r; - } - - TMessageTooLargeClient(unsigned port) - : NullClient(TNetAddr("localhost", port), Config()) + + TBusClientSessionConfig Config() { + TBusClientSessionConfig r; + r.MaxMessageSize = 1; + return r; + } + + TMessageTooLargeClient(unsigned port) + : NullClient(TNetAddr("localhost", port), Config()) { } - + ~TMessageTooLargeClient() override { - Session->Shutdown(); - } - + Session->Shutdown(); + } + void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { Y_UNUSED(mess); - + Y_VERIFY(status == MESSAGE_MESSAGE_TOO_LARGE, "wrong status: %s", ToCString(status)); - - GotTooLarge.Signal(); - } - }; - + + GotTooLarge.Signal(); + } + }; + Y_UNIT_TEST(MessageTooLargeOnClient) { - TObjectCountCheck objectCountCheck; - - NullServer server; - - TMessageTooLargeClient client(server.Session->GetActualListenPort()); - - EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); - UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); - - client.GotTooLarge.WaitI(); - } - + TObjectCountCheck objectCountCheck; + + NullServer server; + + TMessageTooLargeClient client(server.Session->GetActualListenPort()); + + EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); + UNIT_ASSERT_VALUES_EQUAL(MESSAGE_OK, ok); + + client.GotTooLarge.WaitI(); + } + struct TCheckTimeoutClient: public NullClient { ~TCheckTimeoutClient() override { - Session->Shutdown(); - } - - static TBusClientSessionConfig SessionConfig() { - TBusClientSessionConfig sessionConfig; - sessionConfig.SendTimeout = 1; - sessionConfig.ConnectTimeout = 1; + Session->Shutdown(); + } + + static TBusClientSessionConfig SessionConfig() { + TBusClientSessionConfig sessionConfig; + sessionConfig.SendTimeout = 1; + sessionConfig.ConnectTimeout = 1; sessionConfig.Secret.TimeoutPeriod = TDuration::MilliSeconds(10); - return sessionConfig; - } - + return sessionConfig; + } + TCheckTimeoutClient(const TNetAddr& serverAddr) : NullClient(serverAddr, SessionConfig()) { } - + TSystemEvent GotError; - - /// message that could not be delivered + + /// message that could not be delivered void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) override { Y_UNUSED(mess); Y_UNUSED(status); // TODO: check status - - GotError.Signal(); - } - }; - + + GotError.Signal(); + } + }; + Y_UNIT_TEST(SendTimeout_Callback_NoServer) { - TObjectCountCheck objectCountCheck; - - TCheckTimeoutClient client(TNetAddr("localhost", 17)); - - EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); - UNIT_ASSERT_EQUAL(ok, MESSAGE_OK); - - client.GotError.WaitI(); - } - + TObjectCountCheck objectCountCheck; + + TCheckTimeoutClient client(TNetAddr("localhost", 17)); + + EMessageStatus ok = client.Session->SendMessageOneWay(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); + UNIT_ASSERT_EQUAL(ok, MESSAGE_OK); + + client.GotError.WaitI(); + } + Y_UNIT_TEST(SendTimeout_Callback_HangingServer) { - THangingServer server; - - TObjectCountCheck objectCountCheck; - - TCheckTimeoutClient client(TNetAddr("localhost", server.GetPort())); - - bool first = true; - for (;;) { - EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); - if (ok == MESSAGE_BUSY) { - UNIT_ASSERT(!first); - break; - } - UNIT_ASSERT_VALUES_EQUAL(ok, MESSAGE_OK); - first = false; - } - + THangingServer server; + + TObjectCountCheck objectCountCheck; + + TCheckTimeoutClient client(TNetAddr("localhost", server.GetPort())); + + bool first = true; + for (;;) { + EMessageStatus ok = client.Session->SendMessageOneWayMove(new TExampleRequest(&client.Proto.RequestCount), &client.ServerAddr); + if (ok == MESSAGE_BUSY) { + UNIT_ASSERT(!first); + break; + } + UNIT_ASSERT_VALUES_EQUAL(ok, MESSAGE_OK); + first = false; + } + // BUGBUG: The test is buggy: the client might not get any error when sending one-way messages. // All the messages that the client has sent before he gets first MESSAGE_BUSY error might get // serailized and written to the socket buffer, so the write queue gets drained and there are // no messages to timeout when periodic timeout check happens. - client.GotError.WaitI(); - } -} + client.GotError.WaitI(); + } +} diff --git a/library/cpp/messagebus/test/ut/starter_ut.cpp b/library/cpp/messagebus/test/ut/starter_ut.cpp index ebb628ab28..dd4d3aaa5e 100644 --- a/library/cpp/messagebus/test/ut/starter_ut.cpp +++ b/library/cpp/messagebus/test/ut/starter_ut.cpp @@ -1,140 +1,140 @@ #include <library/cpp/testing/unittest/registar.h> - + #include <library/cpp/messagebus/test/helper/example_module.h> #include <library/cpp/messagebus/test/helper/object_count_check.h> #include <library/cpp/messagebus/test/helper/wait_for.h> - -using namespace NBus; -using namespace NBus::NTest; - + +using namespace NBus; +using namespace NBus::NTest; + Y_UNIT_TEST_SUITE(TBusStarterTest) { struct TStartJobTestModule: public TExampleModule { - using TBusModule::CreateDefaultStarter; - - TAtomic StartCount; - - TStartJobTestModule() - : StartCount(0) - { - } - + using TBusModule::CreateDefaultStarter; + + TAtomic StartCount; + + TStartJobTestModule() + : StartCount(0) + { + } + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); - AtomicIncrement(StartCount); - job->Sleep(10); - return &TStartJobTestModule::End; - } - - TJobHandler End(TBusJob* job, TBusMessage* mess) { + AtomicIncrement(StartCount); + job->Sleep(10); + return &TStartJobTestModule::End; + } + + TJobHandler End(TBusJob* job, TBusMessage* mess) { Y_UNUSED(mess); - AtomicIncrement(StartCount); - job->Cancel(MESSAGE_UNKNOWN); + AtomicIncrement(StartCount); + job->Cancel(MESSAGE_UNKNOWN); return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(Test) { - TObjectCountCheck objectCountCheck; - - TBusMessageQueuePtr bus(CreateMessageQueue()); - - TStartJobTestModule module; - - //module.StartModule(); - module.CreatePrivateSessions(bus.Get()); - module.StartInput(); - - TBusSessionConfig config; - config.SendTimeout = 10; - - module.CreateDefaultStarter(*bus, config); - - UNIT_WAIT_FOR(AtomicGet(module.StartCount) >= 3); - - module.Shutdown(); - bus->Stop(); - } - + TObjectCountCheck objectCountCheck; + + TBusMessageQueuePtr bus(CreateMessageQueue()); + + TStartJobTestModule module; + + //module.StartModule(); + module.CreatePrivateSessions(bus.Get()); + module.StartInput(); + + TBusSessionConfig config; + config.SendTimeout = 10; + + module.CreateDefaultStarter(*bus, config); + + UNIT_WAIT_FOR(AtomicGet(module.StartCount) >= 3); + + module.Shutdown(); + bus->Stop(); + } + Y_UNIT_TEST(TestModuleStartJob) { - TObjectCountCheck objectCountCheck; - - TExampleProtocol proto; - - TStartJobTestModule module; - - TBusModuleConfig moduleConfig; - moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10); - module.SetConfig(moduleConfig); - - module.StartModule(); - - module.StartJob(new TExampleRequest(&proto.RequestCount)); - - UNIT_WAIT_FOR(AtomicGet(module.StartCount) != 2); - - module.Shutdown(); - } - + TObjectCountCheck objectCountCheck; + + TExampleProtocol proto; + + TStartJobTestModule module; + + TBusModuleConfig moduleConfig; + moduleConfig.Secret.SchedulePeriod = TDuration::MilliSeconds(10); + module.SetConfig(moduleConfig); + + module.StartModule(); + + module.StartJob(new TExampleRequest(&proto.RequestCount)); + + UNIT_WAIT_FOR(AtomicGet(module.StartCount) != 2); + + module.Shutdown(); + } + struct TSleepModule: public TExampleServerModule { TSystemEvent MessageReceivedEvent; - + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); - - MessageReceivedEvent.Signal(); - - job->Sleep(1000000000); - - return TJobHandler(&TSleepModule::Never); - } - - TJobHandler Never(TBusJob*, TBusMessage*) { + + MessageReceivedEvent.Signal(); + + job->Sleep(1000000000); + + return TJobHandler(&TSleepModule::Never); + } + + TJobHandler Never(TBusJob*, TBusMessage*) { Y_FAIL("happens"); - throw 1; - } - }; - + throw 1; + } + }; + Y_UNIT_TEST(StartJobDestroyDuringSleep) { - TObjectCountCheck objectCountCheck; - - TExampleProtocol proto; - - TSleepModule module; - - module.StartModule(); - - module.StartJob(new TExampleRequest(&proto.StartCount)); - - module.MessageReceivedEvent.WaitI(); - - module.Shutdown(); - } - + TObjectCountCheck objectCountCheck; + + TExampleProtocol proto; + + TSleepModule module; + + module.StartModule(); + + module.StartJob(new TExampleRequest(&proto.StartCount)); + + module.MessageReceivedEvent.WaitI(); + + module.Shutdown(); + } + struct TSendReplyModule: public TExampleServerModule { TSystemEvent MessageReceivedEvent; - + TJobHandler Start(TBusJob* job, TBusMessage* mess) override { Y_UNUSED(mess); - - job->SendReply(new TExampleResponse(&Proto.ResponseCount)); - - MessageReceivedEvent.Signal(); - + + job->SendReply(new TExampleResponse(&Proto.ResponseCount)); + + MessageReceivedEvent.Signal(); + return nullptr; - } - }; - + } + }; + Y_UNIT_TEST(AllowSendReplyInStarted) { - TObjectCountCheck objectCountCheck; - - TExampleProtocol proto; - - TSendReplyModule module; - module.StartModule(); - module.StartJob(new TExampleRequest(&proto.StartCount)); - - module.MessageReceivedEvent.WaitI(); - - module.Shutdown(); - } -} + TObjectCountCheck objectCountCheck; + + TExampleProtocol proto; + + TSendReplyModule module; + module.StartModule(); + module.StartJob(new TExampleRequest(&proto.StartCount)); + + module.MessageReceivedEvent.WaitI(); + + module.Shutdown(); + } +} diff --git a/library/cpp/messagebus/test/ut/sync_client_ut.cpp b/library/cpp/messagebus/test/ut/sync_client_ut.cpp index 848a9d3457..400128193f 100644 --- a/library/cpp/messagebus/test/ut/sync_client_ut.cpp +++ b/library/cpp/messagebus/test/ut/sync_client_ut.cpp @@ -4,7 +4,7 @@ namespace NBus { namespace NTest { using namespace std; - + //////////////////////////////////////////////////////////////////// /// \brief Client for sending synchronous message to local server struct TSyncClient { @@ -13,7 +13,7 @@ namespace NBus { TExampleProtocol Proto; TBusMessageQueuePtr Bus; TBusSyncClientSessionPtr Session; - + int NumReplies; int NumMessages; @@ -53,7 +53,7 @@ namespace NBus { Y_UNIT_TEST_SUITE(SyncClientTest) { Y_UNIT_TEST(TestSync) { TObjectCountCheck objectCountCheck; - + TExampleServer server; TSyncClient client(server.GetActualListenAddr()); client.Work(); @@ -65,5 +65,5 @@ namespace NBus { } } - } -} + } +} diff --git a/library/cpp/messagebus/test/ut/ya.make b/library/cpp/messagebus/test/ut/ya.make index 5af102e0ba..fe1b4961d6 100644 --- a/library/cpp/messagebus/test/ut/ya.make +++ b/library/cpp/messagebus/test/ut/ya.make @@ -1,7 +1,7 @@ OWNER(g:messagebus) - + UNITTEST_FOR(library/cpp/messagebus) - + TIMEOUT(1200) SIZE(LARGE) diff --git a/library/cpp/messagebus/test/ya.make b/library/cpp/messagebus/test/ya.make index 1c1f8bbd9c..0dc4bd4720 100644 --- a/library/cpp/messagebus/test/ya.make +++ b/library/cpp/messagebus/test/ya.make @@ -1,5 +1,5 @@ OWNER(g:messagebus) - + RECURSE( example perftest |